summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/external_sources/object_storage.cpp17
-rw-r--r--ydb/core/external_sources/object_storage_ut.cpp9
-rw-r--r--ydb/library/yql/providers/s3/common/util.cpp16
-rw-r--r--ydb/library/yql/providers/s3/common/util.h3
-rw-r--r--ydb/library/yql/providers/s3/common/ya.make1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp5
-rw-r--r--ydb/tests/fq/s3/test_s3_0.py53
8 files changed, 110 insertions, 0 deletions
diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp
index ecf72445a11..1cca2267a09 100644
--- a/ydb/core/external_sources/object_storage.cpp
+++ b/ydb/core/external_sources/object_storage.cpp
@@ -150,6 +150,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
const bool hasPartitioning = objectStorage.projection_size() || objectStorage.partitioned_by_size();
issues.AddIssues(ValidateFormatSetting(objectStorage.format(), objectStorage.format_setting(), location, hasPartitioning));
+ issues.AddIssues(ValidateSchema(schema));
issues.AddIssues(ValidateJsonListFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
issues.AddIssues(ValidateRawFormat(objectStorage.format(), schema, objectStorage.partitioned_by()));
if (hasPartitioning) {
@@ -269,6 +270,22 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
template<typename TScheme>
+ static NYql::TIssues ValidateSchema(const TScheme& schema) {
+ NYql::TIssues issues;
+ for (const auto& column: schema.column()) {
+ const auto type = column.type();
+ if (type.has_optional_type() && type.optional_type().item().has_optional_type()) {
+ issues.AddIssue(MakeErrorIssue(
+ Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder{} << "Double optional types are not supported (you have '"
+ << column.name() << " " << NYdb::TType(column.type()).ToString() << "' field)"));
+ }
+ }
+
+ return issues;
+ }
+
+ template<typename TScheme>
static NYql::TIssues ValidateJsonListFormat(const TString& format, const TScheme& schema, const google::protobuf::RepeatedPtrField<TString>& partitionedBy) {
NYql::TIssues issues;
if (format != "json_list"sv) {
diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp
index d5bc4a655d3..129ad8febd7 100644
--- a/ydb/core/external_sources/object_storage_ut.cpp
+++ b/ydb/core/external_sources/object_storage_ut.cpp
@@ -55,6 +55,15 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) {
UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Date, Timestamp and Interval types are not allowed in json_list format");
}
+ Y_UNIT_TEST(FailedOptionalTypeValidation) {
+ auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
+ NKikimrExternalSources::TSchema schema;
+ NKikimrExternalSources::TGeneral general;
+ auto newColumn = schema.add_column();
+ newColumn->mutable_type()->mutable_optional_type()->mutable_item()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::INT32);
+ UNIT_ASSERT_EXCEPTION_CONTAINS(source->Pack(schema, general), NExternalSource::TExternalSourceException, "Double optional types are not supported");
+ }
+
Y_UNIT_TEST(WildcardsValidation) {
auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000, nullptr, false, false);
NKikimrExternalSources::TSchema schema;
diff --git a/ydb/library/yql/providers/s3/common/util.cpp b/ydb/library/yql/providers/s3/common/util.cpp
index 59404018874..074b116cc8c 100644
--- a/ydb/library/yql/providers/s3/common/util.cpp
+++ b/ydb/library/yql/providers/s3/common/util.cpp
@@ -48,4 +48,20 @@ TString UrlEscapeRet(const TStringBuf from) {
return to;
}
+bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx) {
+ for (const TItemExprType* item : schemaStructRowType->GetItems()) {
+ const TTypeAnnotationNode* rowType = item->GetItemType();
+ if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
+ rowType = rowType->Cast<TOptionalExprType>()->GetItemType();
+ }
+
+ if (rowType->GetKind() == ETypeAnnotationKind::Optional) {
+ ctx.AddError(TIssue(TStringBuilder() << "Double optional types are not supported (you have '"
+ << item->GetName() << " " << FormatType(item->GetItemType()) << "' field)"));
+ return false;
+ }
+ }
+ return true;
+}
+
}
diff --git a/ydb/library/yql/providers/s3/common/util.h b/ydb/library/yql/providers/s3/common/util.h
index 8767742e31f..d364e971078 100644
--- a/ydb/library/yql/providers/s3/common/util.h
+++ b/ydb/library/yql/providers/s3/common/util.h
@@ -2,6 +2,7 @@
#include <util/string/builder.h>
#include <yql/essentials/public/issue/yql_issue.h>
+#include <yql/essentials/ast/yql_expr.h>
namespace NYql::NS3Util {
@@ -12,4 +13,6 @@ TIssues AddParentIssue(const TStringBuilder& prefix, TIssues&& issues);
// '#', '?'
TString UrlEscapeRet(const TStringBuf from);
+bool ValidateS3ReadWriteSchema(const TStructExprType* schemaStructRowType, TExprContext& ctx);
+
}
diff --git a/ydb/library/yql/providers/s3/common/ya.make b/ydb/library/yql/providers/s3/common/ya.make
index 4927e3b10d6..058d0ca92fd 100644
--- a/ydb/library/yql/providers/s3/common/ya.make
+++ b/ydb/library/yql/providers/s3/common/ya.make
@@ -18,6 +18,7 @@ PEERDIR(
ydb/library/yql/providers/s3/events
yql/essentials/public/issue
yql/essentials/public/issue/protos
+ yql/essentials/ast
)
IF (CLANG AND NOT WITH_VALGRIND)
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index ecda21b9281..350431400b7 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -2,6 +2,7 @@
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
#include <yql/essentials/core/yql_opt_utils.h>
+#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <yql/essentials/providers/common/provider/yql_provider.h>
@@ -26,6 +27,7 @@ TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
return {};
}
+
}
namespace {
@@ -74,6 +76,10 @@ private:
return TStatus::Error;
}
+ if (!NS3Util::ValidateS3ReadWriteSchema(sourceType->Cast<TStructExprType>(), ctx)) {
+ return TStatus::Error;
+ }
+
auto target = input->Child(TS3WriteObject::idx_Target);
if (!TS3Target::Match(target)) {
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 6bfb9b64134..a94de088fc6 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -1,6 +1,7 @@
#include "yql_s3_provider_impl.h"
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
@@ -491,6 +492,10 @@ public:
auto format = s3Object.Format().Ref().Content();
const TStructExprType* structRowType = rowType->Cast<TStructExprType>();
+ if (!NS3Util::ValidateS3ReadWriteSchema(structRowType, ctx)) {
+ return TStatus::Error;
+ }
+
THashSet<TStringBuf> columns;
for (const TItemExprType* item : structRowType->GetItems()) {
columns.emplace(item->GetName());
diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py
index e5ea35c0abc..1e527706db2 100644
--- a/ydb/tests/fq/s3/test_s3_0.py
+++ b/ydb/tests/fq/s3/test_s3_0.py
@@ -1114,3 +1114,56 @@ Pear,15,33'''
client.abort_query(query_id)
client.wait_query(query_id)
+
+ @yq_v2
+ @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
+ def test_double_optional_types_validation(self, kikimr, s3, client, unique_prefix):
+ resource = boto3.resource(
+ "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
+ )
+
+ bucket = resource.Bucket("fbucket")
+ bucket.create(ACL='public-read')
+ bucket.objects.all().delete()
+
+ s3_client = boto3.client(
+ "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
+ )
+
+ fruits = '''Fruit,Price,Weight
+Banana,3,100
+Apple,2,22
+Pear,15,33'''
+ s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.csv', ContentType='text/plain')
+
+ kikimr.control_plane.wait_bootstrap(1)
+ storage_connection_name = unique_prefix + "fruitbucket"
+ client.create_storage_connection(storage_connection_name, "fbucket")
+
+ sql = f'''
+ SELECT *
+ FROM `{storage_connection_name}`.`fruits.csv`
+ WITH (format='csv_with_names', SCHEMA (
+ Name Int32??,
+ ));
+ '''
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ issues = str(client.describe_query(query_id).result.query.issue)
+
+ assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues
+
+ sql = f'''
+ INSERT INTO `{storage_connection_name}`.`insert/`
+ WITH
+ (
+ FORMAT="csv_with_names"
+ )
+ SELECT CAST(42 AS Int32??) as Weight;'''
+
+ query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
+ client.wait_query_status(query_id, fq.QueryMeta.FAILED)
+ issues = str(client.describe_query(query_id).result.query.issue)
+
+ assert "Double optional types are not supported" in issues, "Incorrect issues: " + issues