aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-08-20 19:51:43 +0300
committeraneporada <aneporada@ydb.tech>2022-08-20 19:51:43 +0300
commitd95f2491195efbc546af6bb237fabaf95bc9953e (patch)
tree3c1ae7fc2aca1a2f289ba8aa01e3601ddc932285
parent75d8b5d5e58445073b0260897a93a78269c3f43f (diff)
downloadydb-d95f2491195efbc546af6bb237fabaf95bc9953e.tar.gz
[] Check for file size / file count limits after S3 paths were pruned
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp38
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp81
2 files changed, 81 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index db2cd7ecad..c519ccadca 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -110,8 +110,6 @@ public:
genColumnsByNode.swap(GenColumnsByNode_);
TNodeOnNodeOwnedMap replaces;
- size_t count = 0;
- size_t totalSize = 0;
for (auto& [node, requests] : requestsByNode) {
const TS3Read read(node);
const auto& object = read.Arg(2).Ref();
@@ -119,22 +117,6 @@ public:
size_t readSize = 0;
TExprNode::TListType pathNodes;
- TString formatName;
- {
- const auto& settings = *read.Ref().Child(4);
- auto format = GetSetting(settings, "format");
- if (format && format->ChildrenSize() >= 2) {
- formatName = format->Child(1)->Content();
- }
- }
- auto fileSizeLimit = State_->Configuration->FileSizeLimit;
- if (formatName) {
- auto it = State_->Configuration->FormatSizeLimits.find(formatName);
- if (it != State_->Configuration->FormatSizeLimits.end() && fileSizeLimit > it->second) {
- fileSizeLimit = it->second;
- }
- }
-
struct TExtraColumnValue {
TString Name;
TMaybe<NUdf::EDataSlot> Type;
@@ -172,12 +154,6 @@ public:
}
for (auto& entry : listEntries) {
- if (entry.Size > fileSizeLimit) {
- ctx.AddError(TIssue(ctx.GetPosition(object.Pos()),
- TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
- return TStatus::Error;
- }
-
TMaybe<TVector<TExtraColumnValue>> extraValues;
if (generatedColumnsConfig) {
extraValues = TVector<TExtraColumnValue>{};
@@ -205,12 +181,10 @@ public:
auto& pathList = pathsByExtraValues[extraValues];
pathList.emplace_back(entry.Path, entry.Size);
- ++count;
readSize += entry.Size;
}
YQL_CLOG(INFO, ProviderS3) << "Object " << req.Pattern << " has " << listEntries.size() << " items with total size " << readSize;
- totalSize += readSize;
}
for (const auto& [extraValues, pathList] : pathsByExtraValues) {
@@ -317,18 +291,6 @@ public:
.Done().Ptr());
}
- const auto maxFiles = State_->Configuration->MaxFilesPerQuery;
- if (count > maxFiles) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles));
- return TStatus::Error;
- }
-
- const auto maxSize = State_->Configuration->MaxReadSizePerQuery;
- if (totalSize > maxSize) {
- ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too large objects to read: " << totalSize << ", but limit is " << maxSize));
- return TStatus::Error;
- }
-
return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
}
private:
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index c9c7f35294..4f79b8fa72 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -173,6 +173,87 @@ public:
#undef HNDL
}
+ TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
+ auto status = TOptimizeTransformerBase::DoTransform(input, output, ctx);
+ if (status != TStatus::Ok) {
+ return status;
+ }
+
+ // check read limits after pruning
+ bool hasErr = false;
+ size_t count = 0;
+ size_t totalSize = 0;
+ VisitExpr(input, [&] (const TExprNode::TPtr& n) {
+ TExprBase node(n);
+ if (auto maybeSource = node.Maybe<TDqSourceWrap>()) {
+ const TDqSourceWrap dqSource = node.Cast<TDqSourceWrap>();
+ if (dqSource.DataSource().Category() == S3ProviderName) {
+ const auto& maybeS3SourceSettings = dqSource.Input().Maybe<TS3SourceSettingsBase>();
+ if (maybeS3SourceSettings && dqSource.Settings()) {
+ TString formatName;
+ {
+ auto format = GetSetting(dqSource.Settings().Ref(), "format");
+ if (format) {
+ formatName = format->Child(1)->Content();
+ }
+ }
+ auto fileSizeLimit = State_->Configuration->FileSizeLimit;
+ if (formatName) {
+ auto it = State_->Configuration->FormatSizeLimits.find(formatName);
+ if (it != State_->Configuration->FormatSizeLimits.end() && fileSizeLimit > it->second) {
+ fileSizeLimit = it->second;
+ }
+ }
+
+ for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) {
+ TStringBuf packed = batch.Data().Literal().Value();
+ bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value());
+
+ TPathList paths;
+ UnpackPathsList(packed, isTextEncoded, paths);
+
+ for (auto& entry : paths) {
+ const TString path = std::get<0>(entry);
+ const size_t size = std::get<1>(entry);
+ if (size > fileSizeLimit) {
+ ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()),
+ TStringBuilder() << "Size of object " << path << " = " << size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName));
+ hasErr = true;
+ return false;
+ }
+ totalSize += size;
+ ++count;
+ }
+ }
+ return false;
+ }
+ }
+ }
+ return !hasErr;
+ });
+
+ if (hasErr) {
+ return TStatus::Error;
+ }
+
+ const auto maxFiles = State_->Configuration->MaxFilesPerQuery;
+ if (count > maxFiles) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles));
+ return TStatus::Error;
+ }
+
+ const auto maxSize = State_->Configuration->MaxReadSizePerQuery;
+ if (totalSize > maxSize) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too large objects to read: " << totalSize << ", but limit is " << maxSize));
+ return TStatus::Error;
+ }
+
+ if (count > 0) {
+ YQL_CLOG(INFO, ProviderS3) << "Will read from S3 " << count << " files with total size " << totalSize << " bytes";
+ }
+ return TStatus::Ok;
+ }
+
TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const {
const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TS3ReadObject>();
if (!maybeRead) {