diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-04-03 22:59:30 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-04-03 22:59:30 +0300 |
commit | d332a860c0bf1820f449964d1e0f01731aa15d63 (patch) | |
tree | 0ec62670bf8d3c00633815de9d0a7a8c7ad55340 | |
parent | 2c8dffd1a8e3b4bd1671b6159bcfd0c27ef92147 (diff) | |
download | ydb-d332a860c0bf1820f449964d1e0f01731aa15d63.tar.gz |
Concurrent directory lister
Initial working draft
11 files changed, 842 insertions, 214 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 26c76850471..2860d5ce3e3 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -53,7 +53,7 @@ int curlTrace(CURL *handle, curl_infotype type, default: return 0; } - + Cerr << sb; return 0; @@ -88,11 +88,32 @@ public: DELETE }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, size_t sizeLimit = 0, size_t bodySize = 0, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) - : Headers(headers), Method(method), Offset(offset), SizeLimit(sizeLimit), BodySize(bodySize), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes), Config(config), ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0'), DnsCache(dnsCache), Url(url) - { - InitHandles(); - Counter->Inc(); + TEasyCurl( + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, + const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, + TString url, + IHTTPGateway::THeaders headers, + EMethod method, + size_t offset = 0ULL, + size_t sizeLimit = 0, + size_t bodySize = 0, + const TCurlInitConfig& config = TCurlInitConfig(), + TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) + : Headers(headers) + , Method(method) + , Offset(offset) + , SizeLimit(sizeLimit) + , BodySize(bodySize) + , Counter(counter) + , DownloadedBytes(downloadedBytes) + , UploadedBytes(uploadedBytes) + , Config(config) + , ErrorBuffer(static_cast<size_t>(CURL_ERROR_SIZE), '\0') + , DnsCache(dnsCache) + , Url(url) { + InitHandles(); + Counter->Inc(); } virtual ~TEasyCurl() { @@ -124,6 +145,8 @@ public: // does nothing if CURLOPT_VERBOSE is not set to 1 curl_easy_setopt(Handle, CURLOPT_DEBUGFUNCTION, curlTrace); + // We can do this because we are using async DNS resolver (c-ares). https://curl.se/libcurl/c/CURLOPT_NOSIGNAL.html + curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); // for local debug only // will print tokens in HTTP headers @@ -146,8 +169,14 @@ public: } if (!Headers.empty()) { - CurlHeaders = std::accumulate(Headers.cbegin(), Headers.cend(), CurlHeaders, - std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2))); + CurlHeaders = std::accumulate( + Headers.cbegin(), + Headers.cend(), + CurlHeaders, + std::bind( + &curl_slist_append, + std::placeholders::_1, + std::bind(&TString::c_str, std::placeholders::_2))); curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, CurlHeaders); } @@ -254,15 +283,69 @@ public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, sizeLimit, data.size(), std::move(config), std::move(dnsCache)), Data(std::move(data)), Input(Data), Output(Buffer), HeaderOutput(Header), RetryState(std::move(retryState)) - { + TEasyCurlBuffer( + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, + const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, + TString url, + EMethod method, + TString data, + IHTTPGateway::THeaders headers, + size_t offset, + size_t sizeLimit, + IHTTPGateway::TOnResult callback, + IRetryPolicy<long>::IRetryState::TPtr retryState, + const TCurlInitConfig& config = TCurlInitConfig(), + TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) + : TEasyCurl( + counter, + downloadedBytes, + uploadededBytes, + url, + headers, + method, + offset, + sizeLimit, + data.size(), + std::move(config), + std::move(dnsCache)) + , Data(std::move(data)) + , Input(Data) + , Output(Buffer) + , HeaderOutput(Header) + , RetryState(std::move(retryState)) { Output.Reserve(sizeLimit); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t sizeLimit, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, sizeLimit, std::move(callback), std::move(retryState), std::move(config), std::move(dnsCache)); + static TPtr Make( + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, + const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, + TString url, + EMethod method, + TString data, + IHTTPGateway::THeaders headers, + size_t offset, + size_t sizeLimit, + IHTTPGateway::TOnResult callback, + IRetryPolicy<long>::IRetryState::TPtr retryState, + const TCurlInitConfig& config = TCurlInitConfig(), + TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { + return std::make_shared<TEasyCurlBuffer>( + counter, + downloadedBytes, + uploadededBytes, + std::move(url), + method, + std::move(data), + std::move(headers), + offset, + sizeLimit, + std::move(callback), + std::move(retryState), + std::move(config), + std::move(dnsCache)); } // return true if callback successfully added to this work @@ -712,7 +795,6 @@ private: } Allocated.erase(it); } - } if (easy) { easy->Done(result, httpResponseCode); @@ -724,7 +806,7 @@ private: { const std::unique_lock lock(Sync); - for (const auto& item : Allocated) { + for (auto& item : Allocated) { works.emplace(std::move(item.second)); } @@ -970,7 +1052,7 @@ IHTTPGateway::THeaders IHTTPGateway::MakeYcHeaders(const TString& requestId, con if (!contentType.empty()) { result.push_back(TString("Content-Type:") + contentType); } - + return result; } diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 00459ad4eaa..3c425c3ece8 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -385,6 +385,7 @@ message TS3GatewayConfig { optional uint64 MaxInflightListsPerQuery = 7; optional uint64 MaxDirectoriesAndFilesPerQuery = 8; optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9; + optional bool AllowConcurrentListings = 11 [default = false]; repeated TAttr DefaultSettings = 100; } diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 085fd9f971f..5331b838d2b 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -7,8 +7,6 @@ #include <ydb/library/yql/utils/url_builder.h> #include <ydb/library/yql/utils/yql_panic.h> -#include <contrib/libs/re2/re2/re2.h> - #ifdef THROW #undef THROW #endif @@ -39,9 +37,13 @@ using TPathFilter = std::function<bool(const TString& path, std::vector<TString>& matchedGlobs)>; using TEarlyStopChecker = std::function<bool(const TString& path)>; -std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex) { - auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); - YQL_ENSURE(re->ok()); +std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex, const TSharedListingContextPtr& sharedCtx) { + std::shared_ptr<RE2> re; + if (sharedCtx) { + sharedCtx->GetOrCreate(regex); + } else { + re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); + } const size_t numGroups = re->NumberOfCapturingGroups(); YQL_CLOG(DEBUG, ProviderS3) @@ -80,7 +82,7 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex) return std::make_pair(std::move(filter), std::move(checker)); } -std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& pattern) { +std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& pattern, const TSharedListingContextPtr& sharedCtx) { auto regexPatternPrefix = pattern.substr(0, NS3::GetFirstWildcardPos(pattern)); if (regexPatternPrefix == pattern) { // just match for equality @@ -98,15 +100,15 @@ std::pair<TPathFilter, TEarlyStopChecker> MakeFilterWildcard(const TString& patt YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '" << regex << "' from original pattern '" << pattern << "'"; - return MakeFilterRegexp(regex); + return MakeFilterRegexp(regex, sharedCtx); } -std::pair<TPathFilter, TEarlyStopChecker> MakeFilter(const TString& pattern, ES3PatternType patternType) { +std::pair<TPathFilter, TEarlyStopChecker> MakeFilter(const TString& pattern, ES3PatternType patternType, const TSharedListingContextPtr& sharedCtx) { switch (patternType) { case ES3PatternType::Wildcard: - return MakeFilterWildcard(pattern); + return MakeFilterWildcard(pattern, sharedCtx); case ES3PatternType::Regexp: - return MakeFilterRegexp(pattern); + return MakeFilterRegexp(pattern, sharedCtx); default: ythrow yexception() << "Unknown 'patternType': " << int(patternType); } @@ -165,7 +167,8 @@ public: TLocalS3Lister(const TListingRequest& listingRequest, const TMaybe<TString>& delimiter) : ListingRequest(listingRequest) { Y_ENSURE(!delimiter.Defined(), "delimiter is not supported for local files"); - Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType).first; + Filter = + MakeFilter(listingRequest.Pattern, listingRequest.PatternType, nullptr).first; } TFuture<TListResult> Next() override { @@ -215,6 +218,7 @@ private: class TS3Lister : public IS3Lister { public: struct TListingContext { + TSharedListingContextPtr SharedCtx; // Filter const TPathFilter Filter; const TEarlyStopChecker EarlyStopChecker; @@ -236,15 +240,17 @@ public: IHTTPGateway::TPtr httpGateway, const TListingRequest& listingRequest, const TMaybe<TString>& delimiter, - size_t maxFilesPerQuery = 1000) + size_t maxFilesPerQuery = 1000, + TSharedListingContextPtr sharedCtx = nullptr) : MaxFilesPerQuery(maxFilesPerQuery) { Y_ENSURE( listingRequest.Url.substr(0, 7) != "file://", "This lister does not support reading local files"); - auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType); + auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx); auto ctx = TListingContext{ + std::move(sharedCtx), std::move(filter), std::move(checker), NewPromise<TListResult>(), @@ -295,13 +301,24 @@ private: ythrow yexception() << "Gateway disappeared"; } + + auto sharedCtx = ctx.SharedCtx; auto retryPolicy = ctx.RetryPolicy; + auto callback = CallbackFactoryMethod(std::move(ctx)); gateway->Download( urlBuilder.Build(), headers, 0U, 0U, - CallbackFactoryMethod(std::move(ctx)), + [sharedCtx = std::move(sharedCtx), + cb = std::move(callback)](IHTTPGateway::TResult&& result) { + if (sharedCtx) { + sharedCtx->SubmitCallbackProcessing( + [cb, &result]() { cb(std::move(result)); }); + } else { + cb(std::move(result)); + } + }, /*data=*/"", retryPolicy); } @@ -355,18 +372,19 @@ private: << ": got truncated flag, will continue"; auto newCtx = TListingContext{ - ctx.Filter, - ctx.EarlyStopChecker, - NewPromise<TListResult>(), - NewPromise<TMaybe<TListingContext>>(), - std::make_shared<TListEntries>(), - ctx.GatewayWeak, - GetHTTPDefaultRetryPolicy(), - CreateGuidAsString(), - ctx.ListingRequest, - ctx.Delimiter, - parsedResponse.ContinuationToken, - parsedResponse.MaxKeys}; + ctx.SharedCtx, + ctx.Filter, + ctx.EarlyStopChecker, + NewPromise<TListResult>(), + NewPromise<TMaybe<TListingContext>>(), + std::make_shared<TListEntries>(), + ctx.GatewayWeak, + GetHTTPDefaultRetryPolicy(), + CreateGuidAsString(), + ctx.ListingRequest, + ctx.Delimiter, + parsedResponse.ContinuationToken, + parsedResponse.MaxKeys}; ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx)); } else { @@ -416,15 +434,74 @@ private: TFuture<TMaybe<TListingContext>> NextRequestCtx; }; +class TS3ParallelLimitedListerFactory : public IS3ListerFactory { +public: + using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>; + + explicit TS3ParallelLimitedListerFactory( + size_t maxParallelOps = 1, TSharedListingContextPtr sharedCtx = nullptr) + : SharedCtx(std::move(sharedCtx)) + , Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { } + + TFuture<NS3Lister::IS3Lister::TPtr> Make( + const IHTTPGateway::TPtr& httpGateway, + const NS3Lister::TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles) override { + auto acquired = Semaphore->AcquireAsync(); + return acquired.Apply( + [ctx = SharedCtx, httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) { + return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{ + NS3Lister::MakeS3Lister( + httpGateway, listingRequest, delimiter, allowLocalFiles, ctx), + std::make_unique<TAsyncSemaphore::TAutoRelease>( + f.GetValue()->MakeAutoRelease())}); + }); + } + +private: + class TListerLockReleaseWrapper : public NS3Lister::IS3Lister { + public: + using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>; + + TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock) + : ListerPtr(std::move(listerPtr)) + , Lock(std::move(lock)) { + if (ListerPtr == nullptr) { + Lock.reset(); + } + } + + TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); } + bool HasNext() override { + auto hasNext = ListerPtr->HasNext(); + if (!hasNext) { + Lock.reset(); + } + return ListerPtr->HasNext(); + } + + private: + NS3Lister::IS3Lister::TPtr ListerPtr; + TLockPtr Lock; + }; + +private: + TSharedListingContextPtr SharedCtx; + const TAsyncSemaphore::TPtr Semaphore; +}; + } // namespace IS3Lister::TPtr MakeS3Lister( const IHTTPGateway::TPtr& httpGateway, const TListingRequest& listingRequest, const TMaybe<TString>& delimiter, - bool allowLocalFiles) { + bool allowLocalFiles, + TSharedListingContextPtr sharedCtx) { if (listingRequest.Url.substr(0, 7) != "file://") { - return std::make_shared<TS3Lister>(httpGateway, listingRequest, delimiter); + return std::make_shared<TS3Lister>( + httpGateway, listingRequest, delimiter, 1000, std::move(sharedCtx)); } if (!allowLocalFiles) { @@ -434,4 +511,14 @@ IS3Lister::TPtr MakeS3Lister( return std::make_shared<TLocalS3Lister>(listingRequest, delimiter); } +IS3ListerFactory::TPtr MakeS3ListerFactory( + size_t maxParallelOps, + size_t callbackThreadCount, + size_t callbackPerThreadQueueSize, + size_t regexpCacheSize) { + auto sharedCtx = std::make_shared<TSharedListingContext>( + callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize); + return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps); +} + } // namespace NYql::NS3Lister diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index d617568edff..94c8e86eacd 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -1,8 +1,12 @@ #pragma once +#include <library/cpp/cache/cache.h> #include <library/cpp/threading/future/future.h> +#include <util/thread/pool.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <contrib/libs/re2/re2/re2.h> + #include <memory> #include <variant> #include <vector> @@ -37,6 +41,51 @@ enum class ES3PatternType { Regexp }; +class TSharedListingContext { +public: + TSharedListingContext( + size_t callbackThreadCount, size_t callbackPerThreadQueueSize, size_t regexpCacheSize) + : RegexpCache(regexpCacheSize) { + CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize); + } + + template<typename F> + void SubmitCallbackProcessing(F&& f) { + if (!CallbackProcessingPool.AddFunc(std::forward<F>(f))) { + f(); + } + } + + std::shared_ptr<RE2> GetOrCreate(const TString& regexp) { + if (auto it = Get(regexp); it != nullptr) { + return it; + } else { + auto re = std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options()); + Y_ENSURE(re->ok()); + auto wLock = TWriteGuard{RWLock}; + RegexpCache.Insert(regexp, re); + return re; + } + } + std::shared_ptr<RE2> Get(const TString& regexp) { + auto lock = TReadGuard{RWLock}; + if (auto it = RegexpCache.Find(regexp); it != RegexpCache.End()) { + return *it; + } else { + return nullptr; + } + } + + ~TSharedListingContext() { CallbackProcessingPool.Stop(); } + +private: + TThreadPool CallbackProcessingPool; + TLRUCache<TString, std::shared_ptr<RE2>> RegexpCache; + TRWMutex RWLock; +}; + +using TSharedListingContextPtr = std::shared_ptr<TSharedListingContext>; + struct TObjectListEntry { TString Path; ui64 Size = 0; @@ -92,7 +141,27 @@ IS3Lister::TPtr MakeS3Lister( const IHTTPGateway::TPtr& httpGateway, const TListingRequest& listingRequest, const TMaybe<TString>& delimiter, - bool allowLocalFiles); + bool allowLocalFiles, + TSharedListingContextPtr sharedCtx = nullptr); + +class IS3ListerFactory { +public: + using TPtr = std::shared_ptr<IS3ListerFactory>; + + virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make( + const IHTTPGateway::TPtr& httpGateway, + const NS3Lister::TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles) = 0; + + virtual ~IS3ListerFactory() = default; +}; + +IS3ListerFactory::TPtr MakeS3ListerFactory( + size_t maxParallelOps, + size_t callbackThreadCount, + size_t callbackPerThreadQueueSize, + size_t regexpCacheSize); } // namespace NS3Lister } // namespace NYql diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h index 992572bcc26..b462249515d 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h @@ -72,6 +72,6 @@ struct IPathGenerator { using TPathGeneratorPtr = std::shared_ptr<const IPathGenerator>; -TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 2000); +TPathGeneratorPtr CreatePathGenerator(const TString& projection, const std::vector<TString>& partitionedBy, const TMap<TString, NUdf::EDataSlot>& columns = {}, size_t pathsLimit = 1'000'000); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 9dec3e3059d..1d52ad54243 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -4,6 +4,7 @@ #include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h> #include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> @@ -57,7 +58,7 @@ using namespace NPathGenerator; struct TListRequest { NS3Lister::TListingRequest S3Request; TString FilePattern; - ES3ListingOptions Options = ES3ListingOptions::NoOptions; + TS3ListingOptions Options; TVector<IPathGenerator::TColumnWithValue> ColumnValues; }; @@ -79,14 +80,16 @@ public: TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway) : State_(std::move(state)) , ListerFactory_( - MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery)) + NS3Lister::MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery, 1, 100, 100)) , ListingStrategy_(MakeS3ListingStrategy( gateway, ListerFactory_, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxDirectoriesAndFilesPerQuery, State_->Configuration->MinDesiredDirectoriesOfFilesPerQuery, - State_->Configuration->AllowLocalFiles)) { } + State_->Configuration->MaxInflightListsPerQuery, + State_->Configuration->AllowLocalFiles)) { + } void Rewind() final { PendingRequests_.clear(); @@ -459,18 +462,12 @@ private: if (needsListingOnActors) { TString pathPattern; NS3Lister::ES3PatternVariant pathPatternVariant; - if (requests[0].Options == ES3ListingOptions::UnPartitionedDataset) { - pathPattern = requests[0].S3Request.Pattern; - pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern; - } else if (requests[0].Options == ES3ListingOptions::PartitionedDataset) { + if (requests[0].Options.IsPartitionedDataset) { pathPattern = requests[0].FilePattern; pathPatternVariant = NS3Lister::ES3PatternVariant::FilePattern; } else { - ctx.AddError(TIssue( - ctx.GetPosition(read.Pos()), - TStringBuilder() - << "Unknown listing option " << int(requests[0].Options))); - return TStatus::Error; + pathPattern = requests[0].S3Request.Pattern; + pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern; } settings.push_back(ctx.NewList( @@ -597,7 +594,12 @@ private: .Prefix = dir.Path}}; auto future = ListingStrategy_->List( - req.S3Request, ES3ListingOptions::UnPartitionedDataset); + req.S3Request, + TS3ListingOptions{ + .IsPartitionedDataset = false, + .IsConcurrentListing = + State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse( + State_->Configuration->AllowConcurrentListings)}); RequestsByNode_[source.Raw()].push_back(req); PendingRequests_[req] = future; @@ -728,9 +730,13 @@ private: // each path in CONCAT() can generate multiple list requests for explicit partitioning TVector<TListRequest> reqs; + auto isConcurrentListingEnabled = + State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse( + State_->Configuration->AllowConcurrentListings); auto req = TListRequest{ .S3Request{.Url = url, .Token = tokenStr}, - .FilePattern = effectiveFilePattern}; + .FilePattern = effectiveFilePattern, + .Options{.IsConcurrentListing = isConcurrentListingEnabled}}; if (partitionedBy.empty()) { if (path.empty()) { @@ -752,7 +758,7 @@ private: req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard; req.S3Request.Prefix = req.S3Request.Pattern.substr( 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern)); - req.Options = ES3ListingOptions::UnPartitionedDataset; + req.Options.IsPartitionedDataset = false; reqs.push_back(req); } else { if (NS3::HasWildcards(path)) { @@ -784,7 +790,7 @@ private: generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern); req.S3Request.Pattern = generated; req.S3Request.PatternType = NS3Lister::ES3PatternType::Regexp; - req.Options = ES3ListingOptions::PartitionedDataset; + req.Options.IsPartitionedDataset = true; reqs.push_back(req); } else { for (auto& rule : config.Generator->GetRules()) { @@ -795,7 +801,7 @@ private: req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard; req.S3Request.Prefix = req.S3Request.Pattern.substr( 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern)); - req.Options = ES3ListingOptions::PartitionedDataset; + req.Options.IsPartitionedDataset = true; reqs.push_back(req); } } @@ -830,7 +836,7 @@ private: } const TS3State::TPtr State_; - const IS3ListerFactory::TPtr ListerFactory_; + const NS3Lister::IS3ListerFactory::TPtr ListerFactory_; const IS3ListingStrategy::TPtr ListingStrategy_; TPendingRequests PendingRequests_; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index 33c3470831e..fd2fec6fd65 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -17,22 +17,10 @@ #include <utility> namespace NYql { - -IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option) { - switch (option) { - case ES3ListingOptions::NoOptions: - stream << "[NoOptions]"; - break; - case ES3ListingOptions::PartitionedDataset: - stream << "[PartitionedDataset]"; - break; - case ES3ListingOptions::UnPartitionedDataset: - stream << "[UnPartitionedDataset]"; - break; - default: - ythrow yexception() << "Undefined option: " << int(option); - } - return stream; +IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& options) { + return stream << "TS3ListingOptions{.IsPartitionedDataset=" + << options.IsPartitionedDataset + << ",.IsConcurrentListing=" << options.IsConcurrentListing << "}"; } namespace { @@ -56,7 +44,7 @@ bool IsRecoverableIssue(const TIssues& issues) { class TCollectingS3ListingStrategy : public IS3ListingStrategy { public: using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>( - const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options)>; + const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options)>; TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod, TString collectingName) : Limit(limit) @@ -65,9 +53,9 @@ public: TFuture<NS3Lister::TListResult> List( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) override { + const TS3ListingOptions& options) override { Y_UNUSED(options); - YQL_CLOG(DEBUG, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName + YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName << "] Going to list " << listingRequest; auto futureLister = ListerFactoryMethod(listingRequest, options); return futureLister.Apply([this, listingRequest, options]( @@ -86,7 +74,7 @@ private: return [&state, limit, name = std::move(collectingName)](NS3Lister::TListEntries&& chunkEntries) { auto& stateEntries = std::get<NS3Lister::TListEntries>(state); if (stateEntries.Size() + chunkEntries.Size() > limit) { - YQL_CLOG(DEBUG, ProviderS3) + YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << name << "] Collected " << stateEntries.Size() + chunkEntries.Size() @@ -95,8 +83,7 @@ private: return EAggregationAction::Stop; } YQL_CLOG(TRACE, ProviderS3) - << "[TCollectingS3ListingStrategy:" << name - << "] Collected " + << "[TCollectingS3ListingStrategy:" << name << "] Collected " << stateEntries.Size() + chunkEntries.Size() << " entries. Listing limit " << limit << " Listing continues. "; stateEntries += std::move(chunkEntries); @@ -115,7 +102,7 @@ private: return EAggregationAction::Stop; } TFuture<NS3Lister::TListResult> DoListCallback( - NS3Lister::IS3Lister::TPtr lister, ES3ListingOptions options) const { + NS3Lister::IS3Lister::TPtr lister, TS3ListingOptions options) const { Y_UNUSED(options); return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>( std::move(lister), @@ -139,7 +126,7 @@ private: class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy { public: TFlatFileS3ListingStrategy( - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t limit, bool allowLocalFiles) @@ -147,7 +134,7 @@ public: limit, [allowLocalFiles, httpGateway, listerFactory]( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) { + TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( httpGateway, listingRequest, Nothing(), allowLocalFiles); @@ -158,7 +145,7 @@ public: class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy { public: TDirectoryS3ListingStrategy( - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t limit, bool allowLocalFiles) @@ -166,7 +153,7 @@ public: limit, [allowLocalFiles, httpGateway, listerFactory]( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) { + TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( httpGateway, listingRequest, "/", allowLocalFiles); @@ -183,7 +170,7 @@ public: public: TStrategyListIterator( NS3Lister::TListingRequest listingRequest, - ES3ListingOptions options, + TS3ListingOptions options, TStrategyContainerPtr strategies) : ListingRequest(std::move(listingRequest)) , Options(options) @@ -206,7 +193,7 @@ public: private: const NS3Lister::TListingRequest ListingRequest; - const ES3ListingOptions Options; + const TS3ListingOptions Options; const TStrategyContainerPtr Strategies; TStrategyContainer::const_iterator Iterator; const TStrategyContainer::const_iterator End; @@ -224,7 +211,7 @@ public: TFuture<NS3Lister::TListResult> List( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) override { + const TS3ListingOptions& options) override { auto strategyListIterator = std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies); @@ -280,7 +267,7 @@ private: class TConditionalS3ListingStrategy : public IS3ListingStrategy { public: - using TStrategyCondition = std::function<bool(ES3ListingOptions options)>; + using TStrategyCondition = std::function<bool(const TS3ListingOptions& options)>; using TPair = std::pair<TStrategyCondition, std::shared_ptr<IS3ListingStrategy>>; TConditionalS3ListingStrategy(std::initializer_list<TPair> list) @@ -288,7 +275,7 @@ public: TFuture<NS3Lister::TListResult> List( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) override { + const TS3ListingOptions& options) override { auto strategyIt = std::find_if( Strategies.begin(), Strategies.end(), [options](const TPair& record) { auto [condition, strategy] = record; @@ -314,7 +301,7 @@ public: PartitionedDirectoryResolverIterator( NS3Lister::TListingRequest defaultParams, const TString& basePrefix, - ES3ListingOptions options, + TS3ListingOptions options, TDirectoryS3ListingStrategy directoryListingStrategy) : DefaultParams(std::move(defaultParams)) , Options(options) @@ -390,7 +377,7 @@ public: private: const NS3Lister::TListingRequest DefaultParams; - const ES3ListingOptions Options; + const TS3ListingOptions Options; std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue; std::vector<NS3Lister::TObjectListEntry> Objects; TDirectoryS3ListingStrategy DirectoryListingStrategy; @@ -399,7 +386,7 @@ public: }; TPartitionedDatasetS3ListingStrategy( - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t limit, bool allowLocalFiles) @@ -407,7 +394,7 @@ public: limit, [listerFactory, httpGateway, limit, allowLocalFiles]( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) { + TS3ListingOptions options) { auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( new PartitionedDirectoryResolverIterator{ listingRequest, @@ -428,16 +415,16 @@ public: TBFSDirectoryResolverIterator( NS3Lister::TListingRequest defaultParams, TListingRequestFactory listingRequestFactory, - ES3ListingOptions options, + TS3ListingOptions options, std::deque<TString> initialPathPrefixes, TDirectoryS3ListingStrategy directoryListingStrategy, size_t minParallelism, size_t limit) : DefaultParams(std::move(defaultParams)) , Options(options) + , DirectoryListingStrategy(std::move(directoryListingStrategy)) , ListingRequestFactory(std::move(listingRequestFactory)) , DirectoryPrefixQueue(std::move(initialPathPrefixes)) - , DirectoryListingStrategy(std::move(directoryListingStrategy)) , MinParallelism(minParallelism) , Limit(limit) { } @@ -528,12 +515,14 @@ private: private: const NS3Lister::TListingRequest DefaultParams; - const ES3ListingOptions Options; + const TS3ListingOptions Options; + TDirectoryS3ListingStrategy DirectoryListingStrategy; TListingRequestFactory ListingRequestFactory; + std::deque<TString> DirectoryPrefixQueue; std::vector<NS3Lister::TObjectListEntry> Objects; - TDirectoryS3ListingStrategy DirectoryListingStrategy; TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk; + bool First = true; size_t ReturnedSize = 0; const size_t MinParallelism; @@ -543,7 +532,7 @@ private: class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { public: TUnPartitionedDatasetS3ListingStrategy( - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t limit, size_t minParallelism, @@ -552,7 +541,7 @@ public: limit, [listerFactory, httpGateway, limit, minParallelism, allowLocalFiles]( const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) { + TS3ListingOptions options) { auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( new TBFSDirectoryResolverIterator{ listingRequest, @@ -577,96 +566,500 @@ public: "TUnPartitionedDatasetS3ListingStrategy") {} }; - -class TS3ParallelLimitedListerFactory : public IS3ListerFactory { +// When data is collected result will be emitted (occurs once per iterator); +class TConcurrentBFSDirectoryResolverIterator : public NS3Lister::IS3Lister { public: - using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>; - - explicit TS3ParallelLimitedListerFactory(size_t maxParallelOps = 1) - : Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { } + using TListingRequestFactory = std::function<NS3Lister::TListingRequest( + const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>; - TFuture<NS3Lister::IS3Lister::TPtr> Make( - const IHTTPGateway::TPtr& httpGateway, - const NS3Lister::TListingRequest& listingRequest, - const TMaybe<TString>& delimiter, - bool allowLocalFiles) override { - auto acquired = Semaphore->AcquireAsync(); - return acquired.Apply( - [httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) { - return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{ - NS3Lister::MakeS3Lister( - httpGateway, listingRequest, delimiter, allowLocalFiles), - std::make_unique<TAsyncSemaphore::TAutoRelease>( - f.GetValue()->MakeAutoRelease())}); - }); - } + struct TSharedState { + using TLimitExceededStateModificator = + std::function<void(TSharedState& state, const TString& pathPrefix)>; + using TDirectoryToListMatcher = + std::function<bool(const NS3Lister::TDirectoryListEntry& entry)>; + using TEarlyStopMatcher = std::function<bool(const TSharedState& state)>; -private: - class TListerLockReleaseWrapper : public NS3Lister::IS3Lister { + // Initial params + const NS3Lister::TListingRequest DefaultParams; + const TS3ListingOptions Options; + TDirectoryS3ListingStrategy DirectoryListingStrategy; + const TListingRequestFactory ListingRequestFactory; + const TLimitExceededStateModificator LimitExceededStateModificator; + const TDirectoryToListMatcher DirectoryToListMatcher; + const TEarlyStopMatcher EarlyStopMatcher; + // Mutable state + std::mutex StateLock; + std::deque<TString> DirectoryPrefixQueue; + std::list<TString> InProgressPaths; + std::vector<TIssues> Issues; + std::vector<NS3Lister::TObjectListEntry> Objects; + std::vector<NS3Lister::TDirectoryListEntry> Directories; + std::vector<TFuture<NS3Lister::TListResult>> NextDirectoryListeningChunk; + // CurrentListing + TPromise<NS3Lister::TListResult> CurrentPromise; + bool IsListingFinished = false; + // Configuration + const size_t Limit = 1; + const size_t MaxParallelOps = 1; + // + std::weak_ptr<TSharedState> This; public: - using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>; + static void ListingCallback( + const std::weak_ptr<TSharedState>& stateWeakPtr, + const TFuture<NS3Lister::TListResult>& future, + const TString& sourcePath + ) { + auto state = stateWeakPtr.lock(); + if (!state) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] No state" << sourcePath; + return; + } + YQL_CLOG(TRACE, ProviderS3) << "ListingCallback before lock"; + auto lock = std::lock_guard(state->StateLock); + + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] Got new listing result. Collected entries: " + << state->InProgressPaths.size() + state->DirectoryPrefixQueue.size() + + state->Objects.size() + state->Directories.size(); - TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock) - : ListerPtr(std::move(listerPtr)) - , Lock(std::move(lock)) { - if (ListerPtr == nullptr) { - Lock.reset(); + try { + state->RemovePathFromInProgress(sourcePath); + + if (state->IsListingFinished) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] Listing finished - discarding results of listing for " + << sourcePath; + return; + } + + auto& nextChunk = future.GetValue(); + state->AddChunkToState(nextChunk, sourcePath); + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] Added listing to state "; + while (state->TryScheduleNextListing()) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] Scheduled new listing"; + } + } catch (std::exception& e) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] An exception has happened - saving an issue"; + state->Issues.emplace_back(TIssues{TIssue{e.what()}}); + state->IsListingFinished = true; } + + state->TrySetPromise(); + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] Callback end"; + } + void RemovePathFromInProgress(const TString& path) { + Y_VERIFY(!InProgressPaths.empty()); + auto sizeBeforeRemoval = InProgressPaths.size(); + auto pos = std::find(InProgressPaths.begin(), InProgressPaths.end(), path); + Y_VERIFY(pos != InProgressPaths.end()); + InProgressPaths.erase(pos); + Y_VERIFY(sizeBeforeRemoval == InProgressPaths.size() + 1); } + void AddChunkToState( + const NS3Lister::TListResult& nextBatch, + const TString& sourcePath) { + + if (std::holds_alternative<TIssues>(nextBatch)) { + IsListingFinished = true; + Issues.push_back(std::get<TIssues>(nextBatch)); + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Issue"; + return; + } - TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); } - bool HasNext() override { - auto hasNext = ListerPtr->HasNext(); - if (!hasNext) { - Lock.reset(); + auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch); + + auto currentListingTotalSize = InProgressPaths.size() + + DirectoryPrefixQueue.size() + Objects.size() + + Directories.size() + listingResult.Size(); + + // Process new listing result + if (currentListingTotalSize > Limit) { + IsListingFinished = true; + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit"; + LimitExceededStateModificator(*this, sourcePath); + return; + } + + Objects.insert( + Objects.end(), + std::make_move_iterator(listingResult.Objects.begin()), + std::make_move_iterator(listingResult.Objects.end())); + + for (auto& directoryEntry : listingResult.Directories) { + if (DirectoryToListMatcher(directoryEntry)) { + DirectoryPrefixQueue.push_back(directoryEntry.Path); + } else { + Directories.push_back(directoryEntry); + } + } + + if ((DirectoryPrefixQueue.empty() && InProgressPaths.empty()) || + EarlyStopMatcher(*this)) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] AddToState listing is finished due to MinParallelism"; + IsListingFinished = true; } - return ListerPtr->HasNext(); } - private: - NS3Lister::IS3Lister::TPtr ListerPtr; - TLockPtr Lock; + void TrySetPromise() { + if (!IsListingFinished) { + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] TrySetPromise listing not finished"; + return; + } + SetPromise(); + } + void SetPromise() { + Y_ENSURE(IsListingFinished); + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise going to set promise"; + NS3Lister::TListResult res; + if (!Issues.empty()) { + auto result = TIssues{}; + for (auto& issues : Issues) { + result.AddIssues(issues); + } + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 1"; + res = std::move(result); + } else { + // TODO: add verification + auto result = NS3Lister::TListEntries{.Objects = Objects}; + for (auto& directoryPrefix : DirectoryPrefixQueue) { + result.Directories.push_back({.Path = directoryPrefix}); + } + for (auto& directoryPrefix: InProgressPaths) { + result.Directories.push_back({.Path = directoryPrefix}); + } + for (auto& directoryEntry : Directories) { + result.Directories.push_back(directoryEntry); + } + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 2"; + res = std::move(result); + } + + CurrentPromise.SetValue(res); + YQL_CLOG(DEBUG, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise promise was set"; + } + + bool TryScheduleNextListing() { + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] TryScheduleNextListing next listing"; + if (IsListingFinished) { + return false; + } + if (InProgressPaths.size() >= MaxParallelOps) { + return false; + } + if (DirectoryPrefixQueue.empty()) { + return false; + } + ScheduleNextListing(); + return true; + } + void ScheduleNextListing() { + Y_VERIFY(!DirectoryPrefixQueue.empty()); + auto prefix = DirectoryPrefixQueue.front(); + DirectoryPrefixQueue.pop_front(); + InProgressPaths.push_back(prefix); + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] ScheduleNextListing next listing " << prefix; + const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix); + + DirectoryListingStrategy.List(listingRequest, Options) + .Subscribe( + [prefix, self = This](const TFuture<NS3Lister::TListResult>& future) { + ListingCallback(self, future, prefix); + }); + } }; + using TSharedStatePtr = std::shared_ptr<TSharedState>; + static TSharedStatePtr MakeState( + NS3Lister::TListingRequest defaultParams, + TListingRequestFactory listingRequestFactory, + TSharedState::TLimitExceededStateModificator limitExceededStateModificator, + TSharedState::TDirectoryToListMatcher directoryToListMatcher, + TSharedState::TEarlyStopMatcher earlyStopMatcher, + TS3ListingOptions options, + std::deque<TString> initialPathPrefixes, + TDirectoryS3ListingStrategy directoryListingStrategy, + size_t limit, + size_t maxParallelOps) { + auto res = TSharedStatePtr(new TSharedState{ + .DefaultParams = (std::move(defaultParams)), + .Options = (options), + .DirectoryListingStrategy = (std::move(directoryListingStrategy)), + .ListingRequestFactory = (std::move(listingRequestFactory)), + .LimitExceededStateModificator = (std::move(limitExceededStateModificator)), + .DirectoryToListMatcher = (std::move(directoryToListMatcher)), + .EarlyStopMatcher = (std::move(earlyStopMatcher)), + .DirectoryPrefixQueue = (std::move(initialPathPrefixes)), + .CurrentPromise = NewPromise<NS3Lister::TListResult>(), + .Limit = (limit), + .MaxParallelOps = maxParallelOps}); + res->This = res; + return res; + } + + TConcurrentBFSDirectoryResolverIterator( + NS3Lister::TListingRequest defaultParams, + TListingRequestFactory listingRequestFactory, + TSharedState::TLimitExceededStateModificator limitExceededStateModificator, + TSharedState::TDirectoryToListMatcher directoryToListMatcher, + TSharedState::TEarlyStopMatcher earlyStopMatcher, + TS3ListingOptions options, + std::deque<TString> initialPathPrefixes, + TDirectoryS3ListingStrategy directoryListingStrategy, + size_t limit, + size_t maxParallelOps) + : State(MakeState( + std::move(defaultParams), + std::move(listingRequestFactory), + std::move(limitExceededStateModificator), + std::move(directoryToListMatcher), + std::move(earlyStopMatcher), + options, + std::move(initialPathPrefixes), + std::move(directoryListingStrategy), + limit, + maxParallelOps)) { } + + TFuture<NS3Lister::TListResult> Next() override { + if (!First) { + return MakeFuture(NS3Lister::TListResult{TIssues{ + TIssue{"This iterator should be finished after first iteration"}}}); + } + + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next before lock"; + auto lock = std::lock_guard{State->StateLock}; + + First = false; + if (State->DirectoryPrefixQueue.empty()) { + return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + } + + if (!State->IsListingFinished) { + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next listing is not finished "; + while (State->TryScheduleNextListing()); + } else { + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next listing is finished - reading result "; + State->SetPromise(); + } + + return State->CurrentPromise; + } + + bool HasNext() override { + YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] HasNext"; + return (First & !State->DirectoryPrefixQueue.empty()); + } private: - const TAsyncSemaphore::TPtr Semaphore; + std::shared_ptr<TSharedState> State; + bool First = true; +}; + +class TConcurrentUnPartitionedDatasetS3ListingStrategy : + public TCollectingS3ListingStrategy { +public: + TConcurrentUnPartitionedDatasetS3ListingStrategy( + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + size_t minParallelism, + size_t maxParallelOps, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [listerFactory, httpGateway, limit, minParallelism, allowLocalFiles, maxParallelOps]( + const NS3Lister::TListingRequest& listingRequest, + TS3ListingOptions options) { + auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + new TConcurrentBFSDirectoryResolverIterator{ + listingRequest, + [](const NS3Lister::TListingRequest& defaultParams, + const TString& pathPrefix) { + NS3Lister::TListingRequest request(defaultParams); + request.Prefix = pathPrefix; + return request; + }, + [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state, + const TString& pathPrefix) { + state.DirectoryPrefixQueue.push_back(pathPrefix); + }, + [](const NS3Lister::TDirectoryListEntry& entry) -> bool { + Y_UNUSED(entry); + return true; + }, + [minParallelism](const TConcurrentBFSDirectoryResolverIterator::TSharedState& + state) -> bool { + auto currentListedSize = state.InProgressPaths.size() + + state.DirectoryPrefixQueue.size() + + state.Objects.size() + + state.Directories.size(); + return currentListedSize > minParallelism; + }, + options, + std::deque<TString>{ + (!listingRequest.Prefix.empty()) + ? listingRequest.Prefix + : listingRequest.Pattern.substr( + 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, + TDirectoryS3ListingStrategy{ + listerFactory, httpGateway, limit, allowLocalFiles}, + limit, + maxParallelOps}); + return MakeFuture(std::move(ptr)); + }, + "TConcurrentUnPartitionedDatasetS3ListingStrategy") { } }; +class TConcurrentPartitionedDatasetS3ListingStrategy : + public TCollectingS3ListingStrategy { +public: + TConcurrentPartitionedDatasetS3ListingStrategy( + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + size_t maxParallelOps, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [listerFactory, httpGateway, limit, allowLocalFiles, maxParallelOps]( + const NS3Lister::TListingRequest& listingRequest, + TS3ListingOptions options) { + auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + new TConcurrentBFSDirectoryResolverIterator{ + listingRequest, + [](const NS3Lister::TListingRequest& defaultParams, + const TString& pathPrefix) { + NS3Lister::TListingRequest request(defaultParams); + request.Prefix = pathPrefix; + return request; + }, + [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state, + const TString& pathPrefix) { + Y_UNUSED(pathPrefix); + state.Issues.push_back(TIssues{MakeLimitExceededIssue()}); + }, + [](const NS3Lister::TDirectoryListEntry& entry) -> bool { + return !entry.MatchedRegexp; + }, + [](const TConcurrentBFSDirectoryResolverIterator::TSharedState& + state) -> bool { + Y_UNUSED(state); + return false; + }, + options, + std::deque<TString>{ + (!listingRequest.Prefix.empty()) + ? listingRequest.Prefix + : listingRequest.Pattern.substr( + 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, + TDirectoryS3ListingStrategy{ + listerFactory, httpGateway, limit, allowLocalFiles}, + limit, + maxParallelOps}); + return MakeFuture(std::move(ptr)); + }, + "TConcurrentUnPartitionedDatasetS3ListingStrategy") { } +}; + + + } // namespace -IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps) { - return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps); -} +class TLoggingS3ListingStrategy : public IS3ListingStrategy { +public: + explicit TLoggingS3ListingStrategy(IS3ListingStrategy::TPtr lister) + : Lister(std::move(lister)) { } + + TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& listingRequest, + const TS3ListingOptions& options) override { + YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Going to list request " + << listingRequest << " with options " << options; + return Lister->List(listingRequest, options) + .Apply([start = TInstant::Now()](const TFuture<NS3Lister::TListResult>& future) { + auto duration = TInstant::Now() - start; + std::visit( + TOverloaded{ + [duration](const NS3Lister::TListEntries& entries) { + YQL_CLOG(INFO, ProviderS3) + << "[TLoggingS3ListingStrategy] Listing took " << duration + << " and ended with " << entries.Size() << " entries"; + }, + [duration](const TIssues& issues) { + YQL_CLOG(INFO, ProviderS3) + << "[TLoggingS3ListingStrategy] Listing took " << duration + << " and ended with " << issues.Size() << " issues"; + }}, + future.GetValueSync()); + + + return future; + }); + } + +private: + IS3ListingStrategy::TPtr Lister; +}; IS3ListingStrategy::TPtr MakeS3ListingStrategy( const IHTTPGateway::TPtr& httpGateway, - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, ui64 maxFilesPerQueryFiles, ui64 maxFilesPerQueryDirectory, ui64 minDesiredDirectoriesOfFilesPerQuery, + size_t maxParallelOps, bool allowLocalFiles) { - return std::make_shared<TCompositeS3ListingStrategy>( - std::vector<std::shared_ptr<IS3ListingStrategy>>{ - std::make_shared<TFlatFileS3ListingStrategy>( - listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles), - std::make_shared<TConditionalS3ListingStrategy>( - std::initializer_list<TConditionalS3ListingStrategy::TPair>{ - {[](ES3ListingOptions options) { - return options == ES3ListingOptions::PartitionedDataset; - }, - std::make_shared<TPartitionedDatasetS3ListingStrategy>( - listerFactory, httpGateway, maxFilesPerQueryDirectory, allowLocalFiles)}, - {[](ES3ListingOptions options) { - return options == ES3ListingOptions::UnPartitionedDataset; - }, - std::make_shared<TUnPartitionedDatasetS3ListingStrategy>( - listerFactory, - httpGateway, - maxFilesPerQueryDirectory, - minDesiredDirectoriesOfFilesPerQuery, - allowLocalFiles)}, - })}); + return std::make_shared<TLoggingS3ListingStrategy>( + std::make_shared<TCompositeS3ListingStrategy>( + std::vector<std::shared_ptr<IS3ListingStrategy>>{ + std::make_shared<TFlatFileS3ListingStrategy>( + listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles), + std::make_shared<TConditionalS3ListingStrategy>( + std::initializer_list<TConditionalS3ListingStrategy::TPair>{ + {[](const TS3ListingOptions& options) { + return options.IsPartitionedDataset && + !options.IsConcurrentListing; + }, + std::make_shared<TPartitionedDatasetS3ListingStrategy>( + listerFactory, + httpGateway, + maxFilesPerQueryDirectory, + allowLocalFiles)}, + {[](const TS3ListingOptions& options) { + return options.IsPartitionedDataset && + options.IsConcurrentListing; + }, + std::make_shared<TConcurrentPartitionedDatasetS3ListingStrategy>( + listerFactory, + httpGateway, + maxFilesPerQueryDirectory, + maxParallelOps, + allowLocalFiles)}, + {[](const TS3ListingOptions& options) { + return !options.IsPartitionedDataset && + !options.IsConcurrentListing; + }, + std::make_shared<TUnPartitionedDatasetS3ListingStrategy>( + listerFactory, + httpGateway, + maxFilesPerQueryDirectory, + minDesiredDirectoriesOfFilesPerQuery, + allowLocalFiles)}, + {[](const TS3ListingOptions& options) { + return !options.IsPartitionedDataset && + options.IsConcurrentListing; + }, + std::make_shared<TConcurrentUnPartitionedDatasetS3ListingStrategy>( + listerFactory, + httpGateway, + maxFilesPerQueryDirectory, + minDesiredDirectoriesOfFilesPerQuery, + maxParallelOps, + allowLocalFiles)}})})); } - } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h index 4e3ef8627b4..e5c1416867f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h @@ -11,45 +11,30 @@ namespace NYql { -class IS3ListerFactory { -public: - using TPtr = std::shared_ptr<IS3ListerFactory>; - - virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make( - const IHTTPGateway::TPtr& httpGateway, - const NS3Lister::TListingRequest& listingRequest, - const TMaybe<TString>& delimiter, - bool allowLocalFiles) = 0; - - virtual ~IS3ListerFactory() = default; -}; - -IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps); - -enum class ES3ListingOptions : ui8 { - NoOptions = 0, - UnPartitionedDataset = 1, - PartitionedDataset = 2 +struct TS3ListingOptions { + bool IsPartitionedDataset = false; + bool IsConcurrentListing = false; }; -IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option); +IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& options); class IS3ListingStrategy { public: using TPtr = std::shared_ptr<IS3ListingStrategy>; virtual NThreading::TFuture<NS3Lister::TListResult> List( - const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) = 0; + const NS3Lister::TListingRequest& listingRequest, const TS3ListingOptions& options) = 0; virtual ~IS3ListingStrategy() = default; }; IS3ListingStrategy::TPtr MakeS3ListingStrategy( const IHTTPGateway::TPtr& httpGateway, - const IS3ListerFactory::TPtr& listerFactory, + const NS3Lister::IS3ListerFactory::TPtr& listerFactory, ui64 maxFilesPerQueryFiles, ui64 maxFilesPerQueryDirectory, ui64 minDesiredDirectoriesOfFilesPerQuery, + size_t maxParallelOps, bool allowLocalFiles); } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp index dfc929ef2a8..5df8e25272f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp @@ -6,7 +6,7 @@ namespace NYql { class TMockS3Lister : public NS3Lister::IS3Lister { public: - TMockS3Lister(std::vector<NS3Lister::TListResult> batches) + explicit TMockS3Lister(std::vector<NS3Lister::TListResult> batches) : Batches(std::move(batches)) { } TFuture<NS3Lister::TListResult> Next() override { @@ -29,7 +29,7 @@ private: class TMockS3ExceptionLister : public NS3Lister::IS3Lister { public: - TMockS3ExceptionLister(TString exceptionMessage) + explicit TMockS3ExceptionLister(TString exceptionMessage) : ExceptionMessage(std::move(exceptionMessage)) { } TFuture<NS3Lister::TListResult> Next() override { @@ -97,9 +97,10 @@ void UnitAssertListResultEquals( Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) { auto strategy = TCollectingS3ListingStrategy{ 10, - [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) { + [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) { UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); - UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false); + UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false); return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ NS3Lister::TListEntries{ @@ -118,7 +119,7 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) { "TTest"}; auto actualResultFuture = strategy.List( - NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, ES3ListingOptions::NoOptions); + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{}); auto expectedResult = NS3Lister::TListResult{NS3Lister::TListEntries{ .Objects = std::vector<NS3Lister::TObjectListEntry>{ NS3Lister::TObjectListEntry{ @@ -136,9 +137,10 @@ Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) { Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) { auto strategy = TCollectingS3ListingStrategy{ 1, - [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) { + [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) { UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); - UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false); + UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false); return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ NS3Lister::TListEntries{ @@ -157,8 +159,7 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) { "TTest"}; auto actualResultFuture = strategy.List( - NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, - ES3ListingOptions::NoOptions); + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{}); auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}}; const auto& actualResult = actualResultFuture.GetValue(); UnitAssertListResultEquals(expectedResult, actualResult); @@ -167,10 +168,10 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) { Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) { auto strategy = TCollectingS3ListingStrategy{ 1, - [](const NS3Lister::TListingRequest& listingRequest, - ES3ListingOptions options) { + [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) { UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); - UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false); + UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false); return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ NS3Lister::TListEntries{ @@ -184,8 +185,7 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) { "TTest"}; auto actualResultFuture = strategy.List( - NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, - ES3ListingOptions::NoOptions); + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{}); auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}}; const auto& actualResult = actualResultFuture.GetValue(); UnitAssertListResultEquals(expectedResult, actualResult); @@ -194,17 +194,17 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) { Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) { auto strategy = TCollectingS3ListingStrategy{ 10, - [](const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) { + [](const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options) { UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); - UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + UNIT_ASSERT_VALUES_EQUAL(options.IsPartitionedDataset, false); + UNIT_ASSERT_VALUES_EQUAL(options.IsConcurrentListing, false); return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( std::make_shared<TMockS3ExceptionLister>("EXCEPTION MESSAGE"))); }, "TTest"}; auto actualResultFuture = strategy.List( - NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, - ES3ListingOptions::NoOptions); + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{}); UNIT_ASSERT(actualResultFuture.HasValue()); auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}}; const auto& actualResult = actualResultFuture.GetValue(); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 14ccf509cfb..a089e9385ed 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -20,6 +20,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, ArrowRowGroupReordering); REGISTER_SETTING(*this, UseBlocksSource); REGISTER_SETTING(*this, AtomicUploadCommit); + REGISTER_SETTING(*this, UseConcurrentDirectoryLister); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { @@ -54,6 +55,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB; MaxInflightListsPerQuery = config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1; + AllowConcurrentListings = + config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false; TVector<TString> clusters(Reserve(config.ClusterMappingSize())); for (auto& cluster: config.GetClusterMapping()) { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 155d32e6c8e..d936f88a314 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -19,9 +19,10 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. NCommon::TConfSetting<bool, false> ArrowThreadPool; NCommon::TConfSetting<ui64, false> ArrowParallelRowGroupCount; // Number of parquet row groups to read in parallel, min == 1 - NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK - NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode - NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files + NCommon::TConfSetting<bool, false> ArrowRowGroupReordering; // Allow to push rows from file in any order, default false, but usually it is OK + NCommon::TConfSetting<bool, false> UseBlocksSource; // Use blocks source (if exists) for scalar MKQL mode + NCommon::TConfSetting<bool, false> AtomicUploadCommit; // Commit each file independently, w/o transaction semantic over all files + NCommon::TConfSetting<bool, false> UseConcurrentDirectoryLister; }; struct TS3ClusterSettings { @@ -52,6 +53,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher ui64 MaxReadSizePerQuery; ui64 MaxInflightListsPerQuery; bool AllowLocalFiles; + bool AllowConcurrentListings; }; } // NYql |