aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-10-06 18:55:00 +0300
committerhcpp <hcpp@ydb.tech>2023-10-06 22:18:32 +0300
commit67ea49de7a21d7bbb8836363af40226716c892c0 (patch)
treee136c4a26f4a6906f975374e2994a3976849377b
parent7b785a08695ae1448a64ea5b12ff0d6f3c0c1eb1 (diff)
downloadydb-67ea49de7a21d7bbb8836363af40226716c892c0.tar.gz
remove validation logic copy-paste
-rw-r--r--ydb/core/external_sources/object_storage.cpp33
-rw-r--r--ydb/core/external_sources/object_storage.h6
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.cpp243
-rw-r--r--ydb/core/fq/libs/control_plane_storage/request_validators.h28
4 files changed, 34 insertions, 276 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index 1ed10637e4e..6b7c0f2800a 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -1,4 +1,5 @@
#include "external_source.h"
+#include "object_storage.h"
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -106,8 +107,8 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}
-private:
- static NYql::TIssues Validate(const NKikimrExternalSources::TSchema& schema, const NKikimrExternalSources::TObjectStorage& objectStorage) {
+ template<typename TScheme, typename TObjectStorage>
+ static NYql::TIssues Validate(const TScheme& schema, const TObjectStorage& objectStorage, size_t pathsLimit = 50000) {
NYql::TIssues issues;
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting()));
if (objectStorage.projection_size() || objectStorage.partitioned_by_size()) {
@@ -122,7 +123,7 @@ private:
}
projectionStr = projection.ToJsonPretty();
}
- issues.AddIssues(ValidateProjection(schema, projectionStr, partitionedBy));
+ issues.AddIssues(ValidateProjection(schema, projectionStr, partitionedBy, pathsLimit));
} catch (...) {
issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, CurrentExceptionMessage()));
}
@@ -164,7 +165,7 @@ private:
return issues;
}
- static NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting) {
+ static NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false) {
NYql::TIssues issues;
TSet<TString> conflictingKeys;
for (const auto& [key, value]: formatSetting) {
@@ -205,10 +206,15 @@ private:
conflictingKeys.insert("data.timestamp.format");
continue;
}
+
+ if (matchAllSettings) {
+ issues.AddIssue(MakeErrorIssue(Ydb::StatusIds::BAD_REQUEST, "unknown format setting " + key));
+ }
}
return issues;
}
+private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
"MICROSECONDS"sv,
@@ -248,7 +254,8 @@ private:
return issue;
}
- static NYql::TIssues ValidateProjectionColumns(const NKikimrExternalSources::TSchema& schema, const TVector<TString>& partitionedBy) {
+ template<typename TScheme>
+ static NYql::TIssues ValidateProjectionColumns(const TScheme& schema, const TVector<TString>& partitionedBy) {
NYql::TIssues issues;
TMap<TString, Ydb::Type> types;
for (const auto& column: schema.column()) {
@@ -356,8 +363,9 @@ private:
return ValidateProjectionType(columnType, columnName, availableTypes);
}
- static NYql::TIssues ValidateProjection(const NKikimrExternalSources::TSchema& schema, const TString& projection, const TVector<TString>& partitionedBy) {
- auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema)); // an exception is thrown if an error occurs
+ template<typename TScheme>
+ static NYql::TIssues ValidateProjection(const TScheme& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) {
+ auto generator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy, GetDataSlotColumns(schema), pathsLimit); // an exception is thrown if an error occurs
TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns;
for (const auto& column: generator->GetConfig().Rules) {
projectionColumns[column.Name] = column.Type;
@@ -385,7 +393,8 @@ private:
return issues;
}
- static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const NKikimrExternalSources::TSchema& schema) {
+ template<typename TSchema>
+ static TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const TSchema& schema) {
TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns;
for (const auto& column: schema.column()) {
if (column.has_type()) {
@@ -405,4 +414,12 @@ IExternalSource::TPtr CreateObjectStorageExternalSource() {
return MakeIntrusive<TObjectStorageExternalSource>();
}
+NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
+ return TObjectStorageExternalSource::Validate(schema, objectStorage, pathsLimit);
+}
+
+NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {
+ return TObjectStorageExternalSource::ValidateDateFormatSetting(formatSetting, matchAllSettings);
+}
+
}
diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h
index 22bd9f6d765..f06e2ce3aa7 100644
--- a/ydb/core/external_sources/object_storage.h
+++ b/ydb/core/external_sources/object_storage.h
@@ -2,8 +2,14 @@
#include "external_source.h"
+#include <ydb/public/api/protos/draft/fq.pb.h>
+
namespace NKikimr::NExternalSource {
IExternalSource::TPtr CreateObjectStorageExternalSource();
+NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);
+
+NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);
+
}
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
index eea91a2b5c3..f2016a07ddf 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.cpp
@@ -2,113 +2,6 @@
namespace NFq {
-namespace {
-
-NYql::TIssues ValidateProjectionType(const NYdb::TType& columnType, const TString& columnName, const std::vector<NYdb::TType>& availableTypes) {
- return FindIf(availableTypes, [&columnType](const auto& availableType) { return NYdb::TypesEqual(availableType, columnType); }) == availableTypes.end()
- ? NYql::TIssues{MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << columnName << "\" from projection does not support " << columnType.ToString() << " type")}
- : NYql::TIssues{};
-}
-
-NYql::TIssues ValidateIntegerProjectionType(const NYdb::TType& columnType, const TString& columnName) {
- static const std::vector<NYdb::TType> availableTypes {
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::String)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Int32)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Uint32)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Int64)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Uint64)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Utf8)
- .Build()
- };
- return ValidateProjectionType(columnType, columnName, availableTypes);
-}
-
-NYql::TIssues ValidateEnumProjectionType(const NYdb::TType& columnType, const TString& columnName) {
- static const std::vector<NYdb::TType> availableTypes {
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::String)
- .Build()
- };
- return ValidateProjectionType(columnType, columnName, availableTypes);
-}
-
-NYql::TIssues ValidateCommonProjectionType(const NYdb::TType& columnType, const TString& columnName) {
- static const std::vector<NYdb::TType> availableTypes {
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::String)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Int64)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Utf8)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Int32)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Uint32)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Uint64)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Date)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Datetime)
- .Build()
- };
- return ValidateProjectionType(columnType, columnName, availableTypes);
-}
-
-NYql::TIssues ValidateDateProjectionType(const NYdb::TType& columnType, const TString& columnName) {
- static const std::vector<NYdb::TType> availableTypes {
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::String)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Utf8)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Uint32)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Date)
- .Build(),
- NYdb::TTypeBuilder{}
- .Primitive(NYdb::EPrimitiveType::Datetime)
- .Build()
- };
- return ValidateProjectionType(columnType, columnName, availableTypes);
-}
-
-TMap<TString, NYql::NUdf::EDataSlot> GetDataSlotColumns(const FederatedQuery::Schema& schema) {
- TMap<TString, NYql::NUdf::EDataSlot> dataSlotColumns;
- for (const auto& column: schema.column()) {
- if (column.has_type()) {
- const auto& type = column.type();
- if (type.has_type_id()) {
- dataSlotColumns[column.name()] = NYql::NUdf::GetDataSlot(type.type_id());
- }
- }
- }
- return dataSlotColumns;
-}
-
-}
-
template <typename TConnection>
void ValidateGenericConnectionSetting(
const TConnection& connection,
@@ -234,142 +127,6 @@ NYql::TIssues ValidateConnectionSetting(
return issues;
}
-NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings) {
- NYql::TIssues issues;
- TSet<TString> conflictingKeys;
- for (const auto& [key, value]: formatSetting) {
- if (key == "data.datetime.format_name"sv) {
- if (!IsValidDateTimeFormatName(value)) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.datetime.format_name " + value));
- }
- if (conflictingKeys.contains("data.datetime.format")) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together"));
- }
- conflictingKeys.insert("data.datetime.format_name");
- continue;
- }
-
- if (key == "data.datetime.format"sv) {
- if (conflictingKeys.contains("data.datetime.format_name")) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.datetime.format_name and data.datetime.format together"));
- }
- conflictingKeys.insert("data.datetime.format");
- continue;
- }
-
- if (key == "data.timestamp.format_name"sv) {
- if (!IsValidTimestampFormatName(value)) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.timestamp.format_name " + value));
- }
- if (conflictingKeys.contains("data.timestamp.format")) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together"));
- }
- conflictingKeys.insert("data.timestamp.format_name");
- continue;
- }
-
- if (key == "data.timestamp.format"sv) {
- if (conflictingKeys.contains("data.timestamp.format_name")) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "Don't use data.timestamp.format_name and data.timestamp.format together"));
- }
- conflictingKeys.insert("data.timestamp.format");
- continue;
- }
-
- if (matchAllSettings) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key));
- }
- }
- return issues;
-}
-
-
-NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
- NYql::TIssues issues;
- TSet<TString> conflictingKeys;
- issues.AddIssues(ValidateDateFormatSetting(formatSetting));
- for (const auto& [key, value]: formatSetting) {
- if (key == "file_pattern"sv) {
- continue;
- }
-
- if (key == "data.interval.unit"sv) {
- if (!IsValidIntervalUnit(value)) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.interval.unit " + value));
- }
- continue;
- }
-
- if (IsIn({ "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv}, key)) {
- continue;
- }
-
- if (key == "csv_delimiter"sv) {
- if (format != "csv_with_names"sv) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names"));
- }
- if (value.size() != 1) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should contain only one character"));
- }
- continue;
- }
-
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key));
- }
- return issues;
-}
-
-NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy) {
- NYql::TIssues issues;
- TMap<TString, Ydb::Type> types;
- for (const auto& column: schema.column()) {
- types[column.name()] = column.type();
- }
- for (const auto& parititonedColumn: partitionedBy) {
- auto it = types.find(parititonedColumn);
- if (it == types.end()) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column " << parititonedColumn << " from partitioned_by does not exist in the scheme. Please add such a column to your scheme"));
- continue;
- }
- NYdb::TType columnType{it->second};
- issues.AddIssues(ValidateCommonProjectionType(columnType, parititonedColumn));
- }
- return issues;
-}
-
-NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit) {
- auto generator = NYql::NPathGenerator::CreatePathGenerator(
- projection,
- partitionedBy,
- GetDataSlotColumns(schema),
- pathsLimit); // an exception is thrown if an error occurs
- TMap<TString, NYql::NPathGenerator::IPathGenerator::EType> projectionColumns;
- for (const auto& column: generator->GetConfig().Rules) {
- projectionColumns[column.Name] = column.Type;
- }
- NYql::TIssues issues;
- for (const auto& column: schema.column()) {
- auto it = projectionColumns.find(column.name());
- if (it != projectionColumns.end()) {
- switch (it->second) {
- case NYql::NPathGenerator::IPathGenerator::EType::INTEGER:
- issues.AddIssues(ValidateIntegerProjectionType(NYdb::TType{column.type()}, column.name()));
- break;
- case NYql::NPathGenerator::IPathGenerator::EType::ENUM:
- issues.AddIssues(ValidateEnumProjectionType(NYdb::TType{column.type()}, column.name()));
- break;
- case NYql::NPathGenerator::IPathGenerator::EType::DATE:
- issues.AddIssues(ValidateDateProjectionType(NYdb::TType{column.type()}, column.name()));
- break;
- case NYql::NPathGenerator::IPathGenerator::EType::UNDEFINED:
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, TStringBuilder{} << "Column \"" << column.name() << "\" from projection has undefined generator type"));
- break;
- }
- }
- }
- return issues;
-}
-
NYql::TIssues ValidateEntityName(const TString& name) {
NYql::TIssues issues;
diff --git a/ydb/core/fq/libs/control_plane_storage/request_validators.h b/ydb/core/fq/libs/control_plane_storage/request_validators.h
index add489ae065..d28023daa6a 100644
--- a/ydb/core/fq/libs/control_plane_storage/request_validators.h
+++ b/ydb/core/fq/libs/control_plane_storage/request_validators.h
@@ -2,6 +2,7 @@
#include "util.h"
+#include <ydb/core/external_sources/object_storage.h>
#include <ydb/core/fq/libs/config/yq_issue.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -77,12 +78,6 @@ NYql::TIssues ValidateQuery(const T& ev, size_t maxSize)
return issues;
}
-NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting);
-
-
-NYql::TIssues ValidateDateFormatSetting(const google::protobuf::Map<TString, TString>& formatSetting, bool matchAllSettings = false);
-NYql::TIssues ValidateProjectionColumns(const FederatedQuery::Schema& schema, const TVector<TString>& partitionedBy);
-NYql::TIssues ValidateProjection(const FederatedQuery::Schema& schema, const TString& projection, const TVector<TString>& partitionedBy, size_t pathsLimit);
NYql::TIssues ValidateEntityName(const TString& name);
template<typename T>
@@ -114,7 +109,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
if (!dataStreams.has_schema()) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "data streams with empty schema is forbidden"));
}
- issues.AddIssues(ValidateDateFormatSetting(dataStreams.format_setting(), true));
+ issues.AddIssues(NKikimr::NExternalSource::ValidateDateFormatSetting(dataStreams.format_setting(), true));
break;
}
case FederatedQuery::BindingSetting::BINDING_NOT_SET: {
@@ -125,24 +120,7 @@ NYql::TIssues ValidateBinding(const T& ev, size_t maxSize, const TSet<FederatedQ
case FederatedQuery::BindingSetting::kObjectStorage:
const FederatedQuery::ObjectStorageBinding objectStorage = setting.object_storage();
for (const auto& subset: objectStorage.subset()) {
- issues.AddIssues(ValidateFormatSetting(subset.format(), subset.format_setting()));
- if (subset.projection_size() || subset.partitioned_by_size()) {
- try {
- TVector<TString> partitionedBy{subset.partitioned_by().begin(), subset.partitioned_by().end()};
- issues.AddIssues(ValidateProjectionColumns(subset.schema(), partitionedBy));
- TString projectionStr;
- if (subset.projection_size()) {
- NSc::TValue projection;
- for (const auto& [key, value]: subset.projection()) {
- projection[key] = value;
- }
- projectionStr = projection.ToJsonPretty();
- }
- issues.AddIssues(ValidateProjection(subset.schema(), projectionStr, partitionedBy, pathsLimit));
- } catch (...) {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,CurrentExceptionMessage()));
- }
- }
+ issues.AddIssues(NKikimr::NExternalSource::Validate(subset.schema(), subset, pathsLimit));
}
break;
}