diff options
author | aneporada <aneporada@ydb.tech> | 2022-08-04 09:06:18 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2022-08-04 09:06:18 +0300 |
commit | 91ead42844761e5f9bccab147a24e097b54b562e (patch) | |
tree | 8f88d9352b995c656c85fe98a52a2104e1a9d907 | |
parent | 3305b730bd7faada7c1134b7296b099098deb897 (diff) | |
download | ydb-91ead42844761e5f9bccab147a24e097b54b562e.tar.gz |
[] Store file list as single AST node per partition
9 files changed, 321 insertions, 113 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index c72c3b79e9..5b5c5dde1a 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -30,8 +30,8 @@ "Base": "TExprBase", "Match": {"Type": "Tuple"}, "Children": [ - {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Size", "Type": "TCoAtom"}, + {"Index": 0, "Name": "Data", "Type": "TCoString"}, + {"Index": 1, "Name": "IsText", "Type": "TCoBool"}, {"Index": 2, "Name": "ExtraColumns", "Type": "TExprBase"} ] }, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index 4c04fe5a37..f92fba98b4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -2,6 +2,7 @@ #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -15,6 +16,24 @@ using namespace NNodes; namespace { +bool ValidateS3PackedPaths(TPositionHandle pos, TStringBuf blob, bool isTextEncoded, TExprContext& ctx) { + using namespace NYql::NS3Details; + try { + TPathList paths; + UnpackPathsList(blob, isTextEncoded, paths); + for (size_t i = 0; i < paths.size(); ++i) { + if (std::get<0>(paths[i]).empty()) { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Expected non-empty path (index " << i << ")")); + return false; + } + } + } catch (const std::exception& ex) { + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Failed to parse packed paths: " << ex.what())); + return false; + } + return true; +} + bool ValidateS3Paths(const TExprNode& node, const TStructExprType*& extraColumnsType, TExprContext& ctx) { if (!EnsureTupleMinSize(node, 1, ctx)) { return false; @@ -26,21 +45,19 @@ bool ValidateS3Paths(const TExprNode& node, const TStructExprType*& extraColumns return false; } - if (!EnsureAtom(*path->Child(TS3Path::idx_Path), ctx) || !EnsureAtom(*path->Child(TS3Path::idx_Size), ctx)) - { + auto pathAndSizeList = path->Child(TS3Path::idx_Data); + if (!TCoString::Match(pathAndSizeList)) { + ctx.AddError(TIssue(ctx.GetPosition(pathAndSizeList->Pos()), "Expected String literal for Data")); return false; } - if (path->Child(TS3Path::idx_Path)->Content().empty()) { - ctx.AddError(TIssue(ctx.GetPosition(path->Child(TS3Path::idx_Path)->Pos()), "Expected non-empty path")); + const TExprNode* isTextEncoded = path->Child(TS3Path::idx_IsText); + if (!TCoBool::Match(isTextEncoded)) { + ctx.AddError(TIssue(ctx.GetPosition(isTextEncoded->Pos()), "Expected Bool literal for IsText")); return false; } - ui64 size = 0; - auto sizeStr = path->Child(TS3Path::idx_Size)->Content(); - if (!TryFromString(sizeStr, size)) { - ctx.AddError(TIssue(ctx.GetPosition(path->Child(TS3Path::idx_Size)->Pos()), - TStringBuilder() << "Expected number as S3 object size, got: '" << sizeStr << "'")); + if (!ValidateS3PackedPaths(pathAndSizeList->Pos(), pathAndSizeList->Head().Content(), FromString<bool>(isTextEncoded->Head().Content()), ctx)) { return false; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index df02d07030..a361d0ef6b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -11,6 +11,7 @@ #include <ydb/library/yql/providers/s3/proto/sink.pb.h> #include <ydb/library/yql/providers/s3/proto/source.pb.h> #include <ydb/library/yql/providers/s3/range_helpers/file_tree_builder.h> +#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/utils/log/log.h> namespace NYql { @@ -55,12 +56,15 @@ public: if (const TMaybeNode<TDqSource> source = &node) { cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().Value(); const auto settings = source.Cast().Settings().Cast<TS3SourceSettingsBase>(); - parts.reserve(settings.Paths().Size()); - for (auto i = 0u; i < settings.Paths().Size(); ++i) - parts.emplace_back(1U, - std::pair( - settings.Paths().Item(i).Path().StringValue(), - FromString<ui64>(settings.Paths().Item(i).Size().Value()))); + for (auto i = 0u; i < settings.Paths().Size(); ++i) { + const auto& path = settings.Paths().Item(i); + TPathList paths; + UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), paths); + parts.reserve(parts.size() + paths.size()); + for (auto& p : paths) { + parts.emplace_back(1U, std::pair(std::get<0>(p), std::get<1>(p))); + } + } } if (maxPartitions && parts.size() > maxPartitions) { @@ -127,17 +131,33 @@ public: .Seal().Build() ); - TExprNodeList extraColumns; + TExprNodeList extraColumnsExtents; for (size_t i = 0; i < s3ReadObject.Object().Paths().Size(); ++i) { - extraColumns.push_back(s3ReadObject.Object().Paths().Item(i).ExtraColumns().Ptr()); + auto batch = s3ReadObject.Object().Paths().Item(i); + TStringBuf packed = batch.Data().Literal().Value(); + bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value()); + + TPathList paths; + UnpackPathsList(packed, isTextEncoded, paths); + + extraColumnsExtents.push_back( + ctx.Builder(batch.ExtraColumns().Pos()) + .Callable("Replicate") + .Add(0, batch.ExtraColumns().Ptr()) + .Callable(1, "Uint64") + .Atom(0, ToString(paths.size()), TNodeFlags::Default) + .Seal() + .Seal() + .Build() + ); } - YQL_ENSURE(!extraColumns.empty()); - if (extraColumns.front()->GetTypeAnn()->Cast<TStructExprType>()->GetSize()) { + YQL_ENSURE(!extraColumnsExtents.empty()); + if (s3ReadObject.Object().Paths().Item(0).ExtraColumns().Ref().GetTypeAnn()->Cast<TStructExprType>()->GetSize()) { settings.push_back( ctx.Builder(s3ReadObject.Object().Pos()) .List() .Atom(0, "extraColumns") - .Add(1, ctx.NewCallable(s3ReadObject.Object().Pos(), "AsList", std::move(extraColumns))) + .Add(1, ctx.NewCallable(s3ReadObject.Object().Pos(), "OrderedExtend", std::move(extraColumnsExtents))) .Seal() .Build() ); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 2abc4c4b16..dc0a7fcc59 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> +#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_opt_utils.h> #include <ydb/library/yql/utils/log/log.h> @@ -97,9 +98,11 @@ public: TPendingRequests pendingRequests; TNodeMap<TVector<TListRequest>> requestsByNode; + TNodeMap<TGeneratedColumnsConfig> genColumnsByNode; pendingRequests.swap(PendingRequests_); requestsByNode.swap(RequestsByNode_); + genColumnsByNode.swap(GenColumnsByNode_); TNodeOnNodeOwnedMap replaces; size_t count = 0; @@ -127,6 +130,12 @@ public: } } + TMap<TMaybe<std::vector<TString>>, NS3Details::TPathList> pathsByMatchedGlobs; + const TGeneratedColumnsConfig* generatedColumnsConfig = nullptr; + if (auto it = genColumnsByNode.find(node); it != genColumnsByNode.end()) { + generatedColumnsConfig = &it->second; + } + for (auto& req : requests) { auto it = pendingRequests.find(req); YQL_ENSURE(it != pendingRequests.end()); @@ -150,48 +159,16 @@ public: } return TStatus::Error; } - for (auto& entry : listEntries) { + for (auto& entry : listEntries) { if (entry.Size > fileSizeLimit) { ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); return TStatus::Error; } - TExprNodeList extraColumnsAsStructArgs; - if (auto confIt = GenColumnsByNode_.find(node); confIt != GenColumnsByNode_.end()) { - const TGeneratedColumnsConfig& config = confIt->second; - YQL_ENSURE(config.Columns.size() <= entry.MatchedGlobs.size()); - YQL_ENSURE(config.SchemaTypeNode); - - for (size_t i = 0; i < config.Columns.size(); ++i) { - auto& col = config.Columns[i]; - extraColumnsAsStructArgs.push_back( - ctx.Builder(object.Pos()) - .List() - .Atom(0, col) - .Callable(1, "Data") - .Callable(0, "StructMemberType") - .Add(0, config.SchemaTypeNode) - .Atom(1, col) - .Seal() - .Atom(1, entry.MatchedGlobs[i]) - .Seal() - .Seal() - .Build() - ); - } - } - - pathNodes.emplace_back( - ctx.Builder(object.Pos()) - .List() - .Atom(0, entry.Path) - .Atom(1, ToString(entry.Size), TNodeFlags::Default) - .Add(2, ctx.NewCallable(object.Pos(), "AsStruct", std::move(extraColumnsAsStructArgs))) - .Seal() - .Build() - ); + auto& pathList = pathsByMatchedGlobs[generatedColumnsConfig ? entry.MatchedGlobs : TMaybe<std::vector<TString>>{}]; + pathList.emplace_back(entry.Path, entry.Size); ++count; readSize += entry.Size; } @@ -200,13 +177,60 @@ public: totalSize += readSize; } + for (const auto& [matchedGlobs, pathList] : pathsByMatchedGlobs) { + TExprNodeList extraColumnsAsStructArgs; + if (generatedColumnsConfig) { + YQL_ENSURE(matchedGlobs.Defined()); + YQL_ENSURE(generatedColumnsConfig->Columns.size() <= matchedGlobs->size()); + YQL_ENSURE(generatedColumnsConfig->SchemaTypeNode); + + for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) { + auto& col = generatedColumnsConfig->Columns[i]; + extraColumnsAsStructArgs.push_back( + ctx.Builder(object.Pos()) + .List() + .Atom(0, col) + .Callable(1, "Data") + .Callable(0, "StructMemberType") + .Add(0, generatedColumnsConfig->SchemaTypeNode) + .Atom(1, col) + .Seal() + .Atom(1, (*matchedGlobs)[i]) + .Seal() + .Seal() + .Build() + ); + } + } + + auto extraColumns = ctx.NewCallable(object.Pos(), "AsStruct", std::move(extraColumnsAsStructArgs)); + + TString packedPaths; + bool isTextFormat; + NS3Details::PackPathsList(pathList, packedPaths, isTextFormat); + + pathNodes.emplace_back( + Build<TS3Path>(ctx, object.Pos()) + .Data<TCoString>() + .Literal() + .Build(packedPaths) + .Build() + .IsText<TCoBool>() + .Literal() + .Build(ToString(isTextFormat)) + .Build() + .ExtraColumns(extraColumns) + .Done().Ptr() + ); + } + auto settings = read.Ref().Child(4)->ChildrenList(); auto userSchema = ExtractSchema(settings); TExprNode::TPtr s3Object; s3Object = Build<TS3Object>(ctx, object.Pos()) - .Paths(ctx.NewList(object.Pos(), std::move(pathNodes))) - .Format(ExtractFormat(settings)) - .Settings(ctx.NewList(object.Pos(), std::move(settings))) + .Paths(ctx.NewList(object.Pos(), std::move(pathNodes))) + .Format(ExtractFormat(settings)) + .Settings(ctx.NewList(object.Pos(), std::move(settings))) .Done().Ptr(); replaces.emplace(node, userSchema.back() ? diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index 1ffb82a4b0..c9c7f35294 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -1,6 +1,7 @@ #include "yql_s3_provider_impl.h" #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> @@ -18,6 +19,8 @@ using namespace NNodes; namespace { +using namespace NYql::NS3Details; + void RebuildPredicateForPruning(const TExprNode::TPtr& pred, const TExprNode& arg, const TStructExprType& extraType, TExprNode::TPtr& prunedPred, TExprNode::TPtr& extraPred, TExprContext& ctx) { @@ -112,27 +115,10 @@ TCoFlatMapBase CalculatePrunedPaths(TCoFlatMapBase flatMap, TExprContext& ctx, T TExprNode::TPtr filteredPathList; if (extraPred) { auto source = flatMap.Input().Cast<TDqSourceWrap>().Input().Cast<TS3SourceSettingsBase>(); - TExprNodeList pathList; - for (size_t i = 0; i < source.Paths().Size(); ++i) { - pathList.push_back( - ctx.Builder(source.Paths().Item(i).Pos()) - .List() - .Callable(0, "String") - .Add(0, source.Paths().Item(i).Path().Ptr()) - .Seal() - .Callable(1, "Uint64") - .Add(0, source.Paths().Item(i).Size().Ptr()) - .Seal() - .Add(2, source.Paths().Item(i).ExtraColumns().Ptr()) - .Seal() - .Build() - ); - } - filteredPathList = ctx.Builder(pred->Pos()) .Callable("EvaluateExpr") .Callable(0, "OrderedFilter") - .Add(0, ctx.NewCallable(pred->Pos(), "AsList", std::move(pathList))) + .Add(0, ctx.NewCallable(pred->Pos(), "AsList", source.Paths().Ref().ChildrenList())) .Lambda(1) .Param("item") .Apply(ctx.NewLambda(extraPred->Pos(), flatMap.Lambda().Args().Ptr(), std::move(extraPred))) @@ -183,6 +169,7 @@ public: AddHandler(0, &TCoFlatMapBase::Match, HNDL(TryPrunePaths)); AddHandler(0, &TDqSourceWrap::Match, HNDL(ApplyPrunedPath)); AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqSource)); + AddHandler(0, &TDqSourceWrap::Match, HNDL(MergeS3Paths)); #undef HNDL } @@ -211,50 +198,54 @@ public: return node; } - auto extraColumnsSetting = GetSetting(dqSource.Settings().Ref(), "extraColumns"); - YQL_ENSURE(extraColumnsSetting); - auto extraColumns = extraColumnsSetting->ChildPtr(1); - YQL_ENSURE(extraColumns->IsCallable("AsList"), "extraColumns should have literal value"); - + const size_t beforeEntries = maybeS3SourceSettings.Cast().Paths().Size(); auto prunedPaths = prunedPathSetting->ChildPtr(1); if (prunedPaths->IsCallable("List")) { - YQL_CLOG(INFO, ProviderS3) << "S3 Paths completely pruned: " << extraColumns->ChildrenSize() << " paths"; + YQL_CLOG(INFO, ProviderS3) << "S3 Paths completely pruned: " << beforeEntries << " entries"; return ctx.NewCallable(node.Pos(), "List", { ExpandType(node.Pos(), *node.Ref().GetTypeAnn(), ctx) }); } YQL_ENSURE(prunedPaths->IsCallable("AsList"), "prunedPaths should have literal value"); - YQL_ENSURE(prunedPaths->ChildrenSize() <= extraColumns->ChildrenSize()); YQL_ENSURE(prunedPaths->ChildrenSize() > 0); + const size_t afterEntries = prunedPaths->ChildrenSize(); auto newSettings = ReplaceSetting(dqSource.Settings().Ref(), prunedPathSetting->Pos(), "prunedPaths", nullptr, ctx); - if (prunedPaths->ChildrenSize() == extraColumns->ChildrenSize()) { - YQL_CLOG(INFO, ProviderS3) << "No S3 paths are pruned: " << extraColumns->ChildrenSize() << " paths"; + if (beforeEntries == afterEntries) { + YQL_CLOG(INFO, ProviderS3) << "No S3 paths are pruned: " << afterEntries << " entries"; return Build<TDqSourceWrap>(ctx, dqSource.Pos()) .InitFrom(dqSource) .Settings(newSettings) .Done(); } - YQL_CLOG(INFO, ProviderS3) << "Pruning S3 Paths: " << extraColumns->ChildrenSize() << " -> " << prunedPaths->ChildrenSize(); - TExprNodeList newPaths; - TExprNodeList newExtraColumns; + YQL_CLOG(INFO, ProviderS3) << "Pruning S3 Paths: " << beforeEntries << " -> " << afterEntries << " entries"; + TExprNodeList newExtraColumnsExtents; for (auto& entry : prunedPaths->ChildrenList()) { - auto path = entry->ChildPtr(0); - auto size = entry->ChildPtr(1); - auto extra = entry->ChildPtr(2); + TS3Path batch(entry); + TStringBuf packed = batch.Data().Literal().Value(); + bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value()); - YQL_ENSURE(path->IsCallable("String")); - YQL_ENSURE(size->IsCallable("Uint64")); - YQL_ENSURE(extra->IsCallable("AsStruct")); + TPathList paths; + UnpackPathsList(packed, isTextEncoded, paths); - newPaths.push_back(ctx.NewList(entry->Pos(), { path->HeadPtr(), size->HeadPtr(), extra })); - newExtraColumns.push_back(extra); + newExtraColumnsExtents.push_back( + ctx.Builder(batch.ExtraColumns().Pos()) + .Callable("Replicate") + .Add(0, batch.ExtraColumns().Ptr()) + .Callable(1, "Uint64") + .Atom(0, ToString(paths.size()), TNodeFlags::Default) + .Seal() + .Seal() + .Build() + ); } - newSettings = ReplaceSetting(*newSettings, newSettings->Pos(), "extraColumns", ctx.NewCallable(newSettings->Pos(), "AsList", std::move(newExtraColumns)), ctx); + newSettings = ReplaceSetting(*newSettings, newSettings->Pos(), "extraColumns", + ctx.NewCallable(newSettings->Pos(), "OrderedExtend", std::move(newExtraColumnsExtents)), ctx); auto oldSrc = dqSource.Input().Cast<TS3SourceSettingsBase>(); - auto newSrc = ctx.ChangeChild(dqSource.Input().Ref(), TS3SourceSettingsBase::idx_Paths, ctx.NewList(oldSrc.Paths().Pos(), std::move(newPaths))); + auto newSrc = ctx.ChangeChild(dqSource.Input().Ref(), TS3SourceSettingsBase::idx_Paths, + ctx.NewList(oldSrc.Paths().Pos(), prunedPaths->ChildrenList())); return Build<TDqSourceWrap>(ctx, dqSource.Pos()) .InitFrom(dqSource) @@ -337,30 +328,43 @@ public: if (extraTypeItems.size() < extraType->GetSize()) { auto originalPaths = maybeS3SourceSettings.Cast().Paths().Ptr(); auto originalExtra = extraColumnsSetting->TailPtr(); - YQL_ENSURE(originalExtra->IsCallable("AsList")); YQL_ENSURE(originalPaths->IsList()); - YQL_ENSURE(originalPaths->ChildrenSize() == originalExtra->ChildrenSize()); TExprNodeList newPathItems; - TExprNodeList newExtraColumnsItems; + TExprNodeList newExtraColumnsExtents; - for (const auto& path : maybeS3SourceSettings.Cast().Paths()) { - auto extra = path.ExtraColumns(); + for (const auto& batch : maybeS3SourceSettings.Cast().Paths()) { + auto extra = batch.ExtraColumns(); YQL_ENSURE(TCoAsStruct::Match(extra.Raw())); TExprNodeList children = extra.Ref().ChildrenList(); EraseIf(children, [&](const TExprNode::TPtr& child) { return !extractMembers.contains(child->Head().Content()); }); auto newStruct = ctx.ChangeChildren(extra.Ref(), std::move(children)); - newExtraColumnsItems.push_back(newStruct); - newPathItems.push_back(ctx.ChangeChild(path.Ref(), TS3Path::idx_ExtraColumns, std::move(newStruct))); + + TStringBuf packed = batch.Data().Literal().Value(); + bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value()); + + TPathList paths; + UnpackPathsList(packed, isTextEncoded, paths); + + newExtraColumnsExtents.push_back( + ctx.Builder(batch.ExtraColumns().Pos()) + .Callable("Replicate") + .Add(0, newStruct) + .Callable(1, "Uint64") + .Atom(0, ToString(paths.size()), TNodeFlags::Default) + .Seal() + .Seal() + .Build() + ); + newPathItems.push_back(ctx.ChangeChild(batch.Ref(), TS3Path::idx_ExtraColumns, std::move(newStruct))); } newPaths = ctx.ChangeChildren(maybeS3SourceSettings.Cast().Paths().Ref(), std::move(newPathItems)); - TExprNode::TPtr newExtra = ctx.ChangeChildren(*originalExtra, std::move(newExtraColumnsItems)); + TExprNode::TPtr newExtra = ctx.ChangeChildren(*originalExtra, std::move(newExtraColumnsExtents)); newSettings = TExprBase(extraTypeItems.empty() ? RemoveSetting(settings.Cast().Ref(), "extraColumns", ctx) : ReplaceSetting(settings.Cast().Ref(), extraColumnsSetting->Pos(), "extraColumns", newExtra, ctx)); } } - } const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); @@ -403,6 +407,110 @@ public: .Done(); } + TMaybeNode<TExprBase> MergeS3Paths(TExprBase node, TExprContext& ctx) const { + const TDqSourceWrap dqSource = node.Cast<TDqSourceWrap>(); + if (dqSource.DataSource().Category() != S3ProviderName) { + return node; + } + + const auto& maybeS3SourceSettings = dqSource.Input().Maybe<TS3SourceSettingsBase>(); + if (!maybeS3SourceSettings) { + return node; + } + + TMaybeNode<TExprBase> settings = dqSource.Settings(); + if (settings) { + if (auto prunedPaths = GetSetting(settings.Cast().Ref(), "prunedPaths")) { + if (prunedPaths->ChildrenSize() > 1) { + // pruning in progress + return node; + } + } + } + + TNodeMap<TExprNodeList> s3PathsByExtra; + bool needMerge = false; + for (const auto& batch : maybeS3SourceSettings.Cast().Paths()) { + auto extra = batch.ExtraColumns(); + auto& group = s3PathsByExtra[extra.Raw()]; + group.emplace_back(batch.Ptr()); + needMerge = needMerge || group.size() > 1; + } + + if (!needMerge) { + return node; + } + + TExprNodeList newPathItems; + TExprNodeList newExtraColumnsExtents; + + for (const auto& origBatch : maybeS3SourceSettings.Cast().Paths()) { + auto extra = origBatch.ExtraColumns(); + auto& group = s3PathsByExtra[extra.Raw()]; + if (group.empty()) { + continue; + } + + TPathList paths; + for (auto& item : group) { + TS3Path batch(item); + TStringBuf packed = batch.Data().Literal().Value(); + bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value()); + UnpackPathsList(packed, isTextEncoded, paths); + } + + bool isTextEncoded; + TString packedPaths; + PackPathsList(paths, packedPaths, isTextEncoded); + + newPathItems.emplace_back( + Build<TS3Path>(ctx, origBatch.Pos()) + .Data<TCoString>() + .Literal() + .Build(packedPaths) + .Build() + .IsText<TCoBool>() + .Literal() + .Build(ToString(isTextEncoded)) + .Build() + .ExtraColumns(extra) + .Done().Ptr() + ); + + newExtraColumnsExtents.push_back( + ctx.Builder(extra.Pos()) + .Callable("Replicate") + .Add(0, extra.Ptr()) + .Callable(1, "Uint64") + .Atom(0, ToString(paths.size()), TNodeFlags::Default) + .Seal() + .Seal() + .Build() + ); + + group.clear(); + } + + TMaybeNode<TExprBase> newSettings = settings; + if (settings) { + if (auto extraColumnsSetting = GetSetting(settings.Cast().Ref(), "extraColumns")) { + TPositionHandle pos = extraColumnsSetting->Pos(); + auto newExtra = ctx.NewCallable(pos, "OrderedExtend", std::move(newExtraColumnsExtents)); + newSettings = TExprBase(ReplaceSetting(settings.Cast().Ref(), pos, "extraColumns", newExtra, ctx)); + } + } + + YQL_CLOG(INFO, ProviderS3) << "Merge S3 paths with same extra columns in DqSource over " << maybeS3SourceSettings.Cast().CallableName(); + auto sourceSettings = ctx.ChangeChild(maybeS3SourceSettings.Cast().Ref(), TS3SourceSettingsBase::idx_Paths, + ctx.NewList(maybeS3SourceSettings.Cast().Paths().Pos(), std::move(newPathItems))); + + return Build<TDqSourceWrap>(ctx, dqSource.Pos()) + .InitFrom(dqSource) + .Input(sourceSettings) + .Settings(newSettings) + .Done(); + } + private: const TS3State::TPtr State_; }; diff --git a/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt b/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt index 7e22842e68..1e83f7763c 100644 --- a/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(providers-s3-range_helpers PUBLIC providers-common-provider providers-s3-proto library-yql-utils + cpp-protobuf-util ) target_sources(providers-s3-range_helpers PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/range_helpers/file_tree_builder.cpp diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp index 4813dfa12a..4e5d9b6163 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp @@ -1,8 +1,14 @@ #include "path_list_reader.h" +#include <ydb/library/yql/providers/s3/range_helpers/file_tree_builder.h> + #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/utils/yql_panic.h> +#include <library/cpp/protobuf/util/pb_io.h> +#include <google/protobuf/text_format.h> + +#include <util/stream/mem.h> #include <util/stream/str.h> namespace NYql::NS3Details { @@ -35,13 +41,13 @@ void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TStri // Modern way if (range.PathsSize()) { TString buf; - BuildPathsFromTree(range.GetPaths(), paths, buf); - return; + return BuildPathsFromTree(range.GetPaths(), paths, buf); } std::unordered_map<TString, size_t> map(sourceDesc.GetDeprecatedPath().size()); - for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) + for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) { map.emplace(sourceDesc.GetDeprecatedPath().Get(i).GetPath(), sourceDesc.GetDeprecatedPath().Get(i).GetSize()); + } for (auto i = 0; i < range.GetDeprecatedPath().size(); ++i) { const auto& path = range.GetDeprecatedPath().Get(i); @@ -56,4 +62,33 @@ void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TStri } } +void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded) { + TFileTreeBuilder builder; + for (auto& item : paths) { + builder.AddPath(std::get<0>(item), std::get<1>(item)); + } + NS3::TRange range; + builder.Save(&range); + + isTextEncoded = range.GetPaths().size() < 100; + if (isTextEncoded) { + google::protobuf::TextFormat::PrintToString(range, &packed); + } else { + packed = range.SerializeAsString(); + } +} + +void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths) { + NS3::TRange range; + TMemoryInput inputStream(packed); + if (isTextEncoded) { + ParseFromTextFormat(inputStream, range); + } else { + range.ParseFromArcadiaStream(&inputStream); + } + + TString buf; + BuildPathsFromTree(range.GetPaths(), paths, buf); +} + } // namespace NYql::NS3Details diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h index 9be8c7814b..8f67336878 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h @@ -1,6 +1,5 @@ #pragma once #include <ydb/library/yql/providers/s3/proto/source.pb.h> -#include <ydb/library/yql/providers/s3/proto/range.pb.h> #include <util/generic/hash.h> #include <util/generic/string.h> @@ -15,4 +14,7 @@ using TPathList = std::vector<TPath>; void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, TPathList& paths, ui64& startPathIndex); +void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded); +void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths); + } // namespace NYql::NS3Details diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp index a54135e656..a736e505d5 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp @@ -1,6 +1,7 @@ #include "path_list_reader.h" #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/s3/proto/range.pb.h> #include <library/cpp/testing/unittest/registar.h> |