diff options
author | uzhas <uzhas@ydb.tech> | 2023-09-07 17:18:03 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2023-09-07 17:34:26 +0300 |
commit | 34c1795da760c434a94267f4b22838d7004124b6 (patch) | |
tree | 707b2554adb546422c546187f5260f77ac695b82 | |
parent | ae0c396f695ac3e034855b0ffb84a6234c03f9a8 (diff) | |
download | ydb-34c1795da760c434a94267f4b22838d7004124b6.tar.gz |
enable more insert tests for v2
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp | 100 |
1 files changed, 59 insertions, 41 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index cabe474f270..17e2d41fd5c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -17,6 +17,19 @@ using namespace NNodes; namespace { +TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { + if (partBy) { + auto children = partBy->ChildrenList(); + children.erase(children.cbegin()); + return children; + } + + return {}; +} +} + +namespace { + class TS3DataSinkTypeAnnotationTransformer : public TVisitorTransformerBase { public: TS3DataSinkTypeAnnotationTransformer(TS3State::TPtr state) @@ -104,12 +117,14 @@ private: return TStatus::Error; } + const auto structType = sourceType->Cast<TStructExprType>(); auto target = input->Child(TS3Insert::idx_Target); if (!TS3Target::Match(target)) { ctx.AddError(TIssue(ctx.GetPosition(target->Pos()), "Expected S3 target.")); return TStatus::Error; } + TExprNode::TListType keys; TS3Target tgt(target); if (auto settings = tgt.Settings()) { if (auto userschema = GetSetting(settings.Cast().Ref(), "userschema")) { @@ -122,25 +137,25 @@ private: return TStatus::Error; } } + auto partBy = GetSetting(settings.Cast().Ref(), "partitionedby"sv); + keys = GetPartitionKeys(partBy); } const auto format = tgt.Format(); - const bool isSingleRowPerFileFormat = IsIn({TStringBuf("raw"), TStringBuf("json_list")}, format); - const TTypeAnnotationNode* listItemType = ctx.MakeType<TDataExprType>(EDataSlot::String); - if (!isSingleRowPerFileFormat) { - listItemType = ctx.MakeType<TOptionalExprType>(listItemType); + auto baseTargeType = AnnotateTargetBase(format, keys, structType, ctx); + if (!baseTargeType) { + return TStatus::Error; } - input->SetTypeAnn( - ctx.MakeType<TTupleExprType>( - TTypeAnnotationNode::TListType{ - ctx.MakeType<TListExprType>( - listItemType - ) - } - ) - ); + auto t = ctx.MakeType<TTupleExprType>( + TTypeAnnotationNode::TListType{ + ctx.MakeType<TListExprType>( + baseTargeType + ) + }); + + input->SetTypeAnn(t); return TStatus::Ok; } @@ -337,53 +352,56 @@ private: } const auto structType = itemType->Cast<TStructExprType>(); - const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize(); + const auto keys = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenList(); + const TCoAtom format(input->Child(TS3SinkOutput::idx_Format)); + + auto baseTargetType = AnnotateTargetBase(format, keys, structType, ctx); + if (!baseTargetType) { + return TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(baseTargetType)); + return TStatus::Ok; + } - const auto format = input->Child(TS3SinkOutput::idx_Format); - const bool isSingleRowPerFileFormat = format->IsAtom({"raw","json_list"}); // One string from ClikhouseUdf ClickHouseClient.SerializeFormat results in one S3 file => no delimiters between files. +private: + const TTypeAnnotationNode* AnnotateTargetBase(TCoAtom format, const TExprNode::TListType& keys, const TStructExprType* structType, TExprContext& ctx) { + const bool isSingleRowPerFileFormat = IsIn({TStringBuf("raw"), TStringBuf("json_list")}, format); + auto keysCount = keys.size(); if (keysCount) { if (isSingleRowPerFileFormat) { - ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Partitioned isn't supported for " << format->Content() << " output format.")); - return TStatus::Error; + ctx.AddError(TIssue(ctx.GetPosition(format.Pos()), TStringBuilder() << "Partitioned isn't supported for " << (TStringBuf)format << " output format.")); + return nullptr; } + for (auto i = 0U; i < keysCount; ++i) { - const auto key = input->Child(TS3SinkOutput::idx_KeyColumns)->Child(i); + const auto key = keys[i]; if (const auto keyType = structType->FindItemType(key->Content())) { if (!EnsureDataType(key->Pos(), *keyType, ctx)) { - return TStatus::Error; + return nullptr; } } else { ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column.")); - return TStatus::Error; + return nullptr; } - - TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8)); - itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)); - input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes))); } - } else { - if (format->IsAtom("raw")) { - if (const auto width = structType->GetSize(); width > 1U) { - ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Expected single column for " << format->Content() << " output format, but got " << width)); - return TStatus::Error; - } - if (!EnsureDataType(format->Pos(), *structType->GetItems().front()->GetItemType(), ctx)) { - return TStatus::Error; - } - } + TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8)); + itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)); - if (isSingleRowPerFileFormat) { - input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); - } else { - input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)))); - } + return ctx.MakeType<TTupleExprType>(itemTypes); } - return TStatus::Ok; + const TTypeAnnotationNode* listItemType = ctx.MakeType<TDataExprType>(EDataSlot::String); + if (!isSingleRowPerFileFormat) { + return ctx.MakeType<TOptionalExprType>(listItemType); + } + + return listItemType; } +private: const TS3State::TPtr State_; }; |