aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-02 16:57:53 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-02 16:57:53 +0300
commitcec129ee50c9e30e6b0525bc555fbf9b3c413181 (patch)
treef7b2493586df4a394ad8fd3eeba3945da24e0b99
parenta1b93ec7e3c82d7cd92c2898ae6dbdd744eb0769 (diff)
downloadydb-cec129ee50c9e30e6b0525bc555fbf9b3c413181.tar.gz
Correct check setting for write to S3.
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp48
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.h8
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp60
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp11
5 files changed, 96 insertions, 35 deletions
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index aea10aaaa88..dcfd8fa1459 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -22,7 +22,7 @@ namespace NCommon {
using namespace NNodes;
namespace {
- constexpr std::array<std::string_view, 8> Formats = {
+ constexpr std::array<std::string_view, 8> FormatsForInput = {
"csv_with_names"sv,
"tsv_with_names"sv,
"json_list"sv,
@@ -32,7 +32,16 @@ namespace {
"json_each_row"sv,
"parquet"sv
};
- constexpr std::array<std::string_view, 6> Compressions = {
+ constexpr std::array<std::string_view, 7> FormatsForOutput = {
+ "csv_with_names"sv,
+ "tsv_with_names"sv,
+ "json_list"sv,
+ "json"sv,
+ "raw"sv,
+ "json_each_row"sv,
+ "parquet"sv
+ };
+ constexpr std::array<std::string_view, 6> CompressionsForInput = {
"gzip"sv,
"zstd"sv,
"lz4"sv,
@@ -40,6 +49,9 @@ namespace {
"bzip2"sv,
"xz"sv
};
+ constexpr std::array<std::string_view, 1> CompressionsForOutput = {
+ "lz4"sv
+ };
constexpr std::array<std::string_view, 10> IntervalUnits = {
"MICROSECONDS"sv,
"MILLISECONDS"sv,
@@ -1089,25 +1101,43 @@ void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<
writer.OnEndMap();
}
-bool ValidateCompression(TStringBuf compression, TExprContext& ctx) {
- if (compression.empty() || IsIn(Compressions, compression)) {
+bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx) {
+ if (compression.empty() || IsIn(CompressionsForInput, compression)) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
- << ". Use one of: " << JoinSeq(", ", Compressions)));
+ << ". Use one of: " << JoinSeq(", ", CompressionsForInput)));
+ return false;
+}
+
+bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx) {
+ if (compression.empty() || IsIn(CompressionsForOutput, compression)) {
+ return true;
+ }
+ ctx.AddError(TIssue(TStringBuilder() << "Unknown compression: " << compression
+ << ". Use one of: " << JoinSeq(", ", CompressionsForOutput)));
+ return false;
+}
+
+bool ValidateFormatForInput(std::string_view format, TExprContext& ctx) {
+ if (format.empty() || IsIn(FormatsForInput, format)) {
+ return true;
+ }
+ ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
+ << ". Use one of: " << JoinSeq(", ", FormatsForInput)));
return false;
}
-bool ValidateFormat(TStringBuf format, TExprContext& ctx) {
- if (format.empty() || IsIn(Formats, format)) {
+bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx) {
+ if (format.empty() || IsIn(FormatsForOutput, format)) {
return true;
}
ctx.AddError(TIssue(TStringBuilder() << "Unknown format: " << format
- << ". Use one of: " << JoinSeq(", ", Formats)));
+ << ". Use one of: " << JoinSeq(", ", FormatsForOutput)));
return false;
}
-bool ValidateIntervalUnit(TStringBuf unit, TExprContext& ctx) {
+bool ValidateIntervalUnit(std::string_view unit, TExprContext& ctx) {
if (unit.empty() || IsIn(IntervalUnits, unit)) {
return true;
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.h b/ydb/library/yql/providers/common/provider/yql_provider.h
index a8e22dbf2b3..8d7d700f1bc 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.h
+++ b/ydb/library/yql/providers/common/provider/yql_provider.h
@@ -123,11 +123,13 @@ double GetDataReplicationFactor(const TExprNode& lambda, TExprContext& ctx);
void WriteStatistics(NYson::TYsonWriter& writer, bool totalOnly, const THashMap<ui32, TOperationStatistics>& statistics);
-bool ValidateCompression(TStringBuf compression, TExprContext& ctx);
+bool ValidateCompressionForInput(std::string_view compression, TExprContext& ctx);
+bool ValidateCompressionForOutput(std::string_view compression, TExprContext& ctx);
-bool ValidateFormat(TStringBuf format, TExprContext& ctx);
+bool ValidateFormatForInput(std::string_view format, TExprContext& ctx);
+bool ValidateFormatForOutput(std::string_view format, TExprContext& ctx);
-bool ValidateIntervalUnit(TStringBuf format, TExprContext& ctx);
+bool ValidateIntervalUnit(std::string_view format, TExprContext& ctx);
} // namespace NCommon
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
index f5f075f5d9e..8fa309a590d 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_datasource_type_ann.cpp
@@ -99,11 +99,11 @@ public:
return TStatus::Error;
}
- if (!NCommon::ValidateFormat(read.Format().Ref().Content(), ctx)) {
+ if (!NCommon::ValidateFormatForInput(read.Format().Ref().Content(), ctx)) {
return TStatus::Error;
}
- if (!NCommon::ValidateCompression(read.Compression().Ref().Content(), ctx)) {
+ if (!NCommon::ValidateCompressionForInput(read.Compression().Ref().Content(), ctx)) {
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 69537156537..f0e572ea9eb 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -73,30 +73,62 @@ private:
return TStatus::Error;
}
- TString normalized = NS3::NormalizePath(ToString(path));
- if (normalized == "/") {
+ if (const auto& normalized = NS3::NormalizePath(ToString(path)); normalized == "/") {
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Unable to write to root directory"));
return TStatus::Error;
- }
-
- if (normalized != path) {
+ } else if (normalized != path) {
output = ctx.ChangeChild(*input, TS3Target::idx_Path, ctx.NewAtom(input->Child(TS3Target::idx_Path)->Pos(), normalized));
return TStatus::Repeat;
}
- if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) {
+ if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormatForOutput(input->Child(TS3Target::idx_Format)->Content(), ctx)) {
return TStatus::Error;
}
- if (input->ChildrenSize() > TS3Target::idx_Settings && !EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) {
- return TStatus::Error;
- }
-/* TODO
- const auto compression = GetCompression(*input->Child(TS3Target::idx_Settings));
- if (!NCommon::ValidateCompression(compression, ctx)) {
- return TStatus::Error;
+ if (input->ChildrenSize() > TS3Target::idx_Settings) {
+ if (!EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx))
+ return TStatus::Error;
+
+ const auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
+ if (name == "compression") {
+ const auto& value = setting.Tail();
+ if (!EnsureAtom(value, ctx)) {
+ return false;
+ }
+
+ return NCommon::ValidateCompressionForOutput(value.Content(), ctx);
+ }
+
+ if (name == "partitionedby") {
+ if (setting.ChildrenSize() < 2) {
+ ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()), "Expected at least one column in partitioned_by setting"));
+ return false;
+ }
+
+ std::unordered_set<std::string_view> uniqs(setting.ChildrenSize());
+ for (size_t i = 1; i < setting.ChildrenSize(); ++i) {
+ const auto& column = setting.Child(i);
+ if (!EnsureAtom(*column, ctx)) {
+ return false;
+ }
+ if (!uniqs.emplace(column->Content()).second) {
+ ctx.AddError(TIssue(ctx.GetPosition(column->Pos()),
+ TStringBuilder() << "Duplicate partitioned_by column '" << column->Content() << "'"));
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ return true;
+ };
+
+ if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode"}, validator, ctx)) {
+ return TStatus::Error;
+ }
}
-*/
+
input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
return TStatus::Ok;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index df10744867b..30b34f8f09f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -134,7 +134,7 @@ public:
}
if (!EnsureAtom(*input->Child(TS3ParseSettings::idx_Format), ctx) ||
- !NCommon::ValidateFormat(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
+ !NCommon::ValidateFormatForInput(input->Child(TS3ParseSettings::idx_Format)->Content(), ctx))
{
return TStatus::Error;
}
@@ -250,11 +250,8 @@ public:
return TStatus::Error;
}
- auto format = input->Child(TS3Object::idx_Format)->Content();
-
- if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) ||
- !NCommon::ValidateFormat(format, ctx))
- {
+ const auto format = input->Child(TS3Object::idx_Format)->Content();
+ if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) || !NCommon::ValidateFormatForInput(format, ctx)) {
return TStatus::Error;
}
@@ -284,7 +281,7 @@ public:
}
compression = value.Head().Content();
}
- return NCommon::ValidateCompression(compression, ctx);
+ return NCommon::ValidateCompressionForInput(compression, ctx);
}
if (name == "partitionedby") {
havePartitionedBy = true;