aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoman Udovichenko <rvu@ydb.tech>2023-12-21 16:07:44 +0300
committerGitHub <noreply@github.com>2023-12-21 16:07:44 +0300
commit1f95d62426d546597a43a819ec9872b6daee8b4e (patch)
tree3749cc4e6992558d6087220bf676dc7f1875198f
parent3facfc056204106d2436a8178dd992e7adbbf315 (diff)
downloadydb-1f95d62426d546597a43a819ec9872b6daee8b4e.tar.gz
[dq] Integrate YT peephole transforms into DQ integration (YQL-17386) (#605)
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp2
-rw-r--r--ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp12
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp102
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp2
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp68
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp30
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp34
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_optimize.h4
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp64
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h13
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp2
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json12
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json6
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json18
-rw-r--r--ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json6
-rw-r--r--ydb/library/yql/tools/yqlrun/http/yql_server.cpp2
-rw-r--r--ydb/library/yql/tools/yqlrun/yqlrun.cpp2
20 files changed, 223 insertions, 180 deletions
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 9486028da4..89ded7a01d 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -590,7 +590,7 @@ namespace NYql::NDqs {
BUILD_CONNECTION(TDqCnMerge, BuildMergeChannels);
YQL_ENSURE(false, "Unknown stage connection type: " << input.Cast<NNodes::TCallable>().CallableName());
} else {
- YQL_ENSURE(input.Maybe<TDqSource>());
+ YQL_ENSURE(input.Maybe<TDqSource>(), "Unknown stage input: " << input.Cast<NNodes::TCallable>().CallableName());
}
}
}
diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
index a0ebfe8b17..1040ff5d22 100644
--- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file_mkql_compiler.cpp
@@ -7,6 +7,7 @@
#include <ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.h>
#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -22,6 +23,7 @@
#include <util/generic/xrange.h>
#include <util/string/cast.h>
+#include <util/stream/str.h>
namespace NYql {
@@ -1061,13 +1063,17 @@ void RegisterDqYtFileMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler)
auto values = NCommon::MkqlBuildExpr(wideWrite.Input().Ref(), ctx);
- TYtOutTable table{GetSetting(wideWrite.Settings().Ref(), "outTable")->Child(1)};
- auto inputItemType = NCommon::BuildType(wideWrite.Input().Ref(), GetSeqItemType(*table.Ref().GetTypeAnn()), ctx.ProgramBuilder);
+ auto tableName = GetSetting(wideWrite.Settings().Ref(), "tableName")->Child(1)->Content();
+ auto tableType = GetSetting(wideWrite.Settings().Ref(), "tableType")->Child(1)->Content();
+
+ TStringStream err;
+ auto inputItemType = NCommon::ParseTypeFromYson(tableType, ctx.ProgramBuilder, err);
+ YQL_ENSURE(inputItemType, "Parse type error: " << err.Str());
auto structType = AS_TYPE(TStructType, inputItemType);
values = NarrowFlow(values, *structType, ctx);
values = ctx.ProgramBuilder.Map(values, [&](TRuntimeNode item) {
- return BuildDqWrite(item, table.Name().Value(), ctx);
+ return BuildDqWrite(item, tableName, ctx);
});
return values;
});
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
index 74e69aed08..d82796cd86 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
@@ -221,7 +221,7 @@ private:
}
bool hasNonDeterministicFunctions = false;
- if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
+ if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
@@ -628,7 +628,7 @@ private:
}
bool hasNonDeterministicFunctions = false;
- if (const auto status = PeepHoleOptimizeBeforeExec<false>(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
+ if (const auto status = PeepHoleOptimizeBeforeExec(optimizedNode, optimizedNode, State_, hasNonDeterministicFunctions, ctx); status.Level != TStatus::Ok) {
return SyncStatus(status);
}
@@ -685,50 +685,100 @@ private:
auto clusterStr = TString{cluster.Value()};
delegatedNode = input->ChildPtr(TYtDqProcessWrite::idx_Input);
- if (const auto status = SubstTables(delegatedNode, State_, false, ctx); status.Level == TStatus::Error) {
- return SyncStatus(status);
+
+ auto server = State_->Gateway->GetClusterServer(clusterStr);
+ YQL_ENSURE(server, "Invalid YT cluster: " << clusterStr);
+
+ NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, clusterStr, tmpTable.Name().StringValue(), tmpFolder);
+ realTable.Append(true);
+ YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId");
+
+ NYT::TNode writerOptions = NYT::TNode::CreateMap();
+ if (auto maxRowWeight = config->MaxRowWeight.Get(clusterStr)) {
+ writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight->GetValue());
+ }
+
+ NYT::TNode outSpec;
+ NYT::TNode type;
+ {
+ auto rowSpec = TYqlRowSpecInfo(tmpTable.RowSpec());
+ NYT::TNode spec;
+ rowSpec.FillCodecNode(spec[YqlRowSpecAttribute]);
+ outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec));
+ type = rowSpec.GetTypeNode();
}
- bool hasNonDeterministicFunctions = false;
- TYtExtraPeepHoleSettings settings;
- settings.CurrentCluster = clusterStr;
- settings.TmpTable = &tmpTable;
- settings.TmpFolder = tmpFolder;
- settings.Config = config;
- if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_,
- hasNonDeterministicFunctions, ctx, settings); status.Level == TStatus::Error) {
- return SyncStatus(status);
+
+ // These settings will be passed to YT peephole callback from DQ
+ auto settings = Build<TCoNameValueTupleList>(ctx, delegatedNode->Pos())
+ .Add()
+ .Name().Value("yt_cluster", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(clusterStr).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_server", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(server).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_table", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_tableName", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(tmpTable.Name().Value()).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_tableType", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(type)).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_writeOptions", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_outSpec", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build()
+ .Build()
+ .Add()
+ .Name().Value("yt_tx", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build()
+ .Build()
+ .Done().Ptr();
+
+ auto atomType = ctx.MakeType<TUnitExprType>();
+
+ for (auto child: settings->Children()) {
+ child->Child(0)->SetTypeAnn(atomType);
+ child->Child(0)->SetState(TExprNode::EState::ConstrComplete);
+ child->Child(1)->SetTypeAnn(atomType);
+ child->Child(1)->SetState(TExprNode::EState::ConstrComplete);
}
delegatedNode = Build<TPull>(ctx, delegatedNode->Pos())
.Input(std::move(delegatedNode))
.BytesLimit()
.Value(TString())
- .Build()
+ .Build()
.RowsLimit()
.Value(0U)
- .Build()
+ .Build()
.FormatDetails()
.Value(ui32(NYson::EYsonFormat::Binary))
- .Build()
- .Settings()
- .Build()
+ .Build()
+ .Settings(settings)
.Format()
.Value(0U)
- .Build()
+ .Build()
.PublicId()
.Value(ToString(State_->Types->TranslateOperationId(input->UniqueId())))
- .Build()
+ .Build()
.Discard()
.Value(ToString(true), TNodeFlags::Default)
- .Build()
+ .Build()
.Origin(input)
- .Done()
- .Ptr();
-
- auto atomType = ctx.MakeType<TUnitExprType>();
+ .Done().Ptr();
for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails,
- TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) {
+ TResOrPullBase::idx_Settings, TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) {
delegatedNode->Child(idx)->SetTypeAnn(atomType);
delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete);
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
index 923b095417..cadcd548f0 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasource_exec.cpp
@@ -135,7 +135,7 @@ protected:
}
bool hasNonDeterministicFunctions = false;
- if (const auto status = PeepHoleOptimizeBeforeExec<true>(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) {
+ if (const auto status = PeepHoleOptimizeBeforeExec(optimizedInput, optimizedInput, State_, hasNonDeterministicFunctions, ctx); status.Level != IGraphTransformer::TStatus::Ok) {
return SyncStatus(status);
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
index 7f79548d85..c6762e7b1d 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
@@ -143,25 +143,21 @@ private:
.Body<TDqWrite>()
.Input(CloneCompleteFlow(fill.Content().Body().Ptr(), ctx))
.Provider().Value(YtProviderName).Build()
- .Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Value("table").Build()
- .Value(fill.Output().Item(0)).Build()
- .Build()
- .Build()
+ .Settings<TCoNameValueTupleList>().Build()
.Build()
- .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos()))
.Build()
- .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
+ .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, fill.Pos()))
.Build()
- .ColumnHints().Build()
+ .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
.Build()
- .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .ColumnHints().Build()
.Build()
+ .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .Build()
.Second<TYtFill>()
.InitFrom(fill)
.Settings(NYql::AddSetting(fill.Settings().Ref(), EYtSettingType::NoDq, {}, ctx))
- .Build()
+ .Build()
.Done();
}
}
@@ -248,21 +244,17 @@ private:
.Body<TDqWrite>()
.Input(std::move(work))
.Provider().Value(YtProviderName).Build()
- .Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Value("table", TNodeFlags::Default).Build()
- .Value(sort.Output().Item(0)).Build()
- .Build()
- .Build()
+ .Settings<TCoNameValueTupleList>().Build()
.Build()
- .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos()))
.Build()
- .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
+ .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, sort.Pos()))
.Build()
- .ColumnHints().Build()
+ .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
.Build()
- .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .ColumnHints().Build()
.Build()
+ .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .Build()
.Second(std::move(operation))
.Done();
}
@@ -381,21 +373,17 @@ private:
.DataSource<TYtDSource>()
.Category(map.DataSink().Category())
.Cluster(map.DataSink().Cluster())
- .Build()
- .Input(map.Input())
.Build()
- .Settings(std::move(settings))
+ .Input(map.Input())
.Build()
+ .Settings(std::move(settings))
.Build()
.Build()
- .Provider().Value(YtProviderName).Build()
- .Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Value("table", TNodeFlags::Default).Build()
- .Value(map.Output().Item(0)).Build()
- .Build()
.Build()
+ .Provider().Value(YtProviderName).Build()
+ .Settings<TCoNameValueTupleList>().Build()
.Build()
+ .Build()
.Settings(TDqStageSettings{.PartitionMode = ordered ? TDqStageSettings::EPartitionMode::Single : TDqStageSettings::EPartitionMode::Default}.BuildNode(ctx, map.Pos()))
.Done();
@@ -599,27 +587,23 @@ private:
.SortDirections(std::move(sortDirs))
.SortKeySelectorLambda(std::move(sortKeys))
.ListHandlerLambda(std::move(reducer))
- .Build()
- .Provider().Value(YtProviderName).Build()
- .template Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Value("table", TNodeFlags::Default).Build()
- .Value(reduce.Output().Item(0)).Build()
- .Build()
.Build()
+ .Provider().Value(YtProviderName).Build()
+ .template Settings<TCoNameValueTupleList>().Build()
.Build()
- .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos()))
.Build()
- .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
+ .Settings(TDqStageSettings{.PartitionMode = TDqStageSettings::EPartitionMode::Single}.BuildNode(ctx, reduce.Pos()))
.Build()
- .ColumnHints().Build()
+ .Index().Build(ctx.GetIndexAsString(0), TNodeFlags::Default)
.Build()
- .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .ColumnHints().Build()
.Build()
+ .Flags().Add().Build("FallbackOnError", TNodeFlags::Default).Build()
+ .Build()
.template Second<TYtOperation>()
.InitFrom(reduce)
.Settings(NYql::AddSetting(reduce.Settings().Ref(), EYtSettingType::NoDq, {}, ctx))
- .Build()
+ .Build()
.Done();
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index 6bded487cc..d1cd8aa554 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -3,6 +3,7 @@
#include "yql_yt_mkql_compiler.h"
#include "yql_yt_helpers.h"
#include "yql_yt_op_settings.h"
+#include "yql_yt_provider_impl.h"
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
@@ -18,6 +19,7 @@
#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/utils/log/log.h>
#include <yt/cpp/mapreduce/common/helpers.h>
@@ -667,6 +669,34 @@ public:
NYql::WriteTableReference(writer, YtProviderName, cluster.AsString(), refName.AsString(), true, columns);
}
+ virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& providerParams, TTransformationPipeline* pipeline) override {
+ if (!beforeDqTransforms) {
+ return;
+ }
+
+ auto state = TYtState::TPtr(State_);
+ pipeline->Add(CreateFunctorTransformer([state](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
+ return OptimizeExpr(input, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ if (TYtReadTable::Match(node.Get()) && !node->Head().IsWorld()) {
+ YQL_CLOG(INFO, ProviderYt) << "Peephole-YtTrimWorld";
+ return ctx.ChangeChild(*node, 0, ctx.NewWorld(node->Pos()));
+ }
+ return node;
+ }, ctx, TOptimizeExprSettings{state->Types});
+ }), "YtTrimWorld", TIssuesIds::DEFAULT_ERROR);
+
+ pipeline->Add(CreateSinglePassFunctorTransformer([state, providerParams](TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
+ output = input;
+ auto status = SubstTables(output, state, true, ctx);
+ if (status.Level != IGraphTransformer::TStatus::Error && input != output) {
+ YQL_CLOG(INFO, ProviderYt) << "Peephole-YtSubstTables";
+ }
+ return status;
+ }), "YtSubstTables", TIssuesIds::DEFAULT_ERROR);
+
+ pipeline->Add(CreateYtPeepholeTransformer(TYtState::TPtr(State_), providerParams), "YtPeepHole", TIssuesIds::DEFAULT_ERROR);
+ }
+
private:
TYtState* State_;
};
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
index ea1babb516..d26bceaf3e 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
@@ -631,51 +631,35 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
}
}
-template<bool WithWideFlow>
struct TPeepholePipelineConfigurator : public IPipelineConfigurator {
- TPeepholePipelineConfigurator(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings)
- : State_(std::move(state))
- , Settings_(settings)
+ TPeepholePipelineConfigurator(TYtState::TPtr state)
+ : State_(std::move(state))
{}
private:
void AfterCreate(TTransformationPipeline*) const final {}
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
- pipeline->Add(CreateTYtPeepholeTransformer(State_, Settings_), "Peephole");
- if constexpr (WithWideFlow) {
- pipeline->Add(CreateTYtWideFlowTransformer(State_), "WideFlow");
- }
+ pipeline->Add(CreateYtPeepholeTransformer(State_, {}), "Peephole");
+ pipeline->Add(CreateYtWideFlowTransformer(State_), "WideFlow");
}
void AfterOptimize(TTransformationPipeline*) const final {}
const TYtState::TPtr State_;
- const TYtExtraPeepHoleSettings Settings_;
};
-template<bool ForNativeExecution>
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx,
- const TYtExtraPeepHoleSettings& settings)
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx)
{
- if constexpr (ForNativeExecution) {
- if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx);
- status.Level != IGraphTransformer::TStatus::Ok) {
- return status;
- }
+ if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx);
+ status.Level != IGraphTransformer::TStatus::Ok) {
+ return status;
}
- const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state, settings);
+ const TPeepholePipelineConfigurator wideFlowTransformers(state);
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &wideFlowTransformers;
return PeepHoleOptimizeNode(output, output, ctx, *state->Types, nullptr, hasNonDeterministicFunctions, peepholeSettings);
}
-template
-IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<true>(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings);
-
-template
-IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<false>(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings);
} // NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
index adb45c6196..770b5a4fc5 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
@@ -31,9 +31,7 @@ TExprNode::TPtr OptimizeReadWithSettings(const TExprNode::TPtr& node, bool allow
IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr& input, TExprNode::TPtr& output,
const TYtState::TPtr& state, TExprContext& ctx);
-template<bool ForNativeExecution>
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx,
- const TYtExtraPeepHoleSettings& settings = {});
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx);
} //NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
index 26c2751593..04c6aaf120 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
@@ -19,7 +19,7 @@ namespace {
class TYtPeepholeTransformer : public TOptimizeTransformerBase {
public:
- TYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings)
+ TYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings)
: TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {})
, State_(state)
, Settings_(settings)
@@ -29,6 +29,7 @@ public:
AddHandler(0, &TYtDqWideWrite::Match, HNDL(OptimizeYtDqWideWrite));
#undef HNDL
}
+
private:
TMaybeNode<TExprBase> OptimizeLength(TExprBase node, TExprContext& ctx) {
std::optional<size_t> lengthRes;
@@ -59,41 +60,34 @@ private:
}
TMaybeNode<TExprBase> OptimizeYtDqWideWrite(TExprBase node, TExprContext& ctx) {
- if (!Settings_.TmpTable) {
+ if (Settings_.empty()) {
return node;
}
- if (GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "outTable")) {
+ auto cluster = Settings_.at("yt_cluster");
+ auto server = Settings_.at("yt_server");
+ auto table = Settings_.at("yt_table");
+ auto tableName = Settings_.at("yt_tableName");
+ auto tableType = Settings_.at("yt_tableType");
+ auto writeOptions = Settings_.at("yt_writeOptions");
+ auto outSpec = Settings_.at("yt_outSpec");
+ auto tx = Settings_.at("yt_tx");
+
+ // Check that we optimize only single YtDqWideWrite
+ if (auto setting = GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "table")) {
+ YQL_ENSURE(setting->Child(1)->Content() == table);
return node;
}
- auto server = State_->Gateway->GetClusterServer(Settings_.CurrentCluster);
- YQL_ENSURE(server, "Invalid YT cluster: " << Settings_.CurrentCluster);
-
- NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, Settings_.CurrentCluster,
- Settings_.TmpTable->Name().StringValue(), Settings_.TmpFolder);
- realTable.Append(true);
- YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId");
-
- NYT::TNode spec;
- TYqlRowSpecInfo(Settings_.TmpTable->RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]);
- NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec));
- NYT::TNode writerOptions = NYT::TNode::CreateMap();
-
- if (Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)) {
- auto maxRowWeight = Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)->GetValue();
- writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight);
- }
-
TMaybeNode<TCoSecureParam> secParams;
if (State_->Configuration->Auth.Get().GetOrElse(TString())) {
- secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(Settings_.CurrentCluster)).Done();
+ secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(cluster)).Done();
}
auto settings = Build<TCoNameValueTupleList>(ctx, node.Pos())
.Add()
.Name().Value("table", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build()
+ .Value<TCoAtom>().Value(table).Build()
.Build()
.Add()
.Name().Value("server", TNodeFlags::Default).Build()
@@ -101,23 +95,27 @@ private:
.Build()
.Add()
.Name().Value("outSpec", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build()
+ .Value<TCoAtom>().Value(outSpec).Build()
.Build()
.Add()
.Name().Value("secureParams", TNodeFlags::Default).Build()
.Value(secParams)
.Build()
.Add()
- .Name().Value("tx", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build()
+ .Name().Value("writerOptions", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(writeOptions).Build()
.Build()
.Add()
- .Name().Value("outTable", TNodeFlags::Default).Build()
- .Value(*Settings_.TmpTable)
+ .Name().Value("tx", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(tx).Build()
.Build()
- .Add()
- .Name().Value("writerOptions", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build()
+ .Add() // yt_file gateway specific
+ .Name().Value("tableName", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(tableName).Build()
+ .Build()
+ .Add() // yt_file gateway specific
+ .Name().Value("tableType", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(tableType).Build()
.Build()
.Done().Ptr();
@@ -125,12 +123,12 @@ private:
}
const TYtState::TPtr State_;
- const TYtExtraPeepHoleSettings Settings_;
+ const THashMap<TString, TString> Settings_;
};
}
-THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) {
+THolder<IGraphTransformer> CreateYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings) {
return MakeHolder<TYtPeepholeTransformer>(std::move(state), settings);
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
index db66a94f69..04c63d7438 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
@@ -13,13 +13,6 @@
namespace NYql {
-struct TYtExtraPeepHoleSettings {
- TString CurrentCluster;
- const NNodes::TYtOutTable* TmpTable = nullptr;
- TString TmpFolder;
- TYtSettings::TConstPtr Config;
-};
-
THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtEpochTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtIntentDeterminationTransformer(TYtState::TPtr state);
@@ -39,14 +32,14 @@ THolder<IGraphTransformer> CreateYtDataSinkFinalizingTransformer(TYtState::TPtr
THolder<IGraphTransformer> CreateYtLogicalOptProposalTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtPhysicalOptProposalTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtPhysicalFinalizingTransformer(TYtState::TPtr state);
-THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings);
-THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state);
+THolder<IGraphTransformer> CreateYtPeepholeTransformer(TYtState::TPtr state, const THashMap<TString, TString>& settings);
+THolder<IGraphTransformer> CreateYtWideFlowTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtDqHybridTransformer(TYtState::TPtr state, THolder<IGraphTransformer>&& finalizer);
void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children);
TString MakeTableDisplayName(NNodes::TExprBase table, bool isOutput);
-
void ScanForUsedOutputTables(const TExprNode& input, TVector<TString>& usedNodeIds);
TString MakeUsedNodeId(const TString& cluster, const TString& table);
+
} // NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
index 6ae1d3c988..751506a4bb 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_wide_flow.cpp
@@ -249,7 +249,7 @@ private:
}
-THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state) {
+THolder<IGraphTransformer> CreateYtWideFlowTransformer(TYtState::TPtr state) {
return MakeHolder<TYtWideFlowTransformer>(std::move(state));
}
diff --git a/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json
index d9db89603a..518da5d6ea 100644
--- a/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part0/canondata/result.json
@@ -1373,9 +1373,9 @@
],
"test.test[join-mergejoin_force_align3-off-Results]": [
{
- "checksum": "4b243c5d9d02571857aa516815a1f210",
- "size": 251,
- "uri": "https://{canondata_backend}/937458/e5719cd256fe3fd898e8ebe6df280521ffd29040/resource.tar.gz#test.test_join-mergejoin_force_align3-off-Results_/results.txt"
+ "checksum": "f1c5f5e2353f012207188c4e915487df",
+ "size": 284,
+ "uri": "https://{canondata_backend}/1031349/25c52e56e8eeb1d879e04b5b9efcde6fcf2efd65/resource.tar.gz#test.test_join-mergejoin_force_align3-off-Results_/results.txt"
}
],
"test.test[join-mergejoin_force_per_link-off-Analyze]": [
@@ -1582,9 +1582,9 @@
],
"test.test[join-yql-14829_left-off-Results]": [
{
- "checksum": "096f2ba7e9b6515b3fee775695a56a2b",
- "size": 108,
- "uri": "https://{canondata_backend}/937458/e5719cd256fe3fd898e8ebe6df280521ffd29040/resource.tar.gz#test.test_join-yql-14829_left-off-Results_/results.txt"
+ "checksum": "ab98e1c5a018c46596c65b0352abfa13",
+ "size": 117,
+ "uri": "https://{canondata_backend}/1031349/25c52e56e8eeb1d879e04b5b9efcde6fcf2efd65/resource.tar.gz#test.test_join-yql-14829_left-off-Results_/results.txt"
}
],
"test.test[join-yql-6297-off-Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json
index bfd7c58825..33e84c3795 100644
--- a/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part10/canondata/result.json
@@ -1287,9 +1287,9 @@
],
"test.test[join-mergejoin_force_align1-off-Results]": [
{
- "checksum": "d415ed6f342f69a608715115c099eff5",
- "size": 288,
- "uri": "https://{canondata_backend}/1600758/32cfdeb8c6377a2e7e62c6c4adbb95f25af7669b/resource.tar.gz#test.test_join-mergejoin_force_align1-off-Results_/results.txt"
+ "checksum": "14254ebf1a0a6dffad13177f0cfe847f",
+ "size": 331,
+ "uri": "https://{canondata_backend}/1031349/b0c48c4d800c80c9736c376a51869bb4a7f5b31b/resource.tar.gz#test.test_join-mergejoin_force_align1-off-Results_/results.txt"
}
],
"test.test[join-pullup_left--Analyze]": [
@@ -1462,9 +1462,9 @@
],
"test.test[join-yql-14829_leftonly-off-Results]": [
{
- "checksum": "096f2ba7e9b6515b3fee775695a56a2b",
- "size": 108,
- "uri": "https://{canondata_backend}/1600758/32cfdeb8c6377a2e7e62c6c4adbb95f25af7669b/resource.tar.gz#test.test_join-yql-14829_leftonly-off-Results_/results.txt"
+ "checksum": "ab98e1c5a018c46596c65b0352abfa13",
+ "size": 117,
+ "uri": "https://{canondata_backend}/1031349/b0c48c4d800c80c9736c376a51869bb4a7f5b31b/resource.tar.gz#test.test_join-yql-14829_leftonly-off-Results_/results.txt"
}
],
"test.test[join-yql-4275-off-Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
index 4147ed487e..d813f3e4b3 100644
--- a/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part15/canondata/result.json
@@ -1350,9 +1350,9 @@
],
"test.test[join-premap_merge_with_remap-off-Results]": [
{
- "checksum": "2be66749732358ddfd9a8ef30ec27518",
- "size": 471,
- "uri": "https://{canondata_backend}/1600758/aad142702907f13e911494c1a7b312bad34f692a/resource.tar.gz#test.test_join-premap_merge_with_remap-off-Results_/results.txt"
+ "checksum": "114e220ddf7ec64e2faedb36603ea6b1",
+ "size": 511,
+ "uri": "https://{canondata_backend}/1942415/46b4dec49c8d2a38bf8ce01066b72bfee9407ffe/resource.tar.gz#test.test_join-premap_merge_with_remap-off-Results_/results.txt"
}
],
"test.test[join-pushdown_filter_over_inner_with_assume_strict-off-Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json
index 6de618ad86..56aeeb866e 100644
--- a/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part17/canondata/result.json
@@ -1125,9 +1125,9 @@
],
"test.test[join-mapjoin_opt_vs_2xopt-off-Results]": [
{
- "checksum": "1856c162083394044ab481baa6091e01",
- "size": 125,
- "uri": "https://{canondata_backend}/1936273/7c78e1e45ae282daee686c006624daa21a7c6ca6/resource.tar.gz#test.test_join-mapjoin_opt_vs_2xopt-off-Results_/results.txt"
+ "checksum": "5e4a32e9e65c7d5a5f4b7022f1c7404b",
+ "size": 144,
+ "uri": "https://{canondata_backend}/937458/15884872d0218004c3baba7fcce722819604019c/resource.tar.gz#test.test_join-mapjoin_opt_vs_2xopt-off-Results_/results.txt"
}
],
"test.test[join-mapjoin_partial_uniq_keys--Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json
index 0be68ec43d..f0331340d0 100644
--- a/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part4/canondata/result.json
@@ -1157,9 +1157,9 @@
],
"test.test[join-lookupjoin_with_cache-off-Results]": [
{
- "checksum": "a5a7e398dc30bcb1eb092ef6ee93b65d",
- "size": 264,
- "uri": "https://{canondata_backend}/1900335/a5a16b7313d07b162a608c1abeab1e68e6175117/resource.tar.gz#test.test_join-lookupjoin_with_cache-off-Results_/results.txt"
+ "checksum": "4a5a14ca40382e201b3d29c142c0bbdc",
+ "size": 377,
+ "uri": "https://{canondata_backend}/1942415/2c7b7e06b3aea4ad2e9b20b79598f6f99d82c3f5/resource.tar.gz#test.test_join-lookupjoin_with_cache-off-Results_/results.txt"
}
],
"test.test[join-mapjoin_dup_key--Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
index f0821ccab3..8fbdab9ad8 100644
--- a/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part6/canondata/result.json
@@ -1158,9 +1158,9 @@
],
"test.test[join-anyjoin_common_nodata_keys-off-Results]": [
{
- "checksum": "21d3c1e962a2f1041937d4db4d1b690a",
- "size": 453,
- "uri": "https://{canondata_backend}/1871002/b59ed2ad938015ca28be6d459030014e4b6ff1ea/resource.tar.gz#test.test_join-anyjoin_common_nodata_keys-off-Results_/results.txt"
+ "checksum": "6572671713355db88373ea3e154424d3",
+ "size": 529,
+ "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-anyjoin_common_nodata_keys-off-Results_/results.txt"
}
],
"test.test[join-equi_join_three_simple--Analyze]": [
@@ -1452,9 +1452,9 @@
],
"test.test[join-mapjoin_with_anonymous-off-Results]": [
{
- "checksum": "f739adcd538e5ea310062e8784476dd9",
- "size": 408,
- "uri": "https://{canondata_backend}/1924537/40c66e62107c2a9e3733dec809479087bdd8f6d6/resource.tar.gz#test.test_join-mapjoin_with_anonymous-off-Results_/results.txt"
+ "checksum": "b06c92f700fb33d968deedcec51c32f3",
+ "size": 440,
+ "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-mapjoin_with_anonymous-off-Results_/results.txt"
}
],
"test.test[join-premap_map_inner--Analyze]": [
@@ -1561,9 +1561,9 @@
],
"test.test[join-yql-8125-off-Results]": [
{
- "checksum": "660387db59c42b1e4e85f016a6d434db",
- "size": 157,
- "uri": "https://{canondata_backend}/1871002/b59ed2ad938015ca28be6d459030014e4b6ff1ea/resource.tar.gz#test.test_join-yql-8125-off-Results_/results.txt"
+ "checksum": "3bdb5911a299f9ea9446297b8d1fbb9c",
+ "size": 394,
+ "uri": "https://{canondata_backend}/1936273/edfdc418bcbd1386a4a94dc31aee088a78a549f4/resource.tar.gz#test.test_join-yql-8125-off-Results_/results.txt"
}
],
"test.test[key_filter-contains-default.txt-Analyze]": [
diff --git a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
index 42c9dadd82..d04f000e7a 100644
--- a/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
+++ b/ydb/library/yql/tests/sql/dq_file/part9/canondata/result.json
@@ -1224,9 +1224,9 @@
],
"test.test[join-mergejoin_force_align2-off-Results]": [
{
- "checksum": "6a1f85c59016769b85dd5ce2d5b686c9",
- "size": 300,
- "uri": "https://{canondata_backend}/1936273/1ba42e2c47cd3429011228159c1fdf43dd1881b7/resource.tar.gz#test.test_join-mergejoin_force_align2-off-Results_/results.txt"
+ "checksum": "360d932ebffae959c40b57a204bb60a5",
+ "size": 339,
+ "uri": "https://{canondata_backend}/1599023/753be0e0c576efbfbb51452c05363488f2aa1c64/resource.tar.gz#test.test_join-mergejoin_force_align2-off-Results_/results.txt"
}
],
"test.test[join-nopushdown_filter_with_depends_on-off-Analyze]": [
diff --git a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp
index 0239e00f6c..ebf67df741 100644
--- a/ydb/library/yql/tools/yqlrun/http/yql_server.cpp
+++ b/ydb/library/yql/tools/yqlrun/http/yql_server.cpp
@@ -104,7 +104,7 @@ public:
}
void AfterOptimize(TTransformationPipeline* pipeline) const final {
- pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow");
+ pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow");
pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
}
};
diff --git a/ydb/library/yql/tools/yqlrun/yqlrun.cpp b/ydb/library/yql/tools/yqlrun/yqlrun.cpp
index e56437e8ca..3fce3989cd 100644
--- a/ydb/library/yql/tools/yqlrun/yqlrun.cpp
+++ b/ydb/library/yql/tools/yqlrun/yqlrun.cpp
@@ -251,7 +251,7 @@ public:
}
void AfterOptimize(TTransformationPipeline* pipeline) const final {
- pipeline->Add(CreateTYtWideFlowTransformer(nullptr), "WideFlow");
+ pipeline->Add(CreateYtWideFlowTransformer(nullptr), "WideFlow");
pipeline->Add(MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
}
};