diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
commit | d1f5b91da822b27faad83d50ecfdd2830a1be93e (patch) | |
tree | 78df3bf535cb8a5451afa402c51cb3f8d11b4d06 | |
parent | 22bc9b81495143d67a93bf58c936c5d5a65c8e8e (diff) | |
parent | a2f16dc9eb108ecf11938c7c4275d701a3635bb7 (diff) | |
download | ydb-d1f5b91da822b27faad83d50ecfdd2830a1be93e.tar.gz |
Merge pull request #14716 from ydb-platform/merge-libs-250218-0050
56 files changed, 1643 insertions, 355 deletions
diff --git a/build/external_resources/gdb/a.yaml b/build/external_resources/gdb/a.yaml index 050c7f4896..562f59c672 100644 --- a/build/external_resources/gdb/a.yaml +++ b/build/external_resources/gdb/a.yaml @@ -1,6 +1,50 @@ title: gdb14 service: buildroot +arcanum: + review: + ignore_self_ship: true + min_approvers_count: 1 + disabling_policy: denied + override_policy: append + groups: + - name: reviewers + roles: + - "cc:duty" + - name: owners + roles: + - "cc:duty" + - "cc:developer" + rules: + - reviewers: + - name: reviewers + ship: 0 + assign: 1 + ignore_self_ship: false + + - subpaths: "a.yaml" + reviewers: + - name: reviewers + ship: 0 + assign: 1 + - name: owners + ship: 1 + assign: 0 + ignore_self_ship: false + + auto_merge: + requirements: + - system: arcanum + type: st_issue_linked + disabling_policy: denied + override_policy: final + data: + in_commit_message: true + - system: arcanum + type: comment_issues_closed + disabling_policy: need_reason + override_policy: final + shared: mapping_path: &mapping-path "build/external_resources/gdb/resources.json" input: &base-input diff --git a/build/plugins/lib/nots/typescript/ts_config.py b/build/plugins/lib/nots/typescript/ts_config.py index 435ff4e78b..608e9b8de2 100644 --- a/build/plugins/lib/nots/typescript/ts_config.py +++ b/build/plugins/lib/nots/typescript/ts_config.py @@ -64,7 +64,7 @@ class TsConfig(object): def read(self): try: - with open(self.path) as f: + with open(self.path, encoding="utf-8") as f: self.data = self.rj.load(f, parse_mode=(self.rj.PM_COMMENTS | self.rj.PM_TRAILING_COMMAS)) except Exception as e: diff --git a/build/plugins/nots.py b/build/plugins/nots.py index 061d4bb78b..e5825ac66c 100644 --- a/build/plugins/nots.py +++ b/build/plugins/nots.py @@ -981,3 +981,6 @@ def on_run_javascript_after_build_add_js_script_as_input(unit: NotsUnitType, js_ return __set_append(unit, "_RUN_JAVASCRIPT_AFTER_BUILD_INPUTS", js_script) + + +# Zero-diff commit diff --git a/build/ymake.core.conf b/build/ymake.core.conf index d35952e8da..d3d40074a6 100644 --- a/build/ymake.core.conf +++ b/build/ymake.core.conf @@ -3152,8 +3152,9 @@ macro AR_PLUGIN(name) { ### Register script, which will process all inputs to any link_exe.py call with modules's library ### Script will receive all arguments to link_exe.py, and can output into stdout preprocessed list ### of all arguments, in JSON format -macro LD_PLUGIN(name) { - SRCS(GLOBAL $name.pyplugin) +macro LD_PLUGIN(Name) { + .CMD=$COPY_CMD ${context=TEXT;input:Name} ${noauto;output;global;suf=.pyplugin:Name} + .SEM=_SEM_IGNORED } USE_FLANG=no diff --git a/ydb/ci/rightlib.txt b/ydb/ci/rightlib.txt index a411539c27..c5feedde2d 100644 --- a/ydb/ci/rightlib.txt +++ b/ydb/ci/rightlib.txt @@ -1 +1 @@ -d7d62149f9bae7bdb622300a54f678cc419c81f9 +8fe93946bc369873a7ffbb3a7403463aa80e3117 diff --git a/yql/essentials/ast/yql_type_string.cpp b/yql/essentials/ast/yql_type_string.cpp index fc360d2f90..6dfbaa5664 100644 --- a/yql/essentials/ast/yql_type_string.cpp +++ b/yql/essentials/ast/yql_type_string.cpp @@ -91,6 +91,7 @@ enum EToken TOKEN_TZDATE32 = -56, TOKEN_TZDATETIME64 = -57, TOKEN_TZTIMESTAMP64 = -58, + TOKEN_MULTI = -59, // identifiers TOKEN_IDENTIFIER = -100, @@ -121,6 +122,7 @@ EToken TokenTypeFromStr(TStringBuf str) { TStringBuf("Dict"), TOKEN_DICT }, { TStringBuf("Tuple"), TOKEN_TUPLE }, { TStringBuf("Struct"), TOKEN_STRUCT }, + { TStringBuf("Multi"), TOKEN_MULTI }, { TStringBuf("Resource"), TOKEN_RESOURCE }, { TStringBuf("Void"), TOKEN_VOID }, { TStringBuf("Callable"), TOKEN_CALLABLE }, @@ -267,6 +269,10 @@ private: type = ParseStructType(); break; + case TOKEN_MULTI: + type = ParseMultiType(); + break; + case TOKEN_RESOURCE: type = ParseResourceType(); break; @@ -752,9 +758,9 @@ private: return MakeDictType(keyType, MakeVoidType()); } - TAstNode* ParseTupleTypeImpl() { + TAstNode* ParseTupleTypeImpl(TAstNode* (TTypeParser::*typeCreator)(TSmallVec<TAstNode*>&)) { TSmallVec<TAstNode*> items; - items.push_back(nullptr); // reserve for TupleType + items.push_back(nullptr); // reserve for type callable if (Token != '>') { for (;;) { @@ -773,13 +779,13 @@ private: } } - return MakeTupleType(items); + return (this->*typeCreator)(items); } TAstNode* ParseTupleType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); - TAstNode* tupleType = ParseTupleTypeImpl(); + TAstNode* tupleType = ParseTupleTypeImpl(&TTypeParser::MakeTupleType); if (tupleType) { EXPECT_AND_SKIP_TOKEN('>', nullptr); } @@ -836,6 +842,16 @@ private: return structType; } + TAstNode* ParseMultiType() { + GetNextToken(); // eat keyword + EXPECT_AND_SKIP_TOKEN('<', nullptr); + TAstNode* tupleType = ParseTupleTypeImpl(&TTypeParser::MakeMultiType); + if (tupleType) { + EXPECT_AND_SKIP_TOKEN('>', nullptr); + } + return tupleType; + } + TAstNode* ParseVariantType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); @@ -844,7 +860,7 @@ private: if (Token == TOKEN_IDENTIFIER || Token == TOKEN_ESCAPED_IDENTIFIER) { underlyingType = ParseStructTypeImpl(); } else if (IsTypeKeyword(Token) || Token == '(') { - underlyingType = ParseTupleTypeImpl(); + underlyingType = ParseTupleTypeImpl(&TTypeParser::MakeTupleType); } else { return AddError("Expected type"); } @@ -995,6 +1011,11 @@ private: return MakeList(items.data(), items.size()); } + TAstNode* MakeMultiType(TSmallVec<TAstNode*>& items) { + items[0] = MakeLiteralAtom(TStringBuf("MultiType")); + return MakeList(items.data(), items.size()); + } + TAstNode* ParseResourceType() { GetNextToken(); // eat keyword EXPECT_AND_SKIP_TOKEN('<', nullptr); diff --git a/yql/essentials/core/expr_nodes/yql_expr_nodes.json b/yql/essentials/core/expr_nodes/yql_expr_nodes.json index 09de074b5e..cd33706ac7 100644 --- a/yql/essentials/core/expr_nodes/yql_expr_nodes.json +++ b/yql/essentials/core/expr_nodes/yql_expr_nodes.json @@ -1613,18 +1613,37 @@ ] }, { + "Name": "TCoBlockStorage", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "BlockStorage"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"} + ] + }, + { + "Name": "TCoBlockMapJoinIndex", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "BlockMapJoinIndex"}, + "Children": [ + {"Index": 0, "Name": "BlockStorage", "Type": "TExprBase"}, + {"Index": 1, "Name": "InputType", "Type": "TExprBase"}, + {"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "Options", "Type": "TExprList"} + ] + }, + { "Name": "TCoBlockMapJoinCore", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "BlockMapJoinCore"}, "Children": [ {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"}, {"Index": 1, "Name": "RightInput", "Type": "TExprBase"}, - {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom"}, - {"Index": 3, "Name": "LeftKeyColumns", "Type": "TCoAtomList"}, - {"Index": 4, "Name": "LeftKeyDrops", "Type": "TCoAtomList"}, - {"Index": 5, "Name": "RightKeyColumns", "Type": "TCoAtomList"}, - {"Index": 6, "Name": "RightKeyDrops", "Type": "TCoAtomList"}, - {"Index": 7, "Name": "Options", "Type": "TExprList"} + {"Index": 2, "Name": "RightInputType", "Type": "TExprBase"}, + {"Index": 3, "Name": "JoinKind", "Type": "TCoAtom"}, + {"Index": 4, "Name": "LeftKeyColumns", "Type": "TCoAtomList"}, + {"Index": 5, "Name": "LeftKeyDrops", "Type": "TCoAtomList"}, + {"Index": 6, "Name": "RightKeyColumns", "Type": "TCoAtomList"}, + {"Index": 7, "Name": "RightKeyDrops", "Type": "TCoAtomList"} ] }, { diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp index 9732079e0f..3051091e05 100644 --- a/yql/essentials/core/facade/yql_facade.cpp +++ b/yql/essentials/core/facade/yql_facade.cpp @@ -895,7 +895,6 @@ TProgram::TFutureStatus TProgram::LineageAsync(const TString& username, IOutputS .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true) .AddPostTypeAnnotation() .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput") - .AddCheckExecution(false) .AddLineageOptimization(LineageStr_) .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput") .Build(); @@ -1161,7 +1160,6 @@ TProgram::TFutureStatus TProgram::LineageAsyncWithConfig( pipeline.AddPostTypeAnnotation(); pipelineConf.AfterTypeAnnotation(&pipeline); - pipeline.AddCheckExecution(false); pipeline.AddLineageOptimization(LineageStr_); Transformer_ = pipeline.Build(); diff --git a/yql/essentials/core/services/yql_transform_pipeline.cpp b/yql/essentials/core/services/yql_transform_pipeline.cpp index 2dd8f2c278..5f7165c10d 100644 --- a/yql/essentials/core/services/yql_transform_pipeline.cpp +++ b/yql/essentials/core/services/yql_transform_pipeline.cpp @@ -200,6 +200,7 @@ TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorl TTransformationPipeline& TTransformationPipeline::AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode) { AddCommonOptimization(issueCode); + AddCheckExecution(false, issueCode); Transformers_.push_back(TTransformStage( CreateFunctorTransformer( [typeCtx = TypeAnnotationContext_, &lineageOut](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index fdba180472..732edecc45 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -12987,17 +12987,18 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["BlockDecimalMul"] = &BlockDecimalBinaryWrapper; Functions["BlockDecimalMod"] = &BlockDecimalBinaryWrapper; Functions["BlockDecimalDiv"] = &BlockDecimalBinaryWrapper; + Functions["BlockStorage"] = &BlockStorageWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; - Functions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper; - ExtFunctions["AsScalar"] = &AsScalarWrapper; ExtFunctions["WideToBlocks"] = &WideToBlocksWrapper; ExtFunctions["BlockCombineAll"] = &BlockCombineAllWrapper; ExtFunctions["BlockCombineHashed"] = &BlockCombineHashedWrapper; ExtFunctions["BlockMergeFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; ExtFunctions["BlockMergeManyFinalizeHashed"] = &BlockMergeFinalizeHashedWrapper; + ExtFunctions["BlockMapJoinIndex"] = &BlockMapJoinIndexWrapper; + ExtFunctions["BlockMapJoinCore"] = &BlockMapJoinCoreWrapper; ExtFunctions["SqlRename"] = &SqlRenameWrapper; ExtFunctions["OrderedSqlRename"] = &SqlRenameWrapper; diff --git a/yql/essentials/core/type_ann/type_ann_impl.h b/yql/essentials/core/type_ann/type_ann_impl.h index 717cb7b3ac..20379c8bfa 100644 --- a/yql/essentials/core/type_ann/type_ann_impl.h +++ b/yql/essentials/core/type_ann/type_ann_impl.h @@ -35,7 +35,9 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus CombineCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus GroupingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus DecimalBinaryWrapperBase(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx, bool blocks); - IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockStorageWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockMapJoinIndexWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); TMaybe<ui32> FindOrReportMissingMember(TStringBuf memberName, TPositionHandle pos, const TStructExprType& structType, TExprContext& ctx); diff --git a/yql/essentials/core/type_ann/type_ann_join.cpp b/yql/essentials/core/type_ann/type_ann_join.cpp index 78c1c89f28..cc57859624 100644 --- a/yql/essentials/core/type_ann/type_ann_join.cpp +++ b/yql/essentials/core/type_ann/type_ann_join.cpp @@ -1,7 +1,12 @@ #include "type_ann_core.h" #include "type_ann_impl.h" + #include <util/string/join.h> +#include <util/string/split.h> + +#include <yql/essentials/core/type_ann/type_ann_types.h> #include <yql/essentials/core/yql_join.h> +#include <yql/essentials/minikql/mkql_program_builder.h> namespace NYql { namespace NTypeAnnImpl { @@ -980,40 +985,212 @@ namespace NTypeAnnImpl { } } - IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + IGraphTransformer::TStatus BlockStorageWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType itemTypes; + if (!EnsureWideStreamBlockType(input->Head(), itemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto streamItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType(); + input->SetTypeAnn(ctx.Expr.MakeType<TResourceExprType>(TStringBuilder() << + NKikimr::NMiniKQL::BlockStorageResourcePrefix << FormatType(streamItemType))); + return IGraphTransformer::TStatus::Ok; + } + + bool EnsureBlockStorageResource(const TExprNode* resource, const TMultiExprType*& streamItemType, TExtContext& ctx) { + using NKikimr::NMiniKQL::BlockStorageResourcePrefix; + + if (!EnsureResourceType(*resource, ctx.Expr)) { + return false; + } + const auto resourceType = resource->GetTypeAnn()->Cast<TResourceExprType>(); + const auto resourceTag = resourceType->GetTag(); + if (!resourceTag.StartsWith(BlockStorageResourcePrefix)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(resource->Pos()), "Expected block storage resource")); + return false; + } + + auto typeString = TStringBuf(resourceTag.data() + BlockStorageResourcePrefix.size(), resourceTag.size() - BlockStorageResourcePrefix.size()); + auto typeNode = ctx.Expr.Builder(resource->Pos()) + .Callable("ParseType") + .Atom(0, typeString) + .Seal() + .Build(); + + auto status = ParseTypeWrapper(typeNode, typeNode, ctx); + if (status == IGraphTransformer::TStatus::Error) { + return false; + } + if (!EnsureType(*typeNode, ctx.Expr)) { + return false; + } + + streamItemType = typeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->UserCast<TMultiExprType>(ctx.Expr.GetPosition(resource->Pos()), ctx.Expr); + return true; + } + + IGraphTransformer::TStatus BlockMapJoinIndexWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { + Y_UNUSED(output); + + using NKikimr::NMiniKQL::BlockMapJoinIndexResourcePrefix; + using NKikimr::NMiniKQL::BlockMapJoinIndexResourceSeparator; + + if (!EnsureArgsCount(*input, 4, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + const TMultiExprType* expectedStreamItemType = nullptr; + if (!EnsureBlockStorageResource(input->Child(0), expectedStreamItemType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + + TTypeAnnotationNode::TListType itemTypes; + if (!EnsureType(*input->Child(1), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + auto inputType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureWideBlockType(input->Child(1)->Pos(), *inputType, itemTypes, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + auto streamItemType = inputType->Cast<TMultiExprType>(); + + if (!IsSameAnnotation(*streamItemType, *expectedStreamItemType)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + TStringBuilder() << "Mismatch between provided stream item type " << static_cast<const TTypeAnnotationNode&>(*streamItemType) + << "and block storage item type " << static_cast<const TTypeAnnotationNode&>(*expectedStreamItemType))); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureTupleOfAtoms(*input->Child(2), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + TVector<ui32> keyColumns; + for (const auto& keyColumnNode : input->Child(2)->Children()) { + auto position = GetWideBlockFieldPosition(*streamItemType, keyColumnNode->Content()); + if (!position) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(keyColumnNode->Pos()), TStringBuilder() << "Unknown key column: " << keyColumnNode->Content())); + return IGraphTransformer::TStatus::Error; + } + keyColumns.push_back(*position); + } + + auto settingsValidator = [&](TStringBuf settingName, TExprNode& node, TExprContext& ctx) { + if (node.ChildrenSize() != 1) { + ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), + TStringBuilder() << "No extra parameters are expected by setting '" << settingName << "'")); + return false; + } + return true; + }; + if (!EnsureValidSettings(input->Tail(), {"any"}, settingsValidator, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(ctx.Expr.MakeType<TResourceExprType>(TStringBuilder() << + BlockMapJoinIndexResourcePrefix << FormatType(streamItemType) << BlockMapJoinIndexResourceSeparator << JoinSeq(",", keyColumns))); + return IGraphTransformer::TStatus::Ok; + } + + bool EnsureBlockMapJoinIndexResource(const TExprNode* resource, const TMultiExprType*& streamItemType, TVector<TStringBuf>& keyColumns, TExtContext& ctx) { + using NKikimr::NMiniKQL::BlockMapJoinIndexResourcePrefix; + using NKikimr::NMiniKQL::BlockMapJoinIndexResourceSeparator; + + if (!EnsureResourceType(*resource, ctx.Expr)) { + return false; + } + const auto resourceType = resource->GetTypeAnn()->Cast<TResourceExprType>(); + const auto resourceTag = resourceType->GetTag(); + if (!resourceTag.StartsWith(BlockMapJoinIndexResourcePrefix)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(resource->Pos()), "Expected block map join index resource")); + return false; + } + + auto resourceIdentifier = TStringBuf(resourceTag.data() + BlockMapJoinIndexResourcePrefix.size(), resourceTag.size() - BlockMapJoinIndexResourcePrefix.size()); + TStringBuf typeString, keyColumnsString; + Split(resourceIdentifier, BlockMapJoinIndexResourceSeparator, typeString, keyColumnsString); + Split(keyColumnsString, ",", keyColumns); + + auto resourceTypeNode = ctx.Expr.Builder(resource->Pos()) + .Callable("ParseType") + .Atom(0, typeString) + .Seal() + .Build(); + + auto status = ParseTypeWrapper(resourceTypeNode, resourceTypeNode, ctx); + if (status == IGraphTransformer::TStatus::Error) { + return false; + } + if (!EnsureType(*resourceTypeNode, ctx.Expr)) { + return false; + } + + streamItemType = resourceTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->UserCast<TMultiExprType>(ctx.Expr.GetPosition(resource->Pos()), ctx.Expr); + return true; + } + + IGraphTransformer::TStatus BlockMapJoinCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureArgsCount(*input, 8, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } + if (!EnsureAtom(*input->Child(3), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + const auto joinKind = input->Child(3)->Content(); + if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(3)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind + << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross")); + return IGraphTransformer::TStatus::Error; + } + TTypeAnnotationNode::TListType leftItemTypes; if (!EnsureWideStreamBlockType(input->Head(), leftItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } leftItemTypes.pop_back(); - auto leftItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + auto leftStreamItemType = input->Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + + const TMultiExprType* expectedRightStreamItemType = nullptr; + TVector<TStringBuf> expectedRightKeyColumns; + if (joinKind != "Cross") { + if (!EnsureBlockMapJoinIndexResource(input->Child(1), expectedRightStreamItemType, expectedRightKeyColumns, ctx)) { + return IGraphTransformer::TStatus::Error; + } + } else { + if (!EnsureBlockStorageResource(input->Child(1), expectedRightStreamItemType, ctx)) { + return IGraphTransformer::TStatus::Error; + } + } TTypeAnnotationNode::TListType rightItemTypes; - if (!EnsureWideStreamBlockType(*input->Child(1), rightItemTypes, ctx.Expr)) { + if (!EnsureType(*input->Child(2), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } - rightItemTypes.pop_back(); - auto rightItemType = input->Child(1)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); - - if (!EnsureAtom(*input->Child(2), ctx.Expr)) { + auto rightInputType = input->Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!EnsureWideBlockType(input->Child(2)->Pos(), *rightInputType, rightItemTypes, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } + rightItemTypes.pop_back(); + auto rightStreamItemType = rightInputType->Cast<TMultiExprType>(); - const auto joinKind = input->Child(2)->Content(); - if (joinKind != "Inner" && joinKind != "Left" && joinKind != "LeftSemi" && joinKind != "LeftOnly"&& joinKind != "Cross") { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(2)->Pos()), TStringBuilder() << "Unknown join kind: " << joinKind - << ", supported: Inner, Left, LeftSemi, LeftOnly, Cross")); + if (!IsSameAnnotation(*rightStreamItemType, *expectedRightStreamItemType)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + TStringBuilder() << "Mismatch between provided right stream item type " << static_cast<const TTypeAnnotationNode&>(*rightStreamItemType) + << "and right block storage item type " << static_cast<const TTypeAnnotationNode&>(*expectedRightStreamItemType))); return IGraphTransformer::TStatus::Error; } - if (input->Child(3)->ChildrenSize() != input->Child(5)->ChildrenSize()) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(5)->Pos()), TStringBuilder() << "Mismatch of key column count")); + if (input->Child(4)->ChildrenSize() != input->Child(6)->ChildrenSize()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(6)->Pos()), TStringBuilder() << "Mismatch of key column count")); return IGraphTransformer::TStatus::Error; } @@ -1056,36 +1233,44 @@ namespace NTypeAnnImpl { return true; }; - for (size_t childIdx = 3; childIdx <= 6; childIdx++) { + for (size_t childIdx = 4; childIdx <= 7; childIdx++) { if (!EnsureTupleOfAtoms(*input->Child(childIdx), ctx.Expr)) { return IGraphTransformer::TStatus::Error; } } std::unordered_set<ui32> leftKeyColumns; - if (!checkKeyColumns(leftKeyColumns, true, *input->Child(3), leftItemType)) { + if (!checkKeyColumns(leftKeyColumns, true, *input->Child(4), leftStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> leftKeyDrops; - if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(4), leftItemType)) { + if (!checkKeyDrops(leftKeyDrops, true, leftKeyColumns, *input->Child(5), leftStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> rightKeyColumns; - if (!checkKeyColumns(rightKeyColumns, false, *input->Child(5), rightItemType)) { + if (!checkKeyColumns(rightKeyColumns, true, *input->Child(6), rightStreamItemType)) { return IGraphTransformer::TStatus::Error; } std::unordered_set<ui32> rightKeyDrops; - if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(6), rightItemType)) { + if (!checkKeyDrops(rightKeyDrops, false, rightKeyColumns, *input->Child(7), rightStreamItemType)) { return IGraphTransformer::TStatus::Error; } - auto settingsValidator = [&](TStringBuf, TExprNode& node, TExprContext&) { return node.ChildrenSize() == 1; }; - if (!EnsureValidSettings(input->Tail(), {"rightAny"}, settingsValidator, ctx.Expr)) { + if (input->Child(6)->ChildrenSize() != expectedRightKeyColumns.size()) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + "Provided right key columns list differs from right block storage one")); return IGraphTransformer::TStatus::Error; } + for (size_t i = 0; i < input->Child(6)->ChildrenSize(); i++) { + if (input->Child(6)->Child(i)->Content() != expectedRightKeyColumns[i]) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + "Provided right key columns list differs from right block storage one")); + return IGraphTransformer::TStatus::Error; + } + } std::vector<const TTypeAnnotationNode*> resultItems; for (ui32 pos = 0; pos < leftItemTypes.size(); pos++) { diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 37217aec85..17c549202c 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -239,10 +239,8 @@ private: TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_; }; -template <typename TDerived> -class TBlockStorageBase : public TComputationValue<TDerived> { - using TSelf = TBlockStorageBase<TDerived>; - using TBase = TComputationValue<TDerived>; +class TBlockStorage : public TComputationValue<TBlockStorage> { + using TBase = TComputationValue<TBlockStorage>; public: struct TBlock { @@ -268,7 +266,7 @@ public: }; class TRowIterator { - friend class TBlockStorageBase; + friend class TBlockStorage; public: TRowIterator() = default; @@ -304,7 +302,7 @@ public: } private: - TRowIterator(const TSelf* blockStorage) + TRowIterator(const TBlockStorage* blockStorage) : BlockStorage_(blockStorage) {} @@ -312,19 +310,21 @@ public: size_t CurrentBlockOffset_ = 0; size_t CurrentItemOffset_ = 0; - const TSelf* BlockStorage_ = nullptr; + const TBlockStorage* BlockStorage_ = nullptr; }; - TBlockStorageBase( + TBlockStorage( TMemoryUsageInfo* memInfo, const TVector<TType*>& itemTypes, NUdf::TUnboxedValue stream, + TStringBuf resourceTag, arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) - , Stream_(stream) + , Stream_(std::move(stream)) , Inputs_(itemTypes.size()) + , ResourceTag_(std::move(resourceTag)) { TBlockTypeHelper helper; for (size_t i = 0; i < itemTypes.size(); i++) { @@ -341,11 +341,14 @@ public: case NUdf::EFetchStatus::Yield: return NUdf::EFetchStatus::Yield; case NUdf::EFetchStatus::Finish: + IsFinished_ = true; return NUdf::EFetchStatus::Finish; case NUdf::EFetchStatus::Ok: break; } + Y_ENSURE(!IsFinished_, "Got data on finished stream"); + std::vector<arrow::Datum> blockColumns; for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); @@ -384,19 +387,15 @@ public: return TRowIterator(this); } + size_t GetRowCount() const { + return RowCount_; + } + TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const { Y_ENSURE(columnIdx < Inputs_.size() - 1); return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset); } - void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { - Y_ENSURE(row.size() == ioMap.size()); - for (size_t i = 0; i < row.size(); i++) { - row[i] = GetItem(entry, ioMap[i]); - } - } - -protected: TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const { Y_ENSURE(offset < block.Size); const auto& datum = block.Columns[columnIdx]; @@ -408,6 +407,34 @@ protected: } } + void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { + Y_ENSURE(row.size() == ioMap.size()); + for (size_t i = 0; i < row.size(); i++) { + row[i] = GetItem(entry, ioMap[i]); + } + } + + const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const { + return Comparators_; + } + + const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const { + return Hashers_; + } + + bool IsFinished() const { + return IsFinished_; + } + +private: + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + protected: const std::vector<arrow::ValueDescr> InputsDescr_; @@ -418,27 +445,60 @@ protected: std::vector<TBlock> Data_; size_t RowCount_ = 0; + bool IsFinished_ = false; NUdf::TUnboxedValue Stream_; TUnboxedValueVector Inputs_; + + const TStringBuf ResourceTag_; }; -class TBlockStorage: public TBlockStorageBase<TBlockStorage> { -private: - using TBase = TBlockStorageBase<TBlockStorage>; +class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>; + public: - using TBase::TBase; + TBlockStorageWrapper( + TComputationMutables& mutables, + TVector<TType*>&& itemTypes, + IComputationNode* stream, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , ItemTypes_(std::move(itemTypes)) + , Stream_(stream) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockStorage>( + ItemTypes_, + std::move(Stream_->GetValue(ctx)), + ResourceTag_, + &ctx.ArrowMemoryPool + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(Stream_); + } + +private: + const TVector<TType*> ItemTypes_; + IComputationNode* const Stream_; + + const TString ResourceTag_; }; -class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { - using TBase = TBlockStorageBase<TIndexedBlockStorage>; +class TBlockIndex : public TComputationValue<TBlockIndex> { + using TBase = TComputationValue<TBlockIndex>; struct TIndexNode { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; TIndexNode* Next; TIndexNode() = delete; - TIndexNode(TRowEntry entry, TIndexNode* next = nullptr) + TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr) : Entry(entry) , Next(next) {} @@ -450,7 +510,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { : Raw(0) {} - TIndexMapValue(TRowEntry entry) { + TIndexMapValue(TBlockStorage::TRowEntry entry) { TIndexEntryUnion un; un.Entry = entry; @@ -471,7 +531,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { return EntryList; } - TRowEntry GetEntry() const { + TBlockStorage::TRowEntry GetEntry() const { Y_ENSURE(IsInplace()); TIndexEntryUnion un; @@ -481,7 +541,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { private: union TIndexEntryUnion { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; ui64 Raw; }; @@ -504,7 +564,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { public: class TIterator { - friend class TIndexedBlockStorage; + friend class TBlockIndex; enum class EIteratorType { EMPTY, @@ -547,7 +607,7 @@ public: return *this; } - TMaybe<TRowEntry> Next() { + TMaybe<TBlockStorage::TRowEntry> Next() { Y_ENSURE(IsValid()); switch (Type_) { @@ -560,7 +620,7 @@ public: } EntryConsumed_ = true; - return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing(); + return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing(); case EIteratorType::LIST: for (; Node_ != nullptr; Node_ = Node_->Next) { @@ -597,12 +657,12 @@ public: } private: - TIterator(const TIndexedBlockStorage* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -610,7 +670,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -619,12 +679,12 @@ public: private: EIteratorType Type_; - const TIndexedBlockStorage* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; struct { - TRowEntry Entry_; + TBlockStorage::TRowEntry Entry_; bool EntryConsumed_; }; }; @@ -633,32 +693,35 @@ public: }; public: - TIndexedBlockStorage( + TBlockIndex( TMemoryUsageInfo* memInfo, - const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, - NUdf::TUnboxedValue stream, + NUdf::TUnboxedValue blockStorage, bool any, - arrow::MemoryPool* pool + TStringBuf resourceTag ) - : TBase(memInfo, itemTypes, stream, pool) + : TBase(memInfo) , KeyColumns_(keyColumns) + , BlockStorage_(std::move(blockStorage)) , Any_(any) + , ResourceTag_(std::move(resourceTag)) {} - NUdf::EFetchStatus FetchStream() { - Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built"); - return TBase::FetchStream(); - } - void BuildIndex() { - Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_)); - for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) { - const auto& block = GetBlock(blockOffset); + if (Index_) { + return; + } + + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read"); + + Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount())); + for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) { + const auto& block = blockStorage.GetBlock(blockOffset); auto blockSize = block.Size; std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch; - std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries; + std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries; std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys; ui32 insertBatchLen = 0; @@ -692,7 +755,7 @@ public: continue; } - insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset); + insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset); insertBatch[insertBatchLen].ConstructKey(keyHash); insertBatchLen++; @@ -709,7 +772,7 @@ public: } template<typename TGetKey> - void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { + void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { Y_ENSURE(batchSize <= PrefetchBatchSize); std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch; @@ -737,11 +800,13 @@ public: }); } - bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { - auto indexItem = GetItem(entry, KeyColumns_[i]); - if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { + auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]); + if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { return true; } } @@ -749,25 +814,31 @@ public: return false; } + const NUdf::TUnboxedValue& GetBlockStorage() const { + return BlockStorage_; + } + private: - ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto item = GetItemFromBlock(block, keyColumn, offset); + auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; } - keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item)); + keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item)); keyItems.push_back(std::move(item)); } return keyHash; } - TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) { + TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } @@ -787,21 +858,72 @@ private: } } + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + private: const TVector<ui32>& KeyColumns_; + NUdf::TUnboxedValue BlockStorage_; std::unique_ptr<TIndexMap> Index_; std::deque<TIndexNode> IndexNodes_; const bool Any_; + const TStringBuf ResourceTag_; +}; + +class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>; + +public: + TBlockMapJoinIndexWrapper( + TComputationMutables& mutables, + TVector<ui32>&& keyColumns, + IComputationNode* blockStorage, + bool any, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , KeyColumns_(std::move(keyColumns)) + , BlockStorage_(blockStorage) + , Any_(any) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockIndex>( + KeyColumns_, + std::move(BlockStorage_->GetValue(ctx)), + Any_, + ResourceTag_ + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(BlockStorage_); + } + +private: + const TVector<ui32> KeyColumns_; + IComputationNode* const BlockStorage_; + const bool Any_; + const TString ResourceTag_; }; -template <bool WithoutRight, bool RightRequired, bool RightAny> -class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>> +template <bool WithoutRight, bool RightRequired> +class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>> { -using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>; -using TJoinState = TBlockJoinState<RightRequired>; -using TIndexState = TIndexedBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>; + using TJoinState = TBlockJoinState<RightRequired>; + using TStorageState = TBlockStorage; + using TIndexState = TBlockIndex; + public: TBlockMapJoinCoreWraper( TComputationMutables& mutables, @@ -809,22 +931,18 @@ public: const TVector<TType*>&& leftItemTypes, const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockIndex ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) - , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , KeyTupleCache_(mutables) {} @@ -835,60 +953,49 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TIndexState>( - RightItemTypes_, - RightKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightAny, - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), - std::move(LeftStream_->GetValue(ctx)), LeftKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightKeyColumns_, - RightIOMap_ + RightIOMap_, + std::move(LeftStream_->GetValue(ctx)), + std::move(RightBlockIndex_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& indexState, - NUdf::TUnboxedValue&& leftStream, const TVector<ui32>& leftKeyColumns, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightKeyColumns, - const TVector<ui32>& rightIOMap + const TVector<ui32>& rightIOMap, + NUdf::TUnboxedValue&& leftStream, + NUdf::TUnboxedValue&& rightBlockIndex ) : TBase(memInfo) , JoinState_(joinState) - , IndexState_(indexState) - , LeftStream_(leftStream) , LeftKeyColumns_(leftKeyColumns) - , RightStream_(rightStream) - , RightKeyColumns_(rightKeyColumns) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get()); + auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource()); + auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; while (fetchStatus != NUdf::EFetchStatus::Finish) { - fetchStatus = indexState.FetchStream(); + fetchStatus = storageState.FetchStream(); if (fetchStatus == NUdf::EFetchStatus::Yield) { return NUdf::EFetchStatus::Yield; } @@ -931,7 +1038,7 @@ private: while (joinState.IsNotFull() && !iter.IsEmpty()) { auto key = iter.Next(); - indexState.GetRow(*key, RightIOMap_, rightRow); + storageState.GetRow(*key, RightIOMap_, rightRow); joinState.MakeRow(rightRow); } @@ -993,13 +1100,9 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue IndexState_; - NUdf::TUnboxedValue LeftStream_; const TVector<ui32>& LeftKeyColumns_; - NUdf::TUnboxedValue RightStream_; - const TVector<ui32>& RightKeyColumns_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; @@ -1007,12 +1110,15 @@ private: ui32 LookupBatchCurrent_ = 0; ui32 LookupBatchSize_ = 0; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockIndex_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockIndex_); } private: @@ -1022,44 +1128,37 @@ private: const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockIndex_; const TContainerCacheOnContext KeyTupleCache_; }; class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper> { -using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; -using TJoinState = TBlockJoinState<true>; -using TStorageState = TBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; + using TJoinState = TBlockJoinState<true>; + using TStorageState = TBlockStorage; + public: TBlockCrossJoinCoreWraper( TComputationMutables& mutables, const TVector<TType*>&& resultItemTypes, const TVector<TType*>&& leftItemTypes, - const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockStorage ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) - , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , RightBlockStorage_(std::move(rightBlockStorage)) , KeyTupleCache_(mutables) {} @@ -1070,47 +1169,40 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TStorageState>( - RightItemTypes_, - std::move(RightStream_->GetValue(ctx)), - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), + RightIOMap_, std::move(LeftStream_->GetValue(ctx)), - std::move(RightStream_->GetValue(ctx)), - RightIOMap_ + std::move(RightBlockStorage_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& storageState, + const TVector<ui32>& rightIOMap, NUdf::TUnboxedValue&& leftStream, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightIOMap + NUdf::TUnboxedValue&& rightBlockStorage ) : TBase(memInfo) , JoinState_(joinState) - , StorageState_(storageState) - , LeftStream_(leftStream) - , RightStream_(rightStream) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockStorage_(rightBlockStorage) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& storageState = *static_cast<TStorageState*>(StorageState_.AsBoxed().Get()); + auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; @@ -1176,43 +1268,109 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue StorageState_; - - NUdf::TUnboxedValue LeftStream_; - NUdf::TUnboxedValue RightStream_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; TStorageState::TRowIterator RightRowIterator_; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockStorage_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockStorage_); } private: const TVector<TType*> ResultItemTypes_; const TVector<TType*> LeftItemTypes_; - const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockStorage_; const TContainerCacheOnContext KeyTupleCache_; }; } // namespace +IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream"); + const auto inputStreamType = AS_TYPE(TStreamType, inputType); + MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(), + "Expected Multi as a left stream item type"); + const auto inputStreamComponents = GetWideComponents(inputStreamType); + MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column"); + TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend()); + + const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockStorageWrapper( + ctx.Mutables, + std::move(inputStreamItems), + inputStream, + resultResourceType->GetTag() + ); +} + +IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type"); + auto inputResourceType = AS_TYPE(TResourceType, inputType); + MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1)); + MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type"); + const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements(); + MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column"); + + const auto keyColumnsLiteral = callable.GetInput(2); + const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral); + TVector<ui32> keyColumns; + keyColumns.reserve(keyColumnsTuple->GetValuesCount()); + for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i)); + keyColumns.emplace_back(item->AsValue().Get<ui32>()); + } + + for (ui32 keyColumn : keyColumns) { + MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range"); + } + + const auto anyNode = callable.GetInput(3); + const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>(); + + const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockMapJoinIndexWrapper( + ctx.Mutables, + std::move(keyColumns), + blockStorage, + any, + resultResourceType->GetTag() + ); +} + IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); @@ -1234,22 +1392,28 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column"); const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend()); - const auto rightType = callable.GetInput(1).GetStaticType(); - MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream"); - const auto rightStreamType = AS_TYPE(TStreamType, rightType); - MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), - "Expected Multi as a right stream item type"); - const auto rightStreamComponents = GetWideComponents(rightStreamType); - MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); - const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); - - const auto joinKindNode = callable.GetInput(2); + const auto joinKindNode = callable.GetInput(3); const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); const auto joinKind = GetJoinKind(rawKind); Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross); - const auto leftKeyColumnsLiteral = callable.GetInput(3); + const auto rightBlockStorageType = callable.GetInput(1).GetStaticType(); + MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type"); + auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + } + + auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2)); + MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type"); + const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements(); + MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); + const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); + + const auto leftKeyColumnsLiteral = callable.GetInput(4); const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral); TVector<ui32> leftKeyColumns; leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount()); @@ -1259,7 +1423,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); - const auto leftKeyDropsLiteral = callable.GetInput(4); + const auto leftKeyDropsLiteral = callable.GetInput(5); const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral); THashSet<ui32> leftKeyDrops; leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount()); @@ -1273,7 +1437,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo "Only key columns has to be specified in drop column set"); } - const auto rightKeyColumnsLiteral = callable.GetInput(5); + const auto rightKeyColumnsLiteral = callable.GetInput(6); const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral); TVector<ui32> rightKeyColumns; rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount()); @@ -1283,7 +1447,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend()); - const auto rightKeyDropsLiteral = callable.GetInput(6); + const auto rightKeyDropsLiteral = callable.GetInput(7); const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral); THashSet<ui32> rightKeyDrops; rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount()); @@ -1303,9 +1467,6 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch"); - const auto rightAnyNode = callable.GetInput(7); - const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>(); - // XXX: Mind the last wide item, containing block length. TVector<ui32> leftIOMap; for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { @@ -1329,62 +1490,40 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0); - const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1); + const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1); -#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \ - return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \ +#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \ + return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \ ctx.Mutables, \ std::move(joinItems), \ std::move(leftStreamItems), \ std::move(leftKeyColumns), \ std::move(leftIOMap), \ - std::move(rightStreamItems), \ - std::move(rightKeyColumns), \ std::move(rightIOMap), \ leftStream, \ - rightStream \ + rightBlockStorage \ ) switch (joinKind) { case EJoinKind::Inner: - if (rightAny) { - JOIN_WRAPPER(false, true, true); - } else { - JOIN_WRAPPER(false, true, false); - } + JOIN_WRAPPER(false, true); case EJoinKind::Left: - if (rightAny) { - JOIN_WRAPPER(false, false, true); - } else { - JOIN_WRAPPER(false, false, false); - } + JOIN_WRAPPER(false, false); case EJoinKind::LeftSemi: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join"); - if (rightAny) { - JOIN_WRAPPER(true, true, true); - } else { - JOIN_WRAPPER(true, true, false); - } + JOIN_WRAPPER(true, true); case EJoinKind::LeftOnly: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join"); - if (rightAny) { - JOIN_WRAPPER(true, false, true); - } else { - JOIN_WRAPPER(true, false, false); - } + JOIN_WRAPPER(true, false); case EJoinKind::Cross: - MKQL_ENSURE(!rightAny, "rightAny can't be used with cross join"); return new TBlockCrossJoinCoreWraper( ctx.Mutables, std::move(joinItems), std::move(leftStreamItems), - std::move(leftKeyColumns), std::move(leftIOMap), - std::move(rightStreamItems), - std::move(rightKeyColumns), std::move(rightIOMap), leftStream, - rightStream + rightBlockStorage ); default: /* TODO: Display the human-readable join kind name. */ diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h index a556828aae..d38e0c6115 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.h @@ -4,6 +4,8 @@ namespace NKikimr { namespace NMiniKQL { +IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx); IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx); } // NKikimr diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp index 7f5550a46f..3b2119ba21 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp @@ -315,6 +315,8 @@ struct TCallableComputationNodeBuilderFuncMapFiller { {"BlockDecimalDiv", &WrapBlockDecimalDiv}, {"BlockDecimalMod", &WrapBlockDecimalMod}, {"ScalarApply", &WrapScalarApply}, + {"BlockStorage", &WrapBlockStorage}, + {"BlockMapJoinIndex", &WrapBlockMapJoinIndex}, {"BlockMapJoinCore", &WrapBlockMapJoinCore}, {"MakeHeap", &WrapMakeHeap}, {"PushHeap", &WrapPushHeap}, diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index b40da26574..add6f4731a 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -46,53 +46,115 @@ TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList)); - const auto leftStreamItems = ValidateBlockStreamType(leftStream.GetStaticType()); - const auto rightStreamItems = ValidateBlockStreamType(rightStream.GetStaticType()); - - TVector<TType*> joinReturnItems; + const auto joinReturnType = MakeJoinType(pgmBuilder, + joinKind, + leftStream.GetStaticType(), + leftKeyDrops, + rightStream.GetStaticType(), + rightKeyDrops + ); - const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); - for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size - if (leftKeyDropsSet.contains(i)) { - continue; - } - joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many)); + auto rightBlockStorageNode = pgmBuilder.BlockStorage(rightStream, pgmBuilder.NewResourceType(BlockStorageResourcePrefix)); + if (joinKind != EJoinKind::Cross) { + rightBlockStorageNode = pgmBuilder.BlockMapJoinIndex( + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + rightKeyColumns, + rightAny, + pgmBuilder.NewResourceType(BlockMapJoinIndexResourcePrefix) + ); } - if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) { - const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend()); - for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size - if (rightKeyDropsSet.contains(i)) { - continue; - } + auto joinNode = pgmBuilder.BlockMapJoinCore( + leftStream, + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + joinKind, + leftKeyColumns, + leftKeyDrops, + rightKeyColumns, + rightKeyDrops, + joinReturnType + ); - joinReturnItems.push_back(pgmBuilder.NewBlockType( - joinKind == EJoinKind::Inner ? rightStreamItems[i] - : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i] - : pgmBuilder.NewOptionalType(rightStreamItems[i]), - TBlockType::EShape::Many - )); - } - } + return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)); +} - joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); +TRuntimeNode BuildBlockJoinsWithNodeMultipleUsage(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TRuntimeNode leftList, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, + TRuntimeNode rightList, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny +) { + Y_ENSURE(joinKind == EJoinKind::Inner); + Y_ENSURE(!rightAny); + Y_ENSURE(leftKeyDrops.empty() && rightKeyDrops.empty()); + + const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + const auto leftStream2 = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + const auto leftStream3 = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + + const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList)); + + const auto joinReturnType = MakeJoinType(pgmBuilder, + joinKind, + leftStream.GetStaticType(), + leftKeyDrops, + rightStream.GetStaticType(), + rightKeyDrops + ); + + auto rightBlockStorageNode = pgmBuilder.BlockStorage(rightStream, pgmBuilder.NewResourceType(BlockStorageResourcePrefix)); + auto rightBlockIndexNode = pgmBuilder.BlockMapJoinIndex( + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + rightKeyColumns, + rightAny, + pgmBuilder.NewResourceType(BlockMapJoinIndexResourcePrefix) + ); - TType* joinReturnType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); auto joinNode = pgmBuilder.BlockMapJoinCore( leftStream, - rightStream, - joinKind, + rightBlockIndexNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Inner, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, - rightAny, joinReturnType ); - return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)); + auto joinNode2 = pgmBuilder.BlockMapJoinCore( + leftStream2, + rightBlockIndexNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Inner, + leftKeyColumns, + leftKeyDrops, + rightKeyColumns, + rightKeyDrops, + joinReturnType + ); + + auto joinNode3 = pgmBuilder.BlockMapJoinCore( + leftStream3, + rightBlockStorageNode, + AS_TYPE(TStreamType, rightStream.GetStaticType())->GetItemType(), + EJoinKind::Cross, + {}, + {}, + {}, + {}, + joinReturnType + ); + + return pgmBuilder.OrderedExtend({ + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)), + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode2)), + FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode3)) + }); } +template<auto BuildBlockJoinFunc> NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny, @@ -112,7 +174,7 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType)); TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType)); - const auto joinNode = BuildBlockJoin(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny); + const auto joinNode = BuildBlockJoinFunc(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny); const auto joinType = joinNode.GetStaticType(); Y_ENSURE(joinType->IsList(), "Join result has to be list"); @@ -137,6 +199,7 @@ NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue()); } +template<auto BuildBlockJoinFunc = BuildBlockJoin> void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, TType* expectedType, const NUdf::TUnboxedValue& expected, TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, @@ -146,7 +209,7 @@ void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, ) { const size_t testSize = leftListValue.GetListLength(); for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) { - const auto got = DoTestBlockJoin(setup, + const auto got = DoTestBlockJoin<BuildBlockJoinFunc>(setup, leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops, rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny, joinKind, blockSize, scalar @@ -1323,5 +1386,84 @@ Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestCross) { } // Y_UNIT_TEST_SUITE +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestNodeMultipleUsage) { + constexpr size_t testSize = 1 << 7; + constexpr size_t valueSize = 3; + static const TVector<TString> threeLetterValues = GenerateValues(valueSize); + static const TSet<ui64> fibonacci = GenerateFibonacci(testSize); + static const TString hugeString(128, '1'); + + Y_UNIT_TEST(TestBasic) { + TSetup<false> setup(GetNodeFactory()); + + // 1. Make input for the "left" stream. + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(key); }); + + // 3. Make "expected" data. + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<ui64> expectedRightKey; + TVector<TString> expectedRightValue; + + TMap<ui64, TString> rightMap; + for (size_t i = 0; i < rightKeyInit.size(); i++) { + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + + // Two inner joins + for (size_t join = 0; join < 2; join++) { + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& found = rightMap.find(leftKeyInit[i]); + if (found != rightMap.cend()) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey.push_back(found->first); + expectedRightValue.push_back(found->second); + } + } + } + + // Cross join + for (size_t i = 0; i < leftKeyInit.size(); i++) { + for (size_t j = 0; j < rightKeyInit.size(); j++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey.push_back(rightKeyInit[j]); + expectedRightValue.push_back(rightValueInit[j]); + } + } + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue); + + RunTestBlockJoin<BuildBlockJoinsWithNodeMultipleUsage>(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {} + ); + } + +} // Y_UNIT_TEST_SUITE + } // namespace NMiniKQL } // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp index fdd65bbba2..2dbeb2dbac 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp @@ -153,6 +153,43 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool sc return pgmBuilder.NewTupleType(blockItemTypes); } +TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TType* leftStreamType, const TVector<ui32>& leftKeyDrops, + TType* rightStreamType, const TVector<ui32>& rightKeyDrops +) { + const auto leftStreamItems = ValidateBlockStreamType(leftStreamType); + const auto rightStreamItems = ValidateBlockStreamType(rightStreamType); + + TVector<TType*> joinReturnItems; + + const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); + for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size + if (leftKeyDropsSet.contains(i)) { + continue; + } + joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many)); + } + + if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) { + const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend()); + for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size + if (rightKeyDropsSet.contains(i)) { + continue; + } + + joinReturnItems.push_back(pgmBuilder.NewBlockType( + joinKind == EJoinKind::Inner ? rightStreamItems[i] + : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i] + : pgmBuilder.NewOptionalType(rightStreamItems[i]), + TBlockType::EShape::Many + )); + } + } + + joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + return pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); +} + NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values ) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h index ecff426c92..5d4642a87c 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h @@ -10,6 +10,10 @@ inline bool IsOptionalOrNull(const TType* type) { } TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar); +TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TType* leftStreamType, const TVector<ui32>& leftKeyDrops, + TType* rightStreamType, const TVector<ui32>& rightKeyDrops +); NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp index 717eb9ec32..b139c24058 100644 --- a/yql/essentials/minikql/mkql_program_builder.cpp +++ b/yql/essentials/minikql/mkql_program_builder.cpp @@ -321,6 +321,11 @@ void EnsureDataOrOptionalOfData(TRuntimeNode node) { ->GetItemType()->IsData(), "Expected data or optional of data"); } +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap) { + const auto wideComponents = AS_TYPE(TMultiType, type)->GetElements(); + return ValidateBlockItems(wideComponents, unwrap); +} + std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap) { const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType)); return ValidateBlockItems(wideComponents, unwrap); @@ -6009,29 +6014,82 @@ TRuntimeNode TProgramBuilder::ScalarApply(const TArrayRef<const TRuntimeNode>& a return TRuntimeNode(builder.Build(), false); } -TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, +TRuntimeNode TProgramBuilder::BlockStorage(TRuntimeNode stream, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + ValidateBlockStreamType(stream.GetStaticType()); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(stream); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType) { + if constexpr (RuntimeVersion < 59U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + MKQL_ENSURE(blockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto blockStorageType = AS_TYPE(TResourceType, blockStorage.GetStaticType()); + MKQL_ENSURE(blockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + ValidateBlockType(streamItemType); + + MKQL_ENSURE(returnType->IsResource(), "Expected Resource as a result type"); + auto returnResourceType = AS_TYPE(TResourceType, returnType); + MKQL_ENSURE(returnResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + TRuntimeNode::TList keyColumnsNodes; + keyColumnsNodes.reserve(keyColumns.size()); + std::transform(keyColumns.cbegin(), keyColumns.cend(), + std::back_inserter(keyColumnsNodes), [this](const ui32 idx) { + return NewDataLiteral(idx); + }); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(blockStorage); + callableBuilder.Add(TRuntimeNode(streamItemType, true)); + callableBuilder.Add(NewTuple(keyColumnsNodes)); + callableBuilder.Add(NewDataLiteral((bool)any)); + + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType ) { - if constexpr (RuntimeVersion < 53U) { + if constexpr (RuntimeVersion < 59U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; } - if (RuntimeVersion < 57U && joinKind == EJoinKind::Cross) { - THROW yexception() << __func__ << " does not support cross join in runtime version (" << RuntimeVersion << ")"; + + MKQL_ENSURE(rightBlockStorage.GetStaticType()->IsResource(), "Expected Resource as an input type"); + auto rightBlockStorageType = AS_TYPE(TResourceType, rightBlockStorage.GetStaticType()); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); } MKQL_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); - if (joinKind == EJoinKind::Cross) { - MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); - } else { + if (joinKind != EJoinKind::Cross) { MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); + } else { + MKQL_ENSURE(leftKeyColumns.empty(), "Specifying key columns is not allowed for cross join"); } ValidateBlockStreamType(leftStream.GetStaticType()); - ValidateBlockStreamType(rightStream.GetStaticType()); + ValidateBlockType(rightStreamItemType); ValidateBlockStreamType(returnType); TRuntimeNode::TList leftKeyColumnsNodes; @@ -6064,13 +6122,13 @@ TRuntimeNode TProgramBuilder::BlockMapJoinCore(TRuntimeNode leftStream, TRuntime TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftStream); - callableBuilder.Add(rightStream); + callableBuilder.Add(rightBlockStorage); + callableBuilder.Add(TRuntimeNode(rightStreamItemType, true)); callableBuilder.Add(NewDataLiteral((ui32)joinKind)); callableBuilder.Add(NewTuple(leftKeyColumnsNodes)); callableBuilder.Add(NewTuple(leftKeyDropsNodes)); callableBuilder.Add(NewTuple(rightKeyColumnsNodes)); callableBuilder.Add(NewTuple(rightKeyDropsNodes)); - callableBuilder.Add(NewDataLiteral((bool)rightAny)); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h index b67d80861d..01c0223293 100644 --- a/yql/essentials/minikql/mkql_program_builder.h +++ b/yql/essentials/minikql/mkql_program_builder.h @@ -17,6 +17,9 @@ class TBuiltinFunctionRegistry; constexpr std::string_view RandomMTResource = "MTRand"; constexpr std::string_view ResourceQueuePrefix = "TResourceQueue:"; +constexpr std::string_view BlockStorageResourcePrefix = "TBlockStorage:"; +constexpr std::string_view BlockMapJoinIndexResourcePrefix = "TBlockMapJoinIndex:"; +constexpr std::string_view BlockMapJoinIndexResourceSeparator = ":$YqlKeyColumns:"; enum class EJoinKind { Min = 1, @@ -133,6 +136,7 @@ struct TAggInfo { std::vector<ui32> ArgsColumns; }; +std::vector<TType*> ValidateBlockType(const TType* type, bool unwrap = true); std::vector<TType*> ValidateBlockStreamType(const TType* streamType, bool unwrap = true); std::vector<TType*> ValidateBlockFlowType(const TType* flowType, bool unwrap = true); @@ -258,9 +262,12 @@ public: TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType); TRuntimeNode BlockPgResolvedCall(const std::string_view& name, ui32 id, const TArrayRef<const TRuntimeNode>& args, TType* returnType); - TRuntimeNode BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, + TRuntimeNode BlockStorage(TRuntimeNode stream, TType* returnType); + TRuntimeNode BlockMapJoinIndex(TRuntimeNode blockStorage, TType* streamItemType, const TArrayRef<const ui32>& keyColumns, bool any, TType* returnType); + TRuntimeNode BlockMapJoinCore(TRuntimeNode leftStream, TRuntimeNode rightBlockStorage, TType* rightStreamItemType, EJoinKind joinKind, const TArrayRef<const ui32>& leftKeyColumns, const TArrayRef<const ui32>& leftKeyDrops, - const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, bool rightAny, TType* returnType); + const TArrayRef<const ui32>& rightKeyColumns, const TArrayRef<const ui32>& rightKeyDrops, TType* returnType + ); //-- logical functions TRuntimeNode BlockNot(TRuntimeNode data); diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h index 8782dd1942..d0dc62b939 100644 --- a/yql/essentials/minikql/mkql_runtime_version.h +++ b/yql/essentials/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 58U +#define MKQL_RUNTIME_VERSION 59U #endif // History: diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp index 5d7e22fa2f..123b59cc04 100644 --- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp +++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp @@ -1672,24 +1672,44 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.MapJoinCore(list, dict, joinKind, leftKeyColumns, leftRenames, rightRenames, returnType); }); + AddCallable("BlockStorage", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto stream = MkqlBuildExpr(node.Head(), ctx); + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockStorage(stream, returnType); + }); + + AddCallable("BlockMapJoinIndex", [](const TExprNode& node, TMkqlBuildContext& ctx) { + const auto blockStorage = MkqlBuildExpr(node.Head(), ctx); + const auto itemType = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>(); + + std::vector<ui32> keyColumns; + node.Child(2)->ForEachChild([&](const TExprNode& child){ keyColumns.emplace_back(*GetWideBlockFieldPosition(*itemType, child.Content())); }); + + const bool any = HasSetting(node.Tail(), "any"); + + const auto itemMkqlType = BuildType(*node.Child(1), *itemType, ctx.ProgramBuilder); + const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockMapJoinIndex(blockStorage, itemMkqlType, keyColumns, any, returnType); + }); + AddCallable("BlockMapJoinCore", [](const TExprNode& node, TMkqlBuildContext& ctx) { const auto leftStream = MkqlBuildExpr(node.Head(), ctx); - const auto rightStream = MkqlBuildExpr(*node.Child(1), ctx); - const auto joinKind = GetJoinKind(node, node.Child(2)->Content()); + const auto rightBlockStorage = MkqlBuildExpr(*node.Child(1), ctx); const auto leftItemType = node.Head().GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); - const auto rightItemType = node.Child(1U)->GetTypeAnn()->Cast<TStreamExprType>()->GetItemType()->Cast<TMultiExprType>(); + const auto rightItemType = node.Child(2)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TMultiExprType>(); - std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops; - node.Child(3)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); - node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); - node.Child(5)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); - node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + const auto joinKind = GetJoinKind(node, node.Child(3)->Content()); - bool rightAny = HasSetting(node.Tail(), "rightAny"); + std::vector<ui32> leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops; + node.Child(4)->ForEachChild([&](const TExprNode& child){ leftKeyColumns.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(5)->ForEachChild([&](const TExprNode& child){ leftKeyDrops.emplace_back(*GetWideBlockFieldPosition(*leftItemType, child.Content())); }); + node.Child(6)->ForEachChild([&](const TExprNode& child){ rightKeyColumns.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + node.Child(7)->ForEachChild([&](const TExprNode& child){ rightKeyDrops.emplace_back(*GetWideBlockFieldPosition(*rightItemType, child.Content())); }); + const auto rightItemMkqlType = BuildType(*node.Child(2), *rightItemType, ctx.ProgramBuilder); const auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); - return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightStream, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, rightAny, returnType); + return ctx.ProgramBuilder.BlockMapJoinCore(leftStream, rightBlockStorage, rightItemMkqlType, joinKind, leftKeyColumns, leftKeyDrops, rightKeyColumns, rightKeyDrops, returnType); }); AddCallable({"GraceJoinCore", "GraceSelfJoinCore"}, [](const TExprNode& node, TMkqlBuildContext& ctx) { diff --git a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json index f48bcd05bc..9da54766c1 100644 --- a/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json +++ b/yql/essentials/tests/s-expressions/minirun/part7/canondata/result.json @@ -1,9 +1,9 @@ { "test.test[Blocks-BlockMapJoinCore-default.txt-Debug]": [ { - "checksum": "78ca432550e55028e5a9754d3698bcb4", - "size": 1864, - "uri": "https://{canondata_backend}/1900335/faec43b5ddae0d95b63fd9929fbf0ba6117189f1/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" + "checksum": "445a5d7f7918455fc74eefafa41b519d", + "size": 2187, + "uri": "https://{canondata_backend}/1871182/dd0654c5ee04d3c397f5f6c37ba02b2ae503ed1f/resource.tar.gz#test.test_Blocks-BlockMapJoinCore-default.txt-Debug_/opt.yql" } ], "test.test[Blocks-BlockMapJoinCore-default.txt-Results]": [ diff --git a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls index 8a6ca3b84c..35154e67ac 100644 --- a/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls +++ b/yql/essentials/tests/s-expressions/suites/Blocks/BlockMapJoinCore.yqls @@ -15,7 +15,11 @@ (let narrowLambdaLeftSemi (lambda '(item1 item2) (AsStruct '('"asubkey" item1) '('"avalue" item2)))) (let doJoin (lambda '(left right narrowMapLambda joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops) (block '( - (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore (WideToBlocks (FromFlow (ExpandMap left expandLambda))) (WideToBlocks (FromFlow (ExpandMap right expandLambda))) joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops '()))) narrowMapLambda))) + (let leftStream (WideToBlocks (FromFlow (ExpandMap left expandLambda)))) + (let rightStream (WideToBlocks (FromFlow (ExpandMap right expandLambda)))) + (let rightStreamItemType (StreamItemType (TypeOf rightStream))) + (let rightBlockIndex (BlockMapJoinIndex (BlockStorage rightStream) rightStreamItemType rightKeyColumns '())) + (return (Collect (NarrowMap (ToFlow (WideFromBlocks (BlockMapJoinCore leftStream rightBlockIndex rightStreamItemType joinKind leftKeyColumns leftKeyDrops rightKeyColumns rightKeyDrops))) narrowMapLambda))) )))) (let innerJoin (Apply doJoin (ToFlow table (DependsOn (String '0))) (ToFlow table (DependsOn (String '1))) narrowLambdaInner 'Inner '('0) '() '('0) '())) diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp index 826247cb57..53216bd3f8 100644 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -203,16 +203,11 @@ static bool IsRetriableChunkError(const TSet<int>& codes) static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config) { - int httpCode = errorResponse.GetHttpCode(); - if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) { - return config->RetryInterval; - } - auto allCodes = errorResponse.GetError().GetAllErrorCodes(); using namespace NClusterErrorCodes; - if (httpCode == 429 - || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded) - || allCodes.count(NRpc::RequestQueueSizeLimitExceeded)) + + if (allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded) || + allCodes.count(NRpc::RequestQueueSizeLimitExceeded)) { // request rate limit exceeded return config->RateLimitExceededRetryInterval; diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index 765a96f042..4bddeab86d 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -768,21 +768,21 @@ THttpResponse::THttpResponse( ErrorResponse_ = TErrorResponse(HttpCode_, Context_.RequestId); - auto logAndSetError = [&] (const TString& rawError) { + auto logAndSetError = [&] (int code, const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", Context_.RequestId, HttpCode_, rawError.data()); - ErrorResponse_->SetRawError(rawError); + ErrorResponse_->SetError(TYtError(code, rawError)); }; switch (HttpCode_) { case 429: - logAndSetError("request rate limit exceeded"); + logAndSetError(NClusterErrorCodes::NSecurityClient::RequestQueueSizeLimitExceeded, "request rate limit exceeded"); break; case 500: - logAndSetError(::TStringBuilder() << "internal error in proxy " << Context_.HostName); + logAndSetError(NClusterErrorCodes::NRpc::Unavailable, ::TStringBuilder() << "internal error in proxy " << Context_.HostName); break; default: { @@ -803,6 +803,9 @@ THttpResponse::THttpResponse( if (auto parsedResponse = ParseError(HttpInput_->Headers())) { ErrorResponse_ = parsedResponse.GetRef(); + if (HttpCode_ == 503) { + ExtendGenericError(*ErrorResponse_, NClusterErrorCodes::NBus::TransportError, "transport error"); + } } else { ErrorResponse_->SetRawError( errorString + " - X-YT-Error is missing in headers"); diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 7e9d761c3c..34be58daab 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -7,6 +7,7 @@ #include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/interface/error_codes.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> #include <yt/yt/core/concurrency/thread_pool_poller.h> @@ -42,21 +43,22 @@ TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& TErrorResponse errorResponse(static_cast<int>(httpCode), requestId); - auto logAndSetError = [&] (const TString& rawError) { + auto logAndSetError = [&] (int code, const TString& rawError) { YT_LOG_ERROR("RSP %v - HTTP %v - %v", requestId, httpCode, rawError.data()); - errorResponse.SetRawError(rawError); + errorResponse.SetError(TYtError(code, rawError)); }; + switch (httpCode) { case NHttp::EStatusCode::TooManyRequests: - logAndSetError("request rate limit exceeded"); + logAndSetError(NClusterErrorCodes::NSecurityClient::RequestQueueSizeLimitExceeded, "request rate limit exceeded"); break; case NHttp::EStatusCode::InternalServerError: - logAndSetError("internal error in proxy " + hostName); + logAndSetError(NClusterErrorCodes::NRpc::Unavailable, "internal error in proxy " + hostName); break; default: { @@ -80,6 +82,9 @@ TMaybe<TErrorResponse> GetErrorResponse(const TString& hostName, const TString& if (errorResponse.IsOk()) { return Nothing(); } + if (httpCode == NHttp::EStatusCode::ServiceUnavailable) { + ExtendGenericError(errorResponse, NClusterErrorCodes::NBus::TransportError, "transport error"); + } return errorResponse; } diff --git a/yt/cpp/mapreduce/interface/errors.cpp b/yt/cpp/mapreduce/interface/errors.cpp index 42c7466dcb..fcc23fb3ac 100644 --- a/yt/cpp/mapreduce/interface/errors.cpp +++ b/yt/cpp/mapreduce/interface/errors.cpp @@ -332,7 +332,7 @@ bool TErrorResponse::IsFromTrailers() const bool TErrorResponse::IsTransportError() const { - return HttpCode_ == 503; + return Error_.ContainsErrorCode(NClusterErrorCodes::NBus::TransportError); } TString TErrorResponse::GetRequestId() const @@ -355,6 +355,22 @@ bool TErrorResponse::IsAccessDenied() const return Error_.ContainsErrorCode(NClusterErrorCodes::NSecurityClient::AuthorizationError); } +bool TErrorResponse::IsUnauthorized() const +{ + const auto allCodes = Error_.GetAllErrorCodes(); + for (auto code : { + NClusterErrorCodes::NRpc::AuthenticationError, + NClusterErrorCodes::NRpc::InvalidCsrfToken, + NClusterErrorCodes::NRpc::InvalidCredentials, + NClusterErrorCodes::NSecurityClient::AuthenticationError, + }) { + if (allCodes.contains(code)) { + return true; + } + } + return false; +} + bool TErrorResponse::IsConcurrentTransactionLockConflict() const { return Error_.ContainsErrorCode(NClusterErrorCodes::NCypressClient::ConcurrentTransactionLockConflict); diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h index 749b294f0f..a008ff07b8 100644 --- a/yt/cpp/mapreduce/interface/errors.h +++ b/yt/cpp/mapreduce/interface/errors.h @@ -182,6 +182,9 @@ public: /// Check if error was caused by lack of permissions to execute request. bool IsAccessDenied() const; + /// Check if error was caused by authorization issues. + bool IsUnauthorized() const; + /// Check if error was caused by failure to lock object because of another transaction is holding lock. bool IsConcurrentTransactionLockConflict() const; diff --git a/yt/yql/providers/yt/codec/yt_codec.h b/yt/yql/providers/yt/codec/yt_codec.h index f7ea0e0b78..4e6ef543a5 100644 --- a/yt/yql/providers/yt/codec/yt_codec.h +++ b/yt/yql/providers/yt/codec/yt_codec.h @@ -133,6 +133,10 @@ public: UseBlockOutput_ = true; } + void SetIsTableContent() { + IsTableContent_ = true; + } + void SetTableOffsets(const TVector<ui64>& offsets); void Clear(); @@ -148,6 +152,7 @@ public: bool UseSkiff_ = false; bool UseBlockInput_ = false; bool UseBlockOutput_ = false; + bool IsTableContent_ = false; TString OptLLVM_; TSystemFields SystemFields_; diff --git a/yt/yql/providers/yt/codec/yt_codec_io.cpp b/yt/yql/providers/yt/codec/yt_codec_io.cpp index 82b1a3e2f8..459602d9cf 100644 --- a/yt/yql/providers/yt/codec/yt_codec_io.cpp +++ b/yt/yql/providers/yt/codec/yt_codec_io.cpp @@ -1536,7 +1536,7 @@ public: YQL_ENSURE(inputFields.size() == ColumnConverters_.size()); auto rowIndices = batch->GetColumnByName("$row_index"); - YQL_ENSURE(rowIndices || decoder.Dynamic); + YQL_ENSURE(rowIndices || decoder.Dynamic || Specs_.IsTableContent_); arrow::compute::ExecContext execContext(Pool_); std::vector<arrow::Datum> convertedBatch; diff --git a/yt/yql/providers/yt/common/yql_yt_settings.cpp b/yt/yql/providers/yt/common/yql_yt_settings.cpp index aee7b92e65..03443e6409 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.cpp +++ b/yt/yql/providers/yt/common/yql_yt_settings.cpp @@ -524,6 +524,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, MaxColumnGroups); REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount); REGISTER_SETTING(*this, JobBlockInput); + REGISTER_SETTING(*this, JobBlockTableContent); REGISTER_SETTING(*this, JobBlockOutput).Parser([](const TString& v) { return FromString<EBlockOutputMode>(v); }); REGISTER_SETTING(*this, _EnableYtDqProcessWriteConstraints); REGISTER_SETTING(*this, CompactForDistinct); diff --git a/yt/yql/providers/yt/common/yql_yt_settings.h b/yt/yql/providers/yt/common/yql_yt_settings.h index 85cd91402a..88b12afff2 100644 --- a/yt/yql/providers/yt/common/yql_yt_settings.h +++ b/yt/yql/providers/yt/common/yql_yt_settings.h @@ -294,6 +294,7 @@ struct TYtSettings { NCommon::TConfSetting<ui16, false> MaxColumnGroups; NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount; NCommon::TConfSetting<bool, false> JobBlockInput; + NCommon::TConfSetting<bool, false> JobBlockTableContent; NCommon::TConfSetting<TSet<TString>, false> JobBlockInputSupportedTypes; NCommon::TConfSetting<TSet<NUdf::EDataSlot>, false> JobBlockInputSupportedDataTypes; NCommon::TConfSetting<EBlockOutputMode, false> JobBlockOutput; diff --git a/yt/yql/providers/yt/comp_nodes/ya.make.inc b/yt/yql/providers/yt/comp_nodes/ya.make.inc index b0a54d85a4..2e5ae696d4 100644 --- a/yt/yql/providers/yt/comp_nodes/ya.make.inc +++ b/yt/yql/providers/yt/comp_nodes/ya.make.inc @@ -4,11 +4,13 @@ INCLUDE(${ARCADIA_ROOT}/yql/essentials/minikql/invoke_builtins/header.ya.make.in SET(ORIG_SRC_DIR ${ARCADIA_ROOT}/yt/yql/providers/yt/comp_nodes) SET(ORIG_SOURCES + yql_mkql_file_block_stream.cpp yql_mkql_file_input_state.cpp yql_mkql_file_list.cpp yql_mkql_input_stream.cpp yql_mkql_input.cpp yql_mkql_output.cpp + yql_mkql_block_table_content.cpp yql_mkql_table_content.cpp yql_mkql_table.cpp yql_mkql_ungrouping_list.cpp diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp new file mode 100644 index 0000000000..d935da2004 --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.cpp @@ -0,0 +1,73 @@ +#include "yql_mkql_block_table_content.h" +#include "yql_mkql_file_block_stream.h" + +#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> +#include <yql/essentials/minikql/mkql_node_cast.h> +#include <yql/essentials/minikql/defs.h> + +#include <yql/essentials/public/udf/udf_value.h> + +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <util/generic/size_literals.h> + +namespace NYql { + +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; + +class TYtBlockTableContentWrapper : public TMutableComputationNode<TYtBlockTableContentWrapper> { + typedef TMutableComputationNode<TYtBlockTableContentWrapper> TBaseComputation; +public: + TYtBlockTableContentWrapper(TComputationMutables& mutables, NCommon::TCodecContext& codecCtx, + TVector<TString>&& files, const TString& inputSpec, TStructType* origStructType, bool decompress, std::optional<ui64> expectedRowCount) + : TBaseComputation(mutables) + , Files_(std::move(files)) + , Decompress_(decompress) + , ExpectedRowCount_(std::move(expectedRowCount)) + { + Spec_.SetUseBlockInput(); + Spec_.SetIsTableContent(); + Spec_.Init(codecCtx, inputSpec, {}, {}, origStructType, {}, TString()); + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TFileWideBlockStreamValue>(Spec_, ctx.HolderFactory, Files_, Decompress_, 4, 1_MB, ExpectedRowCount_); + } + +private: + void RegisterDependencies() const final {} + + TMkqlIOSpecs Spec_; + TVector<TString> Files_; + const bool Decompress_; + const std::optional<ui64> ExpectedRowCount_; +}; + +IComputationNode* WrapYtBlockTableContent(NCommon::TCodecContext& codecCtx, + TComputationMutables& mutables, TCallable& callable, TStringBuf pathPrefix) +{ + MKQL_ENSURE(callable.GetInputsCount() == 6, "Expected 6 arguments"); + TString uniqueId(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); + auto origStructType = AS_TYPE(TStructType, AS_VALUE(TTypeType, callable.GetInput(1))); + const ui32 tablesCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui32>(); + TString inputSpec(AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().AsStringRef()); + const bool decompress = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<bool>(); + + std::optional<ui64> length; + TTupleLiteral* lengthTuple = AS_VALUE(TTupleLiteral, callable.GetInput(5)); + if (lengthTuple->GetValuesCount() > 0) { + MKQL_ENSURE(lengthTuple->GetValuesCount() == 1, "Expect 1 element in the length tuple"); + length = AS_VALUE(TDataLiteral, lengthTuple->GetValue(0))->AsValue().Get<ui64>(); + } + + TVector<TString> files; + for (ui32 index = 0; index < tablesCount; ++index) { + files.push_back(TStringBuilder() << pathPrefix << uniqueId << '_' << index); + } + + return new TYtBlockTableContentWrapper(mutables, codecCtx, std::move(files), inputSpec, + origStructType, decompress, length); +} + +} // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h new file mode 100644 index 0000000000..444603406e --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h @@ -0,0 +1,18 @@ +#pragma once + +#include <yql/essentials/providers/common/codec/yql_codec.h> + +#include <yql/essentials/minikql/mkql_node.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> + +#include <util/generic/string.h> +#include <util/generic/strbuf.h> + +namespace NYql { + +NKikimr::NMiniKQL::IComputationNode* WrapYtBlockTableContent( + NYql::NCommon::TCodecContext& codecCtx, + NKikimr::NMiniKQL::TComputationMutables& mutables, + NKikimr::NMiniKQL::TCallable& callable, TStringBuf pathPrefix); + +} // NYql diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp new file mode 100644 index 0000000000..3253c3e1b1 --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.cpp @@ -0,0 +1,50 @@ +#include "yql_mkql_file_block_stream.h" +#include "yql_mkql_file_input_state.h" + +namespace NYql { + +using namespace NKikimr::NMiniKQL; + +TFileWideBlockStreamValue::TFileWideBlockStreamValue( + TMemoryUsageInfo* memInfo, + const TMkqlIOSpecs& spec, + const THolderFactory& holderFactory, + const TVector<TString>& filePaths, + bool decompress, + size_t blockCount, + size_t blockSize, + std::optional<ui64> expectedRowCount +) + : TComputationValue(memInfo) + , Spec_(spec) + , HolderFactory_(holderFactory) + , FilePaths_(filePaths) + , Decompress_(decompress) + , BlockCount_(blockCount) + , BlockSize_(blockSize) + , ExpectedRowCount_(expectedRowCount) +{ + State_ = MakeHolder<TFileInputState>(Spec_, HolderFactory_, MakeMkqlFileInputs(FilePaths_, Decompress_), BlockCount_, BlockSize_); +} + +NUdf::EFetchStatus TFileWideBlockStreamValue::WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + if (!AtStart_) { + State_->Next(); + } + AtStart_ = false; + if (!State_->IsValid()) { + MKQL_ENSURE(!ExpectedRowCount_ || *ExpectedRowCount_ == State_->GetRecordIndex(), "Invalid file row count"); + return NUdf::EFetchStatus::Finish; + } + + auto elements = State_->GetCurrent().GetElements(); + for (ui32 i = 0; i < width; i++) { + if (auto out = output++) { + *out = elements[i]; + } + } + + return NUdf::EFetchStatus::Ok; +} + +} diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h new file mode 100644 index 0000000000..f38771c24f --- /dev/null +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_block_stream.h @@ -0,0 +1,43 @@ +#pragma once + +#include "yql_mkql_file_input_state.h" + +#include <yt/yql/providers/yt/codec/yt_codec.h> +#include <yql/essentials/minikql/computation/mkql_computation_node.h> + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/generic/string.h> + +namespace NYql { + +class TFileWideBlockStreamValue : public NKikimr::NMiniKQL::TComputationValue<TFileWideBlockStreamValue> { +public: + TFileWideBlockStreamValue( + NKikimr::NMiniKQL::TMemoryUsageInfo* memInfo, + const TMkqlIOSpecs& spec, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const TVector<TString>& filePaths, + bool decompress, + size_t blockCount, + size_t blockSize, + std::optional<ui64> expectedRowCount + ); + +private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width); + +private: + const TMkqlIOSpecs& Spec_; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; + const TVector<TString> FilePaths_; + const bool Decompress_; + const size_t BlockCount_; + const size_t BlockSize_; + const std::optional<ui64> ExpectedRowCount_; + + bool AtStart_ = true; + THolder<TFileInputState> State_; +}; + +} diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp index 46b2067d5e..d814246f76 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.cpp @@ -1,5 +1,6 @@ #include "yql_mkql_file_input_state.h" +#include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/utils/yql_panic.h> #include <util/system/fs.h> @@ -55,7 +56,13 @@ bool TFileInputState::NextValue() { } MkqlReader_.Next(); - ++CurrentRecord_; + if (Spec_->UseBlockInput_) { + auto blockCountValue = CurrentValue_.GetElement(Spec_->Inputs[CurrentInput_]->StructSize); + CurrentRecord_ += GetBlockCount(blockCountValue); + } else { + ++CurrentRecord_; + } + return true; } } diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h index 64db03232d..1762e56d2f 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_file_input_state.h @@ -35,7 +35,6 @@ public: OnNextBlockCallback_ = std::move(cb); } -protected: virtual bool IsValid() const override { return Valid_; } @@ -50,6 +49,7 @@ protected: virtual TString DebugInfo() const override; +protected: void Finish() { MkqlReader_.Finish(); } diff --git a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp index 4dc5e57243..8914cac7bd 100644 --- a/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp +++ b/yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.cpp @@ -33,6 +33,7 @@ public: if (useSkiff) { Spec_.SetUseSkiff(optLLVM); } + Spec_.SetIsTableContent(); Spec_.Init(codecCtx, inputSpec, {}, {}, AS_TYPE(TListType, listType)->GetItemType(), {}, TString()); } diff --git a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json index d7ff47f544..d881c9f565 100644 --- a/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json +++ b/yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json @@ -204,6 +204,15 @@ ] }, { + "Name": "TYtBlockTableContent", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "YtBlockTableContent"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"} + ] + }, + { "Name": "TYtLength", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "YtLength"}, diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp index 78af8c3d70..36f936b17a 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_comp_nodes.cpp @@ -129,6 +129,9 @@ public: , Length_(std::move(length)) { Spec_.Init(codecCtx, inputSpecs, groups, tableNames, itemType, auxColumns, NYT::TNode()); + if constexpr (TableContent) { + Spec_.SetIsTableContent(); + } if (!rowOffsets.empty()) { Spec_.SetTableOffsets(rowOffsets); } diff --git a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index 2d805612f9..89525fcbc5 100644 --- a/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/yt/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -596,6 +596,56 @@ void RegisterYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { return values; }); + compiler.OverrideCallable(TYtBlockTableContent::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + TYtBlockTableContent tableContent(&node); + + auto origItemStructType = ( + tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back() + )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TMaybe<ui64> itemsCount; + if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) { + itemsCount = FromString<ui64>(setting->Child(1)->Content()); + } + const auto itemType = NCommon::BuildType(node, *origItemStructType, ctx.ProgramBuilder); + TRuntimeNode values; + if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) { + auto read = maybeRead.Cast(); + + const bool hasRangesOrSampling = AnyOf(read.Input(), [](const TYtSection& s) { + return NYql::HasSetting(s.Settings().Ref(), EYtSettingType::Sample) + || AnyOf(s.Paths(), [](const TYtPath& p) { return !p.Ranges().Maybe<TCoVoid>(); }); + }); + if (hasRangesOrSampling) { + itemsCount.Clear(); + } + + const bool forceKeyColumns = HasRangesWithKeyColumns(read.Input().Ref()); + values = BuildTableContentCall( + TYtTableContent::CallableName(), + itemType, + read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true, THashSet<TString>{"num", "index"}, forceKeyColumns); + values = ApplyPathRangesAndSampling(values, itemType, read.Input().Ref(), ctx); + } else { + auto output = tableContent.Input().Cast<TYtOutput>(); + values = BuildTableContentCall( + TYtTableContent::CallableName(), + itemType, + GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true); + } + + return ctx.ProgramBuilder.WideToBlocks(ctx.ProgramBuilder.FromFlow(ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(values), [&](TRuntimeNode item) -> TRuntimeNode::TList { + TRuntimeNode::TList result; + for (auto& origItem : origItemStructType->GetItems()) { + result.push_back(ctx.ProgramBuilder.Member(item, origItem->GetName())); + } + return result; + }))); + }); + compiler.AddCallable({TYtSort::CallableName(), TYtCopy::CallableName(), TYtMerge::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp index 6feb3542dc..cc9981ccf1 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp @@ -6,6 +6,7 @@ #include <yql/essentials/core/yql_opt_utils.h> #include <yt/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h> +#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h> #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h> @@ -74,6 +75,11 @@ NKikimr::NMiniKQL::TComputationNodeFactory GetGatewayNodeFactory(TCodecContext* return WrapYtTableContent(*codecCtx, ctx.Mutables, callable, "OFF" /* no LLVM for local exec */, filePrefix); } + if (callable.GetType()->GetName() == "YtBlockTableContentJob") { + YQL_ENSURE(codecCtx); + return WrapYtBlockTableContent(*codecCtx, ctx.Mutables, callable, filePrefix); + } + if (!exprContextObject) { exprContextObject = ctx.Mutables.CurValueIndex++; } diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp index d66943f868..f08309f4b9 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_transform.cpp @@ -74,14 +74,19 @@ TGatewayTransformer::TGatewayTransformer(const TExecContextBase& execCtx, TYtSet TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { auto name = internName.Str(); const bool small = name.SkipPrefix("Small"); - if (name == TYtTableContent::CallableName()) { + if (name == TYtTableContent::CallableName() || name == TYtBlockTableContent::CallableName()) { + bool useBlocks = (name == TYtBlockTableContent::CallableName()); *TableContentFlag_ = true; *RemoteExecutionFlag_ = *RemoteExecutionFlag_ || !small; if (EPhase::Content == Phase_ || EPhase::All == Phase_) { - return [&](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) { - YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + return [&, name, useBlocks](NMiniKQL::TCallable& callable, const TTypeEnvironment& env) { + if (useBlocks) { + YQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); + } else { + YQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + } const TString cluster(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); const TString& server = ExecCtx_.Clusters_->GetServer(cluster); @@ -90,7 +95,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { auto tx = entry->Tx; auto deliveryMode = ForceLocalTableContent_ ? ETableContentDeliveryMode::File : Settings_->TableContentDeliveryMode.Get(cluster).GetOrElse(ETableContentDeliveryMode::Native); - bool useSkiff = Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF); + bool useSkiff = !useBlocks && Settings_->TableContentUseSkiff.Get(cluster).GetOrElse(DEFAULT_USE_SKIFF); const bool ensureOldTypesOnly = !useSkiff; const ui64 maxChunksForNativeDelivery = Settings_->TableContentMaxChunksForNativeDelivery.Get().GetOrElse(1000ul); TString contentTmpFolder = ForceLocalTableContent_ ? TString() : Settings_->TableContentTmpFolder.Get(cluster).GetOrElse(TString()); @@ -116,6 +121,7 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { THashMap<TString, ui32> structColumns; if (useSkiff) { + YQL_ENSURE(!useBlocks); auto itemType = AS_TYPE(TListType, callable.GetType()->GetReturnType())->GetItemType(); TStructType* itemTypeStruct = AS_TYPE(TStructType, itemType); if (itemTypeStruct->GetMembersCount() == 0) { @@ -151,7 +157,10 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { refName = res.first->second; } tablesNode.Add(refName); - if (useSkiff) { + if (useBlocks) { + NYT::TNode formatNode("arrow"); + formats.push_back(formatNode); + } else if (useSkiff) { formats.push_back(SingleTableSpecToInputSkiff(specNode, structColumns, false, false, false)); } else { if (ensureOldTypesOnly && specNode.HasKey(YqlRowSpecAttribute)) { @@ -304,15 +313,24 @@ TCallableVisitFunc TGatewayTransformer::operator()(TInternName internName) { } TCallableBuilder call(env, - TStringBuilder() << TYtTableContent::CallableName() << TStringBuf("Job"), + TStringBuilder() << name << TStringBuf("Job"), callable.GetType()->GetReturnType()); + if (useBlocks) { + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); + call.Add(callable.GetInput(4)); // orig struct type + call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); + call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression + call.Add(callable.GetInput(3)); // length + } else { + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); + call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); + call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); + call.Add(PgmBuilder_.NewDataLiteral(useSkiff)); + call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression + call.Add(callable.GetInput(3)); // length + } - call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(uniqueId)); - call.Add(PgmBuilder_.NewDataLiteral(tableList->GetItemsCount())); - call.Add(PgmBuilder_.NewDataLiteral<NUdf::EDataSlot::String>(NYT::NodeToYsonString(specNode))); - call.Add(PgmBuilder_.NewDataLiteral(useSkiff)); - call.Add(PgmBuilder_.NewDataLiteral(ETableContentDeliveryMode::File == deliveryMode)); // use compression - call.Add(callable.GetInput(3)); // length return TRuntimeNode(call.Build(), false); }; } diff --git a/yt/yql/providers/yt/job/yql_job_factory.cpp b/yt/yql/providers/yt/job/yql_job_factory.cpp index 6ff4a02606..6b7cd4fd69 100644 --- a/yt/yql/providers/yt/job/yql_job_factory.cpp +++ b/yt/yql/providers/yt/job/yql_job_factory.cpp @@ -3,6 +3,7 @@ #include <yt/yql/providers/yt/comp_nodes/yql_mkql_input.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_output.h> #include <yt/yql/providers/yt/comp_nodes/yql_mkql_table_content.h> +#include <yt/yql/providers/yt/comp_nodes/yql_mkql_block_table_content.h> #include <yql/essentials/providers/common/comp_nodes/yql_factory.h> #include <yql/essentials/minikql/comp_nodes/mkql_factories.h> #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h> @@ -24,6 +25,9 @@ TComputationNodeFactory GetJobFactory(NYql::NCommon::TCodecContext& codecCtx, co if (name == "TableContent") { return WrapYtTableContent(codecCtx, ctx.Mutables, callable, optLLVM, {} /*empty pathPrefix inside job*/); } + if (name == "BlockTableContent") { + return WrapYtBlockTableContent(codecCtx, ctx.Mutables, callable, {} /*empty pathPrefix inside job*/); + } if (name == "Input") { YQL_ENSURE(reader); YQL_ENSURE(specs); diff --git a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp index f92f152c31..bda09645b4 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_input.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_input.cpp @@ -23,6 +23,7 @@ public: { #define HNDL(name) "YtBlockInput-"#name, Hndl(&TYtBlockInputTransformer::name) AddHandler(0, &TYtMap::Match, HNDL(TryTransformMap)); + AddHandler(0, &TYtTableContent::Match, HNDL(TryTransformTableContent)); #undef HNDL } @@ -71,6 +72,50 @@ private: return EnsureWideFlowType(map.Mapper().Args().Arg(0).Ref(), ctx); } + TMaybeNode<TExprBase> TryTransformTableContent(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { + auto tableContent = node.Cast<TYtTableContent>(); + if (!NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) { + return tableContent; + } + + const TParentsMap* parentsMap = getParents(); + if (auto it = parentsMap->find(tableContent.Raw()); it != parentsMap->end() && it->second.size() > 1) { + return tableContent; + } + + YQL_CLOG(INFO, ProviderYt) << "Rewrite YtTableContent with block input"; + + auto inputStructType = GetSeqItemType(tableContent.Ref().GetTypeAnn())->Cast<TStructExprType>(); + auto asStructBuilder = Build<TCoAsStruct>(ctx, tableContent.Pos()); + TExprNode::TListType narrowMapArgs; + for (auto& item : inputStructType->GetItems()) { + auto arg = ctx.NewArgument(tableContent.Pos(), item->GetName()); + asStructBuilder.Add<TCoNameValueTuple>() + .Name().Build(item->GetName()) + .Value(arg) + .Build(); + narrowMapArgs.push_back(std::move(arg)); + } + + auto settings = RemoveSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady, ctx); + return Build<TCoForwardList>(ctx, tableContent.Pos()) + .Stream<TCoNarrowMap>() + .Input<TCoToFlow>() + .Input<TCoWideFromBlocks>() + .Input<TYtBlockTableContent>() + .Input(tableContent.Input()) + .Settings(settings) + .Build() + .Build() + .Build() + .Lambda() + .Args(narrowMapArgs) + .Body(asStructBuilder.Done()) + .Build() + .Build() + .Done(); + } + private: const TYtState::TPtr State_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp index 2493b340f2..3a1e05d9aa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_filter.cpp @@ -23,6 +23,7 @@ public: #define HNDL(name) "YtBlockIOFilter-"#name, Hndl(&YtBlockIOFilterTransformer::name) AddHandler(0, &TYtMap::Match, HNDL(HandleMapInput)); AddHandler(0, &TYtMap::Match, HNDL(HandleMapOutput)); + AddHandler(0, &TYtTableContent::Match, HNDL(HandleTableContent)); #undef HNDL } @@ -112,11 +113,12 @@ private: } } + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES); auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES); auto lambdaInputType = map.Mapper().Args().Arg(0).Ref().GetTypeAnn(); - if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {})) { + if (!CheckBlockIOSupportedTypes(*lambdaInputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) { return false; } @@ -128,11 +130,12 @@ private: return false; } + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); auto supportedTypes = State_->Configuration->JobBlockOutputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_TYPES); auto supportedDataTypes = State_->Configuration->JobBlockOutputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_OUTPUT_SUPPORTED_DATA_TYPES); auto lambdaOutputType = map.Mapper().Ref().GetTypeAnn(); - if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, false)) { + if (!CheckBlockIOSupportedTypes(*lambdaOutputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit, false)) { return false; } @@ -150,6 +153,66 @@ private: } } + TMaybeNode<TExprBase> HandleTableContent(TExprBase node, TExprContext& ctx) const { + auto tableContent = node.Cast<TYtTableContent>(); + if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::BlockInputReady)) { + return tableContent; + } + + if (!State_->Configuration->JobBlockTableContent.Get().GetOrElse(Types->UseBlocks)) { + return tableContent; + } + + auto settings = tableContent.Settings().Ptr(); + bool canUseBlockInput = CanUseBlockInputForTableContent(tableContent); + bool hasSetting = HasSetting(*settings, EYtSettingType::BlockInputReady); + if (canUseBlockInput && !hasSetting) { + settings = AddSetting(*settings, EYtSettingType::BlockInputReady, TExprNode::TPtr(), ctx); + } else if (!canUseBlockInput && hasSetting) { + settings = RemoveSetting(*settings, EYtSettingType::BlockInputReady, ctx); + } else { + return tableContent; + } + return Build<TYtTableContent>(ctx, node.Pos()) + .InitFrom(tableContent) + .Settings(settings) + .Done(); + } + + bool CanUseBlockInputForTableContent(const TYtTableContent& tableContent) const { + if (auto readTable = tableContent.Input().Maybe<TYtReadTable>()) { + if (readTable.Cast().Input().Size() > 1) { + return false; + } + + for (auto path : readTable.Cast().Input().Item(0).Paths()) { + if (!IsYtTableSuitableForArrowInput(path.Table(), [](const TString&) {})) { + return false; + } + } + + } else if (auto output = tableContent.Input().Maybe<TYtOutput>()) { + auto outTable = GetOutTable(output.Cast()); + if (!IsYtTableSuitableForArrowInput(outTable, [](const TString&) {})) { + return false; + } + + } else { + YQL_ENSURE(false, "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName()); + } + + auto wideFlowLimit = State_->Configuration->WideFlowLimit.Get().GetOrElse(DEFAULT_WIDE_FLOW_LIMIT); + auto supportedTypes = State_->Configuration->JobBlockInputSupportedTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_TYPES); + auto supportedDataTypes = State_->Configuration->JobBlockInputSupportedDataTypes.Get().GetOrElse(DEFAULT_BLOCK_INPUT_SUPPORTED_DATA_TYPES); + + auto inputType = tableContent.Ref().GetTypeAnn(); + if (!CheckBlockIOSupportedTypes(*inputType, supportedTypes, supportedDataTypes, [](const TString&) {}, wideFlowLimit)) { + return false; + } + + return true; + } + private: const TYtState::TPtr State_; const THolder<IGraphTransformer> Finalizer_; diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp index 0f1041f4bb..639fa14ca0 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.cpp @@ -1,6 +1,7 @@ #include "yql_yt_block_io_utils.h" #include <yql/essentials/core/yql_opt_utils.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> namespace NYql { @@ -9,21 +10,20 @@ bool CheckBlockIOSupportedTypes( const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler, + size_t wideFlowLimit, bool allowNestedOptionals ) { - auto itemType = type.Cast<TFlowExprType>()->GetItemType(); - if (itemType->GetKind() == ETypeAnnotationKind::Multi) { - auto& itemTypes = itemType->Cast<TMultiExprType>()->GetItems(); - if (itemTypes.empty()) { - return false; - } + auto& itemType = GetSeqItemType(type); + if (itemType.GetKind() == ETypeAnnotationKind::Multi) { + auto& itemTypes = itemType.Cast<TMultiExprType>()->GetItems(); if (!CheckSupportedTypes(itemTypes, supportedTypes, supportedDataTypes, std::move(unsupportedTypeHandler), allowNestedOptionals)) { return false; } - } else if (itemType->GetKind() == ETypeAnnotationKind::Struct) { - auto& items = itemType->Cast<TStructExprType>()->GetItems(); - if (items.empty()) { + } else if (itemType.GetKind() == ETypeAnnotationKind::Struct) { + auto& items = itemType.Cast<TStructExprType>()->GetItems(); + if (items.empty() || items.size() > wideFlowLimit) { + unsupportedTypeHandler("fields count doesn't satisfy wide flow requirements"); return false; } diff --git a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h index 58e94c9443..dc72518fe6 100644 --- a/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h +++ b/yt/yql/providers/yt/provider/yql_yt_block_io_utils.h @@ -9,6 +9,7 @@ bool CheckBlockIOSupportedTypes( const TSet<TString>& supportedTypes, const TSet<NUdf::EDataSlot>& supportedDataTypes, std::function<void(const TString&)> unsupportedTypeHandler, + size_t wideFlowLimit, bool allowNestedOptionals = true ); diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp index 8eada7e706..fcad1669af 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_constraints.cpp @@ -27,6 +27,7 @@ public: AddHandler({TYtSection::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleSection)); AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleReadTable)); AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleTableContent)); + AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleBlockTableContent)); AddHandler({TYtIsKeySwitch::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault)); AddHandler({TYqlRowSpec::CallableName()}, Hndl(&TYtDataSourceConstraintTransformer::HandleDefault)); @@ -188,6 +189,45 @@ public: return TStatus::Ok; } + TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) { + TYtBlockTableContent tableContent = input.Cast<TYtBlockTableContent>(); + + auto listType = tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); + auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + auto pathRename = [&](TPartOfConstraintBase::TPathType path) -> std::vector<TPartOfConstraintBase::TPathType> { + YQL_ENSURE(!path.empty()); + + auto fieldIndex = itemStructType->FindItem(path[0]); + YQL_ENSURE(fieldIndex.Defined()); + + path[0] = ctx.GetIndexAsString(*fieldIndex); + return { path }; + }; + + TConstraintSet wideConstraints; + for (auto constraint : tableContent.Input().Ref().GetAllConstraints()) { + if (auto empty = dynamic_cast<const TEmptyConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(ctx.MakeConstraint<TEmptyConstraintNode>()); + } else if (auto sorted = dynamic_cast<const TSortedConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(sorted->RenameFields(ctx, pathRename)); + } else if (auto chopped = dynamic_cast<const TChoppedConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(chopped->RenameFields(ctx, pathRename)); + } else if (auto unique = dynamic_cast<const TUniqueConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(unique->RenameFields(ctx, pathRename)); + } else if (auto distinct = dynamic_cast<const TDistinctConstraintNode*>(constraint)) { + wideConstraints.AddConstraint(distinct->RenameFields(ctx, pathRename)); + } else { + YQL_ENSURE(false, "unexpected constraint"); + } + } + + input.Ptr()->SetConstraints(wideConstraints); + return TStatus::Ok; + } + TStatus HandleDefault(TExprBase input, TExprContext& /*ctx*/) { return UpdateAllChildLambdasConstraints(input.Ref()); } diff --git a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp index 4f3ffa5444..3447fbf35b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_datasource_type_ann.cpp @@ -46,6 +46,7 @@ public: AddHandler({TYtReadTable::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTable)); AddHandler({TYtReadTableScheme::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleReadTableScheme)); AddHandler({TYtTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleTableContent)); + AddHandler({TYtBlockTableContent::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleBlockTableContent)); AddHandler({TYtLength::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleLength)); AddHandler({TCoConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleConfigure)); AddHandler({TYtConfigure::CallableName()}, Hndl(&TYtDataSourceTypeAnnotationTransformer::HandleYtConfigure)); @@ -846,7 +847,8 @@ public: return TStatus::Error; } - const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small; + const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor + | EYtSettingType::Split | EYtSettingType::Small | EYtSettingType::BlockInputReady; if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) { return TStatus::Error; } @@ -858,6 +860,49 @@ public: if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) { return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx); } + + return TStatus::Ok; + } + + TStatus HandleBlockTableContent(TExprBase input, TExprContext& ctx) { + if (!EnsureArgsCount(input.Ref(), 2, ctx)) { + return TStatus::Error; + } + + const auto tableContent = input.Cast<TYtBlockTableContent>(); + + if (!tableContent.Input().Ref().IsCallable(TYtReadTable::CallableName()) + && !tableContent.Input().Ref().IsCallable(TYtOutput::CallableName())) { + ctx.AddError(TIssue(ctx.GetPosition(tableContent.Input().Pos()), TStringBuilder() + << "Expected " << TYtReadTable::CallableName() << " or " << TYtOutput::CallableName())); + return TStatus::Error; + } + + if (!EnsureTuple(tableContent.Settings().MutableRef(), ctx)) { + return TStatus::Error; + } + + const EYtSettingTypes allowed = EYtSettingType::MemUsage | EYtSettingType::ItemsCount | EYtSettingType::RowFactor | EYtSettingType::Split | EYtSettingType::Small; + if (!ValidateSettings(tableContent.Settings().Ref(), allowed, ctx)) { + return TStatus::Error; + } + + auto listType = tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); + auto itemStructType = listType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TTypeAnnotationNode::TListType multiTypeItems; + for (auto& item: itemStructType->GetItems()) { + multiTypeItems.emplace_back(ctx.MakeType<TBlockExprType>(item->GetItemType())); + } + multiTypeItems.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))); + input.Ptr()->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(multiTypeItems))); + + if (auto columnOrder = State_->Types->LookupColumnOrder(tableContent.Input().Ref())) { + return State_->Types->SetColumnOrder(input.Ref(), *columnOrder, ctx); + } + return TStatus::Ok; } diff --git a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp index ec7c6781fe..51d4184c0b 100644 --- a/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_join_impl.cpp @@ -1622,13 +1622,49 @@ TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr righ } } - auto settingsBuilder = Build<TCoNameValueTupleList>(ctx, pos); - if (isUniqueKey) { - settingsBuilder - .Add() - .Name() - .Value("rightAny") - .Build() + auto rightStream = ctx.Builder(pos) + .Callable("WideToBlocks") + .Callable(0, "FromFlow") + .Callable(0, "ExpandMap") + .Add(0, std::move(rightFlow)) + .Add(1, std::move(rightExpandLambda)) + .Seal() + .Seal() + .Seal() + .Build(); + + auto rightStreamItemTypeNode = ctx.Builder(pos) + .Callable("StreamItemType") + .Callable(0, "TypeOf") + .Add(0, rightStream) + .Seal() + .Seal() + .Build(); + + auto rightBlockStorage = ctx.Builder(pos) + .Callable("BlockStorage") + .Add(0, std::move(rightStream)) + .Seal() + .Build(); + + if (joinType->Content() != "Cross") { + auto indexSettingsBuilder = Build<TCoNameValueTupleList>(ctx, pos); + if (isUniqueKey) { + indexSettingsBuilder + .Add() + .Name() + .Value("any") + .Build() + .Build(); + } + + rightBlockStorage = ctx.Builder(pos) + .Callable("BlockMapJoinIndex") + .Add(0, std::move(rightBlockStorage)) + .Add(1, rightStreamItemTypeNode) + .Add(2, ctx.NewList(pos, TExprNode::TListType(rightKeyColumnPositionNodes))) + .Add(3, indexSettingsBuilder.Done().Ptr()) + .Seal() .Build(); } @@ -1645,20 +1681,13 @@ TExprNode::TPtr BuildBlockMapJoin(TExprNode::TPtr leftFlow, TExprNode::TPtr righ .Seal() .Seal() .Seal() - .Callable(1, "WideToBlocks") - .Callable(0, "FromFlow") - .Callable(0, "ExpandMap") - .Add(0, std::move(rightFlow)) - .Add(1, std::move(rightExpandLambda)) - .Seal() - .Seal() - .Seal() - .Add(2, std::move(joinType)) - .Add(3, ctx.NewList(pos, std::move(leftKeyColumnPositionNodes))) - .Add(4, ctx.NewList(pos, std::move(leftKeyDropPositionNodes))) - .Add(5, ctx.NewList(pos, std::move(rightKeyColumnPositionNodes))) - .Add(6, ctx.NewList(pos, std::move(rightKeyDropPositionNodes))) - .Add(7, settingsBuilder.Done().Ptr()) + .Add(1, std::move(rightBlockStorage)) + .Add(2, std::move(rightStreamItemTypeNode)) + .Add(3, std::move(joinType)) + .Add(4, ctx.NewList(pos, std::move(leftKeyColumnPositionNodes))) + .Add(5, ctx.NewList(pos, std::move(leftKeyDropPositionNodes))) + .Add(6, ctx.NewList(pos, std::move(rightKeyColumnPositionNodes))) + .Add(7, ctx.NewList(pos, std::move(rightKeyDropPositionNodes))) .Seal() .Seal() .Seal() diff --git a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp index 3ad3f683d9..1b51ea6baa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp @@ -49,6 +49,8 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, TType* const tupleTypeTables = ctx.ProgramBuilder.NewTupleType({strType, boolType, strType, ui64Type, ui64Type, boolType, ui32Type}); TType* const listTypeGroup = ctx.ProgramBuilder.NewListType(tupleTypeTables); + bool useBlocks = callName.EndsWith(TYtBlockTableContent::CallableName()); + const TExprNode* settings = nullptr; TMaybe<TSampleParams> sampling; TVector<TRuntimeNode> groups; @@ -227,9 +229,23 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, samplingTupleItems.push_back(ctx.ProgramBuilder.NewDataLiteral(isSystemSampling)); } - auto outListType = ctx.ProgramBuilder.NewListType(outItemType); + TType* outType = nullptr; + if (useBlocks) { + auto structType = AS_TYPE(TStructType, outItemType); + + std::vector<TType*> outputItems; + outputItems.reserve(structType->GetMembersCount()); + for (size_t i = 0; i < structType->GetMembersCount(); i++) { + outputItems.push_back(ctx.ProgramBuilder.NewBlockType(structType->GetMemberType(i), TBlockType::EShape::Many)); + } + outputItems.push_back(ctx.ProgramBuilder.NewBlockType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); + outType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewMultiType(outputItems)); + + } else { + outType = ctx.ProgramBuilder.NewListType(outItemType); + } - TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outListType); + TCallableBuilder call(ctx.ProgramBuilder.GetTypeEnvironment(), callName, outType); call.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(clusterName)); // cluster name call.Add(ctx.ProgramBuilder.NewList(listTypeGroup, groups)); @@ -241,6 +257,10 @@ TRuntimeNode BuildTableContentCall(TStringBuf callName, call.Add(ctx.ProgramBuilder.NewEmptyTuple()); } + if (useBlocks) { + call.Add(TRuntimeNode(outItemType, true)); + } + auto res = TRuntimeNode(call.Build(), false); if (settings) { @@ -479,6 +499,41 @@ void RegisterYtMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) { } }); + compiler.AddCallable(TYtBlockTableContent::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + TYtBlockTableContent tableContent(&node); + if (node.GetConstraint<TEmptyConstraintNode>()) { + const auto streamType = ctx.BuildType(node, *node.GetTypeAnn()); + return ctx.ProgramBuilder.EmptyIterator(streamType); + } + + auto origItemStructType = ( + tableContent.Input().Maybe<TYtOutput>() + ? tableContent.Input().Ref().GetTypeAnn() + : tableContent.Input().Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back() + )->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + + TMaybe<ui64> itemsCount; + TString name = ToString(TYtBlockTableContent::CallableName()); + if (auto setting = NYql::GetSetting(tableContent.Settings().Ref(), EYtSettingType::ItemsCount)) { + itemsCount = FromString<ui64>(setting->Child(1)->Content()); + } + if (NYql::HasSetting(tableContent.Settings().Ref(), EYtSettingType::Small)) { + name.prepend("Small"); + } + if (auto maybeRead = tableContent.Input().Maybe<TYtReadTable>()) { + auto read = maybeRead.Cast(); + return BuildTableContentCall(name, + ctx.BuildType(node, *origItemStructType), + read.DataSource().Cluster().Value(), read.Input().Ref(), itemsCount, ctx, true); + } else { + auto output = tableContent.Input().Cast<TYtOutput>(); + return BuildTableContentCall(name, + ctx.BuildType(node, *origItemStructType), + GetOutputOp(output).DataSink().Cluster().Value(), output.Ref(), itemsCount, ctx, true); + } + }); + compiler.AddCallable({TYtTablePath::CallableName(), TYtTableRecord::CallableName(), TYtTableIndex::CallableName(), TYtIsKeySwitch::CallableName(), TYtRowNumber::CallableName()}, [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { auto dataSlot = node.GetTypeAnn()->Cast<TDataExprType>()->GetSlot(); diff --git a/yt/yql/tests/sql/suites/lineage/unused_columns.sql b/yt/yql/tests/sql/suites/lineage/unused_columns.sql new file mode 100644 index 0000000000..e8ec1d79eb --- /dev/null +++ b/yt/yql/tests/sql/suites/lineage/unused_columns.sql @@ -0,0 +1,7 @@ +insert into plato.Output +select value from +( + select key as x, value from plato.Input + union all + select AsList(key) as x, value from plato.Input +); |