aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-08-04 09:06:18 +0300
committeraneporada <aneporada@ydb.tech>2022-08-04 09:06:18 +0300
commit91ead42844761e5f9bccab147a24e097b54b562e (patch)
tree8f88d9352b995c656c85fe98a52a2104e1a9d907
parent3305b730bd7faada7c1134b7296b099098deb897 (diff)
downloadydb-91ead42844761e5f9bccab147a24e097b54b562e.tar.gz
[] Store file list as single AST node per partition
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp35
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp42
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp100
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp206
-rw-r--r--ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp41
-rw-r--r--ydb/library/yql/providers/s3/range_helpers/path_list_reader.h4
-rw-r--r--ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp1
9 files changed, 321 insertions, 113 deletions
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index c72c3b79e9..5b5c5dde1a 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -30,8 +30,8 @@
"Base": "TExprBase",
"Match": {"Type": "Tuple"},
"Children": [
- {"Index": 0, "Name": "Path", "Type": "TCoAtom"},
- {"Index": 1, "Name": "Size", "Type": "TCoAtom"},
+ {"Index": 0, "Name": "Data", "Type": "TCoString"},
+ {"Index": 1, "Name": "IsText", "Type": "TCoBool"},
{"Index": 2, "Name": "ExtraColumns", "Type": "TExprBase"}
]
},
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index 4c04fe5a37..f92fba98b4 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -2,6 +2,7 @@
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -15,6 +16,24 @@ using namespace NNodes;
namespace {
+bool ValidateS3PackedPaths(TPositionHandle pos, TStringBuf blob, bool isTextEncoded, TExprContext& ctx) {
+ using namespace NYql::NS3Details;
+ try {
+ TPathList paths;
+ UnpackPathsList(blob, isTextEncoded, paths);
+ for (size_t i = 0; i < paths.size(); ++i) {
+ if (std::get<0>(paths[i]).empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Expected non-empty path (index " << i << ")"));
+ return false;
+ }
+ }
+ } catch (const std::exception& ex) {
+ ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Failed to parse packed paths: " << ex.what()));
+ return false;
+ }
+ return true;
+}
+
bool ValidateS3Paths(const TExprNode& node, const TStructExprType*& extraColumnsType, TExprContext& ctx) {
if (!EnsureTupleMinSize(node, 1, ctx)) {
return false;
@@ -26,21 +45,19 @@ bool ValidateS3Paths(const TExprNode& node, const TStructExprType*& extraColumns
return false;
}
- if (!EnsureAtom(*path->Child(TS3Path::idx_Path), ctx) || !EnsureAtom(*path->Child(TS3Path::idx_Size), ctx))
- {
+ auto pathAndSizeList = path->Child(TS3Path::idx_Data);
+ if (!TCoString::Match(pathAndSizeList)) {
+ ctx.AddError(TIssue(ctx.GetPosition(pathAndSizeList->Pos()), "Expected String literal for Data"));
return false;
}
- if (path->Child(TS3Path::idx_Path)->Content().empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(path->Child(TS3Path::idx_Path)->Pos()), "Expected non-empty path"));
+ const TExprNode* isTextEncoded = path->Child(TS3Path::idx_IsText);
+ if (!TCoBool::Match(isTextEncoded)) {
+ ctx.AddError(TIssue(ctx.GetPosition(isTextEncoded->Pos()), "Expected Bool literal for IsText"));
return false;
}
- ui64 size = 0;
- auto sizeStr = path->Child(TS3Path::idx_Size)->Content();
- if (!TryFromString(sizeStr, size)) {
- ctx.AddError(TIssue(ctx.GetPosition(path->Child(TS3Path::idx_Size)->Pos()),
- TStringBuilder() << "Expected number as S3 object size, got: '" << sizeStr << "'"));
+ if (!ValidateS3PackedPaths(pathAndSizeList->Pos(), pathAndSizeList->Head().Content(), FromString<bool>(isTextEncoded->Head().Content()), ctx)) {
return false;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index df02d07030..a361d0ef6b 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -11,6 +11,7 @@
#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
#include <ydb/library/yql/providers/s3/range_helpers/file_tree_builder.h>
+#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/utils/log/log.h>
namespace NYql {
@@ -55,12 +56,15 @@ public:
if (const TMaybeNode<TDqSource> source = &node) {
cluster = source.Cast().DataSource().Cast<TS3DataSource>().Cluster().Value();
const auto settings = source.Cast().Settings().Cast<TS3SourceSettingsBase>();
- parts.reserve(settings.Paths().Size());
- for (auto i = 0u; i < settings.Paths().Size(); ++i)
- parts.emplace_back(1U,
- std::pair(
- settings.Paths().Item(i).Path().StringValue(),
- FromString<ui64>(settings.Paths().Item(i).Size().Value())));
+ for (auto i = 0u; i < settings.Paths().Size(); ++i) {
+ const auto& path = settings.Paths().Item(i);
+ TPathList paths;
+ UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), paths);
+ parts.reserve(parts.size() + paths.size());
+ for (auto& p : paths) {
+ parts.emplace_back(1U, std::pair(std::get<0>(p), std::get<1>(p)));
+ }
+ }
}
if (maxPartitions && parts.size() > maxPartitions) {
@@ -127,17 +131,33 @@ public:
.Seal().Build()
);
- TExprNodeList extraColumns;
+ TExprNodeList extraColumnsExtents;
for (size_t i = 0; i < s3ReadObject.Object().Paths().Size(); ++i) {
- extraColumns.push_back(s3ReadObject.Object().Paths().Item(i).ExtraColumns().Ptr());
+ auto batch = s3ReadObject.Object().Paths().Item(i);
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
+
+ TPathList paths;
+ UnpackPathsList(packed, isTextEncoded, paths);
+
+ extraColumnsExtents.push_back(
+ ctx.Builder(batch.ExtraColumns().Pos())
+ .Callable("Replicate")
+ .Add(0, batch.ExtraColumns().Ptr())
+ .Callable(1, "Uint64")
+ .Atom(0, ToString(paths.size()), TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Build()
+ );
}
- YQL_ENSURE(!extraColumns.empty());
- if (extraColumns.front()->GetTypeAnn()->Cast<TStructExprType>()->GetSize()) {
+ YQL_ENSURE(!extraColumnsExtents.empty());
+ if (s3ReadObject.Object().Paths().Item(0).ExtraColumns().Ref().GetTypeAnn()->Cast<TStructExprType>()->GetSize()) {
settings.push_back(
ctx.Builder(s3ReadObject.Object().Pos())
.List()
.Atom(0, "extraColumns")
- .Add(1, ctx.NewCallable(s3ReadObject.Object().Pos(), "AsList", std::move(extraColumns)))
+ .Add(1, ctx.NewCallable(s3ReadObject.Object().Pos(), "OrderedExtend", std::move(extraColumnsExtents)))
.Seal()
.Build()
);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 2abc4c4b16..dc0a7fcc59 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
+#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -97,9 +98,11 @@ public:
TPendingRequests pendingRequests;
TNodeMap<TVector<TListRequest>> requestsByNode;
+ TNodeMap<TGeneratedColumnsConfig> genColumnsByNode;
pendingRequests.swap(PendingRequests_);
requestsByNode.swap(RequestsByNode_);
+ genColumnsByNode.swap(GenColumnsByNode_);
TNodeOnNodeOwnedMap replaces;
size_t count = 0;
@@ -127,6 +130,12 @@ public:
}
}
+ TMap<TMaybe<std::vector<TString>>, NS3Details::TPathList> pathsByMatchedGlobs;
+ const TGeneratedColumnsConfig* generatedColumnsConfig = nullptr;
+ if (auto it = genColumnsByNode.find(node); it != genColumnsByNode.end()) {
+ generatedColumnsConfig = &it->second;
+ }
+
for (auto& req : requests) {
auto it = pendingRequests.find(req);
YQL_ENSURE(it != pendingRequests.end());
@@ -150,48 +159,16 @@ public:
}
return TStatus::Error;
}
- for (auto& entry : listEntries) {
+ for (auto& entry : listEntries) {
if (entry.Size > fileSizeLimit) {
ctx.AddError(TIssue(ctx.GetPosition(object.Pos()),
TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
return TStatus::Error;
}
- TExprNodeList extraColumnsAsStructArgs;
- if (auto confIt = GenColumnsByNode_.find(node); confIt != GenColumnsByNode_.end()) {
- const TGeneratedColumnsConfig& config = confIt->second;
- YQL_ENSURE(config.Columns.size() <= entry.MatchedGlobs.size());
- YQL_ENSURE(config.SchemaTypeNode);
-
- for (size_t i = 0; i < config.Columns.size(); ++i) {
- auto& col = config.Columns[i];
- extraColumnsAsStructArgs.push_back(
- ctx.Builder(object.Pos())
- .List()
- .Atom(0, col)
- .Callable(1, "Data")
- .Callable(0, "StructMemberType")
- .Add(0, config.SchemaTypeNode)
- .Atom(1, col)
- .Seal()
- .Atom(1, entry.MatchedGlobs[i])
- .Seal()
- .Seal()
- .Build()
- );
- }
- }
-
- pathNodes.emplace_back(
- ctx.Builder(object.Pos())
- .List()
- .Atom(0, entry.Path)
- .Atom(1, ToString(entry.Size), TNodeFlags::Default)
- .Add(2, ctx.NewCallable(object.Pos(), "AsStruct", std::move(extraColumnsAsStructArgs)))
- .Seal()
- .Build()
- );
+ auto& pathList = pathsByMatchedGlobs[generatedColumnsConfig ? entry.MatchedGlobs : TMaybe<std::vector<TString>>{}];
+ pathList.emplace_back(entry.Path, entry.Size);
++count;
readSize += entry.Size;
}
@@ -200,13 +177,60 @@ public:
totalSize += readSize;
}
+ for (const auto& [matchedGlobs, pathList] : pathsByMatchedGlobs) {
+ TExprNodeList extraColumnsAsStructArgs;
+ if (generatedColumnsConfig) {
+ YQL_ENSURE(matchedGlobs.Defined());
+ YQL_ENSURE(generatedColumnsConfig->Columns.size() <= matchedGlobs->size());
+ YQL_ENSURE(generatedColumnsConfig->SchemaTypeNode);
+
+ for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) {
+ auto& col = generatedColumnsConfig->Columns[i];
+ extraColumnsAsStructArgs.push_back(
+ ctx.Builder(object.Pos())
+ .List()
+ .Atom(0, col)
+ .Callable(1, "Data")
+ .Callable(0, "StructMemberType")
+ .Add(0, generatedColumnsConfig->SchemaTypeNode)
+ .Atom(1, col)
+ .Seal()
+ .Atom(1, (*matchedGlobs)[i])
+ .Seal()
+ .Seal()
+ .Build()
+ );
+ }
+ }
+
+ auto extraColumns = ctx.NewCallable(object.Pos(), "AsStruct", std::move(extraColumnsAsStructArgs));
+
+ TString packedPaths;
+ bool isTextFormat;
+ NS3Details::PackPathsList(pathList, packedPaths, isTextFormat);
+
+ pathNodes.emplace_back(
+ Build<TS3Path>(ctx, object.Pos())
+ .Data<TCoString>()
+ .Literal()
+ .Build(packedPaths)
+ .Build()
+ .IsText<TCoBool>()
+ .Literal()
+ .Build(ToString(isTextFormat))
+ .Build()
+ .ExtraColumns(extraColumns)
+ .Done().Ptr()
+ );
+ }
+
auto settings = read.Ref().Child(4)->ChildrenList();
auto userSchema = ExtractSchema(settings);
TExprNode::TPtr s3Object;
s3Object = Build<TS3Object>(ctx, object.Pos())
- .Paths(ctx.NewList(object.Pos(), std::move(pathNodes)))
- .Format(ExtractFormat(settings))
- .Settings(ctx.NewList(object.Pos(), std::move(settings)))
+ .Paths(ctx.NewList(object.Pos(), std::move(pathNodes)))
+ .Format(ExtractFormat(settings))
+ .Settings(ctx.NewList(object.Pos(), std::move(settings)))
.Done().Ptr();
replaces.emplace(node, userSchema.back() ?
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index 1ffb82a4b0..c9c7f35294 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -1,6 +1,7 @@
#include "yql_s3_provider_impl.h"
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
@@ -18,6 +19,8 @@ using namespace NNodes;
namespace {
+using namespace NYql::NS3Details;
+
void RebuildPredicateForPruning(const TExprNode::TPtr& pred, const TExprNode& arg, const TStructExprType& extraType,
TExprNode::TPtr& prunedPred, TExprNode::TPtr& extraPred, TExprContext& ctx)
{
@@ -112,27 +115,10 @@ TCoFlatMapBase CalculatePrunedPaths(TCoFlatMapBase flatMap, TExprContext& ctx, T
TExprNode::TPtr filteredPathList;
if (extraPred) {
auto source = flatMap.Input().Cast<TDqSourceWrap>().Input().Cast<TS3SourceSettingsBase>();
- TExprNodeList pathList;
- for (size_t i = 0; i < source.Paths().Size(); ++i) {
- pathList.push_back(
- ctx.Builder(source.Paths().Item(i).Pos())
- .List()
- .Callable(0, "String")
- .Add(0, source.Paths().Item(i).Path().Ptr())
- .Seal()
- .Callable(1, "Uint64")
- .Add(0, source.Paths().Item(i).Size().Ptr())
- .Seal()
- .Add(2, source.Paths().Item(i).ExtraColumns().Ptr())
- .Seal()
- .Build()
- );
- }
-
filteredPathList = ctx.Builder(pred->Pos())
.Callable("EvaluateExpr")
.Callable(0, "OrderedFilter")
- .Add(0, ctx.NewCallable(pred->Pos(), "AsList", std::move(pathList)))
+ .Add(0, ctx.NewCallable(pred->Pos(), "AsList", source.Paths().Ref().ChildrenList()))
.Lambda(1)
.Param("item")
.Apply(ctx.NewLambda(extraPred->Pos(), flatMap.Lambda().Args().Ptr(), std::move(extraPred)))
@@ -183,6 +169,7 @@ public:
AddHandler(0, &TCoFlatMapBase::Match, HNDL(TryPrunePaths));
AddHandler(0, &TDqSourceWrap::Match, HNDL(ApplyPrunedPath));
AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqSource));
+ AddHandler(0, &TDqSourceWrap::Match, HNDL(MergeS3Paths));
#undef HNDL
}
@@ -211,50 +198,54 @@ public:
return node;
}
- auto extraColumnsSetting = GetSetting(dqSource.Settings().Ref(), "extraColumns");
- YQL_ENSURE(extraColumnsSetting);
- auto extraColumns = extraColumnsSetting->ChildPtr(1);
- YQL_ENSURE(extraColumns->IsCallable("AsList"), "extraColumns should have literal value");
-
+ const size_t beforeEntries = maybeS3SourceSettings.Cast().Paths().Size();
auto prunedPaths = prunedPathSetting->ChildPtr(1);
if (prunedPaths->IsCallable("List")) {
- YQL_CLOG(INFO, ProviderS3) << "S3 Paths completely pruned: " << extraColumns->ChildrenSize() << " paths";
+ YQL_CLOG(INFO, ProviderS3) << "S3 Paths completely pruned: " << beforeEntries << " entries";
return ctx.NewCallable(node.Pos(), "List", { ExpandType(node.Pos(), *node.Ref().GetTypeAnn(), ctx) });
}
YQL_ENSURE(prunedPaths->IsCallable("AsList"), "prunedPaths should have literal value");
- YQL_ENSURE(prunedPaths->ChildrenSize() <= extraColumns->ChildrenSize());
YQL_ENSURE(prunedPaths->ChildrenSize() > 0);
+ const size_t afterEntries = prunedPaths->ChildrenSize();
auto newSettings = ReplaceSetting(dqSource.Settings().Ref(), prunedPathSetting->Pos(), "prunedPaths", nullptr, ctx);
- if (prunedPaths->ChildrenSize() == extraColumns->ChildrenSize()) {
- YQL_CLOG(INFO, ProviderS3) << "No S3 paths are pruned: " << extraColumns->ChildrenSize() << " paths";
+ if (beforeEntries == afterEntries) {
+ YQL_CLOG(INFO, ProviderS3) << "No S3 paths are pruned: " << afterEntries << " entries";
return Build<TDqSourceWrap>(ctx, dqSource.Pos())
.InitFrom(dqSource)
.Settings(newSettings)
.Done();
}
- YQL_CLOG(INFO, ProviderS3) << "Pruning S3 Paths: " << extraColumns->ChildrenSize() << " -> " << prunedPaths->ChildrenSize();
- TExprNodeList newPaths;
- TExprNodeList newExtraColumns;
+ YQL_CLOG(INFO, ProviderS3) << "Pruning S3 Paths: " << beforeEntries << " -> " << afterEntries << " entries";
+ TExprNodeList newExtraColumnsExtents;
for (auto& entry : prunedPaths->ChildrenList()) {
- auto path = entry->ChildPtr(0);
- auto size = entry->ChildPtr(1);
- auto extra = entry->ChildPtr(2);
+ TS3Path batch(entry);
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
- YQL_ENSURE(path->IsCallable("String"));
- YQL_ENSURE(size->IsCallable("Uint64"));
- YQL_ENSURE(extra->IsCallable("AsStruct"));
+ TPathList paths;
+ UnpackPathsList(packed, isTextEncoded, paths);
- newPaths.push_back(ctx.NewList(entry->Pos(), { path->HeadPtr(), size->HeadPtr(), extra }));
- newExtraColumns.push_back(extra);
+ newExtraColumnsExtents.push_back(
+ ctx.Builder(batch.ExtraColumns().Pos())
+ .Callable("Replicate")
+ .Add(0, batch.ExtraColumns().Ptr())
+ .Callable(1, "Uint64")
+ .Atom(0, ToString(paths.size()), TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Build()
+ );
}
- newSettings = ReplaceSetting(*newSettings, newSettings->Pos(), "extraColumns", ctx.NewCallable(newSettings->Pos(), "AsList", std::move(newExtraColumns)), ctx);
+ newSettings = ReplaceSetting(*newSettings, newSettings->Pos(), "extraColumns",
+ ctx.NewCallable(newSettings->Pos(), "OrderedExtend", std::move(newExtraColumnsExtents)), ctx);
auto oldSrc = dqSource.Input().Cast<TS3SourceSettingsBase>();
- auto newSrc = ctx.ChangeChild(dqSource.Input().Ref(), TS3SourceSettingsBase::idx_Paths, ctx.NewList(oldSrc.Paths().Pos(), std::move(newPaths)));
+ auto newSrc = ctx.ChangeChild(dqSource.Input().Ref(), TS3SourceSettingsBase::idx_Paths,
+ ctx.NewList(oldSrc.Paths().Pos(), prunedPaths->ChildrenList()));
return Build<TDqSourceWrap>(ctx, dqSource.Pos())
.InitFrom(dqSource)
@@ -337,30 +328,43 @@ public:
if (extraTypeItems.size() < extraType->GetSize()) {
auto originalPaths = maybeS3SourceSettings.Cast().Paths().Ptr();
auto originalExtra = extraColumnsSetting->TailPtr();
- YQL_ENSURE(originalExtra->IsCallable("AsList"));
YQL_ENSURE(originalPaths->IsList());
- YQL_ENSURE(originalPaths->ChildrenSize() == originalExtra->ChildrenSize());
TExprNodeList newPathItems;
- TExprNodeList newExtraColumnsItems;
+ TExprNodeList newExtraColumnsExtents;
- for (const auto& path : maybeS3SourceSettings.Cast().Paths()) {
- auto extra = path.ExtraColumns();
+ for (const auto& batch : maybeS3SourceSettings.Cast().Paths()) {
+ auto extra = batch.ExtraColumns();
YQL_ENSURE(TCoAsStruct::Match(extra.Raw()));
TExprNodeList children = extra.Ref().ChildrenList();
EraseIf(children, [&](const TExprNode::TPtr& child) { return !extractMembers.contains(child->Head().Content()); });
auto newStruct = ctx.ChangeChildren(extra.Ref(), std::move(children));
- newExtraColumnsItems.push_back(newStruct);
- newPathItems.push_back(ctx.ChangeChild(path.Ref(), TS3Path::idx_ExtraColumns, std::move(newStruct)));
+
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
+
+ TPathList paths;
+ UnpackPathsList(packed, isTextEncoded, paths);
+
+ newExtraColumnsExtents.push_back(
+ ctx.Builder(batch.ExtraColumns().Pos())
+ .Callable("Replicate")
+ .Add(0, newStruct)
+ .Callable(1, "Uint64")
+ .Atom(0, ToString(paths.size()), TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Build()
+ );
+ newPathItems.push_back(ctx.ChangeChild(batch.Ref(), TS3Path::idx_ExtraColumns, std::move(newStruct)));
}
newPaths = ctx.ChangeChildren(maybeS3SourceSettings.Cast().Paths().Ref(), std::move(newPathItems));
- TExprNode::TPtr newExtra = ctx.ChangeChildren(*originalExtra, std::move(newExtraColumnsItems));
+ TExprNode::TPtr newExtra = ctx.ChangeChildren(*originalExtra, std::move(newExtraColumnsExtents));
newSettings = TExprBase(extraTypeItems.empty() ? RemoveSetting(settings.Cast().Ref(), "extraColumns", ctx) :
ReplaceSetting(settings.Cast().Ref(), extraColumnsSetting->Pos(), "extraColumns", newExtra, ctx));
}
}
-
}
const TStructExprType* outputRowType = node.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
@@ -403,6 +407,110 @@ public:
.Done();
}
+ TMaybeNode<TExprBase> MergeS3Paths(TExprBase node, TExprContext& ctx) const {
+ const TDqSourceWrap dqSource = node.Cast<TDqSourceWrap>();
+ if (dqSource.DataSource().Category() != S3ProviderName) {
+ return node;
+ }
+
+ const auto& maybeS3SourceSettings = dqSource.Input().Maybe<TS3SourceSettingsBase>();
+ if (!maybeS3SourceSettings) {
+ return node;
+ }
+
+ TMaybeNode<TExprBase> settings = dqSource.Settings();
+ if (settings) {
+ if (auto prunedPaths = GetSetting(settings.Cast().Ref(), "prunedPaths")) {
+ if (prunedPaths->ChildrenSize() > 1) {
+ // pruning in progress
+ return node;
+ }
+ }
+ }
+
+ TNodeMap<TExprNodeList> s3PathsByExtra;
+ bool needMerge = false;
+ for (const auto& batch : maybeS3SourceSettings.Cast().Paths()) {
+ auto extra = batch.ExtraColumns();
+ auto& group = s3PathsByExtra[extra.Raw()];
+ group.emplace_back(batch.Ptr());
+ needMerge = needMerge || group.size() > 1;
+ }
+
+ if (!needMerge) {
+ return node;
+ }
+
+ TExprNodeList newPathItems;
+ TExprNodeList newExtraColumnsExtents;
+
+ for (const auto& origBatch : maybeS3SourceSettings.Cast().Paths()) {
+ auto extra = origBatch.ExtraColumns();
+ auto& group = s3PathsByExtra[extra.Raw()];
+ if (group.empty()) {
+ continue;
+ }
+
+ TPathList paths;
+ for (auto& item : group) {
+ TS3Path batch(item);
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
+ UnpackPathsList(packed, isTextEncoded, paths);
+ }
+
+ bool isTextEncoded;
+ TString packedPaths;
+ PackPathsList(paths, packedPaths, isTextEncoded);
+
+ newPathItems.emplace_back(
+ Build<TS3Path>(ctx, origBatch.Pos())
+ .Data<TCoString>()
+ .Literal()
+ .Build(packedPaths)
+ .Build()
+ .IsText<TCoBool>()
+ .Literal()
+ .Build(ToString(isTextEncoded))
+ .Build()
+ .ExtraColumns(extra)
+ .Done().Ptr()
+ );
+
+ newExtraColumnsExtents.push_back(
+ ctx.Builder(extra.Pos())
+ .Callable("Replicate")
+ .Add(0, extra.Ptr())
+ .Callable(1, "Uint64")
+ .Atom(0, ToString(paths.size()), TNodeFlags::Default)
+ .Seal()
+ .Seal()
+ .Build()
+ );
+
+ group.clear();
+ }
+
+ TMaybeNode<TExprBase> newSettings = settings;
+ if (settings) {
+ if (auto extraColumnsSetting = GetSetting(settings.Cast().Ref(), "extraColumns")) {
+ TPositionHandle pos = extraColumnsSetting->Pos();
+ auto newExtra = ctx.NewCallable(pos, "OrderedExtend", std::move(newExtraColumnsExtents));
+ newSettings = TExprBase(ReplaceSetting(settings.Cast().Ref(), pos, "extraColumns", newExtra, ctx));
+ }
+ }
+
+ YQL_CLOG(INFO, ProviderS3) << "Merge S3 paths with same extra columns in DqSource over " << maybeS3SourceSettings.Cast().CallableName();
+ auto sourceSettings = ctx.ChangeChild(maybeS3SourceSettings.Cast().Ref(), TS3SourceSettingsBase::idx_Paths,
+ ctx.NewList(maybeS3SourceSettings.Cast().Paths().Pos(), std::move(newPathItems)));
+
+ return Build<TDqSourceWrap>(ctx, dqSource.Pos())
+ .InitFrom(dqSource)
+ .Input(sourceSettings)
+ .Settings(newSettings)
+ .Done();
+ }
+
private:
const TS3State::TPtr State_;
};
diff --git a/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt b/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt
index 7e22842e68..1e83f7763c 100644
--- a/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/range_helpers/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(providers-s3-range_helpers PUBLIC
providers-common-provider
providers-s3-proto
library-yql-utils
+ cpp-protobuf-util
)
target_sources(providers-s3-range_helpers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/range_helpers/file_tree_builder.cpp
diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp
index 4813dfa12a..4e5d9b6163 100644
--- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp
+++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp
@@ -1,8 +1,14 @@
#include "path_list_reader.h"
+#include <ydb/library/yql/providers/s3/range_helpers/file_tree_builder.h>
+
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/utils/yql_panic.h>
+#include <library/cpp/protobuf/util/pb_io.h>
+#include <google/protobuf/text_format.h>
+
+#include <util/stream/mem.h>
#include <util/stream/str.h>
namespace NYql::NS3Details {
@@ -35,13 +41,13 @@ void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TStri
// Modern way
if (range.PathsSize()) {
TString buf;
- BuildPathsFromTree(range.GetPaths(), paths, buf);
- return;
+ return BuildPathsFromTree(range.GetPaths(), paths, buf);
}
std::unordered_map<TString, size_t> map(sourceDesc.GetDeprecatedPath().size());
- for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i)
+ for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) {
map.emplace(sourceDesc.GetDeprecatedPath().Get(i).GetPath(), sourceDesc.GetDeprecatedPath().Get(i).GetSize());
+ }
for (auto i = 0; i < range.GetDeprecatedPath().size(); ++i) {
const auto& path = range.GetDeprecatedPath().Get(i);
@@ -56,4 +62,33 @@ void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TStri
}
}
+void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded) {
+ TFileTreeBuilder builder;
+ for (auto& item : paths) {
+ builder.AddPath(std::get<0>(item), std::get<1>(item));
+ }
+ NS3::TRange range;
+ builder.Save(&range);
+
+ isTextEncoded = range.GetPaths().size() < 100;
+ if (isTextEncoded) {
+ google::protobuf::TextFormat::PrintToString(range, &packed);
+ } else {
+ packed = range.SerializeAsString();
+ }
+}
+
+void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths) {
+ NS3::TRange range;
+ TMemoryInput inputStream(packed);
+ if (isTextEncoded) {
+ ParseFromTextFormat(inputStream, range);
+ } else {
+ range.ParseFromArcadiaStream(&inputStream);
+ }
+
+ TString buf;
+ BuildPathsFromTree(range.GetPaths(), paths, buf);
+}
+
} // namespace NYql::NS3Details
diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h
index 9be8c7814b..8f67336878 100644
--- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h
+++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h
@@ -1,6 +1,5 @@
#pragma once
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
-#include <ydb/library/yql/providers/s3/proto/range.pb.h>
#include <util/generic/hash.h>
#include <util/generic/string.h>
@@ -15,4 +14,7 @@ using TPathList = std::vector<TPath>;
void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, TPathList& paths, ui64& startPathIndex);
+void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded);
+void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths);
+
} // namespace NYql::NS3Details
diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp
index a54135e656..a736e505d5 100644
--- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp
+++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp
@@ -1,6 +1,7 @@
#include "path_list_reader.h"
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/s3/proto/range.pb.h>
#include <library/cpp/testing/unittest/registar.h>