aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-04-06 15:32:15 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-04-06 15:32:15 +0300
commiteeee0e3bdb0549f53330743aba8290870f53f553 (patch)
tree3e0d28e2a2f6038c7beadbd593b520599fd526ea
parenta18251173ebfcd62901cddca4ed0c2f9029725c7 (diff)
downloadydb-eeee0e3bdb0549f53330743aba8290870f53f553.tar.gz
Added extra configuration
Added extra configuration
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto3
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp72
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.h35
-rw-r--r--ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp7
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp7
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h3
7 files changed, 87 insertions, 46 deletions
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 3c425c3ece..9c40b776f3 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -386,6 +386,9 @@ message TS3GatewayConfig {
optional uint64 MaxDirectoriesAndFilesPerQuery = 8;
optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9;
optional bool AllowConcurrentListings = 11 [default = false];
+ optional uint64 ListingCallbackThreadCount = 12;
+ optional uint64 ListingCallbackPerThreadQueueSize = 13;
+ optional uint64 RegexpCacheSize = 14;
repeated TAttr DefaultSettings = 100;
}
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
index 5331b838d2..81e1f95bf2 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -40,7 +40,7 @@ using TEarlyStopChecker = std::function<bool(const TString& path)>;
std::pair<TPathFilter, TEarlyStopChecker> MakeFilterRegexp(const TString& regex, const TSharedListingContextPtr& sharedCtx) {
std::shared_ptr<RE2> re;
if (sharedCtx) {
- sharedCtx->GetOrCreate(regex);
+ re = sharedCtx->GetOrCreateRegexp(regex);
} else {
re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
}
@@ -218,7 +218,7 @@ private:
class TS3Lister : public IS3Lister {
public:
struct TListingContext {
- TSharedListingContextPtr SharedCtx;
+ const TSharedListingContextPtr SharedCtx;
// Filter
const TPathFilter Filter;
const TEarlyStopChecker EarlyStopChecker;
@@ -237,17 +237,18 @@ public:
};
TS3Lister(
- IHTTPGateway::TPtr httpGateway,
+ const IHTTPGateway::TPtr& httpGateway,
const TListingRequest& listingRequest,
const TMaybe<TString>& delimiter,
- size_t maxFilesPerQuery = 1000,
- TSharedListingContextPtr sharedCtx = nullptr)
+ size_t maxFilesPerQuery,
+ TSharedListingContextPtr sharedCtx)
: MaxFilesPerQuery(maxFilesPerQuery) {
Y_ENSURE(
listingRequest.Url.substr(0, 7) != "file://",
"This lister does not support reading local files");
- auto [filter, checker] = MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx);
+ auto [filter, checker] =
+ MakeFilter(listingRequest.Pattern, listingRequest.PatternType, sharedCtx);
auto ctx = TListingContext{
std::move(sharedCtx),
@@ -301,24 +302,28 @@ private:
ythrow yexception() << "Gateway disappeared";
}
-
auto sharedCtx = ctx.SharedCtx;
auto retryPolicy = ctx.RetryPolicy;
auto callback = CallbackFactoryMethod(std::move(ctx));
+ auto httpCallback = [sharedCtx = std::move(sharedCtx),
+ callback = std::move(callback)](
+ IHTTPGateway::TResult&& result) mutable {
+ if (sharedCtx) {
+ sharedCtx->SubmitCallbackProcessing(
+ [callback = std::move(callback), result = std::move(result)]() mutable {
+ callback(std::move(result));
+ });
+ } else {
+ callback(std::move(result));
+ }
+ };
+
gateway->Download(
urlBuilder.Build(),
headers,
0U,
0U,
- [sharedCtx = std::move(sharedCtx),
- cb = std::move(callback)](IHTTPGateway::TResult&& result) {
- if (sharedCtx) {
- sharedCtx->SubmitCallbackProcessing(
- [cb, &result]() { cb(std::move(result)); });
- } else {
- cb(std::move(result));
- }
- },
+ std::move(httpCallback),
/*data=*/"",
retryPolicy);
}
@@ -373,18 +378,18 @@ private:
auto newCtx = TListingContext{
ctx.SharedCtx,
- ctx.Filter,
- ctx.EarlyStopChecker,
- NewPromise<TListResult>(),
- NewPromise<TMaybe<TListingContext>>(),
- std::make_shared<TListEntries>(),
- ctx.GatewayWeak,
- GetHTTPDefaultRetryPolicy(),
- CreateGuidAsString(),
- ctx.ListingRequest,
- ctx.Delimiter,
- parsedResponse.ContinuationToken,
- parsedResponse.MaxKeys};
+ ctx.Filter,
+ ctx.EarlyStopChecker,
+ NewPromise<TListResult>(),
+ NewPromise<TMaybe<TListingContext>>(),
+ std::make_shared<TListEntries>(),
+ ctx.GatewayWeak,
+ GetHTTPDefaultRetryPolicy(),
+ CreateGuidAsString(),
+ ctx.ListingRequest,
+ ctx.Delimiter,
+ parsedResponse.ContinuationToken,
+ parsedResponse.MaxKeys};
ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx));
} else {
@@ -439,7 +444,7 @@ public:
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
explicit TS3ParallelLimitedListerFactory(
- size_t maxParallelOps = 1, TSharedListingContextPtr sharedCtx = nullptr)
+ size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
: SharedCtx(std::move(sharedCtx))
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
@@ -516,9 +521,12 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
size_t callbackThreadCount,
size_t callbackPerThreadQueueSize,
size_t regexpCacheSize) {
- auto sharedCtx = std::make_shared<TSharedListingContext>(
- callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
- return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps);
+ std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
+ if (callbackThreadCount != 0 || regexpCacheSize != 0) {
+ sharedCtx = std::make_shared<TSharedListingContext>(
+ callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
+ }
+ return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
}
} // namespace NYql::NS3Lister
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
index 94c8e86eac..ecd1449c1a 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
@@ -45,19 +45,28 @@ class TSharedListingContext {
public:
TSharedListingContext(
size_t callbackThreadCount, size_t callbackPerThreadQueueSize, size_t regexpCacheSize)
- : RegexpCache(regexpCacheSize) {
- CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize);
+ : ThreadPoolEnabled(callbackThreadCount != 0)
+ , RegexpCacheEnabled(regexpCacheSize != 0)
+ , RegexpCache(regexpCacheSize) {
+ if (ThreadPoolEnabled) {
+ CallbackProcessingPool.Start(callbackThreadCount, callbackPerThreadQueueSize);
+ }
}
template<typename F>
void SubmitCallbackProcessing(F&& f) {
- if (!CallbackProcessingPool.AddFunc(std::forward<F>(f))) {
- f();
+ if (ThreadPoolEnabled && CallbackProcessingPool.AddFunc(std::forward<F>(f))) {
+ return;
}
+ f();
}
- std::shared_ptr<RE2> GetOrCreate(const TString& regexp) {
- if (auto it = Get(regexp); it != nullptr) {
+ std::shared_ptr<RE2> GetOrCreateRegexp(const TString& regexp) {
+ if (!RegexpCacheEnabled) {
+ return std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options());
+ }
+
+ if (auto it = GetRegexp(regexp); it != nullptr) {
return it;
} else {
auto re = std::make_shared<RE2>(re2::StringPiece(regexp), RE2::Options());
@@ -67,7 +76,15 @@ public:
return re;
}
}
- std::shared_ptr<RE2> Get(const TString& regexp) {
+
+ ~TSharedListingContext() {
+ if (ThreadPoolEnabled) {
+ CallbackProcessingPool.Stop();
+ }
+ }
+
+private:
+ std::shared_ptr<RE2> GetRegexp(const TString& regexp) {
auto lock = TReadGuard{RWLock};
if (auto it = RegexpCache.Find(regexp); it != RegexpCache.End()) {
return *it;
@@ -76,9 +93,9 @@ public:
}
}
- ~TSharedListingContext() { CallbackProcessingPool.Stop(); }
-
private:
+ bool ThreadPoolEnabled = true;
+ bool RegexpCacheEnabled = true;
TThreadPool CallbackProcessingPool;
TLRUCache<TString, std::shared_ptr<RE2>> RegexpCache;
TRWMutex RWLock;
diff --git a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
index 1081e1a60e..11653e0196 100644
--- a/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
+++ b/ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.cpp
@@ -709,21 +709,21 @@ private:
return {.Name=name, .Type=NUdf::EDataSlot::Int64, .Value=ToString(value)};
}
- void CheckCastInt32(int64_t value, const TString& column) {
+ static void CheckCastInt32(int64_t value, const TString& column) {
if (std::numeric_limits<int32_t>::min() <= value && value <= std::numeric_limits<int32_t>::max()) {
return;
}
ythrow yexception() << "The value " << value << " is not representable as an int32 type for column " << column;
}
- void CheckCastUint32(int64_t value, const TString& column) {
+ static void CheckCastUint32(int64_t value, const TString& column) {
if (value >= 0 && value <= std::numeric_limits<uint32_t>::max()) {
return;
}
ythrow yexception() << "The value " << value << " is not representable as an uint32 type for column " << column;
}
- void CheckCastUint64(int64_t value, const TString& column) {
+ static void CheckCastUint64(int64_t value, const TString& column) {
if (value >= 0) {
return;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 1d52ad5424..006ce7669b 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -79,8 +79,11 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
public:
TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway)
: State_(std::move(state))
- , ListerFactory_(
- NS3Lister::MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery, 1, 100, 100))
+ , ListerFactory_(NS3Lister::MakeS3ListerFactory(
+ State_->Configuration->MaxInflightListsPerQuery,
+ State_->Configuration->ListingCallbackThreadCount,
+ State_->Configuration->ListingCallbackPerThreadQueueSize,
+ State_->Configuration->RegexpCacheSize))
, ListingStrategy_(MakeS3ListingStrategy(
gateway,
ListerFactory_,
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index a089e9385e..59342d9fb4 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -55,6 +55,13 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB;
MaxInflightListsPerQuery =
config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1;
+ ListingCallbackThreadCount = config.HasListingCallbackThreadCount()
+ ? config.GetListingCallbackThreadCount()
+ : 1;
+ ListingCallbackPerThreadQueueSize = config.HasListingCallbackPerThreadQueueSize()
+ ? config.GetListingCallbackPerThreadQueueSize()
+ : 100;
+ RegexpCacheSize = config.HasRegexpCacheSize() ? config.GetRegexpCacheSize() : 100;
AllowConcurrentListings =
config.HasAllowConcurrentListings() ? config.GetAllowConcurrentListings() : false;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index d936f88a31..d129bdf16f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -52,6 +52,9 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
ui64 MinDesiredDirectoriesOfFilesPerQuery;
ui64 MaxReadSizePerQuery;
ui64 MaxInflightListsPerQuery;
+ ui64 ListingCallbackThreadCount;
+ ui64 ListingCallbackPerThreadQueueSize;
+ ui64 RegexpCacheSize;
bool AllowLocalFiles;
bool AllowConcurrentListings;
};