aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2023-09-07 17:18:03 +0300
committeruzhas <uzhas@ydb.tech>2023-09-07 17:34:26 +0300
commit34c1795da760c434a94267f4b22838d7004124b6 (patch)
tree707b2554adb546422c546187f5260f77ac695b82
parentae0c396f695ac3e034855b0ffb84a6234c03f9a8 (diff)
downloadydb-34c1795da760c434a94267f4b22838d7004124b6.tar.gz
enable more insert tests for v2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp100
1 files changed, 59 insertions, 41 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index cabe474f270..17e2d41fd5c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -17,6 +17,19 @@ using namespace NNodes;
namespace {
+TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
+ if (partBy) {
+ auto children = partBy->ChildrenList();
+ children.erase(children.cbegin());
+ return children;
+ }
+
+ return {};
+}
+}
+
+namespace {
+
class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
TS3DataSinkTypeAnnotationTransformer(TS3State::TPtr state)
@@ -104,12 +117,14 @@ private:
return TStatus::Error;
}
+ const auto structType = sourceType->Cast<TStructExprType>();
auto target = input->Child(TS3Insert::idx_Target);
if (!TS3Target::Match(target)) {
ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target."));
return TStatus::Error;
}
+ TExprNode::TListType keys;
TS3Target tgt(target);
if (auto settings = tgt.Settings()) {
if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) {
@@ -122,25 +137,25 @@ private:
return TStatus::Error;
}
}
+ auto partBy = GetSetting(settings.Cast().Ref(), "partitionedby"sv);
+ keys = GetPartitionKeys(partBy);
}
const auto format = tgt.Format();
- const bool isSingleRowPerFileFormat = IsIn({TStringBuf("raw"), TStringBuf("json_list")}, format);
- const TTypeAnnotationNode* listItemType = ctx.MakeType<TDataExprType>(EDataSlot::String);
- if (!isSingleRowPerFileFormat) {
- listItemType = ctx.MakeType<TOptionalExprType>(listItemType);
+ auto baseTargeType = AnnotateTargetBase(format, keys, structType, ctx);
+ if (!baseTargeType) {
+ return TStatus::Error;
}
- input->SetTypeAnn(
- ctx.MakeType<TTupleExprType>(
- TTypeAnnotationNode::TListType{
- ctx.MakeType<TListExprType>(
- listItemType
- )
- }
- )
- );
+ auto t = ctx.MakeType<TTupleExprType>(
+ TTypeAnnotationNode::TListType{
+ ctx.MakeType<TListExprType>(
+ baseTargeType
+ )
+ });
+
+ input->SetTypeAnn(t);
return TStatus::Ok;
}
@@ -337,53 +352,56 @@ private:
}
const auto structType = itemType->Cast<TStructExprType>();
- const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize();
+ const auto keys = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenList();
+ const TCoAtom format(input->Child(TS3SinkOutput::idx_Format));
+
+ auto baseTargetType = AnnotateTargetBase(format, keys, structType, ctx);
+ if (!baseTargetType) {
+ return TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(baseTargetType));
+ return TStatus::Ok;
+ }
- const auto format = input->Child(TS3SinkOutput::idx_Format);
- const bool isSingleRowPerFileFormat = format->IsAtom({"raw","json_list"}); // One string from ClikhouseUdf ClickHouseClient.SerializeFormat results in one S3 file => no delimiters between files.
+private:
+ const TTypeAnnotationNode* AnnotateTargetBase(TCoAtom format, const TExprNode::TListType& keys, const TStructExprType* structType, TExprContext& ctx) {
+ const bool isSingleRowPerFileFormat = IsIn({TStringBuf("raw"), TStringBuf("json_list")}, format);
+ auto keysCount = keys.size();
if (keysCount) {
if (isSingleRowPerFileFormat) {
- ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Partitioned isn't supported for " << format->Content() << " output format."));
- return TStatus::Error;
+ ctx.AddError(TIssue(ctx.GetPosition(format.Pos()), TStringBuilder() << "Partitioned isn't supported for " << (TStringBuf)format << " output format."));
+ return nullptr;
}
+
for (auto i = 0U; i < keysCount; ++i) {
- const auto key = input->Child(TS3SinkOutput::idx_KeyColumns)->Child(i);
+ const auto key = keys[i];
if (const auto keyType = structType->FindItemType(key->Content())) {
if (!EnsureDataType(key->Pos(), *keyType, ctx)) {
- return TStatus::Error;
+ return nullptr;
}
} else {
ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column."));
- return TStatus::Error;
+ return nullptr;
}
-
- TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8));
- itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String));
- input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes)));
}
- } else {
- if (format->IsAtom("raw")) {
- if (const auto width = structType->GetSize(); width > 1U) {
- ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Expected single column for " << format->Content() << " output format, but got " << width));
- return TStatus::Error;
- }
- if (!EnsureDataType(format->Pos(), *structType->GetItems().front()->GetItemType(), ctx)) {
- return TStatus::Error;
- }
- }
+ TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8));
+ itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String));
- if (isSingleRowPerFileFormat) {
- input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
- } else {
- input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))));
- }
+ return ctx.MakeType<TTupleExprType>(itemTypes);
}
- return TStatus::Ok;
+ const TTypeAnnotationNode* listItemType = ctx.MakeType<TDataExprType>(EDataSlot::String);
+ if (!isSingleRowPerFileFormat) {
+ return ctx.MakeType<TOptionalExprType>(listItemType);
+ }
+
+ return listItemType;
}
+private:
const TS3State::TPtr State_;
};