diff options
author | bbiff <bbiff@yandex-team.com> | 2022-08-12 20:36:06 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-08-12 20:36:06 +0300 |
commit | ad8dd337a2ee267af0248601e38efe9669607a75 (patch) | |
tree | 92705d8f68777122ecdba63b8b45bc1d8943581d | |
parent | cc65f4da6b407e808692063c6ea6a9a910a41166 (diff) | |
download | ydb-ad8dd337a2ee267af0248601e38efe9669607a75.tar.gz |
add curl config
added curl config
added some configs
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 48 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/proto/gateways_config.proto | 4 |
2 files changed, 41 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2d1512596d..2ff2b767c2 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -14,17 +14,24 @@ namespace NYql { namespace { +struct TCurlInitConfig { + ui64 ConnectionTimeout = 0; + ui64 BytesPerSecondLimit = 0; + ui64 BufferSize = CURL_MAX_WRITE_SIZE; +}; + class TEasyCurl { public: using TPtr = std::shared_ptr<TEasyCurl>; + enum class EMethod { GET, POST, PUT }; - TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false) + TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, const TCurlInitConfig& config = TCurlInitConfig()) : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes) { switch (method) { @@ -41,6 +48,9 @@ public: curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway"); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT, config.ConnectionTimeout); + curl_easy_setopt(Handle, CURLOPT_MAX_RECV_SPEED_LARGE, config.BytesPerSecondLimit); + curl_easy_setopt(Handle, CURLOPT_BUFFERSIZE, config.BufferSize); if (!headers.empty()) { Headers = std::accumulate(headers.cbegin(), headers.cend(), Headers, @@ -113,15 +123,15 @@ public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) + TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState)) { Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState)); + static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState), std::move(config)); } size_t GetExpectedSize() const { @@ -208,8 +218,9 @@ public: size_t offset, IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, - IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + IHTTPGateway::TOnDownloadFinish onFinish, + const TCurlInitConfig& config = TCurlInitConfig()) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} static TPtr Make( @@ -221,9 +232,10 @@ public: size_t offset, IHTTPGateway::TOnDownloadStart onStart, IHTTPGateway::TOnNewDataPart onNewData, - IHTTPGateway::TOnDownloadFinish onFinish) + IHTTPGateway::TOnDownloadFinish onFinish, + const TCurlInitConfig& config = TCurlInitConfig()) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish)); + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config)); } enum class EAction : i8 { @@ -350,6 +362,18 @@ public: if (httpGatewaysCfg->HasBuffersSizePerStream()) { BuffersSizePerStream = httpGatewaysCfg->GetBuffersSizePerStream(); } + + if (httpGatewaysCfg->HasConnectionTimeoutSeconds()) { + InitConfig.ConnectionTimeout = httpGatewaysCfg->GetConnectionTimeoutSeconds(); + } + + if (httpGatewaysCfg->HasBytesPerSecondLimit()) { + InitConfig.BytesPerSecondLimit = httpGatewaysCfg->GetBytesPerSecondLimit(); + } + + if (httpGatewaysCfg->HasDownloadBufferBytesLimit()) { + InitConfig.BufferSize = httpGatewaysCfg->GetDownloadBufferBytesLimit(); + } } InitCurl(); @@ -367,6 +391,7 @@ private: size_t MaxHandlers = 1024U; size_t MaxSimulatenousDownloadsSize = 8_GB; size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U; + TCurlInitConfig InitConfig; void InitCurl() { const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL); @@ -532,7 +557,7 @@ private: Rps->Inc(); const std::unique_lock lock(Sync); - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); Await.emplace(std::move(easy)); Wakeup(0U); } @@ -558,7 +583,7 @@ private: if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig); entry = easy; Await.emplace(std::move(easy)); Wakeup(expectedSize); @@ -609,6 +634,7 @@ private: std::queue<TEasyCurlBuffer::TPtr> Await; std::vector<TEasyCurlStream::TWeakPtr> Streams; + std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; std::unordered_map<TKeyType, TEasyCurlBuffer::TWeakPtr, TKeyHash> Requests; std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed; diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 95c4a17cc1..00b0e069a2 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -3,6 +3,7 @@ option java_package = "ru.yandex.yql.proto"; import "ydb/library/yql/protos/clickhouse.proto"; + /////////////////////////////// common /////////////////////////////// message TActivationByHour { @@ -56,6 +57,9 @@ message THttpGatewayConfig { optional uint32 MaxInFlightCount = 1; optional uint64 MaxSimulatenousDownloadsSize = 2; optional uint32 BuffersSizePerStream = 3; + optional uint64 ConnectionTimeoutSeconds = 4; + optional uint64 BytesPerSecondLimit = 5; + optional uint64 DownloadBufferBytesLimit = 6; } /////////////////////////////// YT /////////////////////////////// |