summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <[email protected]>2023-09-07 11:12:21 +0300
committeruzhas <[email protected]>2023-09-07 11:37:49 +0300
commite62db3a873b031ef58121619e96a9dc1818893a4 (patch)
tree683aadf1e228e6aeb376ba0cdc96f5314949e37e
parentd97ab11bfc5749b3a623406c1d93dcea58998019 (diff)
simplify code
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp275
1 files changed, 7 insertions, 268 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index b07c6972e20..4ad5bfff7c8 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -17,42 +17,32 @@ namespace {
using namespace NNodes;
using namespace NDq;
-TExprNode::TPtr FindChild(const TExprNode& settings, TStringBuf name) {
- for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
- if (settings.Child(i)->Head().IsAtom(name)) {
- return settings.ChildPtr(i);
- }
- }
-
- return {};
-}
-
TExprNode::TPtr GetPartitionBy(const TExprNode& settings) {
- return FindChild(settings, "partitionedby"sv);
+ return GetSetting(settings, "partitionedby"sv);
}
TExprNode::TPtr GetCompression(const TExprNode& settings) {
- return FindChild(settings, "compression"sv);
+ return GetSetting(settings, "compression"sv);
}
TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) {
- return FindChild(settings, "csvdelimiter"sv);
+ return GetSetting(settings, "csvdelimiter"sv);
}
TExprNode::TPtr GetDateTimeFormatName(const TExprNode& settings) {
- return FindChild(settings, "data.datetime.formatname"sv);
+ return GetSetting(settings, "data.datetime.formatname"sv);
}
TExprNode::TPtr GetDateTimeFormat(const TExprNode& settings) {
- return FindChild(settings, "data.datetime.format"sv);
+ return GetSetting(settings, "data.datetime.format"sv);
}
TExprNode::TPtr GetTimestampFormatName(const TExprNode& settings) {
- return FindChild(settings, "data.timestamp.formatname"sv);
+ return GetSetting(settings, "data.timestamp.formatname"sv);
}
TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) {
- return FindChild(settings, "data.timestamp.format"sv);
+ return GetSetting(settings, "data.timestamp.format"sv);
}
TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
@@ -390,257 +380,6 @@ public:
}
}
- TMaybeNode<TExprBase> S3WriteObject1(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
- const auto& write = node.Cast<TS3WriteObject>();
- const auto& targetNode = write.Target();
- const auto& cluster = write.DataSink().Cluster().StringValue();
- const auto token = "cluster:default_" + cluster;
- const auto& settings = write.Target().Settings().Ref();
- auto partBy = GetPartitionBy(settings);
- auto keys = GetPartitionKeys(partBy);
-
- auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
- if (partBy)
- sinkSettingsBuilder.Add(std::move(partBy));
-
- auto compression = GetCompression(settings);
- const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv);
- if (compression)
- sinkSettingsBuilder.Add(std::move(compression));
-
- auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
- if (auto csvDelimiter = GetCsvDelimiter(settings)) {
- sinkOutputSettingsBuilder.Add(std::move(csvDelimiter));
- }
-
- bool hasDateTimeFormat = false;
- bool hasDateTimeFormatName = false;
- bool hasTimestampFormat = false;
- bool hasTimestampFormatName = false;
- if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) {
- sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName));
- hasDateTimeFormatName = true;
- }
-
- if (auto dateTimeFormat = GetDateTimeFormat(settings)) {
- sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat));
- hasDateTimeFormat = true;
- }
-
- if (auto timestampFormatName = GetTimestampFormatName(settings)) {
- sinkOutputSettingsBuilder.Add(std::move(timestampFormatName));
- hasTimestampFormatName = true;
- }
-
- if (auto timestampFormat = GetTimestampFormat(settings)) {
- sinkOutputSettingsBuilder.Add(std::move(timestampFormat));
- hasTimestampFormat = true;
- }
-
- if (!hasDateTimeFormat && !hasDateTimeFormatName) {
- TExprNode::TListType pair;
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.datetime.formatname"));
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX"));
- sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
- }
-
- if (!hasTimestampFormat && !hasTimestampFormatName) {
- TExprNode::TListType pair;
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.timestamp.formatname"));
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX"));
- sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
- }
-
- const TStringBuf format = targetNode.Format();
- if (format != "raw" && format != "json_list") { // multipart
- {
- TExprNode::TListType pair;
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart"));
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "true"));
- sinkSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
- }
- {
- TExprNode::TListType pair;
- pair.push_back(ctx.NewAtom(targetNode.Pos(), "file_size_limit"));
- size_t fileSize = 50_MB;
- if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) {
- fileSize = *maxObjectSize;
- }
- pair.push_back(ctx.NewAtom(targetNode.Pos(), ToString(fileSize)));
- sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
- }
- }
-
- if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
- YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
- return keys.empty() ?
- Build<TDqQuery>(ctx, write.Pos())
- .World(write.World())
- .SinkStages()
- .Add<TDqStage>()
- .Inputs().Build()
- .Program<TCoLambda>()
- .Args({})
- .Body<TS3SinkOutput>()
- .Input<TCoToFlow>()
- .Input(write.Input())
- .Build()
- .Format(write.Target().Format())
- .KeyColumns().Build()
- .Settings(sinkOutputSettingsBuilder.Done())
- .Build()
- .Build()
- .Outputs<TDqStageOutputsList>()
- .Add<TDqSink>()
- .DataSink(write.DataSink())
- .Index().Value("0").Build()
- .Settings<TS3SinkSettings>()
- .Path(write.Target().Path())
- .Settings(sinkSettingsBuilder.Done())
- .Token<TCoSecureParam>()
- .Name().Build(token)
- .Build()
- .Extension().Value(extension).Build()
- .Build()
- .Build()
- .Build()
- .Settings().Build()
- .Build()
- .Build()
- .Done():
- Build<TDqQuery>(ctx, write.Pos())
- .World(write.World())
- .SinkStages()
- .Add<TDqStage>()
- .Inputs()
- .Add<TDqCnHashShuffle>()
- .Output<TDqOutput>()
- .Stage<TDqStage>()
- .Inputs().Build()
- .Program<TCoLambda>()
- .Args({})
- .Body<TCoToFlow>()
- .Input(write.Input())
- .Build()
- .Build()
- .Settings().Build()
- .Build()
- .Index().Value("0", TNodeFlags::Default).Build()
- .Build()
- .KeyColumns().Add(keys).Build()
- .Build()
- .Build()
- .Program<TCoLambda>()
- .Args({"in"})
- .Body<TS3SinkOutput>()
- .Input("in")
- .Format(write.Target().Format())
- .KeyColumns().Add(keys).Build()
- .Settings(sinkOutputSettingsBuilder.Done())
- .Build()
- .Build()
- .Outputs<TDqStageOutputsList>()
- .Add<TDqSink>()
- .DataSink(write.DataSink())
- .Index().Value("0", TNodeFlags::Default).Build()
- .Settings<TS3SinkSettings>()
- .Path(write.Target().Path())
- .Settings(sinkSettingsBuilder.Done())
- .Token<TCoSecureParam>()
- .Name().Build(token)
- .Build()
- .Extension().Value(extension).Build()
- .Build()
- .Build()
- .Build()
- .Settings().Build()
- .Build()
- .Build()
- .Done();
- }
-
- if (!TDqCnUnionAll::Match(write.Input().Raw())) {
- return node;
- }
-
- const TParentsMap* parentsMap = getParents();
- const auto dqUnion = write.Input().Cast<TDqCnUnionAll>();
- if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) {
- return node;
- }
-
- YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as sink.";
-
- const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
-
- const auto sink = Build<TDqSink>(ctx, write.Pos())
- .DataSink(write.DataSink())
- .Index(dqUnion.Output().Index())
- .Settings<TS3SinkSettings>()
- .Path(write.Target().Path())
- .Settings(sinkSettingsBuilder.Done())
- .Token<TCoSecureParam>()
- .Name().Build(token)
- .Build()
- .Extension().Value(extension).Build()
- .Build()
- .Done();
-
- auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos());
- if (inputStage.Outputs() && keys.empty()) {
- outputsBuilder.InitFrom(inputStage.Outputs().Cast());
- }
- outputsBuilder.Add(sink);
-
- if (keys.empty()) {
- const auto outputBuilder = Build<TS3SinkOutput>(ctx, targetNode.Pos())
- .Input(inputStage.Program().Body().Ptr())
- .Format(write.Target().Format())
- .KeyColumns().Add(std::move(keys)).Build()
- .Settings(sinkOutputSettingsBuilder.Done())
- .Done();
-
- return Build<TDqQuery>(ctx, write.Pos())
- .World(write.World())
- .SinkStages()
- .Add<TDqStage>()
- .InitFrom(inputStage)
- .Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr()))
- .Outputs(outputsBuilder.Done())
- .Build()
- .Build()
- .Done();
- } else {
- return Build<TDqQuery>(ctx, write.Pos())
- .World(write.World())
- .SinkStages()
- .Add<TDqStage>()
- .Inputs()
- .Add<TDqCnHashShuffle>()
- .Output<TDqOutput>()
- .Stage(inputStage)
- .Index(dqUnion.Output().Index())
- .Build()
- .KeyColumns().Add(keys).Build()
- .Build()
- .Build()
- .Program<TCoLambda>()
- .Args({"in"})
- .Body<TS3SinkOutput>()
- .Input("in")
- .Format(write.Target().Format())
- .KeyColumns().Add(std::move(keys)).Build()
- .Settings(sinkOutputSettingsBuilder.Done())
- .Build()
- .Build()
- .Settings().Build()
- .Outputs(outputsBuilder.Done())
- .Build()
- .Build()
- .Done();
- }
- }
-
private:
const TS3State::TPtr State_;
};