aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-04-03 22:59:30 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-04-03 22:59:30 +0300
commitd332a860c0bf1820f449964d1e0f01731aa15d63 (patch)
tree0ec62670bf8d3c00633815de9d0a7a8c7ad55340
parent2c8dffd1a8e3b4bd1671b6159bcfd0c27ef92147 (diff)
downloadydb-d332a860c0bf1820f449964d1e0f01731aa15d63.tar.gz
Concurrent directory lister
Initial working draft
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp114
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto1
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp143
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.h71
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp42
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp607
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h29
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp36
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h8
11 files changed, 842 insertions, 214 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 26c76850471..2860d5ce3e3 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -53,7 +53,7 @@ int curlTrace(CURL *handle, curl_infotype type,
default:
return 0;
}
-
+
Cerr << sb;
return 0;
@@ -88,11 +88,32 @@ public:
DELETE
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
- : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0'), DnsCache(dnsCache), Url(url)
- {
- InitHandles();
- Counter->Inc();
+ TEasyCurl(
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes,
+ TString url,
+ IHTTPGateway::THeaders headers,
+ EMethod method,
+ size_t offset = 0ULL,
+ size_t sizeLimit = 0,
+ size_t bodySize = 0,
+ const TCurlInitConfig& config = TCurlInitConfig(),
+ TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
+ : Headers(headers)
+ , Method(method)
+ , Offset(offset)
+ , SizeLimit(sizeLimit)
+ , BodySize(bodySize)
+ , Counter(counter)
+ , DownloadedBytes(downloadedBytes)
+ , UploadedBytes(uploadedBytes)
+ , Config(config)
+ , ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0')
+ , DnsCache(dnsCache)
+ , Url(url) {
+ InitHandles();
+ Counter->Inc();
}
virtual ~TEasyCurl() {
@@ -124,6 +145,8 @@ public:
// does nothing if CURLOPT_VERBOSE is not set to 1
curl_easy_setopt(Handle, CURLOPT_DEBUGFUNCTION, curlTrace);
+ // We can do this because we are using async DNS resolver (c-ares). https://curl.se/libcurl/c/CURLOPT_NOSIGNAL.html
+ curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L);
// for local debug only
// will print tokens in HTTP headers
@@ -146,8 +169,14 @@ public:
}
if (!Headers.empty()) {
- CurlHeaders = std::accumulate(Headers.cbegin(), Headers.cend(), CurlHeaders,
- std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2)));
+ CurlHeaders = std::accumulate(
+ Headers.cbegin(),
+ Headers.cend(),
+ CurlHeaders,
+ std::bind(
+ &curl_slist_append,
+ std::placeholders::_1,
+ std::bind(&TString::c_str, std::placeholders::_2)));
curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, CurlHeaders);
}
@@ -254,15 +283,69 @@ public:
using TPtr = std::shared_ptr<TEasyCurlBuffer>;
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
- TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config), std::move(dnsCache)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState))
- {
+ TEasyCurlBuffer(
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
+ TString url,
+ EMethod method,
+ TString data,
+ IHTTPGateway::THeaders headers,
+ size_t offset,
+ size_t sizeLimit,
+ IHTTPGateway::TOnResult callback,
+ IRetryPolicy<long>::IRetryState::TPtr retryState,
+ const TCurlInitConfig& config = TCurlInitConfig(),
+ TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
+ : TEasyCurl(
+ counter,
+ downloadedBytes,
+ uploadededBytes,
+ url,
+ headers,
+ method,
+ offset,
+ sizeLimit,
+ data.size(),
+ std::move(config),
+ std::move(dnsCache))
+ , Data(std::move(data))
+ , Input(Data)
+ , Output(Buffer)
+ , HeaderOutput(Header)
+ , RetryState(std::move(retryState)) {
Output.Reserve(sizeLimit);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) {
- return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config), std::move(dnsCache));
+ static TPtr Make(
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
+ TString url,
+ EMethod method,
+ TString data,
+ IHTTPGateway::THeaders headers,
+ size_t offset,
+ size_t sizeLimit,
+ IHTTPGateway::TOnResult callback,
+ IRetryPolicy<long>::IRetryState::TPtr retryState,
+ const TCurlInitConfig& config = TCurlInitConfig(),
+ TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) {
+ return std::make_shared<TEasyCurlBuffer>(
+ counter,
+ downloadedBytes,
+ uploadededBytes,
+ std::move(url),
+ method,
+ std::move(data),
+ std::move(headers),
+ offset,
+ sizeLimit,
+ std::move(callback),
+ std::move(retryState),
+ std::move(config),
+ std::move(dnsCache));
}
// return true if callback successfully added to this work
@@ -712,7 +795,6 @@ private:
}
Allocated.erase(it);
}
-
}
if (easy) {
easy->Done(result, httpResponseCode);
@@ -724,7 +806,7 @@ private:
{
const std::unique_lock lock(Sync);
- for (const auto& item : Allocated) {
+ for (auto& item : Allocated) {
works.emplace(std::move(item.second));
}
@@ -970,7 +1052,7 @@ IHTTPGateway::THeaders IHTTPGateway::MakeYcHeaders(const TString& requestId, con
if (!contentType.empty()) {
result.push_back(TString("Content-Type:") + contentType);
}
-
+
return result;
}
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 00459ad4eaa..3c425c3ece8 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -385,6 +385,7 @@ message TS3GatewayConfig {
optional uint64 MaxInflightListsPerQuery = 7;
optional uint64 MaxDirectoriesAndFilesPerQuery = 8;
optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9;
+ optional bool AllowConcurrentListings = 11 [default = false];
repeated TAttr DefaultSettings = 100;
}
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
index 085fd9f971f..5331b838d2b 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -7,8 +7,6 @@
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
-#include <contrib/libs/re2/re2/re2.h>
-
#ifdef THROW
#undef THROW
#endif
@@ -39,9 +37,13 @@ using TPathFilter =
std::function<bool(const TString& path, std::vector<TString>& matchedGlobs)>;
using TEarlyStopChecker = std::function<bool(const TString& path)>;
-std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex) {
- auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
- YQL_ENSURE(re->ok());
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex, const TSharedListingContextPtr& sharedCtx) {
+ std::shared_ptr<RE2> re;
+ if (sharedCtx) {
+ sharedCtx->GetOrCreate(regex);
+ } else {
+ re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
+ }
const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(DEBUG, ProviderS3)
@@ -80,7 +82,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex)
return std::make_pair(std::move(filter), std::move(checker));
}
-std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& pattern) {
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& pattern, const TSharedListingContextPtr& sharedCtx) {
auto regexPatternPrefix = pattern.substr(0, NS3::GetFirstWildcardPos(pattern));
if (regexPatternPrefix == pattern) {
// just match for equality
@@ -98,15 +100,15 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt
YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
<< regex << "' from original pattern '" << pattern << "'";
- return MakeFilterRegexp(regex);
+ return MakeFilterRegexp(regex, sharedCtx);
}
-std::pair<TPathFilter, TEarlyStopChecker> MakeFilter(const TString& pattern, ES3PatternType patternType) {
+std::pair<TPathFilter, TEarlyStopChecker> MakeFilter(const TString& pattern, ES3PatternType patternType, const TSharedListingContextPtr& sharedCtx) {
switch (patternType) {
case ES3PatternType::Wildcard:
- return MakeFilterWildcard(pattern);
+ return MakeFilterWildcard(pattern, sharedCtx);
case ES3PatternType::Regexp:
- return MakeFilterRegexp(pattern);
+ return MakeFilterRegexp(pattern, sharedCtx);
default:
ythrow yexception() << "Unknown 'patternType': " << int(patternType);
}
@@ -165,7 +167,8 @@ public:
TLocalS3Lister(const TListingRequest& listingRequest, const TMaybe<TString>& delimiter)
: ListingRequest(listingRequest) {
Y_ENSURE(!delimiter.Defined(), "delimiter is not supported for local files");
- Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType).first;
+ Filter =
+ MakeFilter(listingRequest.Pattern, listingRequest.PatternType, nullptr).first;
}
TFuture<TListResult> Next() override {
@@ -215,6 +218,7 @@ private:
class TS3Lister : public IS3Lister {
public:
struct TListingContext {
+ TSharedListingContextPtr SharedCtx;
// Filter
const TPathFilter Filter;
const TEarlyStopChecker EarlyStopChecker;
@@ -236,15 +240,17 @@ public:
IHTTPGateway::TPtr httpGateway,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
- size_t maxFilesPerQuery = 1000)
+ size_t maxFilesPerQuery = 1000,
+ TSharedListingContextPtr sharedCtx = nullptr)
: MaxFilesPerQuery(maxFilesPerQuery) {
Y_ENSURE(
listingRequest.Url.substr(0, 7) != "file://",
"This lister does not support reading local files");
- auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
+ auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx);
auto ctx = TListingContext{
+ std::move(sharedCtx),
std::move(filter),
std::move(checker),
NewPromise<TListResult>(),
@@ -295,13 +301,24 @@ private:
ythrow yexception() << "Gateway disappeared";
}
+
+ auto sharedCtx = ctx.SharedCtx;
auto retryPolicy = ctx.RetryPolicy;
+ auto callback = CallbackFactoryMethod(std::move(ctx));
gateway->Download(
urlBuilder.Build(),
headers,
0U,
0U,
- CallbackFactoryMethod(std::move(ctx)),
+ [sharedCtx = std::move(sharedCtx),
+ cb = std::move(callback)](IHTTPGateway::TResult&& result) {
+ if (sharedCtx) {
+ sharedCtx->SubmitCallbackProcessing(
+ [cb, &result]() { cb(std::move(result)); });
+ } else {
+ cb(std::move(result));
+ }
+ },
/*data=*/"",
retryPolicy);
}
@@ -355,18 +372,19 @@ private:
<< ": got truncated flag, will continue";
auto newCtx = TListingContext{
- ctx.Filter,
- ctx.EarlyStopChecker,
- NewPromise<TListResult>(),
- NewPromise<TMaybe<TListingContext>>(),
- std::make_shared<TListEntries>(),
- ctx.GatewayWeak,
- GetHTTPDefaultRetryPolicy(),
- CreateGuidAsString(),
- ctx.ListingRequest,
- ctx.Delimiter,
- parsedResponse.ContinuationToken,
- parsedResponse.MaxKeys};
+ ctx.SharedCtx,
+ ctx.Filter,
+ ctx.EarlyStopChecker,
+ NewPromise<TListResult>(),
+ NewPromise<TMaybe<TListingContext>>(),
+ std::make_shared<TListEntries>(),
+ ctx.GatewayWeak,
+ GetHTTPDefaultRetryPolicy(),
+ CreateGuidAsString(),
+ ctx.ListingRequest,
+ ctx.Delimiter,
+ parsedResponse.ContinuationToken,
+ parsedResponse.MaxKeys};
ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx));
} else {
@@ -416,15 +434,74 @@ private:
TFuture<TMaybe<TListingContext>> NextRequestCtx;
};
+class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
+public:
+ using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
+
+ explicit TS3ParallelLimitedListerFactory(
+ size_t maxParallelOps = 1, TSharedListingContextPtr sharedCtx = nullptr)
+ : SharedCtx(std::move(sharedCtx))
+ , Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
+
+ TFuture<NS3Lister::IS3Lister::TPtr> Make(
+ const IHTTPGateway::TPtr& httpGateway,
+ const NS3Lister::TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles) override {
+ auto acquired = Semaphore->AcquireAsync();
+ return acquired.Apply(
+ [ctx = SharedCtx, httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) {
+ return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
+ NS3Lister::MakeS3Lister(
+ httpGateway, listingRequest, delimiter, allowLocalFiles, ctx),
+ std::make_unique<TAsyncSemaphore::TAutoRelease>(
+ f.GetValue()->MakeAutoRelease())});
+ });
+ }
+
+private:
+ class TListerLockReleaseWrapper : public NS3Lister::IS3Lister {
+ public:
+ using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>;
+
+ TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock)
+ : ListerPtr(std::move(listerPtr))
+ , Lock(std::move(lock)) {
+ if (ListerPtr == nullptr) {
+ Lock.reset();
+ }
+ }
+
+ TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); }
+ bool HasNext() override {
+ auto hasNext = ListerPtr->HasNext();
+ if (!hasNext) {
+ Lock.reset();
+ }
+ return ListerPtr->HasNext();
+ }
+
+ private:
+ NS3Lister::IS3Lister::TPtr ListerPtr;
+ TLockPtr Lock;
+ };
+
+private:
+ TSharedListingContextPtr SharedCtx;
+ const TAsyncSemaphore::TPtr Semaphore;
+};
+
} // namespace
IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
- bool allowLocalFiles) {
+ bool allowLocalFiles,
+ TSharedListingContextPtr sharedCtx) {
if (listingRequest.Url.substr(0, 7) != "file://") {
- return std::make_shared<TS3Lister>(httpGateway, listingRequest, delimiter);
+ return std::make_shared<TS3Lister>(
+ httpGateway, listingRequest, delimiter, 1000, std::move(sharedCtx));
}
if (!allowLocalFiles) {
@@ -434,4 +511,14 @@ IS3Lister::TPtr MakeS3Lister(
return std::make_shared<TLocalS3Lister>(listingRequest, delimiter);
}
+IS3ListerFactory::TPtr MakeS3ListerFactory(
+ size_t maxParallelOps,
+ size_t callbackThreadCount,
+ size_t callbackPerThreadQueueSize,
+ size_t regexpCacheSize) {
+ auto sharedCtx = std::make_shared<TSharedListingContext>(
+ callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
+ return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps);
+}
+
} // namespace NYql::NS3Lister
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
index d617568edff..94c8e86eacd 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
@@ -1,8 +1,12 @@
#pragma once
+#include <library/cpp/cache/cache.h>
#include <library/cpp/threading/future/future.h>
+#include <util/thread/pool.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <contrib/libs/re2/re2/re2.h>
+
#include <memory>
#include <variant>
#include <vector>
@@ -37,6 +41,51 @@ enum class ES3PatternType {
Regexp
};
+class TSharedListingContext {
+public:
+ TSharedListingContext(
+ size_t callbackThreadCount, size_t callbackPerThreadQueueSize, size_t regexpCacheSize)
+ : RegexpCache(regexpCacheSize) {
+ CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize);
+ }
+
+ template<typename F>
+ void SubmitCallbackProcessing(F&& f) {
+ if (!CallbackProcessingPool.AddFunc(std::forward<F>(f))) {
+ f();
+ }
+ }
+
+ std::shared_ptr<RE2> GetOrCreate(const TString& regexp) {
+ if (auto it = Get(regexp); it != nullptr) {
+ return it;
+ } else {
+ auto re = std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options());
+ Y_ENSURE(re->ok());
+ auto wLock = TWriteGuard{RWLock};
+ RegexpCache.Insert(regexp, re);
+ return re;
+ }
+ }
+ std::shared_ptr<RE2> Get(const TString& regexp) {
+ auto lock = TReadGuard{RWLock};
+ if (auto it = RegexpCache.Find(regexp); it != RegexpCache.End()) {
+ return *it;
+ } else {
+ return nullptr;
+ }
+ }
+
+ ~TSharedListingContext() { CallbackProcessingPool.Stop(); }
+
+private:
+ TThreadPool CallbackProcessingPool;
+ TLRUCache<TString, std::shared_ptr<RE2>> RegexpCache;
+ TRWMutex RWLock;
+};
+
+using TSharedListingContextPtr = std::shared_ptr<TSharedListingContext>;
+
struct TObjectListEntry {
TString Path;
ui64 Size = 0;
@@ -92,7 +141,27 @@ IS3Lister::TPtr MakeS3Lister(
const IHTTPGateway::TPtr& httpGateway,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
- bool allowLocalFiles);
+ bool allowLocalFiles,
+ TSharedListingContextPtr sharedCtx = nullptr);
+
+class IS3ListerFactory {
+public:
+ using TPtr = std::shared_ptr<IS3ListerFactory>;
+
+ virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make(
+ const IHTTPGateway::TPtr& httpGateway,
+ const NS3Lister::TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles) = 0;
+
+ virtual ~IS3ListerFactory() = default;
+};
+
+IS3ListerFactory::TPtr MakeS3ListerFactory(
+ size_t maxParallelOps,
+ size_t callbackThreadCount,
+ size_t callbackPerThreadQueueSize,
+ size_t regexpCacheSize);
} // namespace NS3Lister
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
index 992572bcc26..b462249515d 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h
@@ -72,6 +72,6 @@ struct IPathGenerator {
using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>;
-TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 2000);
+TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 1'000'000);
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 9dec3e3059d..1d52ad54243 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -4,6 +4,7 @@
#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
@@ -57,7 +58,7 @@ using namespace NPathGenerator;
struct TListRequest {
NS3Lister::TListingRequest S3Request;
TString FilePattern;
- ES3ListingOptions Options = ES3ListingOptions::NoOptions;
+ TS3ListingOptions Options;
TVector<IPathGenerator::TColumnWithValue> ColumnValues;
};
@@ -79,14 +80,16 @@ public:
TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway)
: State_(std::move(state))
, ListerFactory_(
- MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery))
+ NS3Lister::MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery, 1, 100, 100))
, ListingStrategy_(MakeS3ListingStrategy(
gateway,
ListerFactory_,
State_->Configuration->MaxDiscoveryFilesPerQuery,
State_->Configuration->MaxDirectoriesAndFilesPerQuery,
State_->Configuration->MinDesiredDirectoriesOfFilesPerQuery,
- State_->Configuration->AllowLocalFiles)) { }
+ State_->Configuration->MaxInflightListsPerQuery,
+ State_->Configuration->AllowLocalFiles)) {
+ }
void Rewind() final {
PendingRequests_.clear();
@@ -459,18 +462,12 @@ private:
if (needsListingOnActors) {
TString pathPattern;
NS3Lister::ES3PatternVariant pathPatternVariant;
- if (requests[0].Options == ES3ListingOptions::UnPartitionedDataset) {
- pathPattern = requests[0].S3Request.Pattern;
- pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern;
- } else if (requests[0].Options == ES3ListingOptions::PartitionedDataset) {
+ if (requests[0].Options.IsPartitionedDataset) {
pathPattern = requests[0].FilePattern;
pathPatternVariant = NS3Lister::ES3PatternVariant::FilePattern;
} else {
- ctx.AddError(TIssue(
- ctx.GetPosition(read.Pos()),
- TStringBuilder()
- << "Unknown listing option " << int(requests[0].Options)));
- return TStatus::Error;
+ pathPattern = requests[0].S3Request.Pattern;
+ pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern;
}
settings.push_back(ctx.NewList(
@@ -597,7 +594,12 @@ private:
.Prefix = dir.Path}};
auto future = ListingStrategy_->List(
- req.S3Request, ES3ListingOptions::UnPartitionedDataset);
+ req.S3Request,
+ TS3ListingOptions{
+ .IsPartitionedDataset = false,
+ .IsConcurrentListing =
+ State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse(
+ State_->Configuration->AllowConcurrentListings)});
RequestsByNode_[source.Raw()].push_back(req);
PendingRequests_[req] = future;
@@ -728,9 +730,13 @@ private:
// each path in CONCAT() can generate multiple list requests for explicit partitioning
TVector<TListRequest> reqs;
+ auto isConcurrentListingEnabled =
+ State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse(
+ State_->Configuration->AllowConcurrentListings);
auto req = TListRequest{
.S3Request{.Url = url, .Token = tokenStr},
- .FilePattern = effectiveFilePattern};
+ .FilePattern = effectiveFilePattern,
+ .Options{.IsConcurrentListing = isConcurrentListingEnabled}};
if (partitionedBy.empty()) {
if (path.empty()) {
@@ -752,7 +758,7 @@ private:
req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard;
req.S3Request.Prefix = req.S3Request.Pattern.substr(
0, NS3::GetFirstWildcardPos(req.S3Request.Pattern));
- req.Options = ES3ListingOptions::UnPartitionedDataset;
+ req.Options.IsPartitionedDataset = false;
reqs.push_back(req);
} else {
if (NS3::HasWildcards(path)) {
@@ -784,7 +790,7 @@ private:
generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern);
req.S3Request.Pattern = generated;
req.S3Request.PatternType = NS3Lister::ES3PatternType::Regexp;
- req.Options = ES3ListingOptions::PartitionedDataset;
+ req.Options.IsPartitionedDataset = true;
reqs.push_back(req);
} else {
for (auto& rule : config.Generator->GetRules()) {
@@ -795,7 +801,7 @@ private:
req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard;
req.S3Request.Prefix = req.S3Request.Pattern.substr(
0, NS3::GetFirstWildcardPos(req.S3Request.Pattern));
- req.Options = ES3ListingOptions::PartitionedDataset;
+ req.Options.IsPartitionedDataset = true;
reqs.push_back(req);
}
}
@@ -830,7 +836,7 @@ private:
}
const TS3State::TPtr State_;
- const IS3ListerFactory::TPtr ListerFactory_;
+ const NS3Lister::IS3ListerFactory::TPtr ListerFactory_;
const IS3ListingStrategy::TPtr ListingStrategy_;
TPendingRequests PendingRequests_;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
index 33c3470831e..fd2fec6fd65 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
@@ -17,22 +17,10 @@
#include <utility>
namespace NYql {
-
-IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option) {
- switch (option) {
- case ES3ListingOptions::NoOptions:
- stream << "[NoOptions]";
- break;
- case ES3ListingOptions::PartitionedDataset:
- stream << "[PartitionedDataset]";
- break;
- case ES3ListingOptions::UnPartitionedDataset:
- stream << "[UnPartitionedDataset]";
- break;
- default:
- ythrow yexception() << "Undefined option: " << int(option);
- }
- return stream;
+IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& options) {
+ return stream << "TS3ListingOptions{.IsPartitionedDataset="
+ << options.IsPartitionedDataset
+ << ",.IsConcurrentListing=" << options.IsConcurrentListing << "}";
}
namespace {
@@ -56,7 +44,7 @@ bool IsRecoverableIssue(const TIssues& issues) {
class TCollectingS3ListingStrategy : public IS3ListingStrategy {
public:
using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>(
- const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options)>;
+ const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options)>;
TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod, TString collectingName)
: Limit(limit)
@@ -65,9 +53,9 @@ public:
TFuture<NS3Lister::TListResult> List(
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) override {
+ const TS3ListingOptions& options) override {
Y_UNUSED(options);
- YQL_CLOG(DEBUG, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName
+ YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName
<< "] Going to list " << listingRequest;
auto futureLister = ListerFactoryMethod(listingRequest, options);
return futureLister.Apply([this, listingRequest, options](
@@ -86,7 +74,7 @@ private:
return [&state, limit, name = std::move(collectingName)](NS3Lister::TListEntries&& chunkEntries) {
auto& stateEntries = std::get<NS3Lister::TListEntries>(state);
if (stateEntries.Size() + chunkEntries.Size() > limit) {
- YQL_CLOG(DEBUG, ProviderS3)
+ YQL_CLOG(TRACE, ProviderS3)
<< "[TCollectingS3ListingStrategy:" << name
<< "] Collected "
<< stateEntries.Size() + chunkEntries.Size()
@@ -95,8 +83,7 @@ private:
return EAggregationAction::Stop;
}
YQL_CLOG(TRACE, ProviderS3)
- << "[TCollectingS3ListingStrategy:" << name
- << "] Collected "
+ << "[TCollectingS3ListingStrategy:" << name << "] Collected "
<< stateEntries.Size() + chunkEntries.Size() << " entries. Listing limit "
<< limit << " Listing continues. ";
stateEntries += std::move(chunkEntries);
@@ -115,7 +102,7 @@ private:
return EAggregationAction::Stop;
}
TFuture<NS3Lister::TListResult> DoListCallback(
- NS3Lister::IS3Lister::TPtr lister, ES3ListingOptions options) const {
+ NS3Lister::IS3Lister::TPtr lister, TS3ListingOptions options) const {
Y_UNUSED(options);
return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>(
std::move(lister),
@@ -139,7 +126,7 @@ private:
class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TFlatFileS3ListingStrategy(
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t limit,
bool allowLocalFiles)
@@ -147,7 +134,7 @@ public:
limit,
[allowLocalFiles, httpGateway, listerFactory](
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, Nothing(), allowLocalFiles);
@@ -158,7 +145,7 @@ public:
class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TDirectoryS3ListingStrategy(
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t limit,
bool allowLocalFiles)
@@ -166,7 +153,7 @@ public:
limit,
[allowLocalFiles, httpGateway, listerFactory](
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
httpGateway, listingRequest, "/", allowLocalFiles);
@@ -183,7 +170,7 @@ public:
public:
TStrategyListIterator(
NS3Lister::TListingRequest listingRequest,
- ES3ListingOptions options,
+ TS3ListingOptions options,
TStrategyContainerPtr strategies)
: ListingRequest(std::move(listingRequest))
, Options(options)
@@ -206,7 +193,7 @@ public:
private:
const NS3Lister::TListingRequest ListingRequest;
- const ES3ListingOptions Options;
+ const TS3ListingOptions Options;
const TStrategyContainerPtr Strategies;
TStrategyContainer::const_iterator Iterator;
const TStrategyContainer::const_iterator End;
@@ -224,7 +211,7 @@ public:
TFuture<NS3Lister::TListResult> List(
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) override {
+ const TS3ListingOptions& options) override {
auto strategyListIterator =
std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies);
@@ -280,7 +267,7 @@ private:
class TConditionalS3ListingStrategy : public IS3ListingStrategy {
public:
- using TStrategyCondition = std::function<bool(ES3ListingOptions options)>;
+ using TStrategyCondition = std::function<bool(const TS3ListingOptions& options)>;
using TPair = std::pair<TStrategyCondition, std::shared_ptr<IS3ListingStrategy>>;
TConditionalS3ListingStrategy(std::initializer_list<TPair> list)
@@ -288,7 +275,7 @@ public:
TFuture<NS3Lister::TListResult> List(
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) override {
+ const TS3ListingOptions& options) override {
auto strategyIt = std::find_if(
Strategies.begin(), Strategies.end(), [options](const TPair& record) {
auto [condition, strategy] = record;
@@ -314,7 +301,7 @@ public:
PartitionedDirectoryResolverIterator(
NS3Lister::TListingRequest defaultParams,
const TString& basePrefix,
- ES3ListingOptions options,
+ TS3ListingOptions options,
TDirectoryS3ListingStrategy directoryListingStrategy)
: DefaultParams(std::move(defaultParams))
, Options(options)
@@ -390,7 +377,7 @@ public:
private:
const NS3Lister::TListingRequest DefaultParams;
- const ES3ListingOptions Options;
+ const TS3ListingOptions Options;
std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue;
std::vector<NS3Lister::TObjectListEntry> Objects;
TDirectoryS3ListingStrategy DirectoryListingStrategy;
@@ -399,7 +386,7 @@ public:
};
TPartitionedDatasetS3ListingStrategy(
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t limit,
bool allowLocalFiles)
@@ -407,7 +394,7 @@ public:
limit,
[listerFactory, httpGateway, limit, allowLocalFiles](
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ TS3ListingOptions options) {
auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
new PartitionedDirectoryResolverIterator{
listingRequest,
@@ -428,16 +415,16 @@ public:
TBFSDirectoryResolverIterator(
NS3Lister::TListingRequest defaultParams,
TListingRequestFactory listingRequestFactory,
- ES3ListingOptions options,
+ TS3ListingOptions options,
std::deque<TString> initialPathPrefixes,
TDirectoryS3ListingStrategy directoryListingStrategy,
size_t minParallelism,
size_t limit)
: DefaultParams(std::move(defaultParams))
, Options(options)
+ , DirectoryListingStrategy(std::move(directoryListingStrategy))
, ListingRequestFactory(std::move(listingRequestFactory))
, DirectoryPrefixQueue(std::move(initialPathPrefixes))
- , DirectoryListingStrategy(std::move(directoryListingStrategy))
, MinParallelism(minParallelism)
, Limit(limit) { }
@@ -528,12 +515,14 @@ private:
private:
const NS3Lister::TListingRequest DefaultParams;
- const ES3ListingOptions Options;
+ const TS3ListingOptions Options;
+ TDirectoryS3ListingStrategy DirectoryListingStrategy;
TListingRequestFactory ListingRequestFactory;
+
std::deque<TString> DirectoryPrefixQueue;
std::vector<NS3Lister::TObjectListEntry> Objects;
- TDirectoryS3ListingStrategy DirectoryListingStrategy;
TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk;
+
bool First = true;
size_t ReturnedSize = 0;
const size_t MinParallelism;
@@ -543,7 +532,7 @@ private:
class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TUnPartitionedDatasetS3ListingStrategy(
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t limit,
size_t minParallelism,
@@ -552,7 +541,7 @@ public:
limit,
[listerFactory, httpGateway, limit, minParallelism, allowLocalFiles](
const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ TS3ListingOptions options) {
auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
new TBFSDirectoryResolverIterator{
listingRequest,
@@ -577,96 +566,500 @@ public:
"TUnPartitionedDatasetS3ListingStrategy") {}
};
-
-class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
+// When data is collected result will be emitted (occurs once per iterator);
+class TConcurrentBFSDirectoryResolverIterator : public NS3Lister::IS3Lister {
public:
- using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
-
- explicit TS3ParallelLimitedListerFactory(size_t maxParallelOps = 1)
- : Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
+ using TListingRequestFactory = std::function<NS3Lister::TListingRequest(
+ const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>;
- TFuture<NS3Lister::IS3Lister::TPtr> Make(
- const IHTTPGateway::TPtr& httpGateway,
- const NS3Lister::TListingRequest& listingRequest,
- const TMaybe<TString>& delimiter,
- bool allowLocalFiles) override {
- auto acquired = Semaphore->AcquireAsync();
- return acquired.Apply(
- [httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) {
- return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
- NS3Lister::MakeS3Lister(
- httpGateway, listingRequest, delimiter, allowLocalFiles),
- std::make_unique<TAsyncSemaphore::TAutoRelease>(
- f.GetValue()->MakeAutoRelease())});
- });
- }
+ struct TSharedState {
+ using TLimitExceededStateModificator =
+ std::function<void(TSharedState& state, const TString& pathPrefix)>;
+ using TDirectoryToListMatcher =
+ std::function<bool(const NS3Lister::TDirectoryListEntry& entry)>;
+ using TEarlyStopMatcher = std::function<bool(const TSharedState& state)>;
-private:
- class TListerLockReleaseWrapper : public NS3Lister::IS3Lister {
+ // Initial params
+ const NS3Lister::TListingRequest DefaultParams;
+ const TS3ListingOptions Options;
+ TDirectoryS3ListingStrategy DirectoryListingStrategy;
+ const TListingRequestFactory ListingRequestFactory;
+ const TLimitExceededStateModificator LimitExceededStateModificator;
+ const TDirectoryToListMatcher DirectoryToListMatcher;
+ const TEarlyStopMatcher EarlyStopMatcher;
+ // Mutable state
+ std::mutex StateLock;
+ std::deque<TString> DirectoryPrefixQueue;
+ std::list<TString> InProgressPaths;
+ std::vector<TIssues> Issues;
+ std::vector<NS3Lister::TObjectListEntry> Objects;
+ std::vector<NS3Lister::TDirectoryListEntry> Directories;
+ std::vector<TFuture<NS3Lister::TListResult>> NextDirectoryListeningChunk;
+ // CurrentListing
+ TPromise<NS3Lister::TListResult> CurrentPromise;
+ bool IsListingFinished = false;
+ // Configuration
+ const size_t Limit = 1;
+ const size_t MaxParallelOps = 1;
+ //
+ std::weak_ptr<TSharedState> This;
public:
- using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>;
+ static void ListingCallback(
+ const std::weak_ptr<TSharedState>& stateWeakPtr,
+ const TFuture<NS3Lister::TListResult>& future,
+ const TString& sourcePath
+ ) {
+ auto state = stateWeakPtr.lock();
+ if (!state) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] No state" << sourcePath;
+ return;
+ }
+ YQL_CLOG(TRACE, ProviderS3) << "ListingCallback before lock";
+ auto lock = std::lock_guard(state->StateLock);
+
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] Got new listing result. Collected entries: "
+ << state->InProgressPaths.size() + state->DirectoryPrefixQueue.size() +
+ state->Objects.size() + state->Directories.size();
- TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock)
- : ListerPtr(std::move(listerPtr))
- , Lock(std::move(lock)) {
- if (ListerPtr == nullptr) {
- Lock.reset();
+ try {
+ state->RemovePathFromInProgress(sourcePath);
+
+ if (state->IsListingFinished) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] Listing finished - discarding results of listing for "
+ << sourcePath;
+ return;
+ }
+
+ auto& nextChunk = future.GetValue();
+ state->AddChunkToState(nextChunk, sourcePath);
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] Added listing to state ";
+ while (state->TryScheduleNextListing()) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] Scheduled new listing";
+ }
+ } catch (std::exception& e) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] An exception has happened - saving an issue";
+ state->Issues.emplace_back(TIssues{TIssue{e.what()}});
+ state->IsListingFinished = true;
}
+
+ state->TrySetPromise();
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] Callback end";
+ }
+ void RemovePathFromInProgress(const TString& path) {
+ Y_VERIFY(!InProgressPaths.empty());
+ auto sizeBeforeRemoval = InProgressPaths.size();
+ auto pos = std::find(InProgressPaths.begin(), InProgressPaths.end(), path);
+ Y_VERIFY(pos != InProgressPaths.end());
+ InProgressPaths.erase(pos);
+ Y_VERIFY(sizeBeforeRemoval == InProgressPaths.size() + 1);
}
+ void AddChunkToState(
+ const NS3Lister::TListResult& nextBatch,
+ const TString& sourcePath) {
+
+ if (std::holds_alternative<TIssues>(nextBatch)) {
+ IsListingFinished = true;
+ Issues.push_back(std::get<TIssues>(nextBatch));
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Issue";
+ return;
+ }
- TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); }
- bool HasNext() override {
- auto hasNext = ListerPtr->HasNext();
- if (!hasNext) {
- Lock.reset();
+ auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch);
+
+ auto currentListingTotalSize = InProgressPaths.size() +
+ DirectoryPrefixQueue.size() + Objects.size() +
+ Directories.size() + listingResult.Size();
+
+ // Process new listing result
+ if (currentListingTotalSize > Limit) {
+ IsListingFinished = true;
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit";
+ LimitExceededStateModificator(*this, sourcePath);
+ return;
+ }
+
+ Objects.insert(
+ Objects.end(),
+ std::make_move_iterator(listingResult.Objects.begin()),
+ std::make_move_iterator(listingResult.Objects.end()));
+
+ for (auto& directoryEntry : listingResult.Directories) {
+ if (DirectoryToListMatcher(directoryEntry)) {
+ DirectoryPrefixQueue.push_back(directoryEntry.Path);
+ } else {
+ Directories.push_back(directoryEntry);
+ }
+ }
+
+ if ((DirectoryPrefixQueue.empty() && InProgressPaths.empty()) ||
+ EarlyStopMatcher(*this)) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] AddToState listing is finished due to MinParallelism";
+ IsListingFinished = true;
}
- return ListerPtr->HasNext();
}
- private:
- NS3Lister::IS3Lister::TPtr ListerPtr;
- TLockPtr Lock;
+ void TrySetPromise() {
+ if (!IsListingFinished) {
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] TrySetPromise listing not finished";
+ return;
+ }
+ SetPromise();
+ }
+ void SetPromise() {
+ Y_ENSURE(IsListingFinished);
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise going to set promise";
+ NS3Lister::TListResult res;
+ if (!Issues.empty()) {
+ auto result = TIssues{};
+ for (auto& issues : Issues) {
+ result.AddIssues(issues);
+ }
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 1";
+ res = std::move(result);
+ } else {
+ // TODO: add verification
+ auto result = NS3Lister::TListEntries{.Objects = Objects};
+ for (auto& directoryPrefix : DirectoryPrefixQueue) {
+ result.Directories.push_back({.Path = directoryPrefix});
+ }
+ for (auto& directoryPrefix: InProgressPaths) {
+ result.Directories.push_back({.Path = directoryPrefix});
+ }
+ for (auto& directoryEntry : Directories) {
+ result.Directories.push_back(directoryEntry);
+ }
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 2";
+ res = std::move(result);
+ }
+
+ CurrentPromise.SetValue(res);
+ YQL_CLOG(DEBUG, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise promise was set";
+ }
+
+ bool TryScheduleNextListing() {
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] TryScheduleNextListing next listing";
+ if (IsListingFinished) {
+ return false;
+ }
+ if (InProgressPaths.size() >= MaxParallelOps) {
+ return false;
+ }
+ if (DirectoryPrefixQueue.empty()) {
+ return false;
+ }
+ ScheduleNextListing();
+ return true;
+ }
+ void ScheduleNextListing() {
+ Y_VERIFY(!DirectoryPrefixQueue.empty());
+ auto prefix = DirectoryPrefixQueue.front();
+ DirectoryPrefixQueue.pop_front();
+ InProgressPaths.push_back(prefix);
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] ScheduleNextListing next listing " << prefix;
+ const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix);
+
+ DirectoryListingStrategy.List(listingRequest, Options)
+ .Subscribe(
+ [prefix, self = This](const TFuture<NS3Lister::TListResult>& future) {
+ ListingCallback(self, future, prefix);
+ });
+ }
};
+ using TSharedStatePtr = std::shared_ptr<TSharedState>;
+ static TSharedStatePtr MakeState(
+ NS3Lister::TListingRequest defaultParams,
+ TListingRequestFactory listingRequestFactory,
+ TSharedState::TLimitExceededStateModificator limitExceededStateModificator,
+ TSharedState::TDirectoryToListMatcher directoryToListMatcher,
+ TSharedState::TEarlyStopMatcher earlyStopMatcher,
+ TS3ListingOptions options,
+ std::deque<TString> initialPathPrefixes,
+ TDirectoryS3ListingStrategy directoryListingStrategy,
+ size_t limit,
+ size_t maxParallelOps) {
+ auto res = TSharedStatePtr(new TSharedState{
+ .DefaultParams = (std::move(defaultParams)),
+ .Options = (options),
+ .DirectoryListingStrategy = (std::move(directoryListingStrategy)),
+ .ListingRequestFactory = (std::move(listingRequestFactory)),
+ .LimitExceededStateModificator = (std::move(limitExceededStateModificator)),
+ .DirectoryToListMatcher = (std::move(directoryToListMatcher)),
+ .EarlyStopMatcher = (std::move(earlyStopMatcher)),
+ .DirectoryPrefixQueue = (std::move(initialPathPrefixes)),
+ .CurrentPromise = NewPromise<NS3Lister::TListResult>(),
+ .Limit = (limit),
+ .MaxParallelOps = maxParallelOps});
+ res->This = res;
+ return res;
+ }
+
+ TConcurrentBFSDirectoryResolverIterator(
+ NS3Lister::TListingRequest defaultParams,
+ TListingRequestFactory listingRequestFactory,
+ TSharedState::TLimitExceededStateModificator limitExceededStateModificator,
+ TSharedState::TDirectoryToListMatcher directoryToListMatcher,
+ TSharedState::TEarlyStopMatcher earlyStopMatcher,
+ TS3ListingOptions options,
+ std::deque<TString> initialPathPrefixes,
+ TDirectoryS3ListingStrategy directoryListingStrategy,
+ size_t limit,
+ size_t maxParallelOps)
+ : State(MakeState(
+ std::move(defaultParams),
+ std::move(listingRequestFactory),
+ std::move(limitExceededStateModificator),
+ std::move(directoryToListMatcher),
+ std::move(earlyStopMatcher),
+ options,
+ std::move(initialPathPrefixes),
+ std::move(directoryListingStrategy),
+ limit,
+ maxParallelOps)) { }
+
+ TFuture<NS3Lister::TListResult> Next() override {
+ if (!First) {
+ return MakeFuture(NS3Lister::TListResult{TIssues{
+ TIssue{"This iterator should be finished after first iteration"}}});
+ }
+
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next before lock";
+ auto lock = std::lock_guard{State->StateLock};
+
+ First = false;
+ if (State->DirectoryPrefixQueue.empty()) {
+ return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ }
+
+ if (!State->IsListingFinished) {
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next listing is not finished ";
+ while (State->TryScheduleNextListing());
+ } else {
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next listing is finished - reading result ";
+ State->SetPromise();
+ }
+
+ return State->CurrentPromise;
+ }
+
+ bool HasNext() override {
+ YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] HasNext";
+ return (First & !State->DirectoryPrefixQueue.empty());
+ }
private:
- const TAsyncSemaphore::TPtr Semaphore;
+ std::shared_ptr<TSharedState> State;
+ bool First = true;
+};
+
+class TConcurrentUnPartitionedDatasetS3ListingStrategy :
+ public TCollectingS3ListingStrategy {
+public:
+ TConcurrentUnPartitionedDatasetS3ListingStrategy(
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ size_t minParallelism,
+ size_t maxParallelOps,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [listerFactory, httpGateway, limit, minParallelism, allowLocalFiles, maxParallelOps](
+ const NS3Lister::TListingRequest& listingRequest,
+ TS3ListingOptions options) {
+ auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ new TConcurrentBFSDirectoryResolverIterator{
+ listingRequest,
+ [](const NS3Lister::TListingRequest& defaultParams,
+ const TString& pathPrefix) {
+ NS3Lister::TListingRequest request(defaultParams);
+ request.Prefix = pathPrefix;
+ return request;
+ },
+ [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state,
+ const TString& pathPrefix) {
+ state.DirectoryPrefixQueue.push_back(pathPrefix);
+ },
+ [](const NS3Lister::TDirectoryListEntry& entry) -> bool {
+ Y_UNUSED(entry);
+ return true;
+ },
+ [minParallelism](const TConcurrentBFSDirectoryResolverIterator::TSharedState&
+ state) -> bool {
+ auto currentListedSize = state.InProgressPaths.size() +
+ state.DirectoryPrefixQueue.size() +
+ state.Objects.size() +
+ state.Directories.size();
+ return currentListedSize > minParallelism;
+ },
+ options,
+ std::deque<TString>{
+ (!listingRequest.Prefix.empty())
+ ? listingRequest.Prefix
+ : listingRequest.Pattern.substr(
+ 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
+ TDirectoryS3ListingStrategy{
+ listerFactory, httpGateway, limit, allowLocalFiles},
+ limit,
+ maxParallelOps});
+ return MakeFuture(std::move(ptr));
+ },
+ "TConcurrentUnPartitionedDatasetS3ListingStrategy") { }
};
+class TConcurrentPartitionedDatasetS3ListingStrategy :
+ public TCollectingS3ListingStrategy {
+public:
+ TConcurrentPartitionedDatasetS3ListingStrategy(
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ size_t maxParallelOps,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [listerFactory, httpGateway, limit, allowLocalFiles, maxParallelOps](
+ const NS3Lister::TListingRequest& listingRequest,
+ TS3ListingOptions options) {
+ auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ new TConcurrentBFSDirectoryResolverIterator{
+ listingRequest,
+ [](const NS3Lister::TListingRequest& defaultParams,
+ const TString& pathPrefix) {
+ NS3Lister::TListingRequest request(defaultParams);
+ request.Prefix = pathPrefix;
+ return request;
+ },
+ [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state,
+ const TString& pathPrefix) {
+ Y_UNUSED(pathPrefix);
+ state.Issues.push_back(TIssues{MakeLimitExceededIssue()});
+ },
+ [](const NS3Lister::TDirectoryListEntry& entry) -> bool {
+ return !entry.MatchedRegexp;
+ },
+ [](const TConcurrentBFSDirectoryResolverIterator::TSharedState&
+ state) -> bool {
+ Y_UNUSED(state);
+ return false;
+ },
+ options,
+ std::deque<TString>{
+ (!listingRequest.Prefix.empty())
+ ? listingRequest.Prefix
+ : listingRequest.Pattern.substr(
+ 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
+ TDirectoryS3ListingStrategy{
+ listerFactory, httpGateway, limit, allowLocalFiles},
+ limit,
+ maxParallelOps});
+ return MakeFuture(std::move(ptr));
+ },
+ "TConcurrentUnPartitionedDatasetS3ListingStrategy") { }
+};
+
+
+
} // namespace
-IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps) {
- return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps);
-}
+class TLoggingS3ListingStrategy : public IS3ListingStrategy {
+public:
+ explicit TLoggingS3ListingStrategy(IS3ListingStrategy::TPtr lister)
+ : Lister(std::move(lister)) { }
+
+ TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& listingRequest,
+ const TS3ListingOptions& options) override {
+ YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Going to list request "
+ << listingRequest << " with options " << options;
+ return Lister->List(listingRequest, options)
+ .Apply([start = TInstant::Now()](const TFuture<NS3Lister::TListResult>& future) {
+ auto duration = TInstant::Now() - start;
+ std::visit(
+ TOverloaded{
+ [duration](const NS3Lister::TListEntries& entries) {
+ YQL_CLOG(INFO, ProviderS3)
+ << "[TLoggingS3ListingStrategy] Listing took " << duration
+ << " and ended with " << entries.Size() << " entries";
+ },
+ [duration](const TIssues& issues) {
+ YQL_CLOG(INFO, ProviderS3)
+ << "[TLoggingS3ListingStrategy] Listing took " << duration
+ << " and ended with " << issues.Size() << " issues";
+ }},
+ future.GetValueSync());
+
+
+ return future;
+ });
+ }
+
+private:
+ IS3ListingStrategy::TPtr Lister;
+};
IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
ui64 maxFilesPerQueryFiles,
ui64 maxFilesPerQueryDirectory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
+ size_t maxParallelOps,
bool allowLocalFiles) {
- return std::make_shared<TCompositeS3ListingStrategy>(
- std::vector<std::shared_ptr<IS3ListingStrategy>>{
- std::make_shared<TFlatFileS3ListingStrategy>(
- listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles),
- std::make_shared<TConditionalS3ListingStrategy>(
- std::initializer_list<TConditionalS3ListingStrategy::TPair>{
- {[](ES3ListingOptions options) {
- return options == ES3ListingOptions::PartitionedDataset;
- },
- std::make_shared<TPartitionedDatasetS3ListingStrategy>(
- listerFactory, httpGateway, maxFilesPerQueryDirectory, allowLocalFiles)},
- {[](ES3ListingOptions options) {
- return options == ES3ListingOptions::UnPartitionedDataset;
- },
- std::make_shared<TUnPartitionedDatasetS3ListingStrategy>(
- listerFactory,
- httpGateway,
- maxFilesPerQueryDirectory,
- minDesiredDirectoriesOfFilesPerQuery,
- allowLocalFiles)},
- })});
+ return std::make_shared<TLoggingS3ListingStrategy>(
+ std::make_shared<TCompositeS3ListingStrategy>(
+ std::vector<std::shared_ptr<IS3ListingStrategy>>{
+ std::make_shared<TFlatFileS3ListingStrategy>(
+ listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles),
+ std::make_shared<TConditionalS3ListingStrategy>(
+ std::initializer_list<TConditionalS3ListingStrategy::TPair>{
+ {[](const TS3ListingOptions& options) {
+ return options.IsPartitionedDataset &&
+ !options.IsConcurrentListing;
+ },
+ std::make_shared<TPartitionedDatasetS3ListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxFilesPerQueryDirectory,
+ allowLocalFiles)},
+ {[](const TS3ListingOptions& options) {
+ return options.IsPartitionedDataset &&
+ options.IsConcurrentListing;
+ },
+ std::make_shared<TConcurrentPartitionedDatasetS3ListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxFilesPerQueryDirectory,
+ maxParallelOps,
+ allowLocalFiles)},
+ {[](const TS3ListingOptions& options) {
+ return !options.IsPartitionedDataset &&
+ !options.IsConcurrentListing;
+ },
+ std::make_shared<TUnPartitionedDatasetS3ListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxFilesPerQueryDirectory,
+ minDesiredDirectoriesOfFilesPerQuery,
+ allowLocalFiles)},
+ {[](const TS3ListingOptions& options) {
+ return !options.IsPartitionedDataset &&
+ options.IsConcurrentListing;
+ },
+ std::make_shared<TConcurrentUnPartitionedDatasetS3ListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxFilesPerQueryDirectory,
+ minDesiredDirectoriesOfFilesPerQuery,
+ maxParallelOps,
+ allowLocalFiles)}})}));
}
-
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h
index 4e3ef8627b4..e5c1416867f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h
@@ -11,45 +11,30 @@
namespace NYql {
-class IS3ListerFactory {
-public:
- using TPtr = std::shared_ptr<IS3ListerFactory>;
-
- virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make(
- const IHTTPGateway::TPtr& httpGateway,
- const NS3Lister::TListingRequest& listingRequest,
- const TMaybe<TString>& delimiter,
- bool allowLocalFiles) = 0;
-
- virtual ~IS3ListerFactory() = default;
-};
-
-IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps);
-
-enum class ES3ListingOptions : ui8 {
- NoOptions = 0,
- UnPartitionedDataset = 1,
- PartitionedDataset = 2
+struct TS3ListingOptions {
+ bool IsPartitionedDataset = false;
+ bool IsConcurrentListing = false;
};
-IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option);
+IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& options);
class IS3ListingStrategy {
public:
using TPtr = std::shared_ptr<IS3ListingStrategy>;
virtual NThreading::TFuture<NS3Lister::TListResult> List(
- const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) = 0;
+ const NS3Lister::TListingRequest& listingRequest, const TS3ListingOptions& options) = 0;
virtual ~IS3ListingStrategy() = default;
};
IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
- const IS3ListerFactory::TPtr& listerFactory,
+ const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
ui64 maxFilesPerQueryFiles,
ui64 maxFilesPerQueryDirectory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
+ size_t maxParallelOps,
bool allowLocalFiles);
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
index dfc929ef2a8..5df8e25272f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
@@ -6,7 +6,7 @@ namespace NYql {
class TMockS3Lister : public NS3Lister::IS3Lister {
public:
- TMockS3Lister(std::vector<NS3Lister::TListResult> batches)
+ explicit TMockS3Lister(std::vector<NS3Lister::TListResult> batches)
: Batches(std::move(batches)) { }
TFuture<NS3Lister::TListResult> Next() override {
@@ -29,7 +29,7 @@ private:
class TMockS3ExceptionLister : public NS3Lister::IS3Lister {
public:
- TMockS3ExceptionLister(TString exceptionMessage)
+ explicit TMockS3ExceptionLister(TString exceptionMessage)
: ExceptionMessage(std::move(exceptionMessage)) { }
TFuture<NS3Lister::TListResult> Next() override {
@@ -97,9 +97,10 @@ void UnitAssertListResultEquals(
Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
auto strategy = TCollectingS3ListingStrategy{
10,
- [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
- UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
NS3Lister::TListEntries{
@@ -118,7 +119,7 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
"TTest"};
auto actualResultFuture = strategy.List(
- NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, ES3ListingOptions::NoOptions);
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{});
auto expectedResult = NS3Lister::TListResult{NS3Lister::TListEntries{
.Objects = std::vector<NS3Lister::TObjectListEntry>{
NS3Lister::TObjectListEntry{
@@ -136,9 +137,10 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
auto strategy = TCollectingS3ListingStrategy{
1,
- [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
- UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
NS3Lister::TListEntries{
@@ -157,8 +159,7 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
"TTest"};
auto actualResultFuture = strategy.List(
- NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
- ES3ListingOptions::NoOptions);
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{});
auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}};
const auto& actualResult = actualResultFuture.GetValue();
UnitAssertListResultEquals(expectedResult, actualResult);
@@ -167,10 +168,10 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
auto strategy = TCollectingS3ListingStrategy{
1,
- [](const NS3Lister::TListingRequest& listingRequest,
- ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
- UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
NS3Lister::TListEntries{
@@ -184,8 +185,7 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
"TTest"};
auto actualResultFuture = strategy.List(
- NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
- ES3ListingOptions::NoOptions);
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{});
auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}};
const auto& actualResult = actualResultFuture.GetValue();
UnitAssertListResultEquals(expectedResult, actualResult);
@@ -194,17 +194,17 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) {
auto strategy = TCollectingS3ListingStrategy{
10,
- [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) {
+ [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) {
UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
- UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false);
+ UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false);
return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
std::make_shared<TMockS3ExceptionLister>("EXCEPTION MESSAGE")));
},
"TTest"};
auto actualResultFuture = strategy.List(
- NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
- ES3ListingOptions::NoOptions);
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{});
UNIT_ASSERT(actualResultFuture.HasValue());
auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}};
const auto& actualResult = actualResultFuture.GetValue();
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index 14ccf509cfb..a089e9385ed 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -20,6 +20,7 @@ TS3Configuration::TS3Configuration()
REGISTER_SETTING(*this, ArrowRowGroupReordering);
REGISTER_SETTING(*this, UseBlocksSource);
REGISTER_SETTING(*this, AtomicUploadCommit);
+ REGISTER_SETTING(*this, UseConcurrentDirectoryLister);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
@@ -54,6 +55,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB;
MaxInflightListsPerQuery =
config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1;
+ AllowConcurrentListings =
+ config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false;
TVector<TString> clusters(Reserve(config.ClusterMappingSize()));
for (auto& cluster: config.GetClusterMapping()) {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 155d32e6c8e..d936f88a314 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -19,9 +19,10 @@ struct TS3Settings {
NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000.
NCommon::TConfSetting<bool, false> ArrowThreadPool;
NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1
- NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
- NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode
- NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files
+ NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK
+ NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode
+ NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files
+ NCommon::TConfSetting<bool, false> UseConcurrentDirectoryLister;
};
struct TS3ClusterSettings {
@@ -52,6 +53,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
ui64 MaxReadSizePerQuery;
ui64 MaxInflightListsPerQuery;
bool AllowLocalFiles;
+ bool AllowConcurrentListings;
};
} // NYql