summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <[email protected]>2023-03-20 21:50:26 +0300
committerauzhegov <[email protected]>2023-03-20 21:50:26 +0300
commit495d43f6bcb5343263e9f55f1082633313273a49 (patch)
tree0730f8591b38d46aaaad32206baa35b81cfcaa4e
parentde1da5bba65d071a1fa8a3dde559560c1c4ba8a2 (diff)
Added early stop for exact path listing
Initial version
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp13
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/library/yql/providers/s3/object_listers/ya.make2
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp76
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.h8
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp10
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp44
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp24
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp19
12 files changed, 157 insertions, 87 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 03bdd97ed7c..2fc8036bc85 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1545,12 +1545,12 @@ public:
LOG_CORO_W("Download completed for unknown/discarded range [" << readyRange.Offset << "-" << readyRange.Length << "]");
return;
}
-
+
if (it->second.Cookie != event.Cookie) {
LOG_CORO_W("Mistmatched cookie for range [" << readyRange.Offset << "-" << readyRange.Length << "], received " << event.Cookie << ", expected " << it->second.Cookie);
return;
- }
-
+ }
+
it->second.Data = event.Get()->Result.Extract();
ui64 size = it->second.Data.size();
it->second.Ready = true;
@@ -1659,7 +1659,7 @@ public:
CpuTime += GetCpuTimeDelta();
// if reordering is not allowed wait for row groups sequentially
- while (ReadyRowGroups.empty()
+ while (ReadyRowGroups.empty()
|| (!ReadSpec->RowGroupReordering && ReadyRowGroups.top() > readyGroupCount) ) {
ProcessOneEvent();
}
@@ -2790,13 +2790,10 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ythrow yexception()
<< "'pathpatternvariant' must be configured for directory listing";
}
- auto maybePathPatternVariant =
- NS3Lister::DeserializePatternVariant(pathPatternVariantValue->second);
- if (maybePathPatternVariant.Empty()) {
+ if (!TryFromString(pathPatternVariantValue->second, pathPatternVariant)) {
ythrow yexception()
<< "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second;
}
- pathPatternVariant = *maybePathPatternVariant;
}
ui64 fileSizeLimit = cfg.FileSizeLimit;
if (params.HasFormat()) {
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin-x86_64.txt
index 7188435f830..1288c09a65a 100644
--- a/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin-x86_64.txt
@@ -6,11 +6,18 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(providers-s3-object_listers)
target_link_libraries(providers-s3-object_listers PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
contrib-libs-re2
cpp-xml-document
providers-common-http_gateway
@@ -21,3 +28,8 @@ target_sources(providers-s3-object_listers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
)
+generate_enum_serilization(providers-s3-object_listers
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+ INCLUDE_HEADERS
+ ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt
index 2b11109c923..6aa84460917 100644
--- a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt
@@ -6,12 +6,19 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(providers-s3-object_listers)
target_link_libraries(providers-s3-object_listers PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
contrib-libs-re2
cpp-xml-document
providers-common-http_gateway
@@ -22,3 +29,8 @@ target_sources(providers-s3-object_listers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
)
+generate_enum_serilization(providers-s3-object_listers
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+ INCLUDE_HEADERS
+ ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-x86_64.txt
index 2b11109c923..6aa84460917 100644
--- a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-x86_64.txt
@@ -6,12 +6,19 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(providers-s3-object_listers)
target_link_libraries(providers-s3-object_listers PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
contrib-libs-re2
cpp-xml-document
providers-common-http_gateway
@@ -22,3 +29,8 @@ target_sources(providers-s3-object_listers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
)
+generate_enum_serilization(providers-s3-object_listers
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+ INCLUDE_HEADERS
+ ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.windows-x86_64.txt
index 7188435f830..1288c09a65a 100644
--- a/ydb/library/yql/providers/s3/object_listers/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.windows-x86_64.txt
@@ -6,11 +6,18 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(providers-s3-object_listers)
target_link_libraries(providers-s3-object_listers PUBLIC
contrib-libs-cxxsupp
yutil
+ tools-enum_parser-enum_serialization_runtime
contrib-libs-re2
cpp-xml-document
providers-common-http_gateway
@@ -21,3 +28,8 @@ target_sources(providers-s3-object_listers PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
)
+generate_enum_serilization(providers-s3-object_listers
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+ INCLUDE_HEADERS
+ ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/ya.make b/ydb/library/yql/providers/s3/object_listers/ya.make
index 84d99a9de32..1710fcedf3b 100644
--- a/ydb/library/yql/providers/s3/object_listers/ya.make
+++ b/ydb/library/yql/providers/s3/object_listers/ya.make
@@ -5,6 +5,8 @@ SRCS(
yql_s3_path.cpp
)
+GENERATE_ENUM_SERIALIZATION(yql_s3_list.h)
+
PEERDIR(
contrib/libs/re2
library/cpp/xml/document
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
index 917e55f96c4..493410c599f 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -23,37 +23,23 @@
namespace NYql::NS3Lister {
-TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant) {
- switch (variant) {
- case ES3PatternVariant::PathPattern:
- return "path_pattern";
- case ES3PatternVariant::FilePattern:
- return "file_pattern";
- default:
- return Nothing();
- }
-}
-
-TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant) {
- if (variant == "path_pattern") {
- return ES3PatternVariant::PathPattern;
- }
-
- if (variant == "file_pattern") {
- return ES3PatternVariant::FilePattern;
- }
-
- return Nothing();
+IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) {
+ return stream << "TListingRequest{.url=" << request.Url
+ << ",.Prefix=" << request.Prefix
+ << ",.Pattern=" << request.Pattern
+ << ",.PatternType=" << request.PatternType
+ << ",.Token=<some token with length " << request.Token.length() << ">}";
}
namespace {
using namespace NThreading;
-using TResultFilter =
+using TPathFilter =
std::function<bool(const TString& path, std::vector<TString>& matchedGlobs)>;
+using TEarlyStopChecker = std::function<bool(const TString& path)>;
-TResultFilter MakeFilterRegexp(const TString& regex) {
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex) {
auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
YQL_ENSURE(re->ok());
@@ -70,10 +56,10 @@ TResultFilter MakeFilterRegexp(const TString& regex) {
(*reArgsPtr)[i] = &(*reArgs)[i];
}
- return [groups,
- reArgs,
- reArgsPtr,
- re](const TString& path, std::vector<TString>& matchedGlobs) {
+ auto filter = [groups,
+ reArgs,
+ reArgsPtr,
+ re](const TString& path, std::vector<TString>& matchedGlobs) {
matchedGlobs.clear();
bool matched =
re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size());
@@ -85,16 +71,27 @@ TResultFilter MakeFilterRegexp(const TString& regex) {
}
return matched;
};
+
+ auto checker = [](const TString& path) {
+ Y_UNUSED(path);
+ return false;
+ };
+
+ return std::make_pair(std::move(filter), std::move(checker));
}
-TResultFilter MakeFilterWildcard(const TString& pattern) {
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& pattern) {
auto regexPatternPrefix = pattern.substr(0, NS3::GetFirstWildcardPos(pattern));
if (regexPatternPrefix == pattern) {
// just match for equality
- return [pattern](const TString& path, std::vector<TString>& matchedGlobs) {
+ auto filter = [pattern](const TString& path, std::vector<TString>& matchedGlobs) {
matchedGlobs.clear();
return path == pattern;
};
+
+ auto checker = [pattern](const TString& path) { return path > pattern; };
+
+ return std::make_pair(std::move(filter), std::move(checker));
}
const auto regex = NS3::RegexFromWildcards(pattern);
@@ -104,7 +101,7 @@ TResultFilter MakeFilterWildcard(const TString& pattern) {
return MakeFilterRegexp(regex);
}
-TResultFilter MakeFilter(const TString& pattern, ES3PatternType patternType) {
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilter(const TString& pattern, ES3PatternType patternType) {
switch (patternType) {
case ES3PatternType::Wildcard:
return MakeFilterWildcard(pattern);
@@ -168,7 +165,7 @@ public:
TLocalS3Lister(const TListingRequest& listingRequest, const TMaybe<TString>& delimiter)
: ListingRequest(listingRequest) {
Y_ENSURE(!delimiter.Defined(), "delimiter is not supported for local files");
- Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
+ Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType).first;
}
TFuture<TListResult> Next() override {
@@ -211,7 +208,7 @@ public:
private:
const TListingRequest ListingRequest;
- TResultFilter Filter;
+ TPathFilter Filter;
bool IsFirst = true;
};
@@ -219,7 +216,8 @@ class TS3Lister : public IS3Lister {
public:
struct TListingContext {
// Filter
- const TResultFilter Filter;
+ const TPathFilter Filter;
+ const TEarlyStopChecker EarlyStopChecker;
// Result processing
NThreading::TPromise<TListResult> Promise;
NThreading::TPromise<TMaybe<TListingContext>> NextRequestPromise;
@@ -244,10 +242,11 @@ public:
listingRequest.Url.substr(0, 7) != "file://",
"This lister does not support reading local files");
- auto filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
+ auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
auto ctx = TListingContext{
- filter,
+ std::move(filter),
+ std::move(checker),
NewPromise<TListResult>(),
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
@@ -323,6 +322,7 @@ private:
<< " entries, got another " << parsedResponse.KeyCount
<< " entries, request id: [" << ctx.RequestId << "]";
+ auto earlyStop = false;
for (const auto& content : parsedResponse.Contents) {
if (content.Path.EndsWith('/')) {
// skip 'directories'
@@ -335,6 +335,9 @@ private:
object.Size = content.Size;
object.MatchedGlobs.swap(matchedGlobs);
}
+ if (ctx.EarlyStopChecker(content.Path)) {
+ earlyStop = true;
+ }
}
for (const auto& prefix : parsedResponse.CommonPrefixes) {
auto& directory = ctx.Output->Directories.emplace_back();
@@ -342,13 +345,14 @@ private:
directory.MatchedRegexp = ctx.Filter(prefix, directory.MatchedGlobs);
}
- if (parsedResponse.IsTruncated) {
+ if (parsedResponse.IsTruncated && !earlyStop) {
YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
<< ctx.ListingRequest.Prefix
<< ": got truncated flag, will continue";
auto newCtx = TListingContext{
ctx.Filter,
+ ctx.EarlyStopChecker,
NewPromise<TListResult>(),
NewPromise<TMaybe<TListingContext>>(),
std::make_shared<TListEntries>(),
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
index 51344819493..d617568edff 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
@@ -23,10 +23,6 @@ namespace NS3Lister {
enum class ES3PatternVariant { FilePattern, PathPattern };
-TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant);
-
-TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant);
-
enum class ES3PatternType {
/**
* Pattern may include following wildcard expressions:
@@ -85,6 +81,8 @@ struct TListingRequest {
TString Prefix;
};
+IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request);
+
class IS3Lister : public TIterator<NThreading::TFuture<TListResult>> {
public:
using TPtr = std::shared_ptr<IS3Lister>;
@@ -97,4 +95,4 @@ IS3Lister::TPtr MakeS3Lister(
bool allowLocalFiles);
} // namespace NS3Lister
-} // namespace NYql \ No newline at end of file
+} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 08025dd1f43..56e2b38c606 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -470,14 +470,6 @@ private:
<< "Unknown listing option " << int(requests[0].Options)));
return TStatus::Error;
}
- auto maybeSerializedVariant = NS3Lister::SerializePatternVariant(pathPatternVariant);
- if (!maybeSerializedVariant.Defined()) {
- ctx.AddError(TIssue(
- ctx.GetPosition(read.Pos()),
- TStringBuilder() << "Couldn't serialize path pattern variant "
- << int(pathPatternVariant)));
- return TStatus::Error;
- }
settings.push_back(ctx.NewList(
settingsPos,
@@ -489,7 +481,7 @@ private:
settingsPos,
{
ctx.NewAtom(settingsPos, "pathpatternvariant"),
- ctx.NewAtom(settingsPos, *maybeSerializedVariant),
+ ctx.NewAtom(settingsPos, ToString(pathPatternVariant)),
}));
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
index 930ecd8199b..33c3470831e 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
@@ -58,14 +58,17 @@ public:
using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>(
const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options)>;
- TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod)
+ TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod, TString collectingName)
: Limit(limit)
- , ListerFactoryMethod(std::move(listerFactoryMethod)) { }
+ , ListerFactoryMethod(std::move(listerFactoryMethod))
+ , CollectingName(std::move(collectingName)) { }
TFuture<NS3Lister::TListResult> List(
const NS3Lister::TListingRequest& listingRequest,
ES3ListingOptions options) override {
Y_UNUSED(options);
+ YQL_CLOG(DEBUG, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName
+ << "] Going to list " << listingRequest;
auto futureLister = ListerFactoryMethod(listingRequest, options);
return futureLister.Apply([this, listingRequest, options](
const TFuture<NS3Lister::IS3Lister::TPtr>& lister) {
@@ -79,17 +82,23 @@ public:
private:
static inline auto MakeNewListingChunkHandler(
- NS3Lister::TListResult& state, size_t limit) {
- return [&state, limit](NS3Lister::TListEntries&& chunkEntries) {
+ NS3Lister::TListResult& state, size_t limit, TString collectingName) {
+ return [&state, limit, name = std::move(collectingName)](NS3Lister::TListEntries&& chunkEntries) {
auto& stateEntries = std::get<NS3Lister::TListEntries>(state);
if (stateEntries.Size() + chunkEntries.Size() > limit) {
- YQL_CLOG(INFO, ProviderS3)
- << "[TCollectingS3ListingStrategy] Collected "
+ YQL_CLOG(DEBUG, ProviderS3)
+ << "[TCollectingS3ListingStrategy:" << name
+ << "] Collected "
<< stateEntries.Size() + chunkEntries.Size()
<< " object paths which is more than limit " << limit;
state = TIssues{MakeLimitExceededIssue()};
return EAggregationAction::Stop;
}
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TCollectingS3ListingStrategy:" << name
+ << "] Collected "
+ << stateEntries.Size() + chunkEntries.Size() << " entries. Listing limit "
+ << limit << " Listing continues. ";
stateEntries += std::move(chunkEntries);
return EAggregationAction::Proceed;
};
@@ -111,10 +120,10 @@ private:
return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>(
std::move(lister),
NS3Lister::TListResult{},
- [limit = Limit](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) {
+ [limit = Limit, name = CollectingName](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) {
return std::visit(
TOverloaded{
- std::move(MakeNewListingChunkHandler(state, limit)),
+ std::move(MakeNewListingChunkHandler(state, limit, std::move(name))),
std::move(MakeIssuesHandler(state))},
std::move(chunk));
},
@@ -124,6 +133,7 @@ private:
private:
const size_t Limit;
const TListerFactoryMethod ListerFactoryMethod;
+ const TString CollectingName;
};
class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
@@ -141,7 +151,8 @@ public:
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, Nothing(), allowLocalFiles);
- }) { }
+ },
+ "TFlatFileS3ListingStrategy") { }
};
class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
@@ -159,7 +170,8 @@ public:
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, "/", allowLocalFiles);
- }) { }
+ },
+ "TDirectoryS3ListingStrategy") { }
};
class TCompositeS3ListingStrategy : public IS3ListingStrategy {
@@ -404,7 +416,8 @@ public:
TDirectoryS3ListingStrategy{
listerFactory, httpGateway, limit, allowLocalFiles}});
return MakeFuture(std::move(ptr));
- }) { }
+ },
+ "TPartitionedDatasetS3ListingStrategy") { }
};
class TBFSDirectoryResolverIterator : public NS3Lister::IS3Lister {
@@ -440,6 +453,10 @@ public:
.Apply(
[this, sourcePrefix](const TFuture<NS3Lister::TListResult>& future)
-> NS3Lister::TListResult {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TBFSDirectoryResolverIterator] Got new listing result. Collected entries: "
+ << ReturnedSize + DirectoryPrefixQueue.size();
+
try {
auto& nextBatch = future.GetValue();
if (std::holds_alternative<TIssues>(nextBatch)) {
@@ -452,6 +469,7 @@ public:
auto currentListingTotalSize = ReturnedSize +
DirectoryPrefixQueue.size() +
listingResult.Size();
+
if (currentListingTotalSize > Limit) {
// Stop listing
result.Directories.push_back({.Path = sourcePrefix});
@@ -480,6 +498,7 @@ public:
DirectoryPrefixQueue.clear();
}
}
+
ReturnedSize += result.Size();
return NS3Lister::TListResult{result};
} catch (std::exception& e) {
@@ -554,7 +573,8 @@ public:
minParallelism,
limit});
return MakeFuture(std::move(ptr));
- }) { }
+ },
+ "TUnPartitionedDatasetS3ListingStrategy") {}
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
index b03bf0e0cc9..dfc929ef2a8 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
@@ -97,8 +97,7 @@ void UnitAssertListResultEquals(
Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
auto strategy = TCollectingS3ListingStrategy{
10,
- [](const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
@@ -115,11 +114,11 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
.Path = "a/b",
.Size = 15,
}}}})));
- }};
+ },
+ "TTest"};
auto actualResultFuture = strategy.List(
- NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
- ES3ListingOptions::NoOptions);
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, ES3ListingOptions::NoOptions);
auto expectedResult = NS3Lister::TListResult{NS3Lister::TListEntries{
.Objects = std::vector<NS3Lister::TObjectListEntry>{
NS3Lister::TObjectListEntry{
@@ -137,8 +136,7 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
auto strategy = TCollectingS3ListingStrategy{
1,
- [](const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
@@ -155,7 +153,8 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
.Path = "a/b",
.Size = 15,
}}}})));
- }};
+ },
+ "TTest"};
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
@@ -181,7 +180,8 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
.Size = 10,
}}},
TIssues{TIssue("TEST_ISSUE")}})));
- }};
+ },
+ "TTest"};
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
@@ -194,13 +194,13 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) {
auto strategy = TCollectingS3ListingStrategy{
10,
- [](const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
std::make_shared<TMockS3ExceptionLister>("EXCEPTION MESSAGE")));
- }};
+ },
+ "TTest"};
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 4a66190e238..c196900cf14 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -39,11 +39,20 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
FileSizeLimit = config.HasFileSizeLimit() ? config.GetFileSizeLimit() : 2_GB;
BlockFileSizeLimit = config.HasBlockFileSizeLimit() ? config.GetBlockFileSizeLimit() : 50_GB;
MaxFilesPerQuery = config.HasMaxFilesPerQuery() ? config.GetMaxFilesPerQuery() : 7000;
- MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery() ? config.GetMaxDiscoveryFilesPerQuery() : 9000;
- MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery() ? config.GetMaxDirectoriesAndFilesPerQuery() : 9000;
- MinDesiredDirectoriesOfFilesPerQuery = config.HasMinDesiredDirectoriesOfFilesPerQuery() ? config.GetMinDesiredDirectoriesOfFilesPerQuery() : 100;
- MaxReadSizePerQuery = config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB;
- MaxInflightListsPerQuery = config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1;
+ MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery()
+ ? config.GetMaxDiscoveryFilesPerQuery()
+ : 9000;
+ MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery()
+ ? config.GetMaxDirectoriesAndFilesPerQuery()
+ : 9000;
+ MinDesiredDirectoriesOfFilesPerQuery =
+ config.HasMinDesiredDirectoriesOfFilesPerQuery()
+ ? config.GetMinDesiredDirectoriesOfFilesPerQuery()
+ : 100;
+ MaxReadSizePerQuery =
+ config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB;
+ MaxInflightListsPerQuery =
+ config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1;
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {