diff options
author | uzhas <[email protected]> | 2023-09-07 11:12:21 +0300 |
---|---|---|
committer | uzhas <[email protected]> | 2023-09-07 11:37:49 +0300 |
commit | e62db3a873b031ef58121619e96a9dc1818893a4 (patch) | |
tree | 683aadf1e228e6aeb376ba0cdc96f5314949e37e | |
parent | d97ab11bfc5749b3a623406c1d93dcea58998019 (diff) |
simplify code
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp | 275 |
1 files changed, 7 insertions, 268 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index b07c6972e20..4ad5bfff7c8 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -17,42 +17,32 @@ namespace { using namespace NNodes; using namespace NDq; -TExprNode::TPtr FindChild(const TExprNode& settings, TStringBuf name) { - for (auto i = 0U; i < settings.ChildrenSize(); ++i) { - if (settings.Child(i)->Head().IsAtom(name)) { - return settings.ChildPtr(i); - } - } - - return {}; -} - TExprNode::TPtr GetPartitionBy(const TExprNode& settings) { - return FindChild(settings, "partitionedby"sv); + return GetSetting(settings, "partitionedby"sv); } TExprNode::TPtr GetCompression(const TExprNode& settings) { - return FindChild(settings, "compression"sv); + return GetSetting(settings, "compression"sv); } TExprNode::TPtr GetCsvDelimiter(const TExprNode& settings) { - return FindChild(settings, "csvdelimiter"sv); + return GetSetting(settings, "csvdelimiter"sv); } TExprNode::TPtr GetDateTimeFormatName(const TExprNode& settings) { - return FindChild(settings, "data.datetime.formatname"sv); + return GetSetting(settings, "data.datetime.formatname"sv); } TExprNode::TPtr GetDateTimeFormat(const TExprNode& settings) { - return FindChild(settings, "data.datetime.format"sv); + return GetSetting(settings, "data.datetime.format"sv); } TExprNode::TPtr GetTimestampFormatName(const TExprNode& settings) { - return FindChild(settings, "data.timestamp.formatname"sv); + return GetSetting(settings, "data.timestamp.formatname"sv); } TExprNode::TPtr GetTimestampFormat(const TExprNode& settings) { - return FindChild(settings, "data.timestamp.format"sv); + return GetSetting(settings, "data.timestamp.format"sv); } TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { @@ -390,257 +380,6 @@ public: } } - TMaybeNode<TExprBase> S3WriteObject1(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const { - const auto& write = node.Cast<TS3WriteObject>(); - const auto& targetNode = write.Target(); - const auto& cluster = write.DataSink().Cluster().StringValue(); - const auto token = "cluster:default_" + cluster; - const auto& settings = write.Target().Settings().Ref(); - auto partBy = GetPartitionBy(settings); - auto keys = GetPartitionKeys(partBy); - - auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); - if (partBy) - sinkSettingsBuilder.Add(std::move(partBy)); - - auto compression = GetCompression(settings); - const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv); - if (compression) - sinkSettingsBuilder.Add(std::move(compression)); - - auto sinkOutputSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); - if (auto csvDelimiter = GetCsvDelimiter(settings)) { - sinkOutputSettingsBuilder.Add(std::move(csvDelimiter)); - } - - bool hasDateTimeFormat = false; - bool hasDateTimeFormatName = false; - bool hasTimestampFormat = false; - bool hasTimestampFormatName = false; - if (auto dateTimeFormatName = GetDateTimeFormatName(settings)) { - sinkOutputSettingsBuilder.Add(std::move(dateTimeFormatName)); - hasDateTimeFormatName = true; - } - - if (auto dateTimeFormat = GetDateTimeFormat(settings)) { - sinkOutputSettingsBuilder.Add(std::move(dateTimeFormat)); - hasDateTimeFormat = true; - } - - if (auto timestampFormatName = GetTimestampFormatName(settings)) { - sinkOutputSettingsBuilder.Add(std::move(timestampFormatName)); - hasTimestampFormatName = true; - } - - if (auto timestampFormat = GetTimestampFormat(settings)) { - sinkOutputSettingsBuilder.Add(std::move(timestampFormat)); - hasTimestampFormat = true; - } - - if (!hasDateTimeFormat && !hasDateTimeFormatName) { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.datetime.formatname")); - pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX")); - sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); - } - - if (!hasTimestampFormat && !hasTimestampFormatName) { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(targetNode.Pos(), "data.timestamp.formatname")); - pair.push_back(ctx.NewAtom(targetNode.Pos(), "POSIX")); - sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); - } - - const TStringBuf format = targetNode.Format(); - if (format != "raw" && format != "json_list") { // multipart - { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart")); - pair.push_back(ctx.NewAtom(targetNode.Pos(), "true")); - sinkSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); - } - { - TExprNode::TListType pair; - pair.push_back(ctx.NewAtom(targetNode.Pos(), "file_size_limit")); - size_t fileSize = 50_MB; - if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) { - fileSize = *maxObjectSize; - } - pair.push_back(ctx.NewAtom(targetNode.Pos(), ToString(fileSize))); - sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); - } - } - - if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { - YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; - return keys.empty() ? - Build<TDqQuery>(ctx, write.Pos()) - .World(write.World()) - .SinkStages() - .Add<TDqStage>() - .Inputs().Build() - .Program<TCoLambda>() - .Args({}) - .Body<TS3SinkOutput>() - .Input<TCoToFlow>() - .Input(write.Input()) - .Build() - .Format(write.Target().Format()) - .KeyColumns().Build() - .Settings(sinkOutputSettingsBuilder.Done()) - .Build() - .Build() - .Outputs<TDqStageOutputsList>() - .Add<TDqSink>() - .DataSink(write.DataSink()) - .Index().Value("0").Build() - .Settings<TS3SinkSettings>() - .Path(write.Target().Path()) - .Settings(sinkSettingsBuilder.Done()) - .Token<TCoSecureParam>() - .Name().Build(token) - .Build() - .Extension().Value(extension).Build() - .Build() - .Build() - .Build() - .Settings().Build() - .Build() - .Build() - .Done(): - Build<TDqQuery>(ctx, write.Pos()) - .World(write.World()) - .SinkStages() - .Add<TDqStage>() - .Inputs() - .Add<TDqCnHashShuffle>() - .Output<TDqOutput>() - .Stage<TDqStage>() - .Inputs().Build() - .Program<TCoLambda>() - .Args({}) - .Body<TCoToFlow>() - .Input(write.Input()) - .Build() - .Build() - .Settings().Build() - .Build() - .Index().Value("0", TNodeFlags::Default).Build() - .Build() - .KeyColumns().Add(keys).Build() - .Build() - .Build() - .Program<TCoLambda>() - .Args({"in"}) - .Body<TS3SinkOutput>() - .Input("in") - .Format(write.Target().Format()) - .KeyColumns().Add(keys).Build() - .Settings(sinkOutputSettingsBuilder.Done()) - .Build() - .Build() - .Outputs<TDqStageOutputsList>() - .Add<TDqSink>() - .DataSink(write.DataSink()) - .Index().Value("0", TNodeFlags::Default).Build() - .Settings<TS3SinkSettings>() - .Path(write.Target().Path()) - .Settings(sinkSettingsBuilder.Done()) - .Token<TCoSecureParam>() - .Name().Build(token) - .Build() - .Extension().Value(extension).Build() - .Build() - .Build() - .Build() - .Settings().Build() - .Build() - .Build() - .Done(); - } - - if (!TDqCnUnionAll::Match(write.Input().Raw())) { - return node; - } - - const TParentsMap* parentsMap = getParents(); - const auto dqUnion = write.Input().Cast<TDqCnUnionAll>(); - if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) { - return node; - } - - YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as sink."; - - const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); - - const auto sink = Build<TDqSink>(ctx, write.Pos()) - .DataSink(write.DataSink()) - .Index(dqUnion.Output().Index()) - .Settings<TS3SinkSettings>() - .Path(write.Target().Path()) - .Settings(sinkSettingsBuilder.Done()) - .Token<TCoSecureParam>() - .Name().Build(token) - .Build() - .Extension().Value(extension).Build() - .Build() - .Done(); - - auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos()); - if (inputStage.Outputs() && keys.empty()) { - outputsBuilder.InitFrom(inputStage.Outputs().Cast()); - } - outputsBuilder.Add(sink); - - if (keys.empty()) { - const auto outputBuilder = Build<TS3SinkOutput>(ctx, targetNode.Pos()) - .Input(inputStage.Program().Body().Ptr()) - .Format(write.Target().Format()) - .KeyColumns().Add(std::move(keys)).Build() - .Settings(sinkOutputSettingsBuilder.Done()) - .Done(); - - return Build<TDqQuery>(ctx, write.Pos()) - .World(write.World()) - .SinkStages() - .Add<TDqStage>() - .InitFrom(inputStage) - .Program(ctx.DeepCopyLambda(inputStage.Program().Ref(), outputBuilder.Ptr())) - .Outputs(outputsBuilder.Done()) - .Build() - .Build() - .Done(); - } else { - return Build<TDqQuery>(ctx, write.Pos()) - .World(write.World()) - .SinkStages() - .Add<TDqStage>() - .Inputs() - .Add<TDqCnHashShuffle>() - .Output<TDqOutput>() - .Stage(inputStage) - .Index(dqUnion.Output().Index()) - .Build() - .KeyColumns().Add(keys).Build() - .Build() - .Build() - .Program<TCoLambda>() - .Args({"in"}) - .Body<TS3SinkOutput>() - .Input("in") - .Format(write.Target().Format()) - .KeyColumns().Add(std::move(keys)).Build() - .Settings(sinkOutputSettingsBuilder.Done()) - .Build() - .Build() - .Settings().Build() - .Outputs(outputsBuilder.Done()) - .Build() - .Build() - .Done(); - } - } - private: const TS3State::TPtr State_; }; |