diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-09-19 13:47:26 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-09-19 14:03:55 +0300 |
commit | 20c0ade0ce4e770d6eebf5527494d5a6a64d814a (patch) | |
tree | f69db6113fc0910ff007f15a294e6f4dc42a180e | |
parent | 59a0eeda034730f9c30e3ad76780756d7bc8f8eb (diff) | |
download | ydb-20c0ade0ce4e770d6eebf5527494d5a6a64d814a.tar.gz |
Support block reader for partitioned datasets
-rw-r--r-- | ydb/library/yql/providers/common/mkql/parser.cpp | 57 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp | 9 |
2 files changed, 53 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp index 2d4ac590d8e..6dd57a930d6 100644 --- a/ydb/library/yql/providers/common/mkql/parser.cpp +++ b/ydb/library/yql/providers/common/mkql/parser.cpp @@ -157,19 +157,53 @@ TRuntimeNode BuildParseCall( const auto* finalItemStructType = static_cast<TStructType*>(finalItemType); if (useBlocks) { - return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) { - MKQL_ENSURE(!extraColumnsByPathIndex && metadataColumns.empty(), "TODO"); - - TRuntimeNode::TList fields; + return ctx.ProgramBuilder.BlockExpandChunked(ctx.ProgramBuilder.ExpandMap( + ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) { + auto parsedData = (extraColumnsByPathIndex || !metadataColumns.empty()) + ? ctx.ProgramBuilder.Nth(item, 0) + : item; + + TMaybe<TRuntimeNode> extra; + if (extraColumnsByPathIndex) { + auto pathInd = ctx.ProgramBuilder.Nth(item, 1); + auto extraNode = ctx.ProgramBuilder.Lookup( + ctx.ProgramBuilder.ToIndexDict(*extraColumnsByPathIndex), pathInd); + extra = ctx.ProgramBuilder.Unwrap( + extraNode, + ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>( + "Failed to lookup path index"), + pos.File, + pos.Row, + pos.Column); + } - for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) { - TStringBuf name = finalItemStructType->GetMemberName(i); - fields.push_back(ctx.ProgramBuilder.Member(item, name)); - } + auto blockLengthName = + ctx.ProgramBuilder.Member(parsedData, BlockLengthColumnName); + TRuntimeNode::TList fields; + fields.reserve(finalItemStructType->GetMembersCount()); + + for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) { + TStringBuf name = finalItemStructType->GetMemberName(i); + const auto metadataIter = metadataColumns.find(TString(name)); + if (metadataIter != metadataColumns.end()) { + fields.push_back(ctx.ProgramBuilder.ReplicateScalar( + ctx.ProgramBuilder.AsScalar( + ctx.ProgramBuilder.Nth(item, metadataIter->second)), + blockLengthName)); + } else if (parseItemStructType->FindMemberIndex(name).Defined()) { + fields.push_back(ctx.ProgramBuilder.Member(parsedData, name)); + } else { + MKQL_ENSURE(extra, "Column " << name << " wasn't found"); + fields.push_back(ctx.ProgramBuilder.ReplicateScalar( + ctx.ProgramBuilder.AsScalar( + ctx.ProgramBuilder.Member(*extra, name)), + blockLengthName)); + } + } - fields.push_back(ctx.ProgramBuilder.Member(item, BlockLengthColumnName)); - return fields; - }); + fields.push_back(blockLengthName); + return fields; + })); } if (!compression.empty()) { @@ -204,7 +238,6 @@ TRuntimeNode BuildParseCall( if (extraColumnsByPathIndex || !metadataColumns.empty()) { auto data = ctx.ProgramBuilder.Nth(item, 0); - TMaybe<TRuntimeNode> pathInd; res.emplace_back(parseLambda(data)); if (extraColumnsByPathIndex) { res.emplace_back(ctx.ProgramBuilder.Nth(item, res.size())); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 565666eff38..c7d147e11ff 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -361,9 +361,16 @@ public: const TTypeAnnotationNode* itemType = nullptr; if (input->Content() == TS3ArrowSettings::CallableName()) { + std::unordered_set<TString> extraColumnNames(extraColumnsType->GetSize()); + for (const auto& extraColumn : extraColumnsType->GetItems()) { + extraColumnNames.insert(TString{extraColumn->GetName()}); + } + TVector<const TItemExprType*> blockRowTypeItems; for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) { - blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType()))); + if (!extraColumnNames.contains(TString{x->GetName()})) { + blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType()))); + } } blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)))); |