aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-08-12 20:36:06 +0300
committerbbiff <bbiff@yandex-team.com>2022-08-12 20:36:06 +0300
commitad8dd337a2ee267af0248601e38efe9669607a75 (patch)
tree92705d8f68777122ecdba63b8b45bc1d8943581d
parentcc65f4da6b407e808692063c6ea6a9a910a41166 (diff)
downloadydb-ad8dd337a2ee267af0248601e38efe9669607a75.tar.gz
add curl config
added curl config added some configs
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp48
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto4
2 files changed, 41 insertions, 11 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 2d1512596d..2ff2b767c2 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -14,17 +14,24 @@
namespace NYql {
namespace {
+struct TCurlInitConfig {
+ ui64 ConnectionTimeout = 0;
+ ui64 BytesPerSecondLimit = 0;
+ ui64 BufferSize = CURL_MAX_WRITE_SIZE;
+};
+
class TEasyCurl {
public:
using TPtr = std::shared_ptr<TEasyCurl>;
+
enum class EMethod {
GET,
POST,
PUT
};
- TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false)
+ TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, const TCurlInitConfig& config = TCurlInitConfig())
: Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes)
{
switch (method) {
@@ -41,6 +48,9 @@ public:
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
+ curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT, config.ConnectionTimeout);
+ curl_easy_setopt(Handle, CURLOPT_MAX_RECV_SPEED_LARGE, config.BytesPerSecondLimit);
+ curl_easy_setopt(Handle, CURLOPT_BUFFERSIZE, config.BufferSize);
if (!headers.empty()) {
Headers = std::accumulate(headers.cbegin(), headers.cend(), Headers,
@@ -113,15 +123,15 @@ public:
using TPtr = std::shared_ptr<TEasyCurlBuffer>;
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
- TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState)
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+ TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
{
Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState) {
- return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState));
+ static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig()) {
+ return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback), std::move(retryState), std::move(config));
}
size_t GetExpectedSize() const {
@@ -208,8 +218,9 @@ public:
size_t offset,
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
- IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ IHTTPGateway::TOnDownloadFinish onFinish,
+ const TCurlInitConfig& config = TCurlInitConfig())
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
static TPtr Make(
@@ -221,9 +232,10 @@ public:
size_t offset,
IHTTPGateway::TOnDownloadStart onStart,
IHTTPGateway::TOnNewDataPart onNewData,
- IHTTPGateway::TOnDownloadFinish onFinish)
+ IHTTPGateway::TOnDownloadFinish onFinish,
+ const TCurlInitConfig& config = TCurlInitConfig())
{
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish));
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish), std::move(config));
}
enum class EAction : i8 {
@@ -350,6 +362,18 @@ public:
if (httpGatewaysCfg->HasBuffersSizePerStream()) {
BuffersSizePerStream = httpGatewaysCfg->GetBuffersSizePerStream();
}
+
+ if (httpGatewaysCfg->HasConnectionTimeoutSeconds()) {
+ InitConfig.ConnectionTimeout = httpGatewaysCfg->GetConnectionTimeoutSeconds();
+ }
+
+ if (httpGatewaysCfg->HasBytesPerSecondLimit()) {
+ InitConfig.BytesPerSecondLimit = httpGatewaysCfg->GetBytesPerSecondLimit();
+ }
+
+ if (httpGatewaysCfg->HasDownloadBufferBytesLimit()) {
+ InitConfig.BufferSize = httpGatewaysCfg->GetDownloadBufferBytesLimit();
+ }
}
InitCurl();
@@ -367,6 +391,7 @@ private:
size_t MaxHandlers = 1024U;
size_t MaxSimulatenousDownloadsSize = 8_GB;
size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U;
+ TCurlInitConfig InitConfig;
void InitCurl() {
const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL);
@@ -532,7 +557,7 @@ private:
Rps->Inc();
const std::unique_lock lock(Sync);
- auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr);
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig);
Await.emplace(std::move(easy));
Wakeup(0U);
}
@@ -558,7 +583,7 @@ private:
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr);
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback), retryPolicy ? retryPolicy->CreateRetryState() : nullptr, InitConfig);
entry = easy;
Await.emplace(std::move(easy));
Wakeup(expectedSize);
@@ -609,6 +634,7 @@ private:
std::queue<TEasyCurlBuffer::TPtr> Await;
std::vector<TEasyCurlStream::TWeakPtr> Streams;
+
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
std::unordered_map<TKeyType, TEasyCurlBuffer::TWeakPtr, TKeyHash> Requests;
std::priority_queue<std::pair<TInstant, TEasyCurlBuffer::TPtr>> Delayed;
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 95c4a17cc1..00b0e069a2 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -3,6 +3,7 @@ option java_package = "ru.yandex.yql.proto";
import "ydb/library/yql/protos/clickhouse.proto";
+
/////////////////////////////// common ///////////////////////////////
message TActivationByHour {
@@ -56,6 +57,9 @@ message THttpGatewayConfig {
optional uint32 MaxInFlightCount = 1;
optional uint64 MaxSimulatenousDownloadsSize = 2;
optional uint32 BuffersSizePerStream = 3;
+ optional uint64 ConnectionTimeoutSeconds = 4;
+ optional uint64 BytesPerSecondLimit = 5;
+ optional uint64 DownloadBufferBytesLimit = 6;
}
/////////////////////////////// YT ///////////////////////////////