diff options
author | a-romanov <[email protected]> | 2022-09-20 16:48:52 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-09-20 16:48:52 +0300 |
commit | bb790e406c8aee86ffcba10e3bdf0ba0094c8f77 (patch) | |
tree | 63866ca809442a851a8371ca80f33f33b957da18 | |
parent | d0052234ac49197133b2a0249c1e95cea8adca59 (diff) |
Add extensions.
6 files changed, 48 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index f8e8e74b8ee..32f22a76baa 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -371,6 +371,7 @@ public: IRandomProvider* randomProvider, const TString& url, const TString& path, + const TString& extension, const std::vector<TString>& keys, const size_t memoryLimit, const size_t maxFileSize, @@ -387,6 +388,7 @@ public: , Callbacks(callbacks) , Url(url) , Path(path) + , Extension(extension) , Keys(keys) , MemoryLimit(memoryLimit) , MaxFileSize(maxFileSize) @@ -444,7 +446,7 @@ private: const auto& key = MakePartitionKey(v); const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression, RetryPolicy); + auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, MaxFileSize, Compression, RetryPolicy); ins.first->second.emplace_back(fileWrite.get()); RegisterWithSameMailbox(fileWrite.release()); } @@ -465,7 +467,7 @@ private: auto statusCode = result->Get()->StatusCode; if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) { - + // add err code analysis here if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") { @@ -530,6 +532,7 @@ private: const TString Url; const TString Path; + const TString Extension; const std::vector<TString> Keys; const size_t MemoryLimit; @@ -566,6 +569,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), + params.HasExtension() ? params.GetExtension() : "", std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB, params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB, diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 97242d2f588..e6d3a84fbf5 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -134,7 +134,8 @@ "Children": [ {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, {"Index": 1, "Name": "Settings", "Type": "TExprList"}, - {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"} + {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}, + {"Index": 3, "Name": "Extension", "Type": "TCoAtom"} ] }, { diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index f18ca717e83..7ee6610cd25 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -11,4 +11,5 @@ message TSink { optional string Compression = 5; optional uint64 MemoryLimit = 6; optional uint64 MaxFileSize = 7; + optional string Extension = 8; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 927269d61cc..b607e5f8cf5 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -134,7 +134,7 @@ private: } TStatus HandleSink(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 3, ctx)) { + if (!EnsureArgsCount(*input, 4, ctx)) { return TStatus::Error; } input->SetTypeAnn(ctx.MakeType<TVoidExprType>()); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index c5521aa798c..d13849455b3 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -291,6 +291,7 @@ public: sinkDesc.SetUrl(connect.Url); sinkDesc.SetToken(settings.Token().Name().StringValue()); sinkDesc.SetPath(settings.Path().StringValue()); + sinkDesc.SetExtension(settings.Extension().StringValue()); for (const auto& key : GetKeys(settings.Settings().Ref())) sinkDesc.MutableKeys()->Add(TString(key->Content())); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index c2e5047fd28..7558866a283 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -45,6 +45,36 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { return {}; } +TString GetExtension(const std::string_view& format, const std::string_view& compression) { + static const std::unordered_map<std::string_view, std::string_view> formatsMap = { + {"csv_with_names"sv, "csv"sv}, + {"tsv_with_names"sv, "tsv"sv}, + {"raw"sv, "bin"sv}, + {"json_list"sv, "json"sv}, + {"json_each_row"sv, "json"sv}, + {"parquet"sv, "parquet"sv} + }; + + static const std::unordered_map<std::string_view, std::string_view> compressionsMap = { + {"gzip"sv, "gz"sv}, + {"zstd"sv, "zst"sv}, + {"lz4"sv, "lz4"sv}, + {"bzip2"sv, "bz2"sv}, + {"brotli"sv, "br"sv}, + {"xz"sv, "xz"sv} + }; + + TStringBuilder extension; + if (const auto it = formatsMap.find(format); formatsMap.cend() != it) { + extension << '.' << it->second; + } + + if (const auto it = compressionsMap.find(compression); compressionsMap.cend() != it) { + extension << '.' << it->second; + } + return extension; +} + class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state) @@ -67,7 +97,10 @@ public: auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); if (partBy) sinkSettingsBuilder.Add(std::move(partBy)); - if (auto compression = GetCompression(write.Target().Settings().Ref())) + + auto compression = GetCompression(write.Target().Settings().Ref()); + const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv); + if (compression) sinkSettingsBuilder.Add(std::move(compression)); if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { @@ -98,6 +131,7 @@ public: .Token<TCoSecureParam>() .Name().Build(token) .Build() + .Extension().Value(extension).Build() .Build() .Build() .Build() @@ -145,6 +179,7 @@ public: .Token<TCoSecureParam>() .Name().Build(token) .Build() + .Extension().Value(extension).Build() .Build() .Build() .Build() @@ -177,6 +212,7 @@ public: .Token<TCoSecureParam>() .Name().Build(token) .Build() + .Extension().Value(extension).Build() .Build() .Done(); |