diff options
author | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-01 10:09:04 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.ru> | 2022-06-01 10:09:04 +0300 |
commit | 047e759274848e8f522c9dbd3066959a04242f7d (patch) | |
tree | 5fbc39b88bb9369ee0b9aa002a1fa82427e768da | |
parent | 6d7a8fa5ef8cf1d8cbdad87ec7e2f8a97f56285c (diff) | |
download | ydb-047e759274848e8f522c9dbd3066959a04242f7d.tar.gz |
[yql] DqPrecompute handling
YQL-12393
ref:0c15dc1d57056b00e77e0eb74925400cefb79949
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_peephole.cpp | 52 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_peephole.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/CMakeLists.txt | 12 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/dqs_opt.cpp | 82 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/dqs_opt.h | 11 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 74 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/planner/execution_planner.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 534 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp | 2 |
14 files changed, 703 insertions, 87 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index d4e9c50c4d..b93b8c83b8 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -877,6 +877,10 @@ private: attr->SetValue(queryTimeoutMs); attr = dqSettings.Add(); + attr->SetName("_EnablePrecompute"); + attr->SetValue("0"); // TODO: enable together with removing TEmptyGateway + + attr = dqSettings.Add(); attr->SetName("_LiteralTimeout"); attr->SetValue(queryTimeoutMs); diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp index 039a41477f..223cc2c9be 100644 --- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp @@ -3,10 +3,12 @@ #include <ydb/library/yql/core/yql_join.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/utils/log/log.h> #include <util/generic/size_literals.h> +#include <util/generic/bitmap.h> namespace NYql::NDq { @@ -614,4 +616,54 @@ NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExp .Done(); } +NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx) { + if (!node.Maybe<TDqStageBase>()) { + return node; + } + + auto stage = node.Cast<TDqStageBase>(); + + auto isArgumentUsed = [](const TExprNode::TPtr& node, const TExprNode* argument) { + return !!FindNode(node, + [](const TExprNode::TPtr& node) { + return !TDqStageBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get()); + }, + [argument](const TExprNode::TPtr& node) { + return node.Get() == argument; + }); + }; + + TDynBitMap unusedInputs; + for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { + if (!isArgumentUsed(stage.Program().Body().Ptr(), stage.Program().Args().Arg(i).Raw())) { + unusedInputs.Set(i); + } + } + + if (unusedInputs.Empty()) { + return node; + } + + TExprNode::TListType newInputs; + TExprNode::TListType newArgs; + TNodeOnNodeOwnedMap replaces; + + for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { + if (!unusedInputs.Test(i)) { + newInputs.push_back(stage.Inputs().Item(i).Ptr()); + auto arg = stage.Program().Args().Arg(i).Raw(); + newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content())); + replaces[arg] = newArgs.back(); + } + } + + auto children = node.Ref().ChildrenList(); + children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs)); + children[TDqStageBase::idx_Program] = ctx.NewLambda(stage.Program().Pos(), + ctx.NewArguments(stage.Program().Args().Pos(), std::move(newArgs)), + ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces)); + + return NNodes::TExprBase(ctx.ChangeChildren(node.Ref(), std::move(children))); +} + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.h b/ydb/library/yql/dq/opt/dq_opt_peephole.h index 565ee4296d..f403c8d39d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_peephole.h +++ b/ydb/library/yql/dq/opt/dq_opt_peephole.h @@ -12,5 +12,6 @@ NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExpr NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx); NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx); +NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index 063fdba022..e93e5ba7d1 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -254,6 +254,9 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o { TNodeOnNodeOwnedMap precomputesInsideLambda; VisitExpr(newProgram, [&precomputesInsideLambda](const TExprNode::TPtr& node) { + if (node->IsCallable() && node->Content().StartsWith("DqRead")) { + return false; + } if (TDqPhyPrecompute::Match(node.Get())) { precomputesInsideLambda[node.Get()] = node; return false; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index f9ba89a33d..4a5af653e0 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -45,6 +45,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, _FallbackOnRuntimeErrors); REGISTER_SETTING(*this, WorkerFilter); + REGISTER_SETTING(*this, _EnablePrecompute); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 3c37e4d48f..e357bd0f8e 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -65,6 +65,7 @@ struct TDqSettings { NCommon::TConfSetting<bool, false> EnableFullResultWrite; NCommon::TConfSetting<bool, false> _OneGraphPerQuery; NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors; + NCommon::TConfSetting<bool, false> _EnablePrecompute; NCommon::TConfSetting<TString, false> WorkerFilter; diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.txt index 2d11078641..36378b4938 100644 --- a/ydb/library/yql/providers/dq/opt/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/opt/CMakeLists.txt @@ -14,11 +14,19 @@ target_compile_options(providers-dq-opt PRIVATE target_link_libraries(providers-dq-opt PUBLIC contrib-libs-cxxsupp yutil + providers-dq-expr_nodes + providers-common-mkql + providers-common-codec + providers-common-transform yql-utils-log yql-dq-opt yql-dq-type_ann - providers-common-transform - providers-dq-expr_nodes + library-yql-core + yql-core-peephole_opt + yql-core-type_ann + library-yql-minikql + yql-minikql-computation + cpp-yson-node ) target_sources(providers-dq-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/opt/dqs_opt.cpp diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index 455adef20b..edfcc4232c 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -1,11 +1,14 @@ #include "dqs_opt.h" #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> +#include <ydb/library/yql/providers/common/codec/yql_codec.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h> #include <ydb/library/yql/core/type_ann/type_ann_core.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/dq/opt/dq_opt_phy.h> @@ -16,6 +19,15 @@ #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_mem_info.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> + +#include <library/cpp/yson/node/node_io.h> + #include <util/string/split.h> #define PERFORM_RULE(func, ...) \ @@ -112,11 +124,81 @@ namespace NYql::NDqs { PERFORM_RULE(DqPeepholeRewriteMapJoin, node, ctx); PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx); PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx); + PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx); return inputExpr; }, ctx, optSettings); }); } + THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) { + return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + TProcessedNodesSet ignoreNodes; + VisitExpr(input, [&](const TExprNode::TPtr& node) { + if (node != input && (TDqReadWrapBase::Match(node.Get()) || TDqPhyPrecompute::Match(node.Get()))) { + ignoreNodes.insert(node->UniqueId()); + return false; + } + return true; + }); + + TOptimizeExprSettings settings(typesCtx); + settings.ProcessedNodes = &ignoreNodes; + + NKikimr::NMiniKQL::TScopedAlloc alloc; + NKikimr::NMiniKQL::TTypeEnvironment env(alloc); + NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(env, *funcRegistry); + NKikimr::NMiniKQL::TMemoryUsageInfo memInfo("Precompute"); + NKikimr::NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo); + + return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + if (TDqStageBase::Match(node.Get())) { + auto stage = TDqStageBase(node); + TNodeOnNodeOwnedMap replaces; + std::vector<TCoArgument> newArgs; + std::vector<TExprNode::TPtr> newInputs; + for (size_t i = 0; i < stage.Inputs().Size(); ++i) { + const auto& input = stage.Inputs().Item(i); + if (input.Maybe<TDqPhyPrecompute>() && input.Ref().HasResult()) { + auto yson = input.Ref().GetResult().Content(); + auto dataNode = NYT::NodeFromYsonString(yson); + YQL_ENSURE(dataNode.IsList() && !dataNode.AsList().empty()); + dataNode = dataNode[0]; + TStringStream err; + NKikimr::NMiniKQL::TType* mkqlType = NCommon::BuildType(*input.Ref().GetTypeAnn(), pgmBuilder, err); + if (!mkqlType) { + ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Failed to process " << TDqPhyPrecompute::CallableName() << " type: " << err.Str())); + return nullptr; + } + + auto value = NCommon::ParseYsonNodeInResultFormat(holderFactory, dataNode, mkqlType, &err); + if (!value) { + ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Failed to parse " << TDqPhyPrecompute::CallableName() << " value: " << err.Str())); + return nullptr; + } + + replaces[stage.Program().Args().Arg(i).Raw()] = NCommon::ValueToExprLiteral(input.Ref().GetTypeAnn(), *value, ctx, input.Pos()); + } else { + newArgs.push_back(stage.Program().Args().Arg(i)); + newInputs.push_back(input.Ptr()); + } + } + + if (!replaces.empty()) { + YQL_CLOG(DEBUG, ProviderDq) << "DqsReplacePrecomputes"; + auto children = stage.Ref().ChildrenList(); + children[TDqStageBase::idx_Program] = Build<TCoLambda>(ctx, stage.Program().Pos()) + .Args(newArgs) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces)) + .Done().Ptr(); + children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs)); + return ctx.ChangeChildren(stage.Ref(), std::move(children)); + } + } + return node; + }, ctx, settings); + }); + } + namespace NPeephole { class TDqsPeepholeTransformer: public TSyncTransformerBase { diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.h b/ydb/library/yql/providers/dq/opt/dqs_opt.h index 6c9e1a7c18..25da467f41 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.h +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.h @@ -1,8 +1,14 @@ #pragma once #include <ydb/library/yql/core/yql_graph_transformer.h> -#include <ydb/library/yql/core/yql_type_annotation.h> -#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> + +namespace NKikimr::NMiniKQL { + class IFunctionRegistry; +} + +namespace NYql { + struct TTypeAnnotationContext; +} namespace NYql::NDqs { class TDatabaseManager; @@ -11,6 +17,7 @@ namespace NYql::NDqs { THolder<IGraphTransformer> CreateDqsFinalizingOptTransformer(); THolder<IGraphTransformer> CreateDqsBuildTransformer(); THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer(); + THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry); THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 657d7453e4..2464b27b87 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -17,8 +17,9 @@ using namespace NYql::NNodes; class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: - TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx) + TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) : TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {}) + , Config(config) { #define HNDL(name) "DqsPhy-"#name, Hndl(&TDqsPhysicalOptProposalTransformer::name) AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap)); @@ -41,10 +42,13 @@ public: AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>)); AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>)); AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>)); -#if 0 - AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); - AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); -#endif + if (Config->_EnablePrecompute.Get().GetOrElse(false)) { + AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems)); + AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>)); + AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>)); + AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute)); + AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput)); + } AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>)); AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>)); @@ -58,6 +62,10 @@ public: AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>)); AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>)); AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>)); + if (Config->_EnablePrecompute.Get().GetOrElse(false)) { + AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>)); + AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>)); + } #undef HNDL SetGlobal(1u); @@ -296,10 +304,62 @@ protected: TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal); } + + TMaybeNode<TExprBase> BuildPrecompute(TExprBase node, TExprContext& ctx) { + return DqBuildPrecompute(node, ctx); + } + + TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) { + auto stage = node.Cast<TDqStage>(); + + TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(), + [](const TExprNode::TPtr& node) { + return !TDqReadWrapBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get()); + }, + [](const TExprNode::TPtr& node) { + return TDqPhyPrecompute::Match(node.Get()); + } + ); + + if (innerPrecomputes.empty()) { + return node; + } + + TVector<TExprNode::TPtr> newInputs; + TVector<TExprNode::TPtr> newArgs; + TNodeOnNodeOwnedMap replaces; + + for (ui64 i = 0; i < stage.Inputs().Size(); ++i) { + newInputs.push_back(stage.Inputs().Item(i).Ptr()); + auto arg = stage.Program().Args().Arg(i).Raw(); + newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content())); + replaces[arg] = newArgs.back(); + } + + for (auto& precompute: innerPrecomputes) { + newInputs.push_back(precompute); + newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size())); + replaces[precompute.Get()] = newArgs.back(); + } + + return Build<TDqStage>(ctx, stage.Pos()) + .Inputs() + .Add(newInputs) + .Build() + .Program() + .Args(newArgs) + .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces)) + .Build() + .Settings().Build() + .Done(); + } + +private: + TDqConfiguration::TPtr Config; }; -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) { - return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx)); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) { + return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config)); } } // NYql::NDqs diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h index 7ead367cbc..5ed9da9880 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.h +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/core/yql_graph_transformer.h> #include <ydb/library/yql/core/yql_type_annotation.h> @@ -7,6 +8,6 @@ namespace NYql::NDqs { -THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx); +THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config); } // namespace NYql::NDqs diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 03e69a9fef..5cf9450d05 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -135,6 +135,7 @@ namespace NYql::NDqs { TExprBase expr(DqExprRoot); auto result = expr.Maybe<TDqCnResult>(); auto query = expr.Maybe<TDqQuery>(); + auto value = expr.Maybe<TDqPhyPrecompute>(); const auto maxTasksPerOperation = settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation); YQL_CLOG(DEBUG, ProviderDq) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot); @@ -223,8 +224,15 @@ namespace NYql::NDqs { YQL_ENSURE(!stageInfo.Tasks.empty()); } + TMaybeNode<TDqPhyStage> finalStage; if (result) { - auto& resultStageInfo = TasksGraph.GetStageInfo(result.Cast().Output().Stage().Cast<TDqPhyStage>()); + finalStage = result.Cast().Output().Stage().Maybe<TDqPhyStage>(); + } else if (value) { + finalStage = value.Cast().Connection().Output().Stage().Maybe<TDqPhyStage>(); + } + + if (finalStage) { + auto& resultStageInfo = TasksGraph.GetStageInfo(finalStage.Cast()); YQL_ENSURE(resultStageInfo.Tasks.size() == 1); auto& resultTask = TasksGraph.GetTask(resultStageInfo.Tasks[0]); YQL_ENSURE(resultTask.Outputs.size() == 1); diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 1809bdc08a..a703de40cc 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -52,11 +52,14 @@ #include <util/stream/file.h> #include <util/string/builder.h> +#include <memory> + namespace NYql { using namespace NCommon; using namespace NKikimr::NMiniKQL; using namespace NNodes; +using namespace NThreading; namespace { @@ -201,16 +204,40 @@ private: TDqStatePtr State; }; +struct TUploadCache { + THashMap<TString, TFileLinkPtr> FileLinks; + THashMap<TString, TString> ModulesMapping; + + using TPtr = std::shared_ptr<TUploadCache>; +}; + +struct TPublicIds { + THashMap<ui32, ui32> AllPublicIds; + THashMap<ui64, ui32> Stage2publicId; + size_t GraphsCount = 0; + + using TPtr = std::shared_ptr<TPublicIds>; +}; + struct TDqsPipelineConfigurator : public IPipelineConfigurator { +public: + TDqsPipelineConfigurator(const TDqStatePtr& state) + : State_(state) + { + } private: void AfterCreate(TTransformationPipeline*) const final {} void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { - pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "Build-Phy"); - pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(), "Rewrite-Phy-Callables"); + pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "BuildPhy"); + pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(), "RewritePhyCallables"); + pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(pipeline->GetTypeAnnotationContext().Get(), State_->FunctionRegistry), "ReplacePrecomputes"); } void AfterOptimize(TTransformationPipeline*) const final {} + +private: + TDqStatePtr State_; }; class TDqExecTransformer: public TExecTransformerBase, TCounters @@ -218,8 +245,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters public: TDqExecTransformer(const TDqStatePtr& state) : State(state) - , DqTypeAnnotationTransformer( - CreateTypeAnnotationTransformer(NDq::CreateDqTypeAnnotationTransformer(*State->TypeCtx), *State->TypeCtx)) + , ExecState(MakeIntrusive<TExecState>()) { AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult)); AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull)); @@ -228,6 +254,22 @@ public: } private: + struct TExecState : public TThrRefBase { + TAdaptiveLock Lock; + + struct TItem : public TIntrusiveListItem<TItem> { + TExprNode* Node = nullptr; + TAsyncTransformCallback Callback; + }; + + using TQueueType = TIntrusiveListWithAutoDelete<TExecState::TItem, TDelete>; + TQueueType Completed; + NThreading::TPromise<void> Promise = NThreading::NewPromise(); + bool HasResult = false; + }; + + using TExecStatePtr = TIntrusivePtr<TExecState>; + void GetResultType(TString* type, TVector<TString>* columns, const TExprNode& resOrPull, const TExprNode& resOrPullInput) const { *columns = NCommon::GetResOrPullColumnHints(resOrPull); @@ -655,17 +697,17 @@ private: bool enableLocalRun = true; - THashMap<ui32, ui32> allPublicIds; + TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>(); VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) { const TExprBase expr(node); if (expr.Maybe<TResFill>()) { if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { - allPublicIds.emplace(*publicId, 0U); + publicIds->AllPublicIds.emplace(*publicId, 0U); } } return true; }); - IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); bool enableFullResultWrite = settings->EnableFullResultWrite.Get().GetOrElse(false); if (enableFullResultWrite) { const auto type = result.Input().Ref().GetTypeAnn(); @@ -859,14 +901,51 @@ private: } } - TStatusCallbackPair FallbackWithMessage(const TExprNode& node, const TString& message, TExprContext& ctx) { + IGraphTransformer::TStatus FallbackWithMessage(const TExprNode& node, const TString& message, TExprContext& ctx, bool isRoot) { if (State->Metrics) { State->Metrics->IncCounter("dq", "Fallback"); } State->Statistics[State->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); auto issue = TIssue(ctx.GetPosition(node.Pos()), message).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING); - ctx.AssociativeIssues.emplace(&node, TIssues{std::move(issue)}); - return SyncStatus(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error)); + if (isRoot) { + ctx.AssociativeIssues.emplace(&node, TIssues{std::move(issue)}); + } else { + ctx.IssueManager.RaiseIssue(issue); + } + return IGraphTransformer::TStatus::Error; + } + + TPublicIds::TPtr GetPublicIds(const TExprNode::TPtr& root) const { + TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>(); + VisitExpr(root, [&](const TExprNode::TPtr& node) { + if (TResTransientBase::Match(node.Get())) { + return false; + } else if (TDqReadWrapBase::Match(node.Get())) { + return false; + } else if (TDqConnection::Match(node.Get())) { + if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { + publicIds->AllPublicIds.emplace(*publicId, 0U); + } + } else if (TDqPhyPrecompute::Match(node.Get())) { + if (node->HasResult()) { + return false; + } + } else if (TDqStage::Match(node.Get())) { + const auto& stage = TDqStage(node); + if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) { + if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { + if (const auto settings = NDq::TDqStageSettings::Parse(stage); settings.LogicalId) { + publicIds->Stage2publicId[settings.LogicalId] = *publicId; + } + publicIds->AllPublicIds.emplace(*publicId, 0U); + } + } + } else if (TDqCnResult::Match(node.Get()) || TDqQuery::Match(node.Get())) { + ++publicIds->GraphsCount; + } + return true; + }); + return publicIds; } TStatusCallbackPair HandlePull(const TExprNode::TPtr& input, TExprContext& ctx) { @@ -879,45 +958,29 @@ private: YQL_ENSURE(!TMaybeNode<TDqQuery>(pull.Input().Ptr()) || State->Settings->EnableComputeActor.Get().GetOrElse(false), "DqQuery is not supported with worker actor"); - TString type; - TVector<TString> columns; - GetResultType(&type, &columns, pull.Ref(), pull.Input().Ref()); - const bool oneGraphPerQuery = State->Settings->_OneGraphPerQuery.Get().GetOrElse(false); - size_t graphsCount = 0; - THashMap<ui32, ui32> allPublicIds; - THashMap<ui64, ui32> stage2publicId; - bool hasStageError = false; - VisitExpr(pull.Ptr(), [&](const TExprNode::TPtr& node) { - if (TResTransientBase::Match(node.Get())) - return false; - if (const TExprBase expr(node); expr.Maybe<TDqConnection>()) { - if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { - allPublicIds.emplace(*publicId, 0U); - } - } else if (const auto& maybeStage = expr.Maybe<TDqStage>()) { - const auto& stage = maybeStage.Cast(); - if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) { - if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) { - if (const auto settings = NDq::TDqStageSettings::Parse(maybeStage.Cast()); settings.LogicalId) { - stage2publicId[settings.LogicalId] = *publicId; - } - allPublicIds.emplace(*publicId, 0U); - } - } - } else if (oneGraphPerQuery) { - if (expr.Maybe<TDqCnResult>() || expr.Maybe<TDqQuery>()) { - ++graphsCount; + auto publicIds = GetPublicIds(pull.Ptr()); + YQL_ENSURE(!oneGraphPerQuery || publicIds->GraphsCount == 1, "Internal error: only one graph per query is allowed"); + + auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr()); + if (!precomputes.empty()) { + auto status = HandlePrecomputes(precomputes, ctx); + if (status.Level != TStatus::Ok) { + if (status == TStatus::Async) { + return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) { + completedFuture.GetValue(); + return HandlePrecomputeAsyncComplete(execState); + })); + } else { + return SyncStatus(status); } } - return true; - }); - YQL_ENSURE(!oneGraphPerQuery || graphsCount == 1, "Internal error: only one graph per query is allowed"); - - if (hasStageError) { - return SyncError(); } + TString type; + TVector<TString> columns; + GetResultType(&type, &columns, pull.Ref(), pull.Input().Ref()); + auto optimizedInput = pull.Input().Ptr(); THashMap<TString, TString> secureParams; NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams); @@ -926,17 +989,10 @@ private: optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn()); optimizedInput->CopyConstraints(pull.Input().Ref()); - TDqsPipelineConfigurator peepholeConfig; - TPeepholeSettings peepholeSettings; - peepholeSettings.CommonConfig = &peepholeConfig; - bool hasNonDeterministicFunctions; - // TODO: do it per stage - auto status = PeepHoleOptimizeNode<true>(optimizedInput, optimizedInput, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings); - if (status != TStatus::Ok) { - ctx.AddError(TIssue(ctx.GetPosition(optimizedInput->Pos()), TString("Peephole optimization failed for Dq stage"))); + auto status = PeepHole(optimizedInput, optimizedInput, ctx); + if (status.Level != TStatus::Ok) { return SyncStatus(status); } - YQL_CLOG(TRACE, ProviderDq) << "After PeepHole\n" << NCommon::ExprToPrettyString(ctx, *optimizedInput); // copy-paste { TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches; @@ -958,7 +1014,7 @@ private: // exprRoot must be DqCnResult or DqQuery - executionPlanner->SetPublicIds(stage2publicId); + executionPlanner->SetPublicIds(publicIds->Stage2publicId); auto settings = std::make_shared<TDqSettings>(*State->Settings); auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage); @@ -974,12 +1030,12 @@ private: bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq); if (stagesCount > maxTasksPerOperation && canFallback) { - return FallbackWithMessage( + return SyncStatus(FallbackWithMessage( pull.Ref(), TStringBuilder() << "Too many stages: " << stagesCount << " > " - << maxTasksPerOperation, ctx); + << maxTasksPerOperation, ctx, true)); } YQL_ENSURE(stagesCount <= maxTasksPerOperation); @@ -992,28 +1048,28 @@ private: } } catch (const TFallbackError& err) { YQL_ENSURE(canFallback, "Unexpected TFallbackError: " << err.what()); - return FallbackWithMessage(pull.Ref(), err.what(), ctx); + return SyncStatus(FallbackWithMessage(pull.Ref(), err.what(), ctx, true)); } bool fallbackFlag = false; if (executionPlanner->MaxDataSizePerJob() > maxDataSizePerJob && canFallback) { - return FallbackWithMessage( + return SyncStatus(FallbackWithMessage( pull.Ref(), TStringBuilder() << "MaxDataSizePerJob reached: " << executionPlanner->MaxDataSizePerJob() << " > " - << maxDataSizePerJob, ctx); + << maxDataSizePerJob, ctx, true)); } bool localRun = false; auto& tasks = executionPlanner->GetTasks(); if (tasks.size() > maxTasksPerOperation && canFallback) { - return FallbackWithMessage( + return SyncStatus(FallbackWithMessage( pull.Ref(), TStringBuilder() << "Too many tasks: " << tasks.size() << " > " - << maxTasksPerOperation, ctx); + << maxTasksPerOperation, ctx, true)); } YQL_ENSURE(tasks.size() <= maxTasksPerOperation); @@ -1033,15 +1089,15 @@ private: *taskMeta.AddFiles() = file; } t.MutableMeta()->PackFrom(taskMeta); - if (const auto it = allPublicIds.find(taskMeta.GetStageId()); allPublicIds.cend() != it) + if (const auto it = publicIds->AllPublicIds.find(taskMeta.GetStageId()); publicIds->AllPublicIds.cend() != it) ++it->second; } } - MarkProgressStarted(allPublicIds, State->ProgressWriter); + MarkProgressStarted(publicIds->AllPublicIds, State->ProgressWriter); if (fallbackFlag) { - return FallbackWithMessage(pull.Ref(), "Too big attachment", ctx); + return SyncStatus(FallbackWithMessage(pull.Ref(), "Too big attachment", ctx, true)); } IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(pull.Ref()); @@ -1068,23 +1124,23 @@ private: if (ref) { if (!enableFullResultWrite) { - return FallbackWithMessage(pull.Ref(), + return SyncStatus(FallbackWithMessage(pull.Ref(), TStringBuilder() << "RefSelect mode cannot be used with DQ, because \"" << State->TypeCtx->FullResultDataSink << "\" provider has failed to prepare a result table", - ctx); + ctx, true)); } // Force write to table settings->_AllResultsBytesLimit = 0; settings->_RowsLimitPerWrite = 0; } - IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds); + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), columns, secureParams, graphParams, settings, progressWriter, ModulesMapping, fillSettings.Discard); - future.Subscribe([allPublicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { + future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { YQL_ENSURE(!completedFuture.HasException()); - MarkProgressFinished(allPublicIds, completedFuture.GetValueSync().Success(), progressWriter); + MarkProgressFinished(publicIds->AllPublicIds, completedFuture.GetValueSync().Success(), progressWriter); }); executionPlanner.Destroy(); @@ -1206,9 +1262,9 @@ private: }, ""); } - IDqGateway::TDqProgressWriter MakeDqProgressWriter(const THashMap<ui32, ui32>& allPublicIds) const { - IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, allPublicIds](const TString& stage) { - for (const auto& publicId : allPublicIds) { + IDqGateway::TDqProgressWriter MakeDqProgressWriter(const TPublicIds::TPtr& publicIds) const { + IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, publicIds](const TString& stage) { + for (const auto& publicId : publicIds->AllPublicIds) { auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage); if (publicId.second) { p.Counters.ConstructInPlace(); @@ -1289,8 +1345,340 @@ private: return nullptr; } + TNodeOnNodeOwnedMap FindIndependentPrecomputes(const TExprNode::TPtr& node) const { + TNodeOnNodeOwnedMap precomputes; + TNodeMap<bool> visitedNodes; + FindIndependentPrecomputesImpl(node, precomputes, visitedNodes); + return precomputes; + } + + bool FindIndependentPrecomputesImpl(const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& precomputes, TNodeMap<bool>& visitedNodes) const { + if (auto it = visitedNodes.find(node.Get()); it != visitedNodes.cend()) { + return it->second; + } + + bool& hasPrecompute = visitedNodes[node.Get()]; + if (TDqStageBase::Match(node.Get())) { + auto stage = TDqStageBase(node); + for (const auto& input : stage.Inputs()) { + if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) { + if (!input.Ref().HasResult() && input.Ref().GetState() != TExprNode::EState::Error) { + hasPrecompute = true; + if (input.Ref().StartsExecution() || !FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes)) { + precomputes[input.Raw()] = input.Ptr(); + } + } + } else { + hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute; + } + } + } else { + for (auto child: node->Children()) { + hasPrecompute = FindIndependentPrecomputesImpl(child, precomputes, visitedNodes) || hasPrecompute; + } + } + return hasPrecompute; + } + + static void CompleteNode(const TExecStatePtr& execState, TExprNode* node, const TAsyncTransformCallback& callback) { + auto item = MakeHolder<TExecState::TItem>(); + item->Node = node; + item->Callback = callback; + + NThreading::TPromise<void> promiseToSet; + with_lock(execState->Lock) { + execState->Completed.PushBack(item.Release()); + if (!execState->HasResult) { + execState->HasResult = true; + promiseToSet = execState->Promise; + } + } + + if (promiseToSet.Initialized()) { + promiseToSet.SetValue(); + } + } + + static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecStatePtr execState) { + return TAsyncTransformCallback([execState](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + output = input; + TStatus combinedStatus = TStatus::Repeat; + TExecState::TQueueType completed; + auto newPromise = NThreading::NewPromise(); + { + TGuard<TAdaptiveLock> guard(execState->Lock); + completed.Swap(execState->Completed); + execState->Promise.Swap(newPromise); + execState->HasResult = false; + } + + for (auto& item : completed) { + TExprNode::TPtr callableOutput; + auto status = item.Callback(item.Node, callableOutput, ctx); + if (status.Level != TStatus::Error) { + YQL_ENSURE(callableOutput == item.Node, "Unsupported node rewrite"); + } + combinedStatus = combinedStatus.Combine(status); + } + + return combinedStatus; + }); + } + + IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx) { + + IDataProvider::TFillSettings fillSettings; + fillSettings.AllResultsBytesLimit.Clear(); + fillSettings.RowsLimitPerWrite.Clear(); + auto commonSettings = State->Settings->WithFillSettings(fillSettings); + commonSettings->EnableFullResultWrite = false; + + IGraphTransformer::TStatus combinedStatus = TStatus::Ok; + + for (auto [_, input]: precomputes) { + TString uniqId = TStringBuilder() << input->Content() << "(#" << input->UniqueId() << ')'; + YQL_LOG_CTX_SCOPE(uniqId); + if (input->StartsExecution()) { + YQL_CLOG(DEBUG, ProviderDq) << "Continue async execution"; + combinedStatus = combinedStatus.Combine(TStatus::Async); + continue; + } + + auto logCtx = NLog::CurrentLogContextPath(); + TInstant startTime = TInstant::Now(); + YQL_CLOG(DEBUG, ProviderDq) << "Executing " << input->Content(); + + auto publicIds = GetPublicIds(input); + + auto optimizedInput = input; + + auto status = PeepHole(input, optimizedInput, ctx); + if (status.Level != TStatus::Ok) { + return combinedStatus.Combine(status); + } + + input->SetState(TExprNode::EState::ExecutionInProgress); + + THashMap<TString, TString> secureParams; + NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams); + + // copy-paste { + TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches; + TUserDataTable files; + StartCounter("FreezeUsedFiles"); + + auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){ return true; }, crutches); + if (filesRes.first.Level != TStatus::Ok) { + combinedStatus = combinedStatus.Combine(status); + if (filesRes.first.Level == TStatus::Error) { + return filesRes.first; + } + YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content(); + if (filesRes.first.Level == TStatus::Async) { + filesRes.second.Subscribe([execState = ExecState, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) { + YQL_LOG_CTX_ROOT_SCOPE(logCtx); + YQL_ENSURE(!future.HasException()); + YQL_CLOG(DEBUG, ProviderDq) << "Finishing freezing files"; + CompleteNode(execState, node, future.GetValue()); + }); + } + continue; + } + FlushCounter("FreezeUsedFiles"); + // copy-paste } + + auto executionPlanner = MakeHolder<TDqsExecutionPlanner>( + State->TypeCtx, ctx, State->FunctionRegistry, + optimizedInput); + + // exprRoot must be DqCnResult or DqQuery + + executionPlanner->SetPublicIds(publicIds->Stage2publicId); + + auto settings = std::make_shared<TDqSettings>(*commonSettings); + + auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage); + const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation); + + auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob); + auto stagesCount = executionPlanner->StagesCount(); + + if (!executionPlanner->CanFallback()) { + settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never"; + } + + bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq); + + if (stagesCount > maxTasksPerOperation && canFallback) { + return FallbackWithMessage( + *input, + TStringBuilder() + << "Too many stages: " + << stagesCount << " > " + << maxTasksPerOperation, ctx, false); + } + + YQL_ENSURE(stagesCount <= maxTasksPerOperation); + + try { + while (executionPlanner->PlanExecution(settings, canFallback) > maxTasksPerOperation && tasksPerStage > 1) { + tasksPerStage /= 2; + settings->MaxTasksPerStage = tasksPerStage; + executionPlanner->Clear(); + } + } catch (const TFallbackError& err) { + YQL_ENSURE(canFallback, "Unexpected TFallbackError: " << err.what()); + return FallbackWithMessage(*input, err.what(), ctx, false); + } + + bool fallbackFlag = false; + if (executionPlanner->MaxDataSizePerJob() > maxDataSizePerJob && canFallback) { + return FallbackWithMessage( + *input, + TStringBuilder() + << "MaxDataSizePerJob reached: " + << executionPlanner->MaxDataSizePerJob() << " > " + << maxDataSizePerJob, ctx, false); + } + + auto& tasks = executionPlanner->GetTasks(); + if (tasks.size() > maxTasksPerOperation && canFallback) { + return FallbackWithMessage( + *input, + TStringBuilder() + << "Too many tasks: " + << tasks.size() << " > " + << maxTasksPerOperation, ctx, false); + } + + YQL_ENSURE(tasks.size() <= maxTasksPerOperation); + + { + TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators()); + TTypeEnvironment typeEnv(alloc); + for (auto& t : tasks) { + TUploadList uploadList; + TString lambda = t.GetProgram().GetRaw(); + fallbackFlag |= BuildUploadList(&uploadList, false, &lambda, typeEnv, files); + t.MutableProgram()->SetRaw(lambda); + + Yql::DqsProto::TTaskMeta taskMeta; + t.MutableMeta()->UnpackTo(&taskMeta); + for (const auto& file : uploadList) { + *taskMeta.AddFiles() = file; + } + t.MutableMeta()->PackFrom(taskMeta); + if (const auto it = publicIds->AllPublicIds.find(taskMeta.GetStageId()); publicIds->AllPublicIds.cend() != it) + ++it->second; + } + } + + if (fallbackFlag) { + return FallbackWithMessage(*input, "Too big attachment", ctx, false); + } + + auto graphParams = GatherGraphParams(optimizedInput); + + MarkProgressStarted(publicIds->AllPublicIds, State->ProgressWriter); + + IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds); + + auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), {}, secureParams, graphParams, + settings, progressWriter, ModulesMapping, false); + + executionPlanner.Destroy(); + + bool neverFallback = settings->FallbackPolicy.Get().GetOrElse("default") == "never"; + future.Subscribe([publicIds, state = State, startTime, execState = ExecState, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) { + YQL_LOG_CTX_ROOT_SCOPE(logCtx); + YQL_ENSURE(!completedFuture.HasException()); + const IDqGateway::TResult& res = completedFuture.GetValueSync(); + + MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter); + + auto duration = TInstant::Now() - startTime; + if (state->Metrics) { + state->Metrics->SetCounter("dq", "PrecomputeExecutionTime", duration.MilliSeconds()); + } + + state->Statistics[state->MetricId++] = res.Statistics; + + if (res.Fallback || !res.Success()) { + YQL_CLOG(DEBUG, ProviderDq) << "Finished with error"; + if (res.Fallback) { + if (state->Metrics) { + state->Metrics->IncCounter("dq", "Fallback"); + } + state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1)); + } + + CompleteNode(execState, node, [resIssues = res.Issues, fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus { + auto issue = TIssue(ctx.GetPosition(input->Pos()), "Gateway Error"); + if (fallback) { + issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING); + } else { + issue.SetCode(TIssuesIds::UNEXPECTED, TSeverityIds::S_ERROR); + } + auto issues = TIssues{}; + issues.AddIssue(issue); + issues.AddIssues(resIssues); + ctx.IssueManager.RaiseIssues(issues); + input->SetState(TExprNode::EState::Error); + return IGraphTransformer::TStatus::Error; + }); + } else if (res.Truncated) { + YQL_CLOG(DEBUG, ProviderDq) << "Finished with truncated result"; + CompleteNode(execState, node, [neverFallback, resIssues = res.Issues](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus { + auto issue = TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "DQ cannot execute the query. Cause: " << "too big precompute result").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO); + for (const auto& i : resIssues) { + TIssuePtr subIssue = new TIssue(i); + if (neverFallback && subIssue->Severity == TSeverityIds::S_WARNING) { + subIssue->Severity = TSeverityIds::S_ERROR; + } + issue.AddSubIssue(subIssue); + } + + if (neverFallback) { + issue.Message = "Too big precompute result"; + issue.Severity = TSeverityIds::S_ERROR; + } + ctx.IssueManager.RaiseIssue(issue); + input->SetState(TExprNode::EState::Error); + return IGraphTransformer::TStatus::Error; + }); + } else { + YQL_CLOG(DEBUG, ProviderDq) << "Finished"; + CompleteNode(execState, node, [resIssues = res.Issues, data = res.Data, success = res.Success()](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus { + output = input; + ctx.IssueManager.RaiseIssues(resIssues); + input->SetResult(ctx.NewAtom(input->Pos(), data)); + input->SetState(TExprNode::EState::ExecutionComplete); + return IGraphTransformer::TStatus::Ok; + }); + } + }); + combinedStatus = combinedStatus.Combine(IGraphTransformer::TStatus::Async); + } + return combinedStatus; + } + + IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const { + TDqsPipelineConfigurator peepholeConfig(State); + TPeepholeSettings peepholeSettings; + peepholeSettings.CommonConfig = &peepholeConfig; + bool hasNonDeterministicFunctions; + auto status = PeepHoleOptimizeNode<true>(input, output, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings); + if (status.Level != TStatus::Ok) { + ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TString("Peephole optimization failed for Dq stage"))); + return status; + } + YQL_CLOG(TRACE, ProviderDq) << "After PeepHole\n" << NCommon::ExprToPrettyString(ctx, *output); + return status; + } + +private: TDqStatePtr State; - THolder<IGraphTransformer> DqTypeAnnotationTransformer; + TExecStatePtr ExecState; mutable THashMap<TString, TFileLinkPtr> FileLinks; mutable THashMap<TString, TString> ModulesMapping; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp index a8b5e598a0..9ab03526cc 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp @@ -28,7 +28,7 @@ public: TDqDataProviderSink(const TDqStatePtr& state) : State(state) , LogOptTransformer([state] () { return CreateDqsLogOptTransformer(/*TODO: State->TypeCtx);*/nullptr, state->Settings); }) - , PhyOptTransformer([] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr); }) + , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr, state->Settings); }) , PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); }) , TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); }) , RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); }) |