diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-02 16:57:53 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-02 16:57:53 +0300 |
commit | cec129ee50c9e30e6b0525bc555fbf9b3c413181 (patch) | |
tree | f7b2493586df4a394ad8fd3eeba3945da24e0b99 | |
parent | a1b93ec7e3c82d7cd92c2898ae6dbdd744eb0769 (diff) | |
download | ydb-cec129ee50c9e30e6b0525bc555fbf9b3c413181.tar.gz |
Correct check setting for write to S3.
5 files changed, 96 insertions, 35 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index aea10aaaa88..dcfd8fa1459 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -22,7 +22,7 @@ namespace NCommon { using namespace NNodes; namespace { - constexpr std::array<std::string_view, 8> Formats = { + constexpr std::array<std::string_view, 8> FormatsForInput = { "csv_with_names"sv, "tsv_with_names"sv, "json_list"sv, @@ -32,7 +32,16 @@ namespace { "json_each_row"sv, "parquet"sv }; - constexpr std::array<std::string_view, 6> Compressions = { + constexpr std::array<std::string_view, 7> FormatsForOutput = { + "csv_with_names"sv, + "tsv_with_names"sv, + "json_list"sv, + "json"sv, + "raw"sv, + "json_each_row"sv, + "parquet"sv + }; + constexpr std::array<std::string_view, 6> CompressionsForInput = { "gzip"sv, "zstd"sv, "lz4"sv, @@ -40,6 +49,9 @@ namespace { "bzip2"sv, "xz"sv }; + constexpr std::array<std::string_view, 1> CompressionsForOutput = { + "lz4"sv + }; constexpr std::array<std::string_view, 10> IntervalUnits = { "MICROSECONDS"sv, "MILLISECONDS"sv, @@ -1089,25 +1101,43 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap< writer.OnEndMap(); } -bool ValidateCompression(TStringBuf compression, TExprContext& ctx) { - if (compression.empty() || IsIn(Compressions, compression)) { +bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx) { + if (compression.empty() || IsIn(CompressionsForInput, compression)) { return true; } ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression - << ". Use one of: " << JoinSeq(", ", Compressions))); + << ". Use one of: " << JoinSeq(", ", CompressionsForInput))); + return false; +} + +bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx) { + if (compression.empty() || IsIn(CompressionsForOutput, compression)) { + return true; + } + ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression + << ". Use one of: " << JoinSeq(", ", CompressionsForOutput))); + return false; +} + +bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) { + if (format.empty() || IsIn(FormatsForInput, format)) { + return true; + } + ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format + << ". Use one of: " << JoinSeq(", ", FormatsForInput))); return false; } -bool ValidateFormat(TStringBuf format, TExprContext& ctx) { - if (format.empty() || IsIn(Formats, format)) { +bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) { + if (format.empty() || IsIn(FormatsForOutput, format)) { return true; } ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format - << ". Use one of: " << JoinSeq(", ", Formats))); + << ". Use one of: " << JoinSeq(", ", FormatsForOutput))); return false; } -bool ValidateIntervalUnit(TStringBuf unit, TExprContext& ctx) { +bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) { if (unit.empty() || IsIn(IntervalUnits, unit)) { return true; } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index a8e22dbf2b3..8d7d700f1bc 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -123,11 +123,13 @@ double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx); void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics); -bool ValidateCompression(TStringBuf compression, TExprContext& ctx); +bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx); +bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx); -bool ValidateFormat(TStringBuf format, TExprContext& ctx); +bool ValidateFormatForInput(std::string_view format, TExprContext& ctx); +bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx); -bool ValidateIntervalUnit(TStringBuf format, TExprContext& ctx); +bool ValidateIntervalUnit(std::string_view format, TExprContext& ctx); } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index f5f075f5d9e..8fa309a590d 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -99,11 +99,11 @@ public: return TStatus::Error; } - if (!NCommon::ValidateFormat(read.Format().Ref().Content(), ctx)) { + if (!NCommon::ValidateFormatForInput(read.Format().Ref().Content(), ctx)) { return TStatus::Error; } - if (!NCommon::ValidateCompression(read.Compression().Ref().Content(), ctx)) { + if (!NCommon::ValidateCompressionForInput(read.Compression().Ref().Content(), ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 69537156537..f0e572ea9eb 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -73,30 +73,62 @@ private: return TStatus::Error; } - TString normalized = NS3::NormalizePath(ToString(path)); - if (normalized == "/") { + if (const auto& normalized = NS3::NormalizePath(ToString(path)); normalized == "/") { ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Unable to write to root directory")); return TStatus::Error; - } - - if (normalized != path) { + } else if (normalized != path) { output = ctx.ChangeChild(*input, TS3Target::idx_Path, ctx.NewAtom(input->Child(TS3Target::idx_Path)->Pos(), normalized)); return TStatus::Repeat; } - if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) { + if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormatForOutput(input->Child(TS3Target::idx_Format)->Content(), ctx)) { return TStatus::Error; } - if (input->ChildrenSize() > TS3Target::idx_Settings && !EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) { - return TStatus::Error; - } -/* TODO - const auto compression = GetCompression(*input->Child(TS3Target::idx_Settings)); - if (!NCommon::ValidateCompression(compression, ctx)) { - return TStatus::Error; + if (input->ChildrenSize() > TS3Target::idx_Settings) { + if (!EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) + return TStatus::Error; + + const auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { + if (name == "compression") { + const auto& value = setting.Tail(); + if (!EnsureAtom(value, ctx)) { + return false; + } + + return NCommon::ValidateCompressionForOutput(value.Content(), ctx); + } + + if (name == "partitionedby") { + if (setting.ChildrenSize() < 2) { + ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), "Expected at least one column in partitioned_by setting")); + return false; + } + + std::unordered_set<std::string_view> uniqs(setting.ChildrenSize()); + for (size_t i = 1; i < setting.ChildrenSize(); ++i) { + const auto& column = setting.Child(i); + if (!EnsureAtom(*column, ctx)) { + return false; + } + if (!uniqs.emplace(column->Content()).second) { + ctx.AddError(TIssue(ctx.GetPosition(column->Pos()), + TStringBuilder() << "Duplicate partitioned_by column '" << column->Content() << "'")); + return false; + } + } + + return true; + } + + return true; + }; + + if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode"}, validator, ctx)) { + return TStatus::Error; + } } -*/ + input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); return TStatus::Ok; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index df10744867b..30b34f8f09f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -134,7 +134,7 @@ public: } if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) || - !NCommon::ValidateFormat(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx)) + !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx)) { return TStatus::Error; } @@ -250,11 +250,8 @@ public: return TStatus::Error; } - auto format = input->Child(TS3Object::idx_Format)->Content(); - - if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || - !NCommon::ValidateFormat(format, ctx)) - { + const auto format = input->Child(TS3Object::idx_Format)->Content(); + if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) { return TStatus::Error; } @@ -284,7 +281,7 @@ public: } compression = value.Head().Content(); } - return NCommon::ValidateCompression(compression, ctx); + return NCommon::ValidateCompressionForInput(compression, ctx); } if (name == "partitionedby") { havePartitionedBy = true; |