aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-10-06 14:14:45 +0300
committervvvv <vvvv@ydb.tech>2023-10-06 15:22:57 +0300
commite1e4812f5d91f107a24690baceea830a45739062 (patch)
tree588e030e156fada3811c3ef2187d4c1358620745
parentbeef0fafbcd1e3d89f7dfbbc59476cc3c885b179 (diff)
downloadydb-e1e4812f5d91f107a24690baceea830a45739062.tar.gz
YQL-16809 fix inplace rewrite
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp81
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp17
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_optimize.h4
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp82
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h10
5 files changed, 108 insertions, 86 deletions
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
index 6e3984841e6..617a7197708 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
@@ -670,85 +670,18 @@ private:
const auto tmpFolder = GetTablesTmpFolder(*config);
auto clusterStr = TString{cluster.Value()};
- TMaybeNode<TCoSecureParam> secParams;
- if (State_->Configuration->Auth.Get().GetOrElse(TString())) {
- secParams = Build<TCoSecureParam>(ctx, op.Ref().Pos()).Name().Build(TString("cluster:default_").append(clusterStr)).Done();
- }
-
- VisitExpr(input, [&](const TExprNode::TPtr& node) {
- if (auto maybeWrite = TMaybeNode<TYtDqWideWrite>(node)) {
- auto server = State_->Gateway->GetClusterServer(clusterStr);
- YQL_ENSURE(server, "Invalid YT cluster: " << clusterStr);
-
- NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, clusterStr, tmpTable.Name().StringValue(), tmpFolder);
- realTable.Append(true);
- YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId");
-
- NYT::TNode spec;
- TYqlRowSpecInfo(tmpTable.RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]);
- NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec));
- NYT::TNode writerOptions = NYT::TNode::CreateMap();
-
- if (config->MaxRowWeight.Get(clusterStr)) {
- auto maxRowWeight = config->MaxRowWeight.Get(clusterStr)->GetValue();
- writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight);
- }
-
- auto settings = Build<TCoNameValueTupleList>(ctx, node->Pos())
- .Add()
- .Name().Value("table", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build()
- .Build()
- .Add()
- .Name().Value("server", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(server).Build()
- .Build()
- .Add()
- .Name().Value("outSpec", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build()
- .Build()
- .Add()
- .Name().Value("secureParams", TNodeFlags::Default).Build()
- .Value(secParams)
- .Build()
- .Add()
- .Name().Value("tx", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build()
- .Build()
- .Add()
- .Name().Value("outTable", TNodeFlags::Default).Build()
- .Value(tmpTable)
- .Build()
- .Add()
- .Name().Value("writerOptions", TNodeFlags::Default).Build()
- .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build()
- .Build()
- .Done().Ptr();
-
- node->ChildRef(TYtDqWideWrite::idx_Settings) = settings;
-
- auto atomType = ctx.MakeType<TUnitExprType>();
-
- VisitExpr(node->ChildRef(TYtDqWideWrite::idx_Settings), [&atomType](const TExprNode::TPtr& tupleNode) {
- if (tupleNode->GetTypeAnn() == nullptr) {
- tupleNode->SetTypeAnn(atomType);
- tupleNode->SetState(TExprNode::EState::ConstrComplete);
- }
- return true;
- });
-
- return false;
- }
-
- return true;
- });
-
delegatedNode = input->ChildPtr(TYtDqProcessWrite::idx_Input);
if (const auto status = SubstTables(delegatedNode, State_, false, ctx); status.Level == TStatus::Error) {
return SyncStatus(status);
}
bool hasNonDeterministicFunctions = false;
- if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_, hasNonDeterministicFunctions, ctx); status.Level == TStatus::Error) {
+ TYtExtraPeepHoleSettings settings;
+ settings.CurrentCluster = clusterStr;
+ settings.TmpTable = &tmpTable;
+ settings.TmpFolder = tmpFolder;
+ settings.Config = config;
+ if (const auto status = PeepHoleOptimizeBeforeExec<false>(delegatedNode, delegatedNode, State_,
+ hasNonDeterministicFunctions, ctx, settings); status.Level == TStatus::Error) {
return SyncStatus(status);
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
index a77f046af4f..ea1babb5165 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.cpp
@@ -633,12 +633,15 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
template<bool WithWideFlow>
struct TPeepholePipelineConfigurator : public IPipelineConfigurator {
- TPeepholePipelineConfigurator(TYtState::TPtr state): State_(std::move(state)) {}
+ TPeepholePipelineConfigurator(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings)
+ : State_(std::move(state))
+ , Settings_(settings)
+ {}
private:
void AfterCreate(TTransformationPipeline*) const final {}
void AfterTypeAnnotation(TTransformationPipeline* pipeline) const final {
- pipeline->Add(CreateTYtPeepholeTransformer(State_), "Peephole");
+ pipeline->Add(CreateTYtPeepholeTransformer(State_, Settings_), "Peephole");
if constexpr (WithWideFlow) {
pipeline->Add(CreateTYtWideFlowTransformer(State_), "WideFlow");
}
@@ -647,11 +650,13 @@ private:
void AfterOptimize(TTransformationPipeline*) const final {}
const TYtState::TPtr State_;
+ const TYtExtraPeepHoleSettings Settings_;
};
template<bool ForNativeExecution>
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx)
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx,
+ const TYtExtraPeepHoleSettings& settings)
{
if constexpr (ForNativeExecution) {
if (const auto status = UpdateTableContentMemoryUsage(input, output, state, ctx);
@@ -660,7 +665,7 @@ IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TEx
}
}
- const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state);
+ const TPeepholePipelineConfigurator<ForNativeExecution> wideFlowTransformers(state, settings);
TPeepholeSettings peepholeSettings;
peepholeSettings.CommonConfig = &wideFlowTransformers;
return PeepHoleOptimizeNode(output, output, ctx, *state->Types, nullptr, hasNonDeterministicFunctions, peepholeSettings);
@@ -668,9 +673,9 @@ IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TEx
template
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<true>(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx);
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings);
template
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec<false>(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx);
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx, const TYtExtraPeepHoleSettings& settings);
} // NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
index b7d3a29e550..adb45c6196b 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_optimize.h
@@ -1,6 +1,7 @@
#pragma once
#include "yql_yt_provider.h"
+#include "yql_yt_provider_impl.h"
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/yt/lib/mkql_helpers/mkql_helpers.h>
@@ -32,6 +33,7 @@ IGraphTransformer::TStatus UpdateTableContentMemoryUsage(const TExprNode::TPtr&
template<bool ForNativeExecution>
IGraphTransformer::TStatus PeepHoleOptimizeBeforeExec(TExprNode::TPtr input, TExprNode::TPtr& output,
- const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx);
+ const TYtState::TPtr& state, bool& hasNonDeterministicFunctions, TExprContext& ctx,
+ const TYtExtraPeepHoleSettings& settings = {});
} //NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
index d405a27059e..26c27515936 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp
@@ -6,6 +6,10 @@
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
+#include <ydb/library/yql/providers/yt/common/yql_names.h>
+
+#include <library/cpp/yson/node/node_io.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
namespace NYql {
@@ -15,11 +19,14 @@ namespace {
class TYtPeepholeTransformer : public TOptimizeTransformerBase {
public:
- TYtPeepholeTransformer(TYtState::TPtr state)
- : TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {}), State_(state)
+ TYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings)
+ : TOptimizeTransformerBase(state ? state->Types : nullptr, NLog::EComponent::ProviderYt, {})
+ , State_(state)
+ , Settings_(settings)
{
#define HNDL(name) "Peephole-"#name, Hndl(&TYtPeepholeTransformer::name)
AddHandler(0, &TYtLength::Match, HNDL(OptimizeLength));
+ AddHandler(0, &TYtDqWideWrite::Match, HNDL(OptimizeYtDqWideWrite));
#undef HNDL
}
private:
@@ -51,13 +58,80 @@ private:
return node;
}
+ TMaybeNode<TExprBase> OptimizeYtDqWideWrite(TExprBase node, TExprContext& ctx) {
+ if (!Settings_.TmpTable) {
+ return node;
+ }
+
+ if (GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "outTable")) {
+ return node;
+ }
+
+ auto server = State_->Gateway->GetClusterServer(Settings_.CurrentCluster);
+ YQL_ENSURE(server, "Invalid YT cluster: " << Settings_.CurrentCluster);
+
+ NYT::TRichYPath realTable = State_->Gateway->GetWriteTable(State_->SessionId, Settings_.CurrentCluster,
+ Settings_.TmpTable->Name().StringValue(), Settings_.TmpFolder);
+ realTable.Append(true);
+ YQL_ENSURE(realTable.TransactionId_.Defined(), "Expected TransactionId");
+
+ NYT::TNode spec;
+ TYqlRowSpecInfo(Settings_.TmpTable->RowSpec()).FillCodecNode(spec[YqlRowSpecAttribute]);
+ NYT::TNode outSpec = NYT::TNode::CreateMap()(TString{YqlIOSpecTables}, NYT::TNode::CreateList().Add(spec));
+ NYT::TNode writerOptions = NYT::TNode::CreateMap();
+
+ if (Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)) {
+ auto maxRowWeight = Settings_.Config->MaxRowWeight.Get(Settings_.CurrentCluster)->GetValue();
+ writerOptions["max_row_weight"] = static_cast<i64>(maxRowWeight);
+ }
+
+ TMaybeNode<TCoSecureParam> secParams;
+ if (State_->Configuration->Auth.Get().GetOrElse(TString())) {
+ secParams = Build<TCoSecureParam>(ctx, node.Pos()).Name().Build(TString("cluster:default_").append(Settings_.CurrentCluster)).Done();
+ }
+
+ auto settings = Build<TCoNameValueTupleList>(ctx, node.Pos())
+ .Add()
+ .Name().Value("table", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(NYT::PathToNode(realTable))).Build()
+ .Build()
+ .Add()
+ .Name().Value("server", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(server).Build()
+ .Build()
+ .Add()
+ .Name().Value("outSpec", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(outSpec)).Build()
+ .Build()
+ .Add()
+ .Name().Value("secureParams", TNodeFlags::Default).Build()
+ .Value(secParams)
+ .Build()
+ .Add()
+ .Name().Value("tx", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(GetGuidAsString(*realTable.TransactionId_), TNodeFlags::Default).Build()
+ .Build()
+ .Add()
+ .Name().Value("outTable", TNodeFlags::Default).Build()
+ .Value(*Settings_.TmpTable)
+ .Build()
+ .Add()
+ .Name().Value("writerOptions", TNodeFlags::Default).Build()
+ .Value<TCoAtom>().Value(NYT::NodeToYsonString(writerOptions)).Build()
+ .Build()
+ .Done().Ptr();
+
+ return ctx.ChangeChild(node.Ref(), TYtDqWideWrite::idx_Settings, std::move(settings));
+ }
+
const TYtState::TPtr State_;
+ const TYtExtraPeepHoleSettings Settings_;
};
}
-THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state) {
- return MakeHolder<TYtPeepholeTransformer>(std::move(state));
+THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings) {
+ return MakeHolder<TYtPeepholeTransformer>(std::move(state), settings);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
index 1ea47774658..db66a94f691 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider_impl.h
@@ -7,11 +7,19 @@
#include <ydb/library/yql/core/yql_graph_transformer.h>
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
+#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <util/generic/ptr.h>
namespace NYql {
+struct TYtExtraPeepHoleSettings {
+ TString CurrentCluster;
+ const NNodes::TYtOutTable* TmpTable = nullptr;
+ TString TmpFolder;
+ TYtSettings::TConstPtr Config;
+};
+
THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtEpochTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtIntentDeterminationTransformer(TYtState::TPtr state);
@@ -31,7 +39,7 @@ THolder<IGraphTransformer> CreateYtDataSinkFinalizingTransformer(TYtState::TPtr
THolder<IGraphTransformer> CreateYtLogicalOptProposalTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtPhysicalOptProposalTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtPhysicalFinalizingTransformer(TYtState::TPtr state);
-THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state);
+THolder<IGraphTransformer> CreateTYtPeepholeTransformer(TYtState::TPtr state, const TYtExtraPeepHoleSettings& settings);
THolder<IGraphTransformer> CreateTYtWideFlowTransformer(TYtState::TPtr state);
THolder<IGraphTransformer> CreateYtDqHybridTransformer(TYtState::TPtr state, THolder<IGraphTransformer>&& finalizer);