aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-01 10:09:04 +0300
committerudovichenko-r <udovichenko-r@yandex-team.ru>2022-06-01 10:09:04 +0300
commit047e759274848e8f522c9dbd3066959a04242f7d (patch)
tree5fbc39b88bb9369ee0b9aa002a1fa82427e768da
parent6d7a8fa5ef8cf1d8cbdad87ec7e2f8a97f56285c (diff)
downloadydb-047e759274848e8f522c9dbd3066959a04242f7d.tar.gz
[yql] DqPrecompute handling
YQL-12393 ref:0c15dc1d57056b00e77e0eb74925400cefb79949
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp4
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_peephole.cpp52
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_peephole.h1
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp3
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp1
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h1
-rw-r--r--ydb/library/yql/providers/dq/opt/CMakeLists.txt12
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.cpp82
-rw-r--r--ydb/library/yql/providers/dq/opt/dqs_opt.h11
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.cpp74
-rw-r--r--ydb/library/yql/providers/dq/opt/physical_optimize.h3
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp10
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp534
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp2
14 files changed, 703 insertions, 87 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index d4e9c50c4d..b93b8c83b8 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -877,6 +877,10 @@ private:
attr->SetValue(queryTimeoutMs);
attr = dqSettings.Add();
+ attr->SetName("_EnablePrecompute");
+ attr->SetValue("0"); // TODO: enable together with removing TEmptyGateway
+
+ attr = dqSettings.Add();
attr->SetName("_LiteralTimeout");
attr->SetValue(queryTimeoutMs);
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
index 039a41477f..223cc2c9be 100644
--- a/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_peephole.cpp
@@ -3,10 +3,12 @@
#include <ydb/library/yql/core/yql_join.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/utils/log/log.h>
#include <util/generic/size_literals.h>
+#include <util/generic/bitmap.h>
namespace NYql::NDq {
@@ -614,4 +616,54 @@ NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExp
.Done();
}
+NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx) {
+ if (!node.Maybe<TDqStageBase>()) {
+ return node;
+ }
+
+ auto stage = node.Cast<TDqStageBase>();
+
+ auto isArgumentUsed = [](const TExprNode::TPtr& node, const TExprNode* argument) {
+ return !!FindNode(node,
+ [](const TExprNode::TPtr& node) {
+ return !TDqStageBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get());
+ },
+ [argument](const TExprNode::TPtr& node) {
+ return node.Get() == argument;
+ });
+ };
+
+ TDynBitMap unusedInputs;
+ for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
+ if (!isArgumentUsed(stage.Program().Body().Ptr(), stage.Program().Args().Arg(i).Raw())) {
+ unusedInputs.Set(i);
+ }
+ }
+
+ if (unusedInputs.Empty()) {
+ return node;
+ }
+
+ TExprNode::TListType newInputs;
+ TExprNode::TListType newArgs;
+ TNodeOnNodeOwnedMap replaces;
+
+ for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
+ if (!unusedInputs.Test(i)) {
+ newInputs.push_back(stage.Inputs().Item(i).Ptr());
+ auto arg = stage.Program().Args().Arg(i).Raw();
+ newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content()));
+ replaces[arg] = newArgs.back();
+ }
+ }
+
+ auto children = node.Ref().ChildrenList();
+ children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs));
+ children[TDqStageBase::idx_Program] = ctx.NewLambda(stage.Program().Pos(),
+ ctx.NewArguments(stage.Program().Args().Pos(), std::move(newArgs)),
+ ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces));
+
+ return NNodes::TExprBase(ctx.ChangeChildren(node.Ref(), std::move(children)));
+}
+
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_peephole.h b/ydb/library/yql/dq/opt/dq_opt_peephole.h
index 565ee4296d..f403c8d39d 100644
--- a/ydb/library/yql/dq/opt/dq_opt_peephole.h
+++ b/ydb/library/yql/dq/opt/dq_opt_peephole.h
@@ -12,5 +12,6 @@ NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExpr
NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx);
+NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx);
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index 063fdba022..e93e5ba7d1 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -254,6 +254,9 @@ TMaybeNode<TDqStage> DqPushLambdaToStage(const TDqStage& stage, const TCoAtom& o
{
TNodeOnNodeOwnedMap precomputesInsideLambda;
VisitExpr(newProgram, [&precomputesInsideLambda](const TExprNode::TPtr& node) {
+ if (node->IsCallable() && node->Content().StartsWith("DqRead")) {
+ return false;
+ }
if (TDqPhyPrecompute::Match(node.Get())) {
precomputesInsideLambda[node.Get()] = node;
return false;
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index f9ba89a33d..4a5af653e0 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -45,6 +45,7 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, _FallbackOnRuntimeErrors);
REGISTER_SETTING(*this, WorkerFilter);
+ REGISTER_SETTING(*this, _EnablePrecompute);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 3c37e4d48f..e357bd0f8e 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -65,6 +65,7 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> EnableFullResultWrite;
NCommon::TConfSetting<bool, false> _OneGraphPerQuery;
NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors;
+ NCommon::TConfSetting<bool, false> _EnablePrecompute;
NCommon::TConfSetting<TString, false> WorkerFilter;
diff --git a/ydb/library/yql/providers/dq/opt/CMakeLists.txt b/ydb/library/yql/providers/dq/opt/CMakeLists.txt
index 2d11078641..36378b4938 100644
--- a/ydb/library/yql/providers/dq/opt/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/opt/CMakeLists.txt
@@ -14,11 +14,19 @@ target_compile_options(providers-dq-opt PRIVATE
target_link_libraries(providers-dq-opt PUBLIC
contrib-libs-cxxsupp
yutil
+ providers-dq-expr_nodes
+ providers-common-mkql
+ providers-common-codec
+ providers-common-transform
yql-utils-log
yql-dq-opt
yql-dq-type_ann
- providers-common-transform
- providers-dq-expr_nodes
+ library-yql-core
+ yql-core-peephole_opt
+ yql-core-type_ann
+ library-yql-minikql
+ yql-minikql-computation
+ cpp-yson-node
)
target_sources(providers-dq-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
index 455adef20b..edfcc4232c 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp
@@ -1,11 +1,14 @@
#include "dqs_opt.h"
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
+#include <ydb/library/yql/providers/common/codec/yql_codec.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.h>
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
@@ -16,6 +19,15 @@
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_mem_info.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
#include <util/string/split.h>
#define PERFORM_RULE(func, ...) \
@@ -112,11 +124,81 @@ namespace NYql::NDqs {
PERFORM_RULE(DqPeepholeRewriteMapJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx);
+ PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx);
return inputExpr;
}, ctx, optSettings);
});
}
+ THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) {
+ return CreateFunctorTransformer([typesCtx, funcRegistry](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ TProcessedNodesSet ignoreNodes;
+ VisitExpr(input, [&](const TExprNode::TPtr& node) {
+ if (node != input && (TDqReadWrapBase::Match(node.Get()) || TDqPhyPrecompute::Match(node.Get()))) {
+ ignoreNodes.insert(node->UniqueId());
+ return false;
+ }
+ return true;
+ });
+
+ TOptimizeExprSettings settings(typesCtx);
+ settings.ProcessedNodes = &ignoreNodes;
+
+ NKikimr::NMiniKQL::TScopedAlloc alloc;
+ NKikimr::NMiniKQL::TTypeEnvironment env(alloc);
+ NKikimr::NMiniKQL::TProgramBuilder pgmBuilder(env, *funcRegistry);
+ NKikimr::NMiniKQL::TMemoryUsageInfo memInfo("Precompute");
+ NKikimr::NMiniKQL::THolderFactory holderFactory(alloc.Ref(), memInfo);
+
+ return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ if (TDqStageBase::Match(node.Get())) {
+ auto stage = TDqStageBase(node);
+ TNodeOnNodeOwnedMap replaces;
+ std::vector<TCoArgument> newArgs;
+ std::vector<TExprNode::TPtr> newInputs;
+ for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
+ const auto& input = stage.Inputs().Item(i);
+ if (input.Maybe<TDqPhyPrecompute>() && input.Ref().HasResult()) {
+ auto yson = input.Ref().GetResult().Content();
+ auto dataNode = NYT::NodeFromYsonString(yson);
+ YQL_ENSURE(dataNode.IsList() && !dataNode.AsList().empty());
+ dataNode = dataNode[0];
+ TStringStream err;
+ NKikimr::NMiniKQL::TType* mkqlType = NCommon::BuildType(*input.Ref().GetTypeAnn(), pgmBuilder, err);
+ if (!mkqlType) {
+ ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Failed to process " << TDqPhyPrecompute::CallableName() << " type: " << err.Str()));
+ return nullptr;
+ }
+
+ auto value = NCommon::ParseYsonNodeInResultFormat(holderFactory, dataNode, mkqlType, &err);
+ if (!value) {
+ ctx.AddError(TIssue(ctx.GetPosition(input.Pos()), TStringBuilder() << "Failed to parse " << TDqPhyPrecompute::CallableName() << " value: " << err.Str()));
+ return nullptr;
+ }
+
+ replaces[stage.Program().Args().Arg(i).Raw()] = NCommon::ValueToExprLiteral(input.Ref().GetTypeAnn(), *value, ctx, input.Pos());
+ } else {
+ newArgs.push_back(stage.Program().Args().Arg(i));
+ newInputs.push_back(input.Ptr());
+ }
+ }
+
+ if (!replaces.empty()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "DqsReplacePrecomputes";
+ auto children = stage.Ref().ChildrenList();
+ children[TDqStageBase::idx_Program] = Build<TCoLambda>(ctx, stage.Program().Pos())
+ .Args(newArgs)
+ .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces))
+ .Done().Ptr();
+ children[TDqStageBase::idx_Inputs] = ctx.NewList(stage.Inputs().Pos(), std::move(newInputs));
+ return ctx.ChangeChildren(stage.Ref(), std::move(children));
+ }
+ }
+ return node;
+ }, ctx, settings);
+ });
+ }
+
namespace NPeephole {
class TDqsPeepholeTransformer: public TSyncTransformerBase {
diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.h b/ydb/library/yql/providers/dq/opt/dqs_opt.h
index 6c9e1a7c18..25da467f41 100644
--- a/ydb/library/yql/providers/dq/opt/dqs_opt.h
+++ b/ydb/library/yql/providers/dq/opt/dqs_opt.h
@@ -1,8 +1,14 @@
#pragma once
#include <ydb/library/yql/core/yql_graph_transformer.h>
-#include <ydb/library/yql/core/yql_type_annotation.h>
-#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+
+namespace NKikimr::NMiniKQL {
+ class IFunctionRegistry;
+}
+
+namespace NYql {
+ struct TTypeAnnotationContext;
+}
namespace NYql::NDqs {
class TDatabaseManager;
@@ -11,6 +17,7 @@ namespace NYql::NDqs {
THolder<IGraphTransformer> CreateDqsFinalizingOptTransformer();
THolder<IGraphTransformer> CreateDqsBuildTransformer();
THolder<IGraphTransformer> CreateDqsRewritePhyCallablesTransformer();
+ THolder<IGraphTransformer> CreateDqsReplacePrecomputesTransformer(TTypeAnnotationContext* typesCtx, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry);
THolder<IGraphTransformer> CreateDqsPeepholeTransformer(THolder<IGraphTransformer>&& typeAnnTransformer, TTypeAnnotationContext& typesCtx);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
index 657d7453e4..2464b27b87 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp
@@ -17,8 +17,9 @@ using namespace NYql::NNodes;
class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
- TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx)
+ TDqsPhysicalOptProposalTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config)
: TOptimizeTransformerBase(typeCtx, NLog::EComponent::ProviderDq, {})
+ , Config(config)
{
#define HNDL(name) "DqsPhy-"#name, Hndl(&TDqsPhysicalOptProposalTransformer::name)
AddHandler(0, &TDqSourceWrap::Match, HNDL(BuildStageWithSourceWrap));
@@ -41,10 +42,13 @@ public:
AddHandler(0, &TCoAssumeSorted::Match, HNDL(BuildSortStage<false>));
AddHandler(0, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<false>));
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
-#if 0
- AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
- AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
-#endif
+ if (Config->_EnablePrecompute.Get().GetOrElse(false)) {
+ AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems));
+ AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<false>));
+ AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<false>));
+ AddHandler(0, &TDqPrecompute::Match, HNDL(BuildPrecompute));
+ AddHandler(0, &TDqStage::Match, HNDL(PrecomputeToInput));
+ }
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
@@ -58,6 +62,10 @@ public:
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
+ if (Config->_EnablePrecompute.Get().GetOrElse(false)) {
+ AddHandler(0, &TCoToOptional::Match, HNDL(BuildScalarPrecompute<true>));
+ AddHandler(0, &TCoHead::Match, HNDL(BuildScalarPrecompute<true>));
+ }
#undef HNDL
SetGlobal(1u);
@@ -296,10 +304,62 @@ protected:
TMaybeNode<TExprBase> BuildScalarPrecompute(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
return DqBuildScalarPrecompute(node, ctx, optCtx, *getParents(), IsGlobal);
}
+
+ TMaybeNode<TExprBase> BuildPrecompute(TExprBase node, TExprContext& ctx) {
+ return DqBuildPrecompute(node, ctx);
+ }
+
+ TMaybeNode<TExprBase> PrecomputeToInput(TExprBase node, TExprContext& ctx) {
+ auto stage = node.Cast<TDqStage>();
+
+ TExprNode::TListType innerPrecomputes = FindNodes(stage.Program().Ptr(),
+ [](const TExprNode::TPtr& node) {
+ return !TDqReadWrapBase::Match(node.Get()) && !TDqPhyPrecompute::Match(node.Get());
+ },
+ [](const TExprNode::TPtr& node) {
+ return TDqPhyPrecompute::Match(node.Get());
+ }
+ );
+
+ if (innerPrecomputes.empty()) {
+ return node;
+ }
+
+ TVector<TExprNode::TPtr> newInputs;
+ TVector<TExprNode::TPtr> newArgs;
+ TNodeOnNodeOwnedMap replaces;
+
+ for (ui64 i = 0; i < stage.Inputs().Size(); ++i) {
+ newInputs.push_back(stage.Inputs().Item(i).Ptr());
+ auto arg = stage.Program().Args().Arg(i).Raw();
+ newArgs.push_back(ctx.NewArgument(arg->Pos(), arg->Content()));
+ replaces[arg] = newArgs.back();
+ }
+
+ for (auto& precompute: innerPrecomputes) {
+ newInputs.push_back(precompute);
+ newArgs.push_back(ctx.NewArgument(precompute->Pos(), TStringBuilder() << "_dq_precompute_" << newArgs.size()));
+ replaces[precompute.Get()] = newArgs.back();
+ }
+
+ return Build<TDqStage>(ctx, stage.Pos())
+ .Inputs()
+ .Add(newInputs)
+ .Build()
+ .Program()
+ .Args(newArgs)
+ .Body(ctx.ReplaceNodes(stage.Program().Body().Ptr(), replaces))
+ .Build()
+ .Settings().Build()
+ .Done();
+ }
+
+private:
+ TDqConfiguration::TPtr Config;
};
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx) {
- return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx));
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config) {
+ return THolder(new TDqsPhysicalOptProposalTransformer(typeCtx, config));
}
} // NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.h b/ydb/library/yql/providers/dq/opt/physical_optimize.h
index 7ead367cbc..5ed9da9880 100644
--- a/ydb/library/yql/providers/dq/opt/physical_optimize.h
+++ b/ydb/library/yql/providers/dq/opt/physical_optimize.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
@@ -7,6 +8,6 @@
namespace NYql::NDqs {
-THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx);
+THolder<IGraphTransformer> CreateDqsPhyOptTransformer(TTypeAnnotationContext* typeCtx, const TDqConfiguration::TPtr& config);
} // namespace NYql::NDqs
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 03e69a9fef..5cf9450d05 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -135,6 +135,7 @@ namespace NYql::NDqs {
TExprBase expr(DqExprRoot);
auto result = expr.Maybe<TDqCnResult>();
auto query = expr.Maybe<TDqQuery>();
+ auto value = expr.Maybe<TDqPhyPrecompute>();
const auto maxTasksPerOperation = settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation);
YQL_CLOG(DEBUG, ProviderDq) << "Execution Plan " << NCommon::ExprToPrettyString(ExprContext, *DqExprRoot);
@@ -223,8 +224,15 @@ namespace NYql::NDqs {
YQL_ENSURE(!stageInfo.Tasks.empty());
}
+ TMaybeNode<TDqPhyStage> finalStage;
if (result) {
- auto& resultStageInfo = TasksGraph.GetStageInfo(result.Cast().Output().Stage().Cast<TDqPhyStage>());
+ finalStage = result.Cast().Output().Stage().Maybe<TDqPhyStage>();
+ } else if (value) {
+ finalStage = value.Cast().Connection().Output().Stage().Maybe<TDqPhyStage>();
+ }
+
+ if (finalStage) {
+ auto& resultStageInfo = TasksGraph.GetStageInfo(finalStage.Cast());
YQL_ENSURE(resultStageInfo.Tasks.size() == 1);
auto& resultTask = TasksGraph.GetTask(resultStageInfo.Tasks[0]);
YQL_ENSURE(resultTask.Outputs.size() == 1);
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 1809bdc08a..a703de40cc 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -52,11 +52,14 @@
#include <util/stream/file.h>
#include <util/string/builder.h>
+#include <memory>
+
namespace NYql {
using namespace NCommon;
using namespace NKikimr::NMiniKQL;
using namespace NNodes;
+using namespace NThreading;
namespace {
@@ -201,16 +204,40 @@ private:
TDqStatePtr State;
};
+struct TUploadCache {
+ THashMap<TString, TFileLinkPtr> FileLinks;
+ THashMap<TString, TString> ModulesMapping;
+
+ using TPtr = std::shared_ptr<TUploadCache>;
+};
+
+struct TPublicIds {
+ THashMap<ui32, ui32> AllPublicIds;
+ THashMap<ui64, ui32> Stage2publicId;
+ size_t GraphsCount = 0;
+
+ using TPtr = std::shared_ptr<TPublicIds>;
+};
+
struct TDqsPipelineConfigurator : public IPipelineConfigurator {
+public:
+ TDqsPipelineConfigurator(const TDqStatePtr& state)
+ : State_(state)
+ {
+ }
private:
void AfterCreate(TTransformationPipeline*) const final {}
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
- pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "Build-Phy");
- pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(), "Rewrite-Phy-Callables");
+ pipeline->Add(NDq::CreateDqBuildPhyStagesTransformer(false), "BuildPhy");
+ pipeline->Add(NDqs::CreateDqsRewritePhyCallablesTransformer(), "RewritePhyCallables");
+ pipeline->Add(NDqs::CreateDqsReplacePrecomputesTransformer(pipeline->GetTypeAnnotationContext().Get(), State_->FunctionRegistry), "ReplacePrecomputes");
}
void AfterOptimize(TTransformationPipeline*) const final {}
+
+private:
+ TDqStatePtr State_;
};
class TDqExecTransformer: public TExecTransformerBase, TCounters
@@ -218,8 +245,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
public:
TDqExecTransformer(const TDqStatePtr& state)
: State(state)
- , DqTypeAnnotationTransformer(
- CreateTypeAnnotationTransformer(NDq::CreateDqTypeAnnotationTransformer(*State->TypeCtx), *State->TypeCtx))
+ , ExecState(MakeIntrusive<TExecState>())
{
AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TDqExecTransformer::HandleResult));
AddHandler({TStringBuf("Pull")}, RequireNone(), Hndl(&TDqExecTransformer::HandlePull));
@@ -228,6 +254,22 @@ public:
}
private:
+ struct TExecState : public TThrRefBase {
+ TAdaptiveLock Lock;
+
+ struct TItem : public TIntrusiveListItem<TItem> {
+ TExprNode* Node = nullptr;
+ TAsyncTransformCallback Callback;
+ };
+
+ using TQueueType = TIntrusiveListWithAutoDelete<TExecState::TItem, TDelete>;
+ TQueueType Completed;
+ NThreading::TPromise<void> Promise = NThreading::NewPromise();
+ bool HasResult = false;
+ };
+
+ using TExecStatePtr = TIntrusivePtr<TExecState>;
+
void GetResultType(TString* type, TVector<TString>* columns, const TExprNode& resOrPull, const TExprNode& resOrPullInput) const
{
*columns = NCommon::GetResOrPullColumnHints(resOrPull);
@@ -655,17 +697,17 @@ private:
bool enableLocalRun = true;
- THashMap<ui32, ui32> allPublicIds;
+ TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>();
VisitExpr(result.Ptr(), [&](const TExprNode::TPtr& node) {
const TExprBase expr(node);
if (expr.Maybe<TResFill>()) {
if (auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
- allPublicIds.emplace(*publicId, 0U);
+ publicIds->AllPublicIds.emplace(*publicId, 0U);
}
}
return true;
});
- IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
+ IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);
bool enableFullResultWrite = settings->EnableFullResultWrite.Get().GetOrElse(false);
if (enableFullResultWrite) {
const auto type = result.Input().Ref().GetTypeAnn();
@@ -859,14 +901,51 @@ private:
}
}
- TStatusCallbackPair FallbackWithMessage(const TExprNode& node, const TString& message, TExprContext& ctx) {
+ IGraphTransformer::TStatus FallbackWithMessage(const TExprNode& node, const TString& message, TExprContext& ctx, bool isRoot) {
if (State->Metrics) {
State->Metrics->IncCounter("dq", "Fallback");
}
State->Statistics[State->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
auto issue = TIssue(ctx.GetPosition(node.Pos()), message).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING);
- ctx.AssociativeIssues.emplace(&node, TIssues{std::move(issue)});
- return SyncStatus(IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error));
+ if (isRoot) {
+ ctx.AssociativeIssues.emplace(&node, TIssues{std::move(issue)});
+ } else {
+ ctx.IssueManager.RaiseIssue(issue);
+ }
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ TPublicIds::TPtr GetPublicIds(const TExprNode::TPtr& root) const {
+ TPublicIds::TPtr publicIds = std::make_shared<TPublicIds>();
+ VisitExpr(root, [&](const TExprNode::TPtr& node) {
+ if (TResTransientBase::Match(node.Get())) {
+ return false;
+ } else if (TDqReadWrapBase::Match(node.Get())) {
+ return false;
+ } else if (TDqConnection::Match(node.Get())) {
+ if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
+ publicIds->AllPublicIds.emplace(*publicId, 0U);
+ }
+ } else if (TDqPhyPrecompute::Match(node.Get())) {
+ if (node->HasResult()) {
+ return false;
+ }
+ } else if (TDqStage::Match(node.Get())) {
+ const auto& stage = TDqStage(node);
+ if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) {
+ if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
+ if (const auto settings = NDq::TDqStageSettings::Parse(stage); settings.LogicalId) {
+ publicIds->Stage2publicId[settings.LogicalId] = *publicId;
+ }
+ publicIds->AllPublicIds.emplace(*publicId, 0U);
+ }
+ }
+ } else if (TDqCnResult::Match(node.Get()) || TDqQuery::Match(node.Get())) {
+ ++publicIds->GraphsCount;
+ }
+ return true;
+ });
+ return publicIds;
}
TStatusCallbackPair HandlePull(const TExprNode::TPtr& input, TExprContext& ctx) {
@@ -879,45 +958,29 @@ private:
YQL_ENSURE(!TMaybeNode<TDqQuery>(pull.Input().Ptr()) || State->Settings->EnableComputeActor.Get().GetOrElse(false),
"DqQuery is not supported with worker actor");
- TString type;
- TVector<TString> columns;
- GetResultType(&type, &columns, pull.Ref(), pull.Input().Ref());
-
const bool oneGraphPerQuery = State->Settings->_OneGraphPerQuery.Get().GetOrElse(false);
- size_t graphsCount = 0;
- THashMap<ui32, ui32> allPublicIds;
- THashMap<ui64, ui32> stage2publicId;
- bool hasStageError = false;
- VisitExpr(pull.Ptr(), [&](const TExprNode::TPtr& node) {
- if (TResTransientBase::Match(node.Get()))
- return false;
- if (const TExprBase expr(node); expr.Maybe<TDqConnection>()) {
- if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
- allPublicIds.emplace(*publicId, 0U);
- }
- } else if (const auto& maybeStage = expr.Maybe<TDqStage>()) {
- const auto& stage = maybeStage.Cast();
- if (!(stage.Ref().StartsExecution() || stage.Ref().HasResult())) {
- if (const auto publicId = State->TypeCtx->TranslateOperationId(node->UniqueId())) {
- if (const auto settings = NDq::TDqStageSettings::Parse(maybeStage.Cast()); settings.LogicalId) {
- stage2publicId[settings.LogicalId] = *publicId;
- }
- allPublicIds.emplace(*publicId, 0U);
- }
- }
- } else if (oneGraphPerQuery) {
- if (expr.Maybe<TDqCnResult>() || expr.Maybe<TDqQuery>()) {
- ++graphsCount;
+ auto publicIds = GetPublicIds(pull.Ptr());
+ YQL_ENSURE(!oneGraphPerQuery || publicIds->GraphsCount == 1, "Internal error: only one graph per query is allowed");
+
+ auto precomputes = FindIndependentPrecomputes(pull.Input().Ptr());
+ if (!precomputes.empty()) {
+ auto status = HandlePrecomputes(precomputes, ctx);
+ if (status.Level != TStatus::Ok) {
+ if (status == TStatus::Async) {
+ return std::make_pair(status, ExecState->Promise.GetFuture().Apply([execState = ExecState](const TFuture<void>& completedFuture) {
+ completedFuture.GetValue();
+ return HandlePrecomputeAsyncComplete(execState);
+ }));
+ } else {
+ return SyncStatus(status);
}
}
- return true;
- });
- YQL_ENSURE(!oneGraphPerQuery || graphsCount == 1, "Internal error: only one graph per query is allowed");
-
- if (hasStageError) {
- return SyncError();
}
+ TString type;
+ TVector<TString> columns;
+ GetResultType(&type, &columns, pull.Ref(), pull.Input().Ref());
+
auto optimizedInput = pull.Input().Ptr();
THashMap<TString, TString> secureParams;
NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams);
@@ -926,17 +989,10 @@ private:
optimizedInput->SetTypeAnn(pull.Input().Ref().GetTypeAnn());
optimizedInput->CopyConstraints(pull.Input().Ref());
- TDqsPipelineConfigurator peepholeConfig;
- TPeepholeSettings peepholeSettings;
- peepholeSettings.CommonConfig = &peepholeConfig;
- bool hasNonDeterministicFunctions;
- // TODO: do it per stage
- auto status = PeepHoleOptimizeNode<true>(optimizedInput, optimizedInput, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings);
- if (status != TStatus::Ok) {
- ctx.AddError(TIssue(ctx.GetPosition(optimizedInput->Pos()), TString("Peephole optimization failed for Dq stage")));
+ auto status = PeepHole(optimizedInput, optimizedInput, ctx);
+ if (status.Level != TStatus::Ok) {
return SyncStatus(status);
}
- YQL_CLOG(TRACE, ProviderDq) << "After PeepHole\n" << NCommon::ExprToPrettyString(ctx, *optimizedInput);
// copy-paste {
TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches;
@@ -958,7 +1014,7 @@ private:
// exprRoot must be DqCnResult or DqQuery
- executionPlanner->SetPublicIds(stage2publicId);
+ executionPlanner->SetPublicIds(publicIds->Stage2publicId);
auto settings = std::make_shared<TDqSettings>(*State->Settings);
auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
@@ -974,12 +1030,12 @@ private:
bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq);
if (stagesCount > maxTasksPerOperation && canFallback) {
- return FallbackWithMessage(
+ return SyncStatus(FallbackWithMessage(
pull.Ref(),
TStringBuilder()
<< "Too many stages: "
<< stagesCount << " > "
- << maxTasksPerOperation, ctx);
+ << maxTasksPerOperation, ctx, true));
}
YQL_ENSURE(stagesCount <= maxTasksPerOperation);
@@ -992,28 +1048,28 @@ private:
}
} catch (const TFallbackError& err) {
YQL_ENSURE(canFallback, "Unexpected TFallbackError: " << err.what());
- return FallbackWithMessage(pull.Ref(), err.what(), ctx);
+ return SyncStatus(FallbackWithMessage(pull.Ref(), err.what(), ctx, true));
}
bool fallbackFlag = false;
if (executionPlanner->MaxDataSizePerJob() > maxDataSizePerJob && canFallback) {
- return FallbackWithMessage(
+ return SyncStatus(FallbackWithMessage(
pull.Ref(),
TStringBuilder()
<< "MaxDataSizePerJob reached: "
<< executionPlanner->MaxDataSizePerJob() << " > "
- << maxDataSizePerJob, ctx);
+ << maxDataSizePerJob, ctx, true));
}
bool localRun = false;
auto& tasks = executionPlanner->GetTasks();
if (tasks.size() > maxTasksPerOperation && canFallback) {
- return FallbackWithMessage(
+ return SyncStatus(FallbackWithMessage(
pull.Ref(),
TStringBuilder()
<< "Too many tasks: "
<< tasks.size() << " > "
- << maxTasksPerOperation, ctx);
+ << maxTasksPerOperation, ctx, true));
}
YQL_ENSURE(tasks.size() <= maxTasksPerOperation);
@@ -1033,15 +1089,15 @@ private:
*taskMeta.AddFiles() = file;
}
t.MutableMeta()->PackFrom(taskMeta);
- if (const auto it = allPublicIds.find(taskMeta.GetStageId()); allPublicIds.cend() != it)
+ if (const auto it = publicIds->AllPublicIds.find(taskMeta.GetStageId()); publicIds->AllPublicIds.cend() != it)
++it->second;
}
}
- MarkProgressStarted(allPublicIds, State->ProgressWriter);
+ MarkProgressStarted(publicIds->AllPublicIds, State->ProgressWriter);
if (fallbackFlag) {
- return FallbackWithMessage(pull.Ref(), "Too big attachment", ctx);
+ return SyncStatus(FallbackWithMessage(pull.Ref(), "Too big attachment", ctx, true));
}
IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(pull.Ref());
@@ -1068,23 +1124,23 @@ private:
if (ref) {
if (!enableFullResultWrite) {
- return FallbackWithMessage(pull.Ref(),
+ return SyncStatus(FallbackWithMessage(pull.Ref(),
TStringBuilder() << "RefSelect mode cannot be used with DQ, because \"" << State->TypeCtx->FullResultDataSink << "\" provider has failed to prepare a result table",
- ctx);
+ ctx, true));
}
// Force write to table
settings->_AllResultsBytesLimit = 0;
settings->_RowsLimitPerWrite = 0;
}
- IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(allPublicIds);
+ IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);
auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
- future.Subscribe([allPublicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
+ future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_ENSURE(!completedFuture.HasException());
- MarkProgressFinished(allPublicIds, completedFuture.GetValueSync().Success(), progressWriter);
+ MarkProgressFinished(publicIds->AllPublicIds, completedFuture.GetValueSync().Success(), progressWriter);
});
executionPlanner.Destroy();
@@ -1206,9 +1262,9 @@ private:
}, "");
}
- IDqGateway::TDqProgressWriter MakeDqProgressWriter(const THashMap<ui32, ui32>& allPublicIds) const {
- IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, allPublicIds](const TString& stage) {
- for (const auto& publicId : allPublicIds) {
+ IDqGateway::TDqProgressWriter MakeDqProgressWriter(const TPublicIds::TPtr& publicIds) const {
+ IDqGateway::TDqProgressWriter dqProgressWriter = [progressWriter = State->ProgressWriter, publicIds](const TString& stage) {
+ for (const auto& publicId : publicIds->AllPublicIds) {
auto p = TOperationProgress(TString(DqProviderName), publicId.first, TOperationProgress::EState::InProgress, stage);
if (publicId.second) {
p.Counters.ConstructInPlace();
@@ -1289,8 +1345,340 @@ private:
return nullptr;
}
+ TNodeOnNodeOwnedMap FindIndependentPrecomputes(const TExprNode::TPtr& node) const {
+ TNodeOnNodeOwnedMap precomputes;
+ TNodeMap<bool> visitedNodes;
+ FindIndependentPrecomputesImpl(node, precomputes, visitedNodes);
+ return precomputes;
+ }
+
+ bool FindIndependentPrecomputesImpl(const TExprNode::TPtr& node, TNodeOnNodeOwnedMap& precomputes, TNodeMap<bool>& visitedNodes) const {
+ if (auto it = visitedNodes.find(node.Get()); it != visitedNodes.cend()) {
+ return it->second;
+ }
+
+ bool& hasPrecompute = visitedNodes[node.Get()];
+ if (TDqStageBase::Match(node.Get())) {
+ auto stage = TDqStageBase(node);
+ for (const auto& input : stage.Inputs()) {
+ if (auto maybePrecompute = input.Maybe<TDqPhyPrecompute>()) {
+ if (!input.Ref().HasResult() && input.Ref().GetState() != TExprNode::EState::Error) {
+ hasPrecompute = true;
+ if (input.Ref().StartsExecution() || !FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes)) {
+ precomputes[input.Raw()] = input.Ptr();
+ }
+ }
+ } else {
+ hasPrecompute = FindIndependentPrecomputesImpl(input.Ptr(), precomputes, visitedNodes) || hasPrecompute;
+ }
+ }
+ } else {
+ for (auto child: node->Children()) {
+ hasPrecompute = FindIndependentPrecomputesImpl(child, precomputes, visitedNodes) || hasPrecompute;
+ }
+ }
+ return hasPrecompute;
+ }
+
+ static void CompleteNode(const TExecStatePtr& execState, TExprNode* node, const TAsyncTransformCallback& callback) {
+ auto item = MakeHolder<TExecState::TItem>();
+ item->Node = node;
+ item->Callback = callback;
+
+ NThreading::TPromise<void> promiseToSet;
+ with_lock(execState->Lock) {
+ execState->Completed.PushBack(item.Release());
+ if (!execState->HasResult) {
+ execState->HasResult = true;
+ promiseToSet = execState->Promise;
+ }
+ }
+
+ if (promiseToSet.Initialized()) {
+ promiseToSet.SetValue();
+ }
+ }
+
+ static TAsyncTransformCallback HandlePrecomputeAsyncComplete(TExecStatePtr execState) {
+ return TAsyncTransformCallback([execState](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ output = input;
+ TStatus combinedStatus = TStatus::Repeat;
+ TExecState::TQueueType completed;
+ auto newPromise = NThreading::NewPromise();
+ {
+ TGuard<TAdaptiveLock> guard(execState->Lock);
+ completed.Swap(execState->Completed);
+ execState->Promise.Swap(newPromise);
+ execState->HasResult = false;
+ }
+
+ for (auto& item : completed) {
+ TExprNode::TPtr callableOutput;
+ auto status = item.Callback(item.Node, callableOutput, ctx);
+ if (status.Level != TStatus::Error) {
+ YQL_ENSURE(callableOutput == item.Node, "Unsupported node rewrite");
+ }
+ combinedStatus = combinedStatus.Combine(status);
+ }
+
+ return combinedStatus;
+ });
+ }
+
+ IGraphTransformer::TStatus HandlePrecomputes(const TNodeOnNodeOwnedMap& precomputes, TExprContext& ctx) {
+
+ IDataProvider::TFillSettings fillSettings;
+ fillSettings.AllResultsBytesLimit.Clear();
+ fillSettings.RowsLimitPerWrite.Clear();
+ auto commonSettings = State->Settings->WithFillSettings(fillSettings);
+ commonSettings->EnableFullResultWrite = false;
+
+ IGraphTransformer::TStatus combinedStatus = TStatus::Ok;
+
+ for (auto [_, input]: precomputes) {
+ TString uniqId = TStringBuilder() << input->Content() << "(#" << input->UniqueId() << ')';
+ YQL_LOG_CTX_SCOPE(uniqId);
+ if (input->StartsExecution()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Continue async execution";
+ combinedStatus = combinedStatus.Combine(TStatus::Async);
+ continue;
+ }
+
+ auto logCtx = NLog::CurrentLogContextPath();
+ TInstant startTime = TInstant::Now();
+ YQL_CLOG(DEBUG, ProviderDq) << "Executing " << input->Content();
+
+ auto publicIds = GetPublicIds(input);
+
+ auto optimizedInput = input;
+
+ auto status = PeepHole(input, optimizedInput, ctx);
+ if (status.Level != TStatus::Ok) {
+ return combinedStatus.Combine(status);
+ }
+
+ input->SetState(TExprNode::EState::ExecutionInProgress);
+
+ THashMap<TString, TString> secureParams;
+ NCommon::FillSecureParams(optimizedInput, *State->TypeCtx, secureParams);
+
+ // copy-paste {
+ TUserDataTable crutches = State->TypeCtx->UserDataStorageCrutches;
+ TUserDataTable files;
+ StartCounter("FreezeUsedFiles");
+
+ auto filesRes = NCommon::FreezeUsedFiles(*optimizedInput, files, *State->TypeCtx, ctx, [](const TString&){ return true; }, crutches);
+ if (filesRes.first.Level != TStatus::Ok) {
+ combinedStatus = combinedStatus.Combine(status);
+ if (filesRes.first.Level == TStatus::Error) {
+ return filesRes.first;
+ }
+ YQL_CLOG(DEBUG, ProviderDq) << "Freezing files for " << input->Content();
+ if (filesRes.first.Level == TStatus::Async) {
+ filesRes.second.Subscribe([execState = ExecState, node = input.Get(), logCtx](const TAsyncTransformCallbackFuture& future) {
+ YQL_LOG_CTX_ROOT_SCOPE(logCtx);
+ YQL_ENSURE(!future.HasException());
+ YQL_CLOG(DEBUG, ProviderDq) << "Finishing freezing files";
+ CompleteNode(execState, node, future.GetValue());
+ });
+ }
+ continue;
+ }
+ FlushCounter("FreezeUsedFiles");
+ // copy-paste }
+
+ auto executionPlanner = MakeHolder<TDqsExecutionPlanner>(
+ State->TypeCtx, ctx, State->FunctionRegistry,
+ optimizedInput);
+
+ // exprRoot must be DqCnResult or DqQuery
+
+ executionPlanner->SetPublicIds(publicIds->Stage2publicId);
+
+ auto settings = std::make_shared<TDqSettings>(*commonSettings);
+
+ auto tasksPerStage = settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage);
+ const auto maxTasksPerOperation = State->Settings->MaxTasksPerOperation.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerOperation);
+
+ auto maxDataSizePerJob = settings->MaxDataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::MaxDataSizePerJob);
+ auto stagesCount = executionPlanner->StagesCount();
+
+ if (!executionPlanner->CanFallback()) {
+ settings->FallbackPolicy = State->TypeCtx->DqFallbackPolicy = "never";
+ }
+
+ bool canFallback = (settings->FallbackPolicy.Get().GetOrElse("default") != "never" && !State->TypeCtx->ForceDq);
+
+ if (stagesCount > maxTasksPerOperation && canFallback) {
+ return FallbackWithMessage(
+ *input,
+ TStringBuilder()
+ << "Too many stages: "
+ << stagesCount << " > "
+ << maxTasksPerOperation, ctx, false);
+ }
+
+ YQL_ENSURE(stagesCount <= maxTasksPerOperation);
+
+ try {
+ while (executionPlanner->PlanExecution(settings, canFallback) > maxTasksPerOperation && tasksPerStage > 1) {
+ tasksPerStage /= 2;
+ settings->MaxTasksPerStage = tasksPerStage;
+ executionPlanner->Clear();
+ }
+ } catch (const TFallbackError& err) {
+ YQL_ENSURE(canFallback, "Unexpected TFallbackError: " << err.what());
+ return FallbackWithMessage(*input, err.what(), ctx, false);
+ }
+
+ bool fallbackFlag = false;
+ if (executionPlanner->MaxDataSizePerJob() > maxDataSizePerJob && canFallback) {
+ return FallbackWithMessage(
+ *input,
+ TStringBuilder()
+ << "MaxDataSizePerJob reached: "
+ << executionPlanner->MaxDataSizePerJob() << " > "
+ << maxDataSizePerJob, ctx, false);
+ }
+
+ auto& tasks = executionPlanner->GetTasks();
+ if (tasks.size() > maxTasksPerOperation && canFallback) {
+ return FallbackWithMessage(
+ *input,
+ TStringBuilder()
+ << "Too many tasks: "
+ << tasks.size() << " > "
+ << maxTasksPerOperation, ctx, false);
+ }
+
+ YQL_ENSURE(tasks.size() <= maxTasksPerOperation);
+
+ {
+ TScopedAlloc alloc(NKikimr::TAlignedPagePoolCounters(), State->FunctionRegistry->SupportsSizedAllocators());
+ TTypeEnvironment typeEnv(alloc);
+ for (auto& t : tasks) {
+ TUploadList uploadList;
+ TString lambda = t.GetProgram().GetRaw();
+ fallbackFlag |= BuildUploadList(&uploadList, false, &lambda, typeEnv, files);
+ t.MutableProgram()->SetRaw(lambda);
+
+ Yql::DqsProto::TTaskMeta taskMeta;
+ t.MutableMeta()->UnpackTo(&taskMeta);
+ for (const auto& file : uploadList) {
+ *taskMeta.AddFiles() = file;
+ }
+ t.MutableMeta()->PackFrom(taskMeta);
+ if (const auto it = publicIds->AllPublicIds.find(taskMeta.GetStageId()); publicIds->AllPublicIds.cend() != it)
+ ++it->second;
+ }
+ }
+
+ if (fallbackFlag) {
+ return FallbackWithMessage(*input, "Too big attachment", ctx, false);
+ }
+
+ auto graphParams = GatherGraphParams(optimizedInput);
+
+ MarkProgressStarted(publicIds->AllPublicIds, State->ProgressWriter);
+
+ IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);
+
+ auto future = State->DqGateway->ExecutePlan(State->SessionId, *executionPlanner.Get(), {}, secureParams, graphParams,
+ settings, progressWriter, ModulesMapping, false);
+
+ executionPlanner.Destroy();
+
+ bool neverFallback = settings->FallbackPolicy.Get().GetOrElse("default") == "never";
+ future.Subscribe([publicIds, state = State, startTime, execState = ExecState, node = input.Get(), neverFallback, logCtx](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
+ YQL_LOG_CTX_ROOT_SCOPE(logCtx);
+ YQL_ENSURE(!completedFuture.HasException());
+ const IDqGateway::TResult& res = completedFuture.GetValueSync();
+
+ MarkProgressFinished(publicIds->AllPublicIds, res.Success(), state->ProgressWriter);
+
+ auto duration = TInstant::Now() - startTime;
+ if (state->Metrics) {
+ state->Metrics->SetCounter("dq", "PrecomputeExecutionTime", duration.MilliSeconds());
+ }
+
+ state->Statistics[state->MetricId++] = res.Statistics;
+
+ if (res.Fallback || !res.Success()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Finished with error";
+ if (res.Fallback) {
+ if (state->Metrics) {
+ state->Metrics->IncCounter("dq", "Fallback");
+ }
+ state->Statistics[state->MetricId++].Entries.push_back(TOperationStatistics::TEntry("Fallback", 0, 0, 0, 0, 1));
+ }
+
+ CompleteNode(execState, node, [resIssues = res.Issues, fallback = res.Fallback](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus {
+ auto issue = TIssue(ctx.GetPosition(input->Pos()), "Gateway Error");
+ if (fallback) {
+ issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING);
+ } else {
+ issue.SetCode(TIssuesIds::UNEXPECTED, TSeverityIds::S_ERROR);
+ }
+ auto issues = TIssues{};
+ issues.AddIssue(issue);
+ issues.AddIssues(resIssues);
+ ctx.IssueManager.RaiseIssues(issues);
+ input->SetState(TExprNode::EState::Error);
+ return IGraphTransformer::TStatus::Error;
+ });
+ } else if (res.Truncated) {
+ YQL_CLOG(DEBUG, ProviderDq) << "Finished with truncated result";
+ CompleteNode(execState, node, [neverFallback, resIssues = res.Issues](const TExprNode::TPtr& input, TExprNode::TPtr&, TExprContext& ctx) -> IGraphTransformer::TStatus {
+ auto issue = TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "DQ cannot execute the query. Cause: " << "too big precompute result").SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_INFO);
+ for (const auto& i : resIssues) {
+ TIssuePtr subIssue = new TIssue(i);
+ if (neverFallback && subIssue->Severity == TSeverityIds::S_WARNING) {
+ subIssue->Severity = TSeverityIds::S_ERROR;
+ }
+ issue.AddSubIssue(subIssue);
+ }
+
+ if (neverFallback) {
+ issue.Message = "Too big precompute result";
+ issue.Severity = TSeverityIds::S_ERROR;
+ }
+ ctx.IssueManager.RaiseIssue(issue);
+ input->SetState(TExprNode::EState::Error);
+ return IGraphTransformer::TStatus::Error;
+ });
+ } else {
+ YQL_CLOG(DEBUG, ProviderDq) << "Finished";
+ CompleteNode(execState, node, [resIssues = res.Issues, data = res.Data, success = res.Success()](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) -> IGraphTransformer::TStatus {
+ output = input;
+ ctx.IssueManager.RaiseIssues(resIssues);
+ input->SetResult(ctx.NewAtom(input->Pos(), data));
+ input->SetState(TExprNode::EState::ExecutionComplete);
+ return IGraphTransformer::TStatus::Ok;
+ });
+ }
+ });
+ combinedStatus = combinedStatus.Combine(IGraphTransformer::TStatus::Async);
+ }
+ return combinedStatus;
+ }
+
+ IGraphTransformer::TStatus PeepHole(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) const {
+ TDqsPipelineConfigurator peepholeConfig(State);
+ TPeepholeSettings peepholeSettings;
+ peepholeSettings.CommonConfig = &peepholeConfig;
+ bool hasNonDeterministicFunctions;
+ auto status = PeepHoleOptimizeNode<true>(input, output, ctx, *State->TypeCtx, nullptr, hasNonDeterministicFunctions, peepholeSettings);
+ if (status.Level != TStatus::Ok) {
+ ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TString("Peephole optimization failed for Dq stage")));
+ return status;
+ }
+ YQL_CLOG(TRACE, ProviderDq) << "After PeepHole\n" << NCommon::ExprToPrettyString(ctx, *output);
+ return status;
+ }
+
+private:
TDqStatePtr State;
- THolder<IGraphTransformer> DqTypeAnnotationTransformer;
+ TExecStatePtr ExecState;
mutable THashMap<TString, TFileLinkPtr> FileLinks;
mutable THashMap<TString, TString> ModulesMapping;
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
index a8b5e598a0..9ab03526cc 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
@@ -28,7 +28,7 @@ public:
TDqDataProviderSink(const TDqStatePtr& state)
: State(state)
, LogOptTransformer([state] () { return CreateDqsLogOptTransformer(/*TODO: State->TypeCtx);*/nullptr, state->Settings); })
- , PhyOptTransformer([] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr); })
+ , PhyOptTransformer([state] () { return CreateDqsPhyOptTransformer(/*TODO: State->TypeCtx*/nullptr, state->Settings); })
, PhysicalFinalizingTransformer([] () { return CreateDqsFinalizingOptTransformer(); })
, TypeAnnotationTransformer([state] () { return CreateDqsDataSinkTypeAnnotationTransformer(state->TypeCtx); })
, RecaptureTransformer([state] () { return CreateDqsRecaptureTransformer(state); })