diff options
| -rw-r--r-- | ydb/core/external_sources/object_storage.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/external_sources/object_storage_ut.cpp | 9 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/common/util.cpp | 16 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/common/util.h | 3 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/common/ya.make | 1 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp | 6 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp | 5 | ||||
| -rw-r--r-- | ydb/tests/fq/s3/test_s3_0.py | 53 |
8 files changed, 110 insertions, 0 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index ecf72445a11..1cca2267a09 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource { } const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size(); issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning)); + issues.AddIssues(ValidateSchema(schema)); issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by())); issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by())); if (hasPartitioning) { @@ -269,6 +270,22 @@ struct TObjectStorageExternalSource : public IExternalSource { } template<typename TScheme> + static NYql::TIssues ValidateSchema(const TScheme& schema) { + NYql::TIssues issues; + for (const auto& column: schema.column()) { + const auto type = column.type(); + if (type.has_optional_type() && type.optional_type().item().has_optional_type()) { + issues.AddIssue(MakeErrorIssue( + Ydb::StatusIds::BAD_REQUEST, + TStringBuilder{} << "Double optional types are not supported (you have '" + << column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)")); + } + } + + return issues; + } + + template<typename TScheme> static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) { NYql::TIssues issues; if (format != "json_list"sv) { diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp index d5bc4a655d3..129ad8febd7 100644 --- a/ydb/core/external_sources/object_storage_ut.cpp +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -55,6 +55,15 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) { UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format"); } + Y_UNIT_TEST(FailedOptionalTypeValidation) { + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); + NKikimrExternalSources::TSchema schema; + NKikimrExternalSources::TGeneral general; + auto newColumn = schema.add_column(); + newColumn->mutable_type()->mutable_optional_type()->mutable_item()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::INT32); + UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Double optional types are not supported"); + } + Y_UNIT_TEST(WildcardsValidation) { auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false); NKikimrExternalSources::TSchema schema; diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp index 59404018874..074b116cc8c 100644 --- a/ydb/library/yql/providers/s3/common/util.cpp +++ b/ydb/library/yql/providers/s3/common/util.cpp @@ -48,4 +48,20 @@ TString UrlEscapeRet(const TStringBuf from) { return to; } +bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx) { + for (const TItemExprType* item : schemaStructRowType->GetItems()) { + const TTypeAnnotationNode* rowType = item->GetItemType(); + if (rowType->GetKind() == ETypeAnnotationKind::Optional) { + rowType = rowType->Cast<TOptionalExprType>()->GetItemType(); + } + + if (rowType->GetKind() == ETypeAnnotationKind::Optional) { + ctx.AddError(TIssue(TStringBuilder() << "Double optional types are not supported (you have '" + << item->GetName() << " " << FormatType(item->GetItemType()) << "' field)")); + return false; + } + } + return true; +} + } diff --git a/ydb/library/yql/providers/s3/common/util.h b/ydb/library/yql/providers/s3/common/util.h index 8767742e31f..d364e971078 100644 --- a/ydb/library/yql/providers/s3/common/util.h +++ b/ydb/library/yql/providers/s3/common/util.h @@ -2,6 +2,7 @@ #include <util/string/builder.h> #include <yql/essentials/public/issue/yql_issue.h> +#include <yql/essentials/ast/yql_expr.h> namespace NYql::NS3Util { @@ -12,4 +13,6 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues); // '#', '?' TString UrlEscapeRet(const TStringBuf from); +bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx); + } diff --git a/ydb/library/yql/providers/s3/common/ya.make b/ydb/library/yql/providers/s3/common/ya.make index 4927e3b10d6..058d0ca92fd 100644 --- a/ydb/library/yql/providers/s3/common/ya.make +++ b/ydb/library/yql/providers/s3/common/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/library/yql/providers/s3/events yql/essentials/public/issue yql/essentials/public/issue/protos + yql/essentials/ast ) IF (CLANG AND NOT WITH_VALGRIND) diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index ecda21b9281..350431400b7 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -2,6 +2,7 @@ #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> #include <yql/essentials/core/yql_opt_utils.h> +#include <ydb/library/yql/providers/s3/common/util.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <yql/essentials/providers/common/provider/yql_provider.h> @@ -26,6 +27,7 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { return {}; } + } namespace { @@ -74,6 +76,10 @@ private: return TStatus::Error; } + if (!NS3Util::ValidateS3ReadWriteSchema(sourceType->Cast<TStructExprType>(), ctx)) { + return TStatus::Error; + } + auto target = input->Child(TS3WriteObject::idx_Target); if (!TS3Target::Match(target)) { ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target.")); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 6bfb9b64134..a94de088fc6 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -1,6 +1,7 @@ #include "yql_s3_provider_impl.h" #include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/providers/s3/common/util.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> @@ -491,6 +492,10 @@ public: auto format = s3Object.Format().Ref().Content(); const TStructExprType* structRowType = rowType->Cast<TStructExprType>(); + if (!NS3Util::ValidateS3ReadWriteSchema(structRowType, ctx)) { + return TStatus::Error; + } + THashSet<TStringBuf> columns; for (const TItemExprType* item : structRowType->GetItems()) { columns.emplace(item->GetName()); diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index e5ea35c0abc..1e527706db2 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -1114,3 +1114,56 @@ Pear,15,33''' client.abort_query(query_id) client.wait_query(query_id) + + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`fruits.csv` + WITH (format='csv_with_names', SCHEMA ( + Name Int32??, + )); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + issues = str(client.describe_query(query_id).result.query.issue) + + assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues + + sql = f''' + INSERT INTO `{storage_connection_name}`.`insert/` + WITH + ( + FORMAT="csv_with_names" + ) + SELECT CAST(42 AS Int32??) as Weight;''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + issues = str(client.describe_query(query_id).result.query.issue) + + assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues |
