diff options
author | Roman Udovichenko <rvu@ydb.tech> | 2023-12-21 16:07:44 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-21 16:07:44 +0300 |
commit | 1f95d62426d546597a43a819ec9872b6daee8b4e (patch) | |
tree | 3749cc4e6992558d6087220bf676dc7f1875198f | |
parent | 3facfc056204106d2436a8178dd992e7adbbf315 (diff) | |
download | ydb-1f95d62426d546597a43a819ec9872b6daee8b4e.tar.gz |
[dq] Integrate YT peephole transforms into DQ integration (YQL-17386) (#605)
20 files changed, 223 insertions, 180 deletions
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 9486028da4..89ded7a01d 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -590,7 +590,7 @@ namespace NYql::NDqs { BUILD_CONNECTION(TDqCnMerge, BuildMergeChannels); YQL_ENSURE(false, "Unknown stage connection type: " << input.Cast<NNodes::TCallable>().CallableName()); } else { - YQL_ENSURE(input.Maybe<TDqSource>()); + YQL_ENSURE(input.Maybe<TDqSource>(), "Unknown stage input: " << input.Cast<NNodes::TCallable>().CallableName()); } } } diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp index a0ebfe8b17..1040ff5d22 100644 --- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp @@ -7,6 +7,7 @@ #include <ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.h> #include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h> #include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -22,6 +23,7 @@ #include <util/generic/xrange.h> #include <util/string/cast.h> +#include <util/stream/str.h> namespace NYql { @@ -1061,13 +1063,17 @@ void RegisterDqYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler) auto values = NCommon::MkqlBuildExpr(wideWrite.Input().Ref(), ctx); - TYtOutTable table{GetSetting(wideWrite.Settings().Ref(), "outTable")->Child(1)}; - auto inputItemType = NCommon::BuildType(wideWrite.Input().Ref(), GetSeqItemType(*table.Ref().GetTypeAnn()), ctx.ProgramBuilder); + auto tableName = GetSetting(wideWrite.Settings().Ref(), "tableName")->Child(1)->Content(); + auto tableType = GetSetting(wideWrite.Settings().Ref(), "tableType")->Child(1)->Content(); + + TStringStream err; + auto inputItemType = NCommon::ParseTypeFromYson(tableType, ctx.ProgramBuilder, err); + YQL_ENSURE(inputItemType, "Parse type error: " << err.Str()); auto structType = AS_TYPE(TStructType, inputItemType); values = NarrowFlow(values, *structType, ctx); values = ctx.ProgramBuilder.Map(values, [&](TRuntimeNode item) { - return BuildDqWrite(item, table.Name().Value(), ctx); + return BuildDqWrite(item, tableName, ctx); }); return values; }); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 74e69aed08..d82796cd86 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -221,7 +221,7 @@ private: } bool hasNonDeterministicFunctions = false; - if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) { + if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) { return SyncStatus(status); } @@ -628,7 +628,7 @@ private: } bool hasNonDeterministicFunctions = false; - if (const auto status = PeepHoleOptimizeBeforeExec<false>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) { + if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) { return SyncStatus(status); } @@ -685,50 +685,100 @@ private: auto clusterStr = TString{cluster.Value()}; delegatedNode = input->ChildPtr(TYtDqProcessWrite::idx_Input); - if (const auto status = SubstTables(delegatedNode, State_, false, ctx); status.Level == TStatus::Error) { - return SyncStatus(status); + + auto server = State_->Gateway->GetClusterServer(clusterStr); + YQL_ENSURE(server, "Invalid YT cluster: " << clusterStr); + + NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, clusterStr, tmpTable.Name().StringValue(), tmpFolder); + realTable.Append(true); + YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId"); + + NYT::TNode writerOptions = NYT::TNode::CreateMap(); + if (auto maxRowWeight = config->MaxRowWeight.Get(clusterStr)) { + writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight->GetValue()); + } + + NYT::TNode outSpec; + NYT::TNode type; + { + auto rowSpec = TYqlRowSpecInfo(tmpTable.RowSpec()); + NYT::TNode spec; + rowSpec.FillCodecNode(spec[YqlRowSpecAttribute]); + outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec)); + type = rowSpec.GetTypeNode(); } - bool hasNonDeterministicFunctions = false; - TYtExtraPeepHoleSettings settings; - settings.CurrentCluster = clusterStr; - settings.TmpTable = &tmpTable; - settings.TmpFolder = tmpFolder; - settings.Config = config; - if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_, - hasNonDeterministicFunctions, ctx, settings); status.Level == TStatus::Error) { - return SyncStatus(status); + + // These settings will be passed to YT peephole callback from DQ + auto settings = Build<TCoNameValueTupleList>(ctx, delegatedNode->Pos()) + .Add() + .Name().Value("yt_cluster", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(clusterStr).Build() + .Build() + .Add() + .Name().Value("yt_server", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(server).Build() + .Build() + .Add() + .Name().Value("yt_table", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build() + .Build() + .Add() + .Name().Value("yt_tableName", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(tmpTable.Name().Value()).Build() + .Build() + .Add() + .Name().Value("yt_tableType", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(type)).Build() + .Build() + .Add() + .Name().Value("yt_writeOptions", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build() + .Build() + .Add() + .Name().Value("yt_outSpec", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build() + .Build() + .Add() + .Name().Value("yt_tx", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build() + .Build() + .Done().Ptr(); + + auto atomType = ctx.MakeType<TUnitExprType>(); + + for (auto child: settings->Children()) { + child->Child(0)->SetTypeAnn(atomType); + child->Child(0)->SetState(TExprNode::EState::ConstrComplete); + child->Child(1)->SetTypeAnn(atomType); + child->Child(1)->SetState(TExprNode::EState::ConstrComplete); } delegatedNode = Build<TPull>(ctx, delegatedNode->Pos()) .Input(std::move(delegatedNode)) .BytesLimit() .Value(TString()) - .Build() + .Build() .RowsLimit() .Value(0U) - .Build() + .Build() .FormatDetails() .Value(ui32(NYson::EYsonFormat::Binary)) - .Build() - .Settings() - .Build() + .Build() + .Settings(settings) .Format() .Value(0U) - .Build() + .Build() .PublicId() .Value(ToString(State_->Types->TranslateOperationId(input->UniqueId()))) - .Build() + .Build() .Discard() .Value(ToString(true), TNodeFlags::Default) - .Build() + .Build() .Origin(input) - .Done() - .Ptr(); - - auto atomType = ctx.MakeType<TUnitExprType>(); + .Done().Ptr(); for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails, - TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) { + TResOrPullBase::idx_Settings, TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) { delegatedNode->Child(idx)->SetTypeAnn(atomType); delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp index 923b095417..cadcd548f0 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp @@ -135,7 +135,7 @@ protected: } bool hasNonDeterministicFunctions = false; - if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) { + if (const auto status = PeepHoleOptimizeBeforeExec(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) { return SyncStatus(status); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 7f79548d85..c6762e7b1d 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -143,25 +143,21 @@ private: .Body<TDqWrite>() .Input(CloneCompleteFlow(fill.Content().Body().Ptr(), ctx)) .Provider().Value(YtProviderName).Build() - .Settings<TCoNameValueTupleList>() - .Add() - .Name().Value("table").Build() - .Value(fill.Output().Item(0)).Build() - .Build() - .Build() + .Settings<TCoNameValueTupleList>().Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos())) .Build() - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) + .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos())) .Build() - .ColumnHints().Build() + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .ColumnHints().Build() .Build() + .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Build() .Second<TYtFill>() .InitFrom(fill) .Settings(NYql::AddSetting(fill.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) - .Build() + .Build() .Done(); } } @@ -248,21 +244,17 @@ private: .Body<TDqWrite>() .Input(std::move(work)) .Provider().Value(YtProviderName).Build() - .Settings<TCoNameValueTupleList>() - .Add() - .Name().Value("table", TNodeFlags::Default).Build() - .Value(sort.Output().Item(0)).Build() - .Build() - .Build() + .Settings<TCoNameValueTupleList>().Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos())) .Build() - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) + .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos())) .Build() - .ColumnHints().Build() + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .ColumnHints().Build() .Build() + .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Build() .Second(std::move(operation)) .Done(); } @@ -381,21 +373,17 @@ private: .DataSource<TYtDSource>() .Category(map.DataSink().Category()) .Cluster(map.DataSink().Cluster()) - .Build() - .Input(map.Input()) .Build() - .Settings(std::move(settings)) + .Input(map.Input()) .Build() + .Settings(std::move(settings)) .Build() .Build() - .Provider().Value(YtProviderName).Build() - .Settings<TCoNameValueTupleList>() - .Add() - .Name().Value("table", TNodeFlags::Default).Build() - .Value(map.Output().Item(0)).Build() - .Build() .Build() + .Provider().Value(YtProviderName).Build() + .Settings<TCoNameValueTupleList>().Build() .Build() + .Build() .Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos())) .Done(); @@ -599,27 +587,23 @@ private: .SortDirections(std::move(sortDirs)) .SortKeySelectorLambda(std::move(sortKeys)) .ListHandlerLambda(std::move(reducer)) - .Build() - .Provider().Value(YtProviderName).Build() - .template Settings<TCoNameValueTupleList>() - .Add() - .Name().Value("table", TNodeFlags::Default).Build() - .Value(reduce.Output().Item(0)).Build() - .Build() .Build() + .Provider().Value(YtProviderName).Build() + .template Settings<TCoNameValueTupleList>().Build() .Build() - .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos())) .Build() - .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) + .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos())) .Build() - .ColumnHints().Build() + .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default) .Build() - .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .ColumnHints().Build() .Build() + .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build() + .Build() .template Second<TYtOperation>() .InitFrom(reduce) .Settings(NYql::AddSetting(reduce.Settings().Ref(), EYtSettingType::NoDq, {}, ctx)) - .Build() + .Build() .Done(); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 6bded487cc..d1cd8aa554 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -3,6 +3,7 @@ #include "yql_yt_mkql_compiler.h" #include "yql_yt_helpers.h" #include "yql_yt_op_settings.h" +#include "yql_yt_provider_impl.h" #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <ydb/library/yql/providers/yt/common/yql_configuration.h> @@ -18,6 +19,7 @@ #include <ydb/library/yql/core/yql_type_helpers.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/utils/log/log.h> #include <yt/cpp/mapreduce/common/helpers.h> @@ -667,6 +669,34 @@ public: NYql::WriteTableReference(writer, YtProviderName, cluster.AsString(), refName.AsString(), true, columns); } + virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& providerParams, TTransformationPipeline* pipeline) override { + if (!beforeDqTransforms) { + return; + } + + auto state = TYtState::TPtr(State_); + pipeline->Add(CreateFunctorTransformer([state](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + if (TYtReadTable::Match(node.Get()) && !node->Head().IsWorld()) { + YQL_CLOG(INFO, ProviderYt) << "Peephole-YtTrimWorld"; + return ctx.ChangeChild(*node, 0, ctx.NewWorld(node->Pos())); + } + return node; + }, ctx, TOptimizeExprSettings{state->Types}); + }), "YtTrimWorld", TIssuesIds::DEFAULT_ERROR); + + pipeline->Add(CreateSinglePassFunctorTransformer([state, providerParams](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { + output = input; + auto status = SubstTables(output, state, true, ctx); + if (status.Level != IGraphTransformer::TStatus::Error && input != output) { + YQL_CLOG(INFO, ProviderYt) << "Peephole-YtSubstTables"; + } + return status; + }), "YtSubstTables", TIssuesIds::DEFAULT_ERROR); + + pipeline->Add(CreateYtPeepholeTransformer(TYtState::TPtr(State_), providerParams), "YtPeepHole", TIssuesIds::DEFAULT_ERROR); + } + private: TYtState* State_; }; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp index ea1babb516..d26bceaf3e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp @@ -631,51 +631,35 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& } } -template<bool WithWideFlow> struct TPeepholePipelineConfigurator : public IPipelineConfigurator { - TPeepholePipelineConfigurator(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) - : State_(std::move(state)) - , Settings_(settings) + TPeepholePipelineConfigurator(TYtState::TPtr state) + : State_(std::move(state)) {} private: void AfterCreate(TTransformationPipeline*) const final {} void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final { - pipeline->Add(CreateTYtPeepholeTransformer(State_, Settings_), "Peephole"); - if constexpr (WithWideFlow) { - pipeline->Add(CreateTYtWideFlowTransformer(State_), "WideFlow"); - } + pipeline->Add(CreateYtPeepholeTransformer(State_, {}), "Peephole"); + pipeline->Add(CreateYtWideFlowTransformer(State_), "WideFlow"); } void AfterOptimize(TTransformationPipeline*) const final {} const TYtState::TPtr State_; - const TYtExtraPeepHoleSettings Settings_; }; -template<bool ForNativeExecution> IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, - const TYtExtraPeepHoleSettings& settings) + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx) { - if constexpr (ForNativeExecution) { - if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx); - status.Level != IGraphTransformer::TStatus::Ok) { - return status; - } + if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx); + status.Level != IGraphTransformer::TStatus::Ok) { + return status; } - const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state, settings); + const TPeepholePipelineConfigurator wideFlowTransformers(state); TPeepholeSettings peepholeSettings; peepholeSettings.CommonConfig = &wideFlowTransformers; return PeepHoleOptimizeNode(output, output, ctx, *state->Types, nullptr, hasNonDeterministicFunctions, peepholeSettings); } -template -IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<true>(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings); - -template -IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<false>(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings); } // NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h index adb45c6196..770b5a4fc5 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h @@ -31,9 +31,7 @@ TExprNode::TPtr OptimizeReadWithSettings(const TExprNode::TPtr& node, bool allow IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& input, TExprNode::TPtr& output, const TYtState::TPtr& state, TExprContext& ctx); -template<bool ForNativeExecution> IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output, - const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, - const TYtExtraPeepHoleSettings& settings = {}); + const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx); } //NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp index 26c2751593..04c6aaf120 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp @@ -19,7 +19,7 @@ namespace { class TYtPeepholeTransformer : public TOptimizeTransformerBase { public: - TYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) + TYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings) : TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {}) , State_(state) , Settings_(settings) @@ -29,6 +29,7 @@ public: AddHandler(0, &TYtDqWideWrite::Match, HNDL(OptimizeYtDqWideWrite)); #undef HNDL } + private: TMaybeNode<TExprBase> OptimizeLength(TExprBase node, TExprContext& ctx) { std::optional<size_t> lengthRes; @@ -59,41 +60,34 @@ private: } TMaybeNode<TExprBase> OptimizeYtDqWideWrite(TExprBase node, TExprContext& ctx) { - if (!Settings_.TmpTable) { + if (Settings_.empty()) { return node; } - if (GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "outTable")) { + auto cluster = Settings_.at("yt_cluster"); + auto server = Settings_.at("yt_server"); + auto table = Settings_.at("yt_table"); + auto tableName = Settings_.at("yt_tableName"); + auto tableType = Settings_.at("yt_tableType"); + auto writeOptions = Settings_.at("yt_writeOptions"); + auto outSpec = Settings_.at("yt_outSpec"); + auto tx = Settings_.at("yt_tx"); + + // Check that we optimize only single YtDqWideWrite + if (auto setting = GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "table")) { + YQL_ENSURE(setting->Child(1)->Content() == table); return node; } - auto server = State_->Gateway->GetClusterServer(Settings_.CurrentCluster); - YQL_ENSURE(server, "Invalid YT cluster: " << Settings_.CurrentCluster); - - NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, Settings_.CurrentCluster, - Settings_.TmpTable->Name().StringValue(), Settings_.TmpFolder); - realTable.Append(true); - YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId"); - - NYT::TNode spec; - TYqlRowSpecInfo(Settings_.TmpTable->RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]); - NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec)); - NYT::TNode writerOptions = NYT::TNode::CreateMap(); - - if (Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)) { - auto maxRowWeight = Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)->GetValue(); - writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight); - } - TMaybeNode<TCoSecureParam> secParams; if (State_->Configuration->Auth.Get().GetOrElse(TString())) { - secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(Settings_.CurrentCluster)).Done(); + secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(cluster)).Done(); } auto settings = Build<TCoNameValueTupleList>(ctx, node.Pos()) .Add() .Name().Value("table", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build() + .Value<TCoAtom>().Value(table).Build() .Build() .Add() .Name().Value("server", TNodeFlags::Default).Build() @@ -101,23 +95,27 @@ private: .Build() .Add() .Name().Value("outSpec", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build() + .Value<TCoAtom>().Value(outSpec).Build() .Build() .Add() .Name().Value("secureParams", TNodeFlags::Default).Build() .Value(secParams) .Build() .Add() - .Name().Value("tx", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build() + .Name().Value("writerOptions", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(writeOptions).Build() .Build() .Add() - .Name().Value("outTable", TNodeFlags::Default).Build() - .Value(*Settings_.TmpTable) + .Name().Value("tx", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(tx).Build() .Build() - .Add() - .Name().Value("writerOptions", TNodeFlags::Default).Build() - .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build() + .Add() // yt_file gateway specific + .Name().Value("tableName", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(tableName).Build() + .Build() + .Add() // yt_file gateway specific + .Name().Value("tableType", TNodeFlags::Default).Build() + .Value<TCoAtom>().Value(tableType).Build() .Build() .Done().Ptr(); @@ -125,12 +123,12 @@ private: } const TYtState::TPtr State_; - const TYtExtraPeepHoleSettings Settings_; + const THashMap<TString, TString> Settings_; }; } -THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) { +THolder<IGraphTransformer> CreateYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings) { return MakeHolder<TYtPeepholeTransformer>(std::move(state), settings); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h index db66a94f69..04c63d7438 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h @@ -13,13 +13,6 @@ namespace NYql { -struct TYtExtraPeepHoleSettings { - TString CurrentCluster; - const NNodes::TYtOutTable* TmpTable = nullptr; - TString TmpFolder; - TYtSettings::TConstPtr Config; -}; - THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtEpochTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtIntentDeterminationTransformer(TYtState::TPtr state); @@ -39,14 +32,14 @@ THolder<IGraphTransformer> CreateYtDataSinkFinalizingTransformer(TYtState::TPtr THolder<IGraphTransformer> CreateYtLogicalOptProposalTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtPhysicalOptProposalTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtPhysicalFinalizingTransformer(TYtState::TPtr state); -THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings); -THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state); +THolder<IGraphTransformer> CreateYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings); +THolder<IGraphTransformer> CreateYtWideFlowTransformer(TYtState::TPtr state); THolder<IGraphTransformer> CreateYtDqHybridTransformer(TYtState::TPtr state, THolder<IGraphTransformer>&& finalizer); void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children); TString MakeTableDisplayName(NNodes::TExprBase table, bool isOutput); - void ScanForUsedOutputTables(const TExprNode& input, TVector<TString>& usedNodeIds); TString MakeUsedNodeId(const TString& cluster, const TString& table); + } // NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp index 6ae1d3c988..751506a4bb 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp @@ -249,7 +249,7 @@ private: } -THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state) { +THolder<IGraphTransformer> CreateYtWideFlowTransformer(TYtState::TPtr state) { return MakeHolder<TYtWideFlowTransformer>(std::move(state)); } diff --git a/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json index d9db89603a..518da5d6ea 100644 --- a/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json @@ -1373,9 +1373,9 @@ ], "test.test[join-mergejoin_force_align3-off-Results]": [ { - "checksum": "4b243c5d9d02571857aa516815a1f210", - "size": 251, - "uri": "https://{canondata_backend}/937458/e5719cd256fe3fd898e8ebe6df280521ffd29040/resource.tar.gz#test.test_join-mergejoin_force_align3-off-Results_/results.txt" + "checksum": "f1c5f5e2353f012207188c4e915487df", + "size": 284, + "uri": "https://{canondata_backend}/1031349/25c52e56e8eeb1d879e04b5b9efcde6fcf2efd65/resource.tar.gz#test.test_join-mergejoin_force_align3-off-Results_/results.txt" } ], "test.test[join-mergejoin_force_per_link-off-Analyze]": [ @@ -1582,9 +1582,9 @@ ], "test.test[join-yql-14829_left-off-Results]": [ { - "checksum": "096f2ba7e9b6515b3fee775695a56a2b", - "size": 108, - "uri": "https://{canondata_backend}/937458/e5719cd256fe3fd898e8ebe6df280521ffd29040/resource.tar.gz#test.test_join-yql-14829_left-off-Results_/results.txt" + "checksum": "ab98e1c5a018c46596c65b0352abfa13", + "size": 117, + "uri": "https://{canondata_backend}/1031349/25c52e56e8eeb1d879e04b5b9efcde6fcf2efd65/resource.tar.gz#test.test_join-yql-14829_left-off-Results_/results.txt" } ], "test.test[join-yql-6297-off-Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json index bfd7c58825..33e84c3795 100644 --- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json @@ -1287,9 +1287,9 @@ ], "test.test[join-mergejoin_force_align1-off-Results]": [ { - "checksum": "d415ed6f342f69a608715115c099eff5", - "size": 288, - "uri": "https://{canondata_backend}/1600758/32cfdeb8c6377a2e7e62c6c4adbb95f25af7669b/resource.tar.gz#test.test_join-mergejoin_force_align1-off-Results_/results.txt" + "checksum": "14254ebf1a0a6dffad13177f0cfe847f", + "size": 331, + "uri": "https://{canondata_backend}/1031349/b0c48c4d800c80c9736c376a51869bb4a7f5b31b/resource.tar.gz#test.test_join-mergejoin_force_align1-off-Results_/results.txt" } ], "test.test[join-pullup_left--Analyze]": [ @@ -1462,9 +1462,9 @@ ], "test.test[join-yql-14829_leftonly-off-Results]": [ { - "checksum": "096f2ba7e9b6515b3fee775695a56a2b", - "size": 108, - "uri": "https://{canondata_backend}/1600758/32cfdeb8c6377a2e7e62c6c4adbb95f25af7669b/resource.tar.gz#test.test_join-yql-14829_leftonly-off-Results_/results.txt" + "checksum": "ab98e1c5a018c46596c65b0352abfa13", + "size": 117, + "uri": "https://{canondata_backend}/1031349/b0c48c4d800c80c9736c376a51869bb4a7f5b31b/resource.tar.gz#test.test_join-yql-14829_leftonly-off-Results_/results.txt" } ], "test.test[join-yql-4275-off-Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json index 4147ed487e..d813f3e4b3 100644 --- a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json @@ -1350,9 +1350,9 @@ ], "test.test[join-premap_merge_with_remap-off-Results]": [ { - "checksum": "2be66749732358ddfd9a8ef30ec27518", - "size": 471, - "uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_join-premap_merge_with_remap-off-Results_/results.txt" + "checksum": "114e220ddf7ec64e2faedb36603ea6b1", + "size": 511, + "uri": "https://{canondata_backend}/1942415/46b4dec49c8d2a38bf8ce01066b72bfee9407ffe/resource.tar.gz#test.test_join-premap_merge_with_remap-off-Results_/results.txt" } ], "test.test[join-pushdown_filter_over_inner_with_assume_strict-off-Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json index 6de618ad86..56aeeb866e 100644 --- a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json @@ -1125,9 +1125,9 @@ ], "test.test[join-mapjoin_opt_vs_2xopt-off-Results]": [ { - "checksum": "1856c162083394044ab481baa6091e01", - "size": 125, - "uri": "https://{canondata_backend}/1936273/7c78e1e45ae282daee686c006624daa21a7c6ca6/resource.tar.gz#test.test_join-mapjoin_opt_vs_2xopt-off-Results_/results.txt" + "checksum": "5e4a32e9e65c7d5a5f4b7022f1c7404b", + "size": 144, + "uri": "https://{canondata_backend}/937458/15884872d0218004c3baba7fcce722819604019c/resource.tar.gz#test.test_join-mapjoin_opt_vs_2xopt-off-Results_/results.txt" } ], "test.test[join-mapjoin_partial_uniq_keys--Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json index 0be68ec43d..f0331340d0 100644 --- a/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json @@ -1157,9 +1157,9 @@ ], "test.test[join-lookupjoin_with_cache-off-Results]": [ { - "checksum": "a5a7e398dc30bcb1eb092ef6ee93b65d", - "size": 264, - "uri": "https://{canondata_backend}/1900335/a5a16b7313d07b162a608c1abeab1e68e6175117/resource.tar.gz#test.test_join-lookupjoin_with_cache-off-Results_/results.txt" + "checksum": "4a5a14ca40382e201b3d29c142c0bbdc", + "size": 377, + "uri": "https://{canondata_backend}/1942415/2c7b7e06b3aea4ad2e9b20b79598f6f99d82c3f5/resource.tar.gz#test.test_join-lookupjoin_with_cache-off-Results_/results.txt" } ], "test.test[join-mapjoin_dup_key--Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json index f0821ccab3..8fbdab9ad8 100644 --- a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json @@ -1158,9 +1158,9 @@ ], "test.test[join-anyjoin_common_nodata_keys-off-Results]": [ { - "checksum": "21d3c1e962a2f1041937d4db4d1b690a", - "size": 453, - "uri": "https://{canondata_backend}/1871002/b59ed2ad938015ca28be6d459030014e4b6ff1ea/resource.tar.gz#test.test_join-anyjoin_common_nodata_keys-off-Results_/results.txt" + "checksum": "6572671713355db88373ea3e154424d3", + "size": 529, + "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-anyjoin_common_nodata_keys-off-Results_/results.txt" } ], "test.test[join-equi_join_three_simple--Analyze]": [ @@ -1452,9 +1452,9 @@ ], "test.test[join-mapjoin_with_anonymous-off-Results]": [ { - "checksum": "f739adcd538e5ea310062e8784476dd9", - "size": 408, - "uri": "https://{canondata_backend}/1924537/40c66e62107c2a9e3733dec809479087bdd8f6d6/resource.tar.gz#test.test_join-mapjoin_with_anonymous-off-Results_/results.txt" + "checksum": "b06c92f700fb33d968deedcec51c32f3", + "size": 440, + "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-mapjoin_with_anonymous-off-Results_/results.txt" } ], "test.test[join-premap_map_inner--Analyze]": [ @@ -1561,9 +1561,9 @@ ], "test.test[join-yql-8125-off-Results]": [ { - "checksum": "660387db59c42b1e4e85f016a6d434db", - "size": 157, - "uri": "https://{canondata_backend}/1871002/b59ed2ad938015ca28be6d459030014e4b6ff1ea/resource.tar.gz#test.test_join-yql-8125-off-Results_/results.txt" + "checksum": "3bdb5911a299f9ea9446297b8d1fbb9c", + "size": 394, + "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-yql-8125-off-Results_/results.txt" } ], "test.test[key_filter-contains-default.txt-Analyze]": [ diff --git a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json index 42c9dadd82..d04f000e7a 100644 --- a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json +++ b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json @@ -1224,9 +1224,9 @@ ], "test.test[join-mergejoin_force_align2-off-Results]": [ { - "checksum": "6a1f85c59016769b85dd5ce2d5b686c9", - "size": 300, - "uri": "https://{canondata_backend}/1936273/1ba42e2c47cd3429011228159c1fdf43dd1881b7/resource.tar.gz#test.test_join-mergejoin_force_align2-off-Results_/results.txt" + "checksum": "360d932ebffae959c40b57a204bb60a5", + "size": 339, + "uri": "https://{canondata_backend}/1599023/753be0e0c576efbfbb51452c05363488f2aa1c64/resource.tar.gz#test.test_join-mergejoin_force_align2-off-Results_/results.txt" } ], "test.test[join-nopushdown_filter_with_depends_on-off-Analyze]": [ diff --git a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp index 0239e00f6c..ebf67df741 100644 --- a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp +++ b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp @@ -104,7 +104,7 @@ public: } void AfterOptimize(TTransformationPipeline* pipeline) const final { - pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow"); pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); } }; diff --git a/ydb/library/yql/tools/yqlrun/yqlrun.cpp b/ydb/library/yql/tools/yqlrun/yqlrun.cpp index e56437e8ca..3fce3989cd 100644 --- a/ydb/library/yql/tools/yqlrun/yqlrun.cpp +++ b/ydb/library/yql/tools/yqlrun/yqlrun.cpp @@ -251,7 +251,7 @@ public: } void AfterOptimize(TTransformationPipeline* pipeline) const final { - pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow"); + pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow"); pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); } }; |