diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-04-06 15:32:15 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-04-06 15:32:15 +0300 |
commit | eeee0e3bdb0549f53330743aba8290870f53f553 (patch) | |
tree | 3e0d28e2a2f6038c7beadbd593b520599fd526ea | |
parent | a18251173ebfcd62901cddca4ed0c2f9029725c7 (diff) | |
download | ydb-eeee0e3bdb0549f53330743aba8290870f53f553.tar.gz |
Added extra configuration
Added extra configuration
7 files changed, 87 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 3c425c3ece..9c40b776f3 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -386,6 +386,9 @@ message TS3GatewayConfig { optional uint64 MaxDirectoriesAndFilesPerQuery = 8; optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9; optional bool AllowConcurrentListings = 11 [default = false]; + optional uint64 ListingCallbackThreadCount = 12; + optional uint64 ListingCallbackPerThreadQueueSize = 13; + optional uint64 RegexpCacheSize = 14; repeated TAttr DefaultSettings = 100; } diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 5331b838d2..81e1f95bf2 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -40,7 +40,7 @@ using TEarlyStopChecker = std::function<bool(const TString& path)>; std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex, const TSharedListingContextPtr& sharedCtx) { std::shared_ptr<RE2> re; if (sharedCtx) { - sharedCtx->GetOrCreate(regex); + re = sharedCtx->GetOrCreateRegexp(regex); } else { re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); } @@ -218,7 +218,7 @@ private: class TS3Lister : public IS3Lister { public: struct TListingContext { - TSharedListingContextPtr SharedCtx; + const TSharedListingContextPtr SharedCtx; // Filter const TPathFilter Filter; const TEarlyStopChecker EarlyStopChecker; @@ -237,17 +237,18 @@ public: }; TS3Lister( - IHTTPGateway::TPtr httpGateway, + const IHTTPGateway::TPtr& httpGateway, const TListingRequest& listingRequest, const TMaybe<TString>& delimiter, - size_t maxFilesPerQuery = 1000, - TSharedListingContextPtr sharedCtx = nullptr) + size_t maxFilesPerQuery, + TSharedListingContextPtr sharedCtx) : MaxFilesPerQuery(maxFilesPerQuery) { Y_ENSURE( listingRequest.Url.substr(0, 7) != "file://", "This lister does not support reading local files"); - auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx); + auto [filter, checker] = + MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx); auto ctx = TListingContext{ std::move(sharedCtx), @@ -301,24 +302,28 @@ private: ythrow yexception() << "Gateway disappeared"; } - auto sharedCtx = ctx.SharedCtx; auto retryPolicy = ctx.RetryPolicy; auto callback = CallbackFactoryMethod(std::move(ctx)); + auto httpCallback = [sharedCtx = std::move(sharedCtx), + callback = std::move(callback)]( + IHTTPGateway::TResult&& result) mutable { + if (sharedCtx) { + sharedCtx->SubmitCallbackProcessing( + [callback = std::move(callback), result = std::move(result)]() mutable { + callback(std::move(result)); + }); + } else { + callback(std::move(result)); + } + }; + gateway->Download( urlBuilder.Build(), headers, 0U, 0U, - [sharedCtx = std::move(sharedCtx), - cb = std::move(callback)](IHTTPGateway::TResult&& result) { - if (sharedCtx) { - sharedCtx->SubmitCallbackProcessing( - [cb, &result]() { cb(std::move(result)); }); - } else { - cb(std::move(result)); - } - }, + std::move(httpCallback), /*data=*/"", retryPolicy); } @@ -373,18 +378,18 @@ private: auto newCtx = TListingContext{ ctx.SharedCtx, - ctx.Filter, - ctx.EarlyStopChecker, - NewPromise<TListResult>(), - NewPromise<TMaybe<TListingContext>>(), - std::make_shared<TListEntries>(), - ctx.GatewayWeak, - GetHTTPDefaultRetryPolicy(), - CreateGuidAsString(), - ctx.ListingRequest, - ctx.Delimiter, - parsedResponse.ContinuationToken, - parsedResponse.MaxKeys}; + ctx.Filter, + ctx.EarlyStopChecker, + NewPromise<TListResult>(), + NewPromise<TMaybe<TListingContext>>(), + std::make_shared<TListEntries>(), + ctx.GatewayWeak, + GetHTTPDefaultRetryPolicy(), + CreateGuidAsString(), + ctx.ListingRequest, + ctx.Delimiter, + parsedResponse.ContinuationToken, + parsedResponse.MaxKeys}; ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx)); } else { @@ -439,7 +444,7 @@ public: using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>; explicit TS3ParallelLimitedListerFactory( - size_t maxParallelOps = 1, TSharedListingContextPtr sharedCtx = nullptr) + size_t maxParallelOps, TSharedListingContextPtr sharedCtx) : SharedCtx(std::move(sharedCtx)) , Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { } @@ -516,9 +521,12 @@ IS3ListerFactory::TPtr MakeS3ListerFactory( size_t callbackThreadCount, size_t callbackPerThreadQueueSize, size_t regexpCacheSize) { - auto sharedCtx = std::make_shared<TSharedListingContext>( - callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize); - return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps); + std::shared_ptr<TSharedListingContext> sharedCtx = nullptr; + if (callbackThreadCount != 0 || regexpCacheSize != 0) { + sharedCtx = std::make_shared<TSharedListingContext>( + callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize); + } + return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx); } } // namespace NYql::NS3Lister diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index 94c8e86eac..ecd1449c1a 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -45,19 +45,28 @@ class TSharedListingContext { public: TSharedListingContext( size_t callbackThreadCount, size_t callbackPerThreadQueueSize, size_t regexpCacheSize) - : RegexpCache(regexpCacheSize) { - CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize); + : ThreadPoolEnabled(callbackThreadCount != 0) + , RegexpCacheEnabled(regexpCacheSize != 0) + , RegexpCache(regexpCacheSize) { + if (ThreadPoolEnabled) { + CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize); + } } template<typename F> void SubmitCallbackProcessing(F&& f) { - if (!CallbackProcessingPool.AddFunc(std::forward<F>(f))) { - f(); + if (ThreadPoolEnabled && CallbackProcessingPool.AddFunc(std::forward<F>(f))) { + return; } + f(); } - std::shared_ptr<RE2> GetOrCreate(const TString& regexp) { - if (auto it = Get(regexp); it != nullptr) { + std::shared_ptr<RE2> GetOrCreateRegexp(const TString& regexp) { + if (!RegexpCacheEnabled) { + return std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options()); + } + + if (auto it = GetRegexp(regexp); it != nullptr) { return it; } else { auto re = std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options()); @@ -67,7 +76,15 @@ public: return re; } } - std::shared_ptr<RE2> Get(const TString& regexp) { + + ~TSharedListingContext() { + if (ThreadPoolEnabled) { + CallbackProcessingPool.Stop(); + } + } + +private: + std::shared_ptr<RE2> GetRegexp(const TString& regexp) { auto lock = TReadGuard{RWLock}; if (auto it = RegexpCache.Find(regexp); it != RegexpCache.End()) { return *it; @@ -76,9 +93,9 @@ public: } } - ~TSharedListingContext() { CallbackProcessingPool.Stop(); } - private: + bool ThreadPoolEnabled = true; + bool RegexpCacheEnabled = true; TThreadPool CallbackProcessingPool; TLRUCache<TString, std::shared_ptr<RE2>> RegexpCache; TRWMutex RWLock; diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp index 1081e1a60e..11653e0196 100644 --- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp +++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp @@ -709,21 +709,21 @@ private: return {.Name=name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(value)}; } - void CheckCastInt32(int64_t value, const TString& column) { + static void CheckCastInt32(int64_t value, const TString& column) { if (std::numeric_limits<int32_t>::min() <= value && value <= std::numeric_limits<int32_t>::max()) { return; } ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column; } - void CheckCastUint32(int64_t value, const TString& column) { + static void CheckCastUint32(int64_t value, const TString& column) { if (value >= 0 && value <= std::numeric_limits<uint32_t>::max()) { return; } ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column; } - void CheckCastUint64(int64_t value, const TString& column) { + static void CheckCastUint64(int64_t value, const TString& column) { if (value >= 0) { return; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 1d52ad5424..006ce7669b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -79,8 +79,11 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { public: TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway) : State_(std::move(state)) - , ListerFactory_( - NS3Lister::MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery, 1, 100, 100)) + , ListerFactory_(NS3Lister::MakeS3ListerFactory( + State_->Configuration->MaxInflightListsPerQuery, + State_->Configuration->ListingCallbackThreadCount, + State_->Configuration->ListingCallbackPerThreadQueueSize, + State_->Configuration->RegexpCacheSize)) , ListingStrategy_(MakeS3ListingStrategy( gateway, ListerFactory_, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index a089e9385e..59342d9fb4 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -55,6 +55,13 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB; MaxInflightListsPerQuery = config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1; + ListingCallbackThreadCount = config.HasListingCallbackThreadCount() + ? config.GetListingCallbackThreadCount() + : 1; + ListingCallbackPerThreadQueueSize = config.HasListingCallbackPerThreadQueueSize() + ? config.GetListingCallbackPerThreadQueueSize() + : 100; + RegexpCacheSize = config.HasRegexpCacheSize() ? config.GetRegexpCacheSize() : 100; AllowConcurrentListings = config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index d936f88a31..d129bdf16f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -52,6 +52,9 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher ui64 MinDesiredDirectoriesOfFilesPerQuery; ui64 MaxReadSizePerQuery; ui64 MaxInflightListsPerQuery; + ui64 ListingCallbackThreadCount; + ui64 ListingCallbackPerThreadQueueSize; + ui64 RegexpCacheSize; bool AllowLocalFiles; bool AllowConcurrentListings; }; |