diff options
author | hcpp <hcpp@ydb.tech> | 2023-10-06 18:55:00 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-10-06 22:18:32 +0300 |
commit | 67ea49de7a21d7bbb8836363af40226716c892c0 (patch) | |
tree | e136c4a26f4a6906f975374e2994a3976849377b | |
parent | 7b785a08695ae1448a64ea5b12ff0d6f3c0c1eb1 (diff) | |
download | ydb-67ea49de7a21d7bbb8836363af40226716c892c0.tar.gz |
remove validation logic copy-paste
4 files changed, 34 insertions, 276 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 1ed10637e4e..6b7c0f2800a 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -1,4 +1,5 @@ #include "external_source.h" +#include "object_storage.h" #include <ydb/core/protos/external_sources.pb.h> #include <ydb/core/protos/flat_scheme_op.pb.h> @@ -106,8 +107,8 @@ struct TObjectStorageExternalSource : public IExternalSource { } } -private: - static NYql::TIssues Validate(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TObjectStorage& objectStorage) { + template<typename TScheme, typename TObjectStorage> + static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit = 50000) { NYql::TIssues issues; issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting())); if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) { @@ -122,7 +123,7 @@ private: } projectionStr = projection.ToJsonPretty(); } - issues.AddIssues(ValidateProjection(schema, projectionStr, partitionedBy)); + issues.AddIssues(ValidateProjection(schema, projectionStr, partitionedBy, pathsLimit)); } catch (...) { issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, CurrentExceptionMessage())); } @@ -164,7 +165,7 @@ private: return issues; } - static NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting) { + static NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false) { NYql::TIssues issues; TSet<TString> conflictingKeys; for (const auto& [key, value]: formatSetting) { @@ -205,10 +206,15 @@ private: conflictingKeys.insert("data.timestamp.format"); continue; } + + if (matchAllSettings) { + issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key)); + } } return issues; } +private: static bool IsValidIntervalUnit(const TString& unit) { static constexpr std::array<std::string_view, 7> IntervalUnits = { "MICROSECONDS"sv, @@ -248,7 +254,8 @@ private: return issue; } - static NYql::TIssues ValidateProjectionColumns(const NKikimrExternalSources::TSchema& schema, const TVector<TString>& partitionedBy) { + template<typename TScheme> + static NYql::TIssues ValidateProjectionColumns(const TScheme& schema, const TVector<TString>& partitionedBy) { NYql::TIssues issues; TMap<TString, Ydb::Type> types; for (const auto& column: schema.column()) { @@ -356,8 +363,9 @@ private: return ValidateProjectionType(columnType, columnName, availableTypes); } - static NYql::TIssues ValidateProjection(const NKikimrExternalSources::TSchema& schema, const TString& projection, const TVector<TString>& partitionedBy) { - auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs + template<typename TScheme> + static NYql::TIssues ValidateProjection(const TScheme& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) { + auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema), pathsLimit); // an exception is thrown if an error occurs TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; for (const auto& column: generator->GetConfig().Rules) { projectionColumns[column.Name] = column.Type; @@ -385,7 +393,8 @@ private: return issues; } - static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const NKikimrExternalSources::TSchema& schema) { + template<typename TSchema> + static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const TSchema& schema) { TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns; for (const auto& column: schema.column()) { if (column.has_type()) { @@ -405,4 +414,12 @@ IExternalSource::TPtr CreateObjectStorageExternalSource() { return MakeIntrusive<TObjectStorageExternalSource>(); } +NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) { + return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit); +} + +NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) { + return TObjectStorageExternalSource::ValidateDateFormatSetting(formatSetting, matchAllSettings); +} + } diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h index 22bd9f6d765..f06e2ce3aa7 100644 --- a/ydb/core/external_sources/object_storage.h +++ b/ydb/core/external_sources/object_storage.h @@ -2,8 +2,14 @@ #include "external_source.h" +#include <ydb/public/api/protos/draft/fq.pb.h> + namespace NKikimr::NExternalSource { IExternalSource::TPtr CreateObjectStorageExternalSource(); +NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit); + +NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false); + } diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp index eea91a2b5c3..f2016a07ddf 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp @@ -2,113 +2,6 @@ namespace NFq { -namespace { - -NYql::TIssues ValidateProjectionType(const NYdb::TType& columnType, const TString& columnName, const std::vector<NYdb::TType>& availableTypes) { - return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) == availableTypes.end() - ? NYql::TIssues{MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << columnName << "\" from projection does not support " << columnType.ToString() << " type")} - : NYql::TIssues{}; -} - -NYql::TIssues ValidateIntegerProjectionType(const NYdb::TType& columnType, const TString& columnName) { - static const std::vector<NYdb::TType> availableTypes { - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::String) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Int32) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Uint32) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Int64) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Uint64) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Utf8) - .Build() - }; - return ValidateProjectionType(columnType, columnName, availableTypes); -} - -NYql::TIssues ValidateEnumProjectionType(const NYdb::TType& columnType, const TString& columnName) { - static const std::vector<NYdb::TType> availableTypes { - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::String) - .Build() - }; - return ValidateProjectionType(columnType, columnName, availableTypes); -} - -NYql::TIssues ValidateCommonProjectionType(const NYdb::TType& columnType, const TString& columnName) { - static const std::vector<NYdb::TType> availableTypes { - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::String) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Int64) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Utf8) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Int32) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Uint32) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Uint64) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Date) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Datetime) - .Build() - }; - return ValidateProjectionType(columnType, columnName, availableTypes); -} - -NYql::TIssues ValidateDateProjectionType(const NYdb::TType& columnType, const TString& columnName) { - static const std::vector<NYdb::TType> availableTypes { - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::String) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Utf8) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Uint32) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Date) - .Build(), - NYdb::TTypeBuilder{} - .Primitive(NYdb::EPrimitiveType::Datetime) - .Build() - }; - return ValidateProjectionType(columnType, columnName, availableTypes); -} - -TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const FederatedQuery::Schema& schema) { - TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns; - for (const auto& column: schema.column()) { - if (column.has_type()) { - const auto& type = column.type(); - if (type.has_type_id()) { - dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id()); - } - } - } - return dataSlotColumns; -} - -} - template <typename TConnection> void ValidateGenericConnectionSetting( const TConnection& connection, @@ -234,142 +127,6 @@ NYql::TIssues ValidateConnectionSetting( return issues; } -NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) { - NYql::TIssues issues; - TSet<TString> conflictingKeys; - for (const auto& [key, value]: formatSetting) { - if (key == "data.datetime.format_name"sv) { - if (!IsValidDateTimeFormatName(value)) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.datetime.format_name " + value)); - } - if (conflictingKeys.contains("data.datetime.format")) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); - } - conflictingKeys.insert("data.datetime.format_name"); - continue; - } - - if (key == "data.datetime.format"sv) { - if (conflictingKeys.contains("data.datetime.format_name")) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together")); - } - conflictingKeys.insert("data.datetime.format"); - continue; - } - - if (key == "data.timestamp.format_name"sv) { - if (!IsValidTimestampFormatName(value)) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.timestamp.format_name " + value)); - } - if (conflictingKeys.contains("data.timestamp.format")) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); - } - conflictingKeys.insert("data.timestamp.format_name"); - continue; - } - - if (key == "data.timestamp.format"sv) { - if (conflictingKeys.contains("data.timestamp.format_name")) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together")); - } - conflictingKeys.insert("data.timestamp.format"); - continue; - } - - if (matchAllSettings) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); - } - } - return issues; -} - - -NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) { - NYql::TIssues issues; - TSet<TString> conflictingKeys; - issues.AddIssues(ValidateDateFormatSetting(formatSetting)); - for (const auto& [key, value]: formatSetting) { - if (key == "file_pattern"sv) { - continue; - } - - if (key == "data.interval.unit"sv) { - if (!IsValidIntervalUnit(value)) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.interval.unit " + value)); - } - continue; - } - - if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) { - continue; - } - - if (key == "csv_delimiter"sv) { - if (format != "csv_with_names"sv) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names")); - } - if (value.size() != 1) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should contain only one character")); - } - continue; - } - - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); - } - return issues; -} - -NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy) { - NYql::TIssues issues; - TMap<TString, Ydb::Type> types; - for (const auto& column: schema.column()) { - types[column.name()] = column.type(); - } - for (const auto& parititonedColumn: partitionedBy) { - auto it = types.find(parititonedColumn); - if (it == types.end()) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column " << parititonedColumn << " from partitioned_by does not exist in the scheme. Please add such a column to your scheme")); - continue; - } - NYdb::TType columnType{it->second}; - issues.AddIssues(ValidateCommonProjectionType(columnType, parititonedColumn)); - } - return issues; -} - -NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) { - auto generator = NYql::NPathGenerator::CreatePathGenerator( - projection, - partitionedBy, - GetDataSlotColumns(schema), - pathsLimit); // an exception is thrown if an error occurs - TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns; - for (const auto& column: generator->GetConfig().Rules) { - projectionColumns[column.Name] = column.Type; - } - NYql::TIssues issues; - for (const auto& column: schema.column()) { - auto it = projectionColumns.find(column.name()); - if (it != projectionColumns.end()) { - switch (it->second) { - case NYql::NPathGenerator::IPathGenerator::EType::INTEGER: - issues.AddIssues(ValidateIntegerProjectionType(NYdb::TType{column.type()}, column.name())); - break; - case NYql::NPathGenerator::IPathGenerator::EType::ENUM: - issues.AddIssues(ValidateEnumProjectionType(NYdb::TType{column.type()}, column.name())); - break; - case NYql::NPathGenerator::IPathGenerator::EType::DATE: - issues.AddIssues(ValidateDateProjectionType(NYdb::TType{column.type()}, column.name())); - break; - case NYql::NPathGenerator::IPathGenerator::EType::UNDEFINED: - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << column.name() << "\" from projection has undefined generator type")); - break; - } - } - } - return issues; -} - NYql::TIssues ValidateEntityName(const TString& name) { NYql::TIssues issues; diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.h b/ydb/core/fq/libs/control_plane_storage/request_validators.h index add489ae065..d28023daa6a 100644 --- a/ydb/core/fq/libs/control_plane_storage/request_validators.h +++ b/ydb/core/fq/libs/control_plane_storage/request_validators.h @@ -2,6 +2,7 @@ #include "util.h" +#include <ydb/core/external_sources/object_storage.h> #include <ydb/core/fq/libs/config/yq_issue.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/public/issue/yql_issue.h> @@ -77,12 +78,6 @@ NYql::TIssues ValidateQuery(const T& ev, size_t maxSize) return issues; } -NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting); - - -NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false); -NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy); -NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit); NYql::TIssues ValidateEntityName(const TString& name); template<typename T> @@ -114,7 +109,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ if (!dataStreams.has_schema()) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden")); } - issues.AddIssues(ValidateDateFormatSetting(dataStreams.format_setting(), true)); + issues.AddIssues(NKikimr::NExternalSource::ValidateDateFormatSetting(dataStreams.format_setting(), true)); break; } case FederatedQuery::BindingSetting::BINDING_NOT_SET: { @@ -125,24 +120,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ case FederatedQuery::BindingSetting::kObjectStorage: const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage(); for (const auto& subset: objectStorage.subset()) { - issues.AddIssues(ValidateFormatSetting(subset.format(), subset.format_setting())); - if (subset.projection_size() || subset.partitioned_by_size()) { - try { - TVector<TString> partitionedBy{subset.partitioned_by().begin(), subset.partitioned_by().end()}; - issues.AddIssues(ValidateProjectionColumns(subset.schema(), partitionedBy)); - TString projectionStr; - if (subset.projection_size()) { - NSc::TValue projection; - for (const auto& [key, value]: subset.projection()) { - projection[key] = value; - } - projectionStr = projection.ToJsonPretty(); - } - issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy, pathsLimit)); - } catch (...) { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage())); - } - } + issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit)); } break; } |