summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-09-20 16:48:52 +0300
committera-romanov <[email protected]>2022-09-20 16:48:52 +0300
commitbb790e406c8aee86ffcba10e3bdf0ba0094c8f77 (patch)
tree63866ca809442a851a8371ca80f33f33b957da18
parentd0052234ac49197133b2a0249c1e95cea8adca59 (diff)
Add extensions.
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp8
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json3
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp38
6 files changed, 48 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index f8e8e74b8ee..32f22a76baa 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -371,6 +371,7 @@ public:
IRandomProvider* randomProvider,
const TString& url,
const TString& path,
+ const TString& extension,
const std::vector<TString>& keys,
const size_t memoryLimit,
const size_t maxFileSize,
@@ -387,6 +388,7 @@ public:
, Callbacks(callbacks)
, Url(url)
, Path(path)
+ , Extension(extension)
, Keys(keys)
, MemoryLimit(memoryLimit)
, MaxFileSize(maxFileSize)
@@ -444,7 +446,7 @@ private:
const auto& key = MakePartitionKey(v);
const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName(), MaxFileSize, Compression, RetryPolicy);
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, MaxFileSize, Compression, RetryPolicy);
ins.first->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
@@ -465,7 +467,7 @@ private:
auto statusCode = result->Get()->StatusCode;
if (statusCode == NYql::NDqProto::StatusIds::UNSPECIFIED) {
-
+
// add err code analysis here
if (result->Get()->S3ErrorCode == "BucketMaxSizeExceeded") {
@@ -530,6 +532,7 @@ private:
const TString Url;
const TString Path;
+ const TString Extension;
const std::vector<TString> Keys;
const size_t MemoryLimit;
@@ -566,6 +569,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
credentialsProviderFactory->CreateProvider(),
randomProvider, params.GetUrl(),
params.GetPath(),
+ params.HasExtension() ? params.GetExtension() : "",
std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()),
params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB,
params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB,
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 97242d2f588..e6d3a84fbf5 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -134,7 +134,8 @@
"Children": [
{"Index": 0, "Name": "Path", "Type": "TCoAtom"},
{"Index": 1, "Name": "Settings", "Type": "TExprList"},
- {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}
+ {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"},
+ {"Index": 3, "Name": "Extension", "Type": "TCoAtom"}
]
},
{
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
index f18ca717e83..7ee6610cd25 100644
--- a/ydb/library/yql/providers/s3/proto/sink.proto
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -11,4 +11,5 @@ message TSink {
optional string Compression = 5;
optional uint64 MemoryLimit = 6;
optional uint64 MaxFileSize = 7;
+ optional string Extension = 8;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 927269d61cc..b607e5f8cf5 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -134,7 +134,7 @@ private:
}
TStatus HandleSink(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureArgsCount(*input, 3, ctx)) {
+ if (!EnsureArgsCount(*input, 4, ctx)) {
return TStatus::Error;
}
input->SetTypeAnn(ctx.MakeType<TVoidExprType>());
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index c5521aa798c..d13849455b3 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -291,6 +291,7 @@ public:
sinkDesc.SetUrl(connect.Url);
sinkDesc.SetToken(settings.Token().Name().StringValue());
sinkDesc.SetPath(settings.Path().StringValue());
+ sinkDesc.SetExtension(settings.Extension().StringValue());
for (const auto& key : GetKeys(settings.Settings().Ref()))
sinkDesc.MutableKeys()->Add(TString(key->Content()));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index c2e5047fd28..7558866a283 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -45,6 +45,36 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
return {};
}
+TString GetExtension(const std::string_view& format, const std::string_view& compression) {
+ static const std::unordered_map<std::string_view, std::string_view> formatsMap = {
+ {"csv_with_names"sv, "csv"sv},
+ {"tsv_with_names"sv, "tsv"sv},
+ {"raw"sv, "bin"sv},
+ {"json_list"sv, "json"sv},
+ {"json_each_row"sv, "json"sv},
+ {"parquet"sv, "parquet"sv}
+ };
+
+ static const std::unordered_map<std::string_view, std::string_view> compressionsMap = {
+ {"gzip"sv, "gz"sv},
+ {"zstd"sv, "zst"sv},
+ {"lz4"sv, "lz4"sv},
+ {"bzip2"sv, "bz2"sv},
+ {"brotli"sv, "br"sv},
+ {"xz"sv, "xz"sv}
+ };
+
+ TStringBuilder extension;
+ if (const auto it = formatsMap.find(format); formatsMap.cend() != it) {
+ extension << '.' << it->second;
+ }
+
+ if (const auto it = compressionsMap.find(compression); compressionsMap.cend() != it) {
+ extension << '.' << it->second;
+ }
+ return extension;
+}
+
class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state)
@@ -67,7 +97,10 @@ public:
auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
if (partBy)
sinkSettingsBuilder.Add(std::move(partBy));
- if (auto compression = GetCompression(write.Target().Settings().Ref()))
+
+ auto compression = GetCompression(write.Target().Settings().Ref());
+ const auto& extension = GetExtension(write.Target().Format().Value(), compression ? compression->Tail().Content() : ""sv);
+ if (compression)
sinkSettingsBuilder.Add(std::move(compression));
if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
@@ -98,6 +131,7 @@ public:
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
+ .Extension().Value(extension).Build()
.Build()
.Build()
.Build()
@@ -145,6 +179,7 @@ public:
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
+ .Extension().Value(extension).Build()
.Build()
.Build()
.Build()
@@ -177,6 +212,7 @@ public:
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
+ .Extension().Value(extension).Build()
.Build()
.Done();