diff options
author | aneporada <aneporada@ydb.tech> | 2022-08-20 19:51:43 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2022-08-20 19:51:43 +0300 |
commit | d95f2491195efbc546af6bb237fabaf95bc9953e (patch) | |
tree | 3c1ae7fc2aca1a2f289ba8aa01e3601ddc932285 | |
parent | 75d8b5d5e58445073b0260897a93a78269c3f43f (diff) | |
download | ydb-d95f2491195efbc546af6bb237fabaf95bc9953e.tar.gz |
[] Check for file size / file count limits after S3 paths were pruned
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp | 38 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp | 81 |
2 files changed, 81 insertions, 38 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index db2cd7ecad..c519ccadca 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -110,8 +110,6 @@ public: genColumnsByNode.swap(GenColumnsByNode_); TNodeOnNodeOwnedMap replaces; - size_t count = 0; - size_t totalSize = 0; for (auto& [node, requests] : requestsByNode) { const TS3Read read(node); const auto& object = read.Arg(2).Ref(); @@ -119,22 +117,6 @@ public: size_t readSize = 0; TExprNode::TListType pathNodes; - TString formatName; - { - const auto& settings = *read.Ref().Child(4); - auto format = GetSetting(settings, "format"); - if (format && format->ChildrenSize() >= 2) { - formatName = format->Child(1)->Content(); - } - } - auto fileSizeLimit = State_->Configuration->FileSizeLimit; - if (formatName) { - auto it = State_->Configuration->FormatSizeLimits.find(formatName); - if (it != State_->Configuration->FormatSizeLimits.end() && fileSizeLimit > it->second) { - fileSizeLimit = it->second; - } - } - struct TExtraColumnValue { TString Name; TMaybe<NUdf::EDataSlot> Type; @@ -172,12 +154,6 @@ public: } for (auto& entry : listEntries) { - if (entry.Size > fileSizeLimit) { - ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), - TStringBuilder() << "Size of object " << entry.Path << " = " << entry.Size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); - return TStatus::Error; - } - TMaybe<TVector<TExtraColumnValue>> extraValues; if (generatedColumnsConfig) { extraValues = TVector<TExtraColumnValue>{}; @@ -205,12 +181,10 @@ public: auto& pathList = pathsByExtraValues[extraValues]; pathList.emplace_back(entry.Path, entry.Size); - ++count; readSize += entry.Size; } YQL_CLOG(INFO, ProviderS3) << "Object " << req.Pattern << " has " << listEntries.size() << " items with total size " << readSize; - totalSize += readSize; } for (const auto& [extraValues, pathList] : pathsByExtraValues) { @@ -317,18 +291,6 @@ public: .Done().Ptr()); } - const auto maxFiles = State_->Configuration->MaxFilesPerQuery; - if (count > maxFiles) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles)); - return TStatus::Error; - } - - const auto maxSize = State_->Configuration->MaxReadSizePerQuery; - if (totalSize > maxSize) { - ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too large objects to read: " << totalSize << ", but limit is " << maxSize)); - return TStatus::Error; - } - return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); } private: diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index c9c7f35294..4f79b8fa72 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -173,6 +173,87 @@ public: #undef HNDL } + TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { + auto status = TOptimizeTransformerBase::DoTransform(input, output, ctx); + if (status != TStatus::Ok) { + return status; + } + + // check read limits after pruning + bool hasErr = false; + size_t count = 0; + size_t totalSize = 0; + VisitExpr(input, [&] (const TExprNode::TPtr& n) { + TExprBase node(n); + if (auto maybeSource = node.Maybe<TDqSourceWrap>()) { + const TDqSourceWrap dqSource = node.Cast<TDqSourceWrap>(); + if (dqSource.DataSource().Category() == S3ProviderName) { + const auto& maybeS3SourceSettings = dqSource.Input().Maybe<TS3SourceSettingsBase>(); + if (maybeS3SourceSettings && dqSource.Settings()) { + TString formatName; + { + auto format = GetSetting(dqSource.Settings().Ref(), "format"); + if (format) { + formatName = format->Child(1)->Content(); + } + } + auto fileSizeLimit = State_->Configuration->FileSizeLimit; + if (formatName) { + auto it = State_->Configuration->FormatSizeLimits.find(formatName); + if (it != State_->Configuration->FormatSizeLimits.end() && fileSizeLimit > it->second) { + fileSizeLimit = it->second; + } + } + + for (const TS3Path& batch : maybeS3SourceSettings.Cast().Paths()) { + TStringBuf packed = batch.Data().Literal().Value(); + bool isTextEncoded = FromString<bool>(batch.IsText().Literal().Value()); + + TPathList paths; + UnpackPathsList(packed, isTextEncoded, paths); + + for (auto& entry : paths) { + const TString path = std::get<0>(entry); + const size_t size = std::get<1>(entry); + if (size > fileSizeLimit) { + ctx.AddError(TIssue(ctx.GetPosition(batch.Pos()), + TStringBuilder() << "Size of object " << path << " = " << size << " and exceeds limit = " << fileSizeLimit << " specified for format " << formatName)); + hasErr = true; + return false; + } + totalSize += size; + ++count; + } + } + return false; + } + } + } + return !hasErr; + }); + + if (hasErr) { + return TStatus::Error; + } + + const auto maxFiles = State_->Configuration->MaxFilesPerQuery; + if (count > maxFiles) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles)); + return TStatus::Error; + } + + const auto maxSize = State_->Configuration->MaxReadSizePerQuery; + if (totalSize > maxSize) { + ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too large objects to read: " << totalSize << ", but limit is " << maxSize)); + return TStatus::Error; + } + + if (count > 0) { + YQL_CLOG(INFO, ProviderS3) << "Will read from S3 " << count << " files with total size " << totalSize << " bytes"; + } + return TStatus::Ok; + } + TMaybeNode<TExprBase> TrimReadWorld(TExprBase node, TExprContext& ctx) const { const auto& maybeRead = node.Cast<TCoLeft>().Input().Maybe<TS3ReadObject>(); if (!maybeRead) { |