diff options
| author | aneporada <[email protected]> | 2022-10-19 20:30:43 +0300 |
|---|---|---|
| committer | aneporada <[email protected]> | 2022-10-19 20:30:43 +0300 |
| commit | 026c0fd128c65b45d7db1e993dad9d6bb6475dfc (patch) | |
| tree | df89f1d9af6e1010bd8fa67b3e34b74bb0557c1f | |
| parent | c5533b2142bf51a05a75c9b22b0e899b93520829 (diff) | |
Fix S3 insert with userschema
| -rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.cpp | 56 | ||||
| -rw-r--r-- | ydb/library/yql/core/yql_expr_type_annotation.h | 1 | ||||
| -rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp | 36 |
3 files changed, 90 insertions, 3 deletions
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 6c0df996492..1a286d143fb 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -1955,6 +1955,62 @@ bool EnsureValidSettings(const TExprNode& node, return true; } + +bool EnsureValidUserSchemaSetting(const TExprNode& node, TExprContext& ctx) { + if (!EnsureTupleMinSize(node, 2, ctx)) { + return false; + } + + if (!EnsureTupleMaxSize(node, 3, ctx)) { + return false; + } + + if (!EnsureAtom(node.Head(), ctx)) { + return false; + } + + if (node.Head().Content() != "userschema") { + ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()), TStringBuilder() << "Expecting userschema, but got '" << node.Head().Content() << "'")); + return false; + } + + if (!EnsureTypeWithStructType(*node.Child(1), ctx)) { + return false; + } + + if (node.ChildrenSize() == 3) { + if (!EnsureTupleOfAtoms(*node.Child(2), ctx)) { + return false; + } + const TStructExprType* s = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>(); + THashSet<TStringBuf> items; + for (auto child : node.Child(2)->ChildrenList()) { + if (!items.insert(child->Content()).second) { + ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()), + TStringBuilder() << "Invalid positional userschema: got duplicated field '" << child->Content() << "'")); + return false; + + } + if (!s->FindItem(child->Content())) { + ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()), + TStringBuilder() << "Invalid positional userschema: field '" << child->Content() << "'" + << " is not found in type " << *(const TTypeAnnotationNode*)s)); + return false; + } + } + + if (node.Child(2)->ChildrenSize() < s->GetSize()) { + ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()), + TStringBuilder() << "Invalid positional userschema of size " << node.Child(2)->ChildrenSize() + << " with struct type of size " << s->GetSize())); + return false; + } + } + + return true; +} + + TSettingNodeValidator RequireSingleValueSettings(const TSettingNodeValidator& validator) { return [validator](TStringBuf name, const TExprNode& setting, TExprContext& ctx) { if (setting.ChildrenSize() != 2) { diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 4bad3cfdc3f..4392a4b5504 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -72,6 +72,7 @@ bool EnsureValidSettings(const TExprNode& node, const THashSet<TStringBuf>& supportedSettings, const TSettingNodeValidator& validator, TExprContext& ctx); +bool EnsureValidUserSchemaSetting(const TExprNode& node, TExprContext& ctx); TSettingNodeValidator RequireSingleValueSettings(const TSettingNodeValidator& validator); bool EnsureLambda(const TExprNode& node, TExprContext& ctx); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index b607e5f8cf5..0753addfa5c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -2,6 +2,7 @@ #include "yql_s3_provider_impl.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> @@ -49,11 +50,36 @@ private: return TStatus::Error; } - if (!TS3Target::Match(input->Child(TS3WriteObject::idx_Target))) { - ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3WriteObject::idx_Target)->Pos()), "Expected S3 target.")); + auto source = input->Child(TS3WriteObject::idx_Input); + if (!EnsureListType(*source, ctx)) { return TStatus::Error; } + const TTypeAnnotationNode* sourceType = source->GetTypeAnn()->Cast<TListExprType>()->GetItemType(); + if (!EnsureStructType(source->Pos(), *sourceType, ctx)) { + return TStatus::Error; + } + + auto target = input->Child(TS3WriteObject::idx_Target); + if (!TS3Target::Match(target)) { + ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target.")); + return TStatus::Error; + } + + TS3Target tgt(target); + if (auto settings = tgt.Settings()) { + if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) { + const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + if (!IsSameAnnotation(*targetType, *sourceType)) { + ctx.AddError(TIssue(ctx.GetPosition(source->Pos()), + TStringBuilder() << "Type mismatch between schema type: " << *targetType + << " and actual data type: " << *sourceType << ", diff is: " + << GetTypeDiff(*targetType, *sourceType))); + return TStatus::Error; + } + } + } + input->SetTypeAnn(ctx.MakeType<TWorldExprType>()); return TStatus::Ok; } @@ -121,10 +147,14 @@ private: return true; } + if (name == "userschema") { + return EnsureValidUserSchemaSetting(setting, ctx); + } + return true; }; - if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode"}, validator, ctx)) { + if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema"}, validator, ctx)) { return TStatus::Error; } } |
