summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2022-10-19 20:30:43 +0300
committeraneporada <[email protected]>2022-10-19 20:30:43 +0300
commit026c0fd128c65b45d7db1e993dad9d6bb6475dfc (patch)
treedf89f1d9af6e1010bd8fa67b3e34b74bb0557c1f
parentc5533b2142bf51a05a75c9b22b0e899b93520829 (diff)
Fix S3 insert with userschema
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp56
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp36
3 files changed, 90 insertions, 3 deletions
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 6c0df996492..1a286d143fb 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -1955,6 +1955,62 @@ bool EnsureValidSettings(const TExprNode& node,
return true;
}
+
+bool EnsureValidUserSchemaSetting(const TExprNode& node, TExprContext& ctx) {
+ if (!EnsureTupleMinSize(node, 2, ctx)) {
+ return false;
+ }
+
+ if (!EnsureTupleMaxSize(node, 3, ctx)) {
+ return false;
+ }
+
+ if (!EnsureAtom(node.Head(), ctx)) {
+ return false;
+ }
+
+ if (node.Head().Content() != "userschema") {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()), TStringBuilder() << "Expecting userschema, but got '" << node.Head().Content() << "'"));
+ return false;
+ }
+
+ if (!EnsureTypeWithStructType(*node.Child(1), ctx)) {
+ return false;
+ }
+
+ if (node.ChildrenSize() == 3) {
+ if (!EnsureTupleOfAtoms(*node.Child(2), ctx)) {
+ return false;
+ }
+ const TStructExprType* s = node.Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+ THashSet<TStringBuf> items;
+ for (auto child : node.Child(2)->ChildrenList()) {
+ if (!items.insert(child->Content()).second) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()),
+ TStringBuilder() << "Invalid positional userschema: got duplicated field '" << child->Content() << "'"));
+ return false;
+
+ }
+ if (!s->FindItem(child->Content())) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()),
+ TStringBuilder() << "Invalid positional userschema: field '" << child->Content() << "'"
+ << " is not found in type " << *(const TTypeAnnotationNode*)s));
+ return false;
+ }
+ }
+
+ if (node.Child(2)->ChildrenSize() < s->GetSize()) {
+ ctx.AddError(TIssue(ctx.GetPosition(node.Head().Pos()),
+ TStringBuilder() << "Invalid positional userschema of size " << node.Child(2)->ChildrenSize()
+ << " with struct type of size " << s->GetSize()));
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
TSettingNodeValidator RequireSingleValueSettings(const TSettingNodeValidator& validator) {
return [validator](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
if (setting.ChildrenSize() != 2) {
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 4bad3cfdc3f..4392a4b5504 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -72,6 +72,7 @@ bool EnsureValidSettings(const TExprNode& node,
const THashSet<TStringBuf>& supportedSettings,
const TSettingNodeValidator& validator,
TExprContext& ctx);
+bool EnsureValidUserSchemaSetting(const TExprNode& node, TExprContext& ctx);
TSettingNodeValidator RequireSingleValueSettings(const TSettingNodeValidator& validator);
bool EnsureLambda(const TExprNode& node, TExprContext& ctx);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index b607e5f8cf5..0753addfa5c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -2,6 +2,7 @@
#include "yql_s3_provider_impl.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
@@ -49,11 +50,36 @@ private:
return TStatus::Error;
}
- if (!TS3Target::Match(input->Child(TS3WriteObject::idx_Target))) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3WriteObject::idx_Target)->Pos()), "Expected S3 target."));
+ auto source = input->Child(TS3WriteObject::idx_Input);
+ if (!EnsureListType(*source, ctx)) {
return TStatus::Error;
}
+ const TTypeAnnotationNode* sourceType = source->GetTypeAnn()->Cast<TListExprType>()->GetItemType();
+ if (!EnsureStructType(source->Pos(), *sourceType, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto target = input->Child(TS3WriteObject::idx_Target);
+ if (!TS3Target::Match(target)) {
+ ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
+ return TStatus::Error;
+ }
+
+ TS3Target tgt(target);
+ if (auto settings = tgt.Settings()) {
+ if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
+ const TTypeAnnotationNode* targetType = userschema->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ if (!IsSameAnnotation(*targetType, *sourceType)) {
+ ctx.AddError(TIssue(ctx.GetPosition(source->Pos()),
+ TStringBuilder() << "Type mismatch between schema type: " << *targetType
+ << " and actual data type: " << *sourceType << ", diff is: "
+ << GetTypeDiff(*targetType, *sourceType)));
+ return TStatus::Error;
+ }
+ }
+ }
+
input->SetTypeAnn(ctx.MakeType<TWorldExprType>());
return TStatus::Ok;
}
@@ -121,10 +147,14 @@ private:
return true;
}
+ if (name == "userschema") {
+ return EnsureValidUserSchemaSetting(setting, ctx);
+ }
+
return true;
};
- if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode"}, validator, ctx)) {
+ if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema"}, validator, ctx)) {
return TStatus::Error;
}
}