aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-09-19 13:47:26 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-09-19 14:03:55 +0300
commit20c0ade0ce4e770d6eebf5527494d5a6a64d814a (patch)
treef69db6113fc0910ff007f15a294e6f4dc42a180e
parent59a0eeda034730f9c30e3ad76780756d7bc8f8eb (diff)
downloadydb-20c0ade0ce4e770d6eebf5527494d5a6a64d814a.tar.gz
Support block reader for partitioned datasets
-rw-r--r--ydb/library/yql/providers/common/mkql/parser.cpp57
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp9
2 files changed, 53 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/mkql/parser.cpp b/ydb/library/yql/providers/common/mkql/parser.cpp
index 2d4ac590d8e..6dd57a930d6 100644
--- a/ydb/library/yql/providers/common/mkql/parser.cpp
+++ b/ydb/library/yql/providers/common/mkql/parser.cpp
@@ -157,19 +157,53 @@ TRuntimeNode BuildParseCall(
const auto* finalItemStructType = static_cast<TStructType*>(finalItemType);
if (useBlocks) {
- return ctx.ProgramBuilder.ExpandMap(ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
- MKQL_ENSURE(!extraColumnsByPathIndex && metadataColumns.empty(), "TODO");
-
- TRuntimeNode::TList fields;
+ return ctx.ProgramBuilder.BlockExpandChunked(ctx.ProgramBuilder.ExpandMap(
+ ctx.ProgramBuilder.ToFlow(input), [&](TRuntimeNode item) {
+ auto parsedData = (extraColumnsByPathIndex || !metadataColumns.empty())
+ ? ctx.ProgramBuilder.Nth(item, 0)
+ : item;
+
+ TMaybe<TRuntimeNode> extra;
+ if (extraColumnsByPathIndex) {
+ auto pathInd = ctx.ProgramBuilder.Nth(item, 1);
+ auto extraNode = ctx.ProgramBuilder.Lookup(
+ ctx.ProgramBuilder.ToIndexDict(*extraColumnsByPathIndex), pathInd);
+ extra = ctx.ProgramBuilder.Unwrap(
+ extraNode,
+ ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(
+ "Failed to lookup path index"),
+ pos.File,
+ pos.Row,
+ pos.Column);
+ }
- for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
- TStringBuf name = finalItemStructType->GetMemberName(i);
- fields.push_back(ctx.ProgramBuilder.Member(item, name));
- }
+ auto blockLengthName =
+ ctx.ProgramBuilder.Member(parsedData, BlockLengthColumnName);
+ TRuntimeNode::TList fields;
+ fields.reserve(finalItemStructType->GetMembersCount());
+
+ for (ui32 i = 0; i < finalItemStructType->GetMembersCount(); ++i) {
+ TStringBuf name = finalItemStructType->GetMemberName(i);
+ const auto metadataIter = metadataColumns.find(TString(name));
+ if (metadataIter != metadataColumns.end()) {
+ fields.push_back(ctx.ProgramBuilder.ReplicateScalar(
+ ctx.ProgramBuilder.AsScalar(
+ ctx.ProgramBuilder.Nth(item, metadataIter->second)),
+ blockLengthName));
+ } else if (parseItemStructType->FindMemberIndex(name).Defined()) {
+ fields.push_back(ctx.ProgramBuilder.Member(parsedData, name));
+ } else {
+ MKQL_ENSURE(extra, "Column " << name << " wasn't found");
+ fields.push_back(ctx.ProgramBuilder.ReplicateScalar(
+ ctx.ProgramBuilder.AsScalar(
+ ctx.ProgramBuilder.Member(*extra, name)),
+ blockLengthName));
+ }
+ }
- fields.push_back(ctx.ProgramBuilder.Member(item, BlockLengthColumnName));
- return fields;
- });
+ fields.push_back(blockLengthName);
+ return fields;
+ }));
}
if (!compression.empty()) {
@@ -204,7 +238,6 @@ TRuntimeNode BuildParseCall(
if (extraColumnsByPathIndex || !metadataColumns.empty()) {
auto data = ctx.ProgramBuilder.Nth(item, 0);
- TMaybe<TRuntimeNode> pathInd;
res.emplace_back(parseLambda(data));
if (extraColumnsByPathIndex) {
res.emplace_back(ctx.ProgramBuilder.Nth(item, res.size()));
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 565666eff38..c7d147e11ff 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -361,9 +361,16 @@ public:
const TTypeAnnotationNode* itemType = nullptr;
if (input->Content() == TS3ArrowSettings::CallableName()) {
+ std::unordered_set<TString> extraColumnNames(extraColumnsType->GetSize());
+ for (const auto& extraColumn : extraColumnsType->GetItems()) {
+ extraColumnNames.insert(TString{extraColumn->GetName()});
+ }
+
TVector<const TItemExprType*> blockRowTypeItems;
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
- blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
+ if (!extraColumnNames.contains(TString{x->GetName()})) {
+ blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(x->GetName(), ctx.MakeType<TBlockExprType>(x->GetItemType())));
+ }
}
blockRowTypeItems.push_back(ctx.MakeType<TItemExprType>(BlockLengthColumnName, ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64))));