diff options
author | vvvv <vvvv@ydb.tech> | 2023-10-06 14:14:45 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-10-06 15:22:57 +0300 |
commit | e1e4812f5d91f107a24690baceea830a45739062 (patch) | |
tree | 588e030e156fada3811c3ef2187d4c1358620745 | |
parent | beef0fafbcd1e3d89f7dfbbc59476cc3c885b179 (diff) | |
download | ydb-e1e4812f5d91f107a24690baceea830a45739062.tar.gz |
YQL-16809 fix inplace rewrite
5 files changed, 108 insertions, 86 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 6e3984841e6..617a7197708 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -670,85 +670,18 @@ private: const auto tmpFolder = GetTablesTmpFolder(*config); auto clusterStr = TString{cluster.Value()}; - TMaybeNode<TCoSecureParam> secParams; - if (State_->Configuration->Auth.Get().GetOrElse(TString())) { - secParams = Build<TCoSecureParam>(ctx, op.Ref().Pos()).Name().Build(TString("cluster:default_").append(clusterStr)).Done(); - } - - VisitExpr(input, [&](const TExprNode::TPtr& node) { - if (auto maybeWrite = TMaybeNode<TYtDqWideWrite>(node)) { - auto server = State_->Gateway->GetClusterServer(clusterStr); - YQL_ENSURE(server, "Invalid YT cluster: " << clusterStr); - - NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, clusterStr, tmpTable.Name().StringValue(), tmpFolder); - realTable.Append(true); - YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId"); - - NYT::TNode spec; - TYqlRowSpecInfo(tmpTable.RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]); - NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec)); - NYT::TNode writerOptions = NYT::TNode::CreateMap(); - - if (config->MaxRowWeight.Get(clusterStr)) { - auto maxRowWeight = config->MaxRowWeight.Get(clusterStr)->GetValue(); - writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight); - } - - auto settings = Build<TCoNameValueTupleList>(ctx, node->Pos()) - .Add() - .Name().Value("table", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build() - .Build() - .Add() - .Name().Value("server", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(server).Build() - .Build() - .Add() - .Name().Value("outSpec", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build() - .Build() - .Add() - .Name().Value("secureParams", TNodeFlags::Default).Build() - .Value(secParams) - .Build() - .Add() - .Name().Value("tx", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build() - .Build() - .Add() - .Name().Value("outTable", TNodeFlags::Default).Build() - .Value(tmpTable) - .Build() - .Add() - .Name().Value("writerOptions", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build() - .Build() - .Done().Ptr(); - - node->ChildRef(TYtDqWideWrite::idx_Settings) = settings; - - auto atomType = ctx.MakeType<TUnitExprType>(); - - VisitExpr(node->ChildRef(TYtDqWideWrite::idx_Settings), [&atomType](const TExprNode::TPtr& tupleNode) { - if (tupleNode->GetTypeAnn() == nullptr) { - tupleNode->SetTypeAnn(atomType); - tupleNode->SetState(TExprNode::EState::ConstrComplete); - } - return true; - }); - - return false; - } - - return true; - }); - delegatedNode = input->ChildPtr(TYtDqProcessWrite::idx_Input); if (const auto status = SubstTables(delegatedNode, State_, false, ctx); status.Level == TStatus::Error) { return SyncStatus(status); } bool hasNonDeterministicFunctions = false; - if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_, hasNonDeterministicFunctions, ctx); status.Level == TStatus::Error) { + TYtExtraPeepHoleSettings settings; + settings.CurrentCluster = clusterStr; + settings.TmpTable = &tmpTable; + settings.TmpFolder = tmpFolder; + settings.Config = config; + if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_, + hasNonDeterministicFunctions, ctx, settings); status.Level == TStatus::Error) { return SyncStatus(status); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp index a77f046af4f..ea1babb5165 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp @@ -633,12 +633,15 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& template<bool WithWideFlow> struct TPeepholePipelineConfigurator : public IPipelineConfigurator { - TPeepholePipelineConfigurator(TYtState::TPtr state): State_(std::move(state)) {} + TPeepholePipelineConfigurator(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) + : State_(std::move(state)) + , Settings_(settings) + {} private: void AfterCreate(TTransformationPipeline*) const final {} void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { - pipeline->Add(CreateTYtPeepholeTransformer(State_), "Peephole"); + pipeline->Add(CreateTYtPeepholeTransformer(State_, Settings_), "Peephole"); if constexpr (WithWideFlow) { pipeline->Add(CreateTYtWideFlowTransformer(State_), "WideFlow"); } @@ -647,11 +650,13 @@ private: void AfterOptimize(TTransformationPipeline*) const final {} const TYtState::TPtr State_; + const TYtExtraPeepHoleSettings Settings_; }; template<bool ForNativeExecution> IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx) + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, + const TYtExtraPeepHoleSettings& settings) { if constexpr (ForNativeExecution) { if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx); @@ -660,7 +665,7 @@ IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TEx } } - const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state); + const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state, settings); TPeepholeSettings peepholeSettings; peepholeSettings.CommonConfig = &wideFlowTransformers; return PeepHoleOptimizeNode(output, output, ctx, *state->Types, nullptr, hasNonDeterministicFunctions, peepholeSettings); @@ -668,9 +673,9 @@ IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TEx template IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<true>(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx); + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings); template IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<false>(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx); + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings); } // NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h index b7d3a29e550..adb45c6196b 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h @@ -1,6 +1,7 @@ #pragma once #include "yql_yt_provider.h" +#include "yql_yt_provider_impl.h" #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <ydb/library/yql/providers/yt/lib/mkql_helpers/mkql_helpers.h> @@ -32,6 +33,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& template<bool ForNativeExecution> IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx); + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, + const TYtExtraPeepHoleSettings& settings = {}); } //NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp index d405a27059e..26c27515936 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp @@ -6,6 +6,10 @@ #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/providers/yt/common/yql_configuration.h> #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> +#include <ydb/library/yql/providers/yt/common/yql_names.h> + +#include <library/cpp/yson/node/node_io.h> +#include <yt/cpp/mapreduce/common/helpers.h> namespace NYql { @@ -15,11 +19,14 @@ namespace { class TYtPeepholeTransformer : public TOptimizeTransformerBase { public: - TYtPeepholeTransformer(TYtState::TPtr state) - : TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {}), State_(state) + TYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) + : TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {}) + , State_(state) + , Settings_(settings) { #define HNDL(name) "Peephole-"#name, Hndl(&TYtPeepholeTransformer::name) AddHandler(0, &TYtLength::Match, HNDL(OptimizeLength)); + AddHandler(0, &TYtDqWideWrite::Match, HNDL(OptimizeYtDqWideWrite)); #undef HNDL } private: @@ -51,13 +58,80 @@ private: return node; } + TMaybeNode<TExprBase> OptimizeYtDqWideWrite(TExprBase node, TExprContext& ctx) { + if (!Settings_.TmpTable) { + return node; + } + + if (GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "outTable")) { + return node; + } + + auto server = State_->Gateway->GetClusterServer(Settings_.CurrentCluster); + YQL_ENSURE(server, "Invalid YT cluster: " << Settings_.CurrentCluster); + + NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, Settings_.CurrentCluster, + Settings_.TmpTable->Name().StringValue(), Settings_.TmpFolder); + realTable.Append(true); + YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId"); + + NYT::TNode spec; + TYqlRowSpecInfo(Settings_.TmpTable->RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]); + NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec)); + NYT::TNode writerOptions = NYT::TNode::CreateMap(); + + if (Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)) { + auto maxRowWeight = Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)->GetValue(); + writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight); + } + + TMaybeNode<TCoSecureParam> secParams; + if (State_->Configuration->Auth.Get().GetOrElse(TString())) { + secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(Settings_.CurrentCluster)).Done(); + } + + auto settings = Build<TCoNameValueTupleList>(ctx, node.Pos()) + .Add() + .Name().Value("table", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build() + .Build() + .Add() + .Name().Value("server", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(server).Build() + .Build() + .Add() + .Name().Value("outSpec", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build() + .Build() + .Add() + .Name().Value("secureParams", TNodeFlags::Default).Build() + .Value(secParams) + .Build() + .Add() + .Name().Value("tx", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build() + .Build() + .Add() + .Name().Value("outTable", TNodeFlags::Default).Build() + .Value(*Settings_.TmpTable) + .Build() + .Add() + .Name().Value("writerOptions", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build() + .Build() + .Done().Ptr(); + + return ctx.ChangeChild(node.Ref(), TYtDqWideWrite::idx_Settings, std::move(settings)); + } + const TYtState::TPtr State_; + const TYtExtraPeepHoleSettings Settings_; }; } -THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state) { - return MakeHolder<TYtPeepholeTransformer>(std::move(state)); +THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) { + return MakeHolder<TYtPeepholeTransformer>(std::move(state), settings); } } // namespace NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h index 1ea47774658..db66a94f691 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h @@ -7,11 +7,19 @@ #include <ydb/library/yql/core/yql_graph_transformer.h> #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h> +#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <util/generic/ptr.h> namespace NYql { +struct TYtExtraPeepHoleSettings { + TString CurrentCluster; + const NNodes::TYtOutTable* TmpTable = nullptr; + TString TmpFolder; + TYtSettings::TConstPtr Config; +}; + THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtEpochTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtIntentDeterminationTransformer(TYtState::TPtr state); @@ -31,7 +39,7 @@ THolder<IGraphTransformer> CreateYtDataSinkFinalizingTransformer(TYtState::TPtr THolder<IGraphTransformer> CreateYtLogicalOptProposalTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtPhysicalOptProposalTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtPhysicalFinalizingTransformer(TYtState::TPtr state); -THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state); +THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings); THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtDqHybridTransformer(TYtState::TPtr state, THolder<IGraphTransformer>&& finalizer); |