summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2022-09-14 13:12:18 +0300
committeraneporada <[email protected]>2022-09-14 13:12:18 +0300
commit6b586028e16e710686153609795466829c08c470 (patch)
tree487c09751686854ac91cd4f33e7e7284856e853a
parentd6e8190159c9b88668658afdd7c796133f8f69ba (diff)
[] Limit parallel list operations
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp43
2 files changed, 43 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
index 1c97a77c087..f3605fe34b6 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
@@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC
providers-s3-proto
providers-s3-range_helpers
library-yql-utils
+ yql-utils-threading
)
target_sources(providers-s3-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
index 8d4b72500b1..bc88fe8234d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/threading/async_semaphore.h>
#include <ydb/library/yql/utils/url_builder.h>
#include <ydb/library/yql/utils/yql_panic.h>
@@ -278,11 +279,51 @@ private:
const ui64 MaxFilesPerQuery;
};
+class TS3ParallelLimitedLister : public IS3Lister {
+public:
+ explicit TS3ParallelLimitedLister(const IS3Lister::TPtr& lister, size_t maxParallelOps = 1)
+ : Lister(lister)
+ , Semaphore(TAsyncSemaphore::Make(maxParallelOps))
+ {}
+
+private:
+ TFuture<TListResult> DoList(const TString& token, const TString& url, const TString& pattern, const TMaybe<TString>& pathPrefix) {
+ auto promise = NewPromise<TListResult>();
+ auto future = promise.GetFuture();
+ auto acquired = Semaphore->AcquireAsync();
+ acquired.Subscribe([lister = Lister, promise, token, url, pattern, pathPrefix](const auto& f) {
+ auto lock = std::make_shared<TAsyncSemaphore::TAutoRelease>(f.GetValue()->MakeAutoRelease());
+ TFuture<TListResult> listFuture = pathPrefix.Defined() ?
+ lister->ListRegex(token, url, pattern, *pathPrefix) :
+ lister->List(token, url, pattern);
+ listFuture.Subscribe([promise, lock](const auto& f) mutable {
+ try {
+ promise.SetValue(f.GetValue());
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ }
+ });
+ });
+ return future;
+ }
+
+ TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override {
+ return DoList(token, url, pattern, {});
+ }
+
+ TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override {
+ return DoList(token, url, pattern, pathPrefix);
+ }
+
+ const IS3Lister::TPtr Lister;
+ const TAsyncSemaphore::TPtr Semaphore;
+};
}
IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery) {
- return IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery));
+ auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery));
+ return IS3Lister::TPtr(new TS3ParallelLimitedLister(lister, 1));
}
}