diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-26 01:37:02 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-02-26 01:37:02 +0300 |
commit | 8f43f510228637bc8b5810e6c19cf53e0c4a3a1c (patch) | |
tree | cafa384d021549ae12c387d9043684a036da2e8f | |
parent | 368743f37b3e87f5c016cf2f899c3e570786d36f (diff) | |
download | ydb-8f43f510228637bc8b5810e6c19cf53e0c4a3a1c.tar.gz |
YQ-893 Validate compression's naming (snake_case) in s3 and pq
Validate compressions
ref:4d7c25ed20bd1cfeaee91c4df3da630fb2f83a6a
5 files changed, 65 insertions, 2 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index bfdc26f5f8..b095e38d5c 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -34,7 +34,7 @@ std::array<TString, 2U> GetSettings(const TExprNode& settings) { writer.OpenMap(); child.Tail().ForEachChild([&writer, &compression](const TExprNode& pair) { if (pair.Head().IsAtom("compression") && pair.Tail().IsCallable({"String", "Utf8"})) - if (const auto& comp = pair.Tail().Head().Content(); !comp.empty() && std::isupper(comp.front())) + if (const auto& comp = pair.Tail().Head().Content(); !comp.empty()) compression = comp; else { writer.WriteKey(pair.Head().Content()); @@ -79,7 +79,29 @@ std::array<TString, 2U> GetSettings(const TExprNode& settings) { } return {TString(), TString()}; } + +TString ResolveUDFNameByCompression(std::string_view input) { + if (input == "gzip"sv) { + return "Gzip"; + } + if (input == "zstd"sv) { + return "Zstd"; + } + if (input == "lz4"sv) { + return "Lz4"; + } + if (input == "brotli"sv) { + return "Brotli"; + } + if (input == "bzip2"sv) { + return "Bzip2"; + } + if (input == "xz"sv) { + return "Xz"; + } + THROW yexception() << "Invalid compression: " << input; } +} // namespace TRuntimeNode BuildParseCall( TRuntimeNode input, @@ -91,7 +113,7 @@ TRuntimeNode BuildParseCall( { if (!compression.empty()) { input = ctx.ProgramBuilder.Map(input, [&ctx, &compression](TRuntimeNode item) { - return ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf(std::string("Decompress.") += compression), {item}); + return ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf(std::string("Decompress.") += ResolveUDFNameByCompression(compression)), {item}); }); } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index aa782ee93f..4d2ee60c37 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -11,8 +11,10 @@ #include <ydb/library/yql/minikql/mkql_program_builder.h> #include <util/folder/path.h> +#include <util/generic/is_in.h> #include <util/generic/utility.h> + namespace NYql { namespace NCommon { @@ -1047,5 +1049,24 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap< writer.OnEndMap(); } +bool ValidateCompression(TStringBuf compression, TExprContext& ctx) { + if (compression.empty() || + IsIn({ + "gzip"sv, + "zstd"sv, + "lz4"sv, + "brotli"sv, + "bzip2"sv, + "xz"sv + }, compression)) + { + return true; + } + ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression + << ". Use one of: gzip, zstd, lz4, brotli, bzip2, xz")); + return false; +} + + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h index 9e80cfd135..7163fe247c 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.h +++ b/ydb/library/yql/providers/common/provider/yql_provider.h @@ -120,5 +120,7 @@ double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx); void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics); +bool ValidateCompression(TStringBuf compression, TExprContext& ctx); + } // namespace NCommon } // namespace NYql diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp index 1f884336ec..cc0c8f4242 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp @@ -85,6 +85,10 @@ public: return TStatus::Error; } + if (!NCommon::ValidateCompression(read.Compression().Ref().Content(), ctx)) { + return TStatus::Error; + } + TPqTopic topic = read.Topic(); if (!EnsureCallable(topic.Ref(), ctx)) { return TStatus::Error; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 810942ee56..f0597bad33 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -15,6 +15,16 @@ using namespace NNodes; namespace { +TStringBuf GetCompression(const TExprNode& settings) { + for (auto i = 0U; i < settings.ChildrenSize(); ++i) { + const auto& child = *settings.Child(i); + if (child.Head().IsAtom("compression") && child.Tail().IsCallable({"String", "Utf8"})) + if (const auto& comp = child.Tail().Head().Content(); !comp.empty()) + return comp; + } + return ""; +} + class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { public: TS3DataSourceTypeAnnotationTransformer(TS3State::TPtr state) @@ -122,6 +132,10 @@ public: return TStatus::Error; } + if (const auto& compression = GetCompression(*input->Child(TS3Object::idx_Settings)); !NCommon::ValidateCompression(compression, ctx)) { + return TStatus::Error; + } + input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); return TStatus::Ok; } |