diff options
author | aneporada <[email protected]> | 2022-09-14 13:12:18 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2022-09-14 13:12:18 +0300 |
commit | 6b586028e16e710686153609795466829c08c470 (patch) | |
tree | 487c09751686854ac91cd4f33e7e7284856e853a | |
parent | d6e8190159c9b88668658afdd7c796133f8f69ba (diff) |
[] Limit parallel list operations
-rw-r--r-- | ydb/library/yql/providers/s3/provider/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_list.cpp | 43 |
2 files changed, 43 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt index 1c97a77c087..f3605fe34b6 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt @@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC providers-s3-proto providers-s3-range_helpers library-yql-utils + yql-utils-threading ) target_sources(providers-s3-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp index 8d4b72500b1..bc88fe8234d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/threading/async_semaphore.h> #include <ydb/library/yql/utils/url_builder.h> #include <ydb/library/yql/utils/yql_panic.h> @@ -278,11 +279,51 @@ private: const ui64 MaxFilesPerQuery; }; +class TS3ParallelLimitedLister : public IS3Lister { +public: + explicit TS3ParallelLimitedLister(const IS3Lister::TPtr& lister, size_t maxParallelOps = 1) + : Lister(lister) + , Semaphore(TAsyncSemaphore::Make(maxParallelOps)) + {} + +private: + TFuture<TListResult> DoList(const TString& token, const TString& url, const TString& pattern, const TMaybe<TString>& pathPrefix) { + auto promise = NewPromise<TListResult>(); + auto future = promise.GetFuture(); + auto acquired = Semaphore->AcquireAsync(); + acquired.Subscribe([lister = Lister, promise, token, url, pattern, pathPrefix](const auto& f) { + auto lock = std::make_shared<TAsyncSemaphore::TAutoRelease>(f.GetValue()->MakeAutoRelease()); + TFuture<TListResult> listFuture = pathPrefix.Defined() ? + lister->ListRegex(token, url, pattern, *pathPrefix) : + lister->List(token, url, pattern); + listFuture.Subscribe([promise, lock](const auto& f) mutable { + try { + promise.SetValue(f.GetValue()); + } catch (...) { + promise.SetException(std::current_exception()); + } + }); + }); + return future; + } + + TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override { + return DoList(token, url, pattern, {}); + } + + TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override { + return DoList(token, url, pattern, pathPrefix); + } + + const IS3Lister::TPtr Lister; + const TAsyncSemaphore::TPtr Semaphore; +}; } IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery) { - return IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery)); + auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery)); + return IS3Lister::TPtr(new TS3ParallelLimitedLister(lister, 1)); } } |