aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-09-14 17:39:37 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-09-14 17:39:37 +0300
commit332da01f2cfe1a99b6b7bfeda9ebe3b0a39a7112 (patch)
treec93edca22b538b4216c915333d01e9eb04252d3f
parent459f5cf25ea6d6180fbbb8ef048d73a41876b124 (diff)
downloadydb-332da01f2cfe1a99b6b7bfeda9ebe3b0a39a7112.tar.gz
Pause curl handle on backpressure.
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp61
1 files changed, 25 insertions, 36 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index f8324a1ad4..21a1427dcb 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -357,26 +357,22 @@ public:
Stop = -1,
None = 0,
Work = 1,
- StopAndDrop = Stop + Drop
+ Init = 2
};
EAction GetAction(size_t buffersSize) {
+ if (!Started) {
+ Started = true;
+ return EAction::Init;
+ }
+
if (Cancelled) {
- const auto ret = Working ? EAction::StopAndDrop : EAction::Drop;
- Working = false;
- return ret;
+ return EAction::Drop;
}
- if (Working != Counter->load() < buffersSize) {
- if (const auto limit = GetSizeLimit(); !Working && limit && Position >= limit) {
- OnFinish(TIssues());
- return EAction::Drop;
- }
- if (Working = !Working)
- SkipTo(Position);
- else
- LastHttpResponseCode = 0L;
- return Working ? EAction::Work : EAction::Stop;
+ if (buffersSize && Paused != Counter->load() >= buffersSize) {
+ Paused = !Paused;
+ return Paused ? EAction::Stop : EAction::Work;
}
return EAction::None;
@@ -388,7 +384,6 @@ public:
}
private:
void Fail(const TIssue& error) final {
- Working = false;
if (!Cancelled)
OnFinish(TIssues{error});
}
@@ -397,22 +392,17 @@ private:
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
- Working = false;
if (!Cancelled)
OnFinish(TIssues());
}
size_t Write(void* contents, size_t size, size_t nmemb) final {
- if (!LastHttpResponseCode) {
- curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &LastHttpResponseCode);
- if (!FirstHttpResponseCode)
- OnStart(FirstHttpResponseCode = LastHttpResponseCode);
- else if (FirstHttpResponseCode != LastHttpResponseCode)
- Cancel(TIssue(TStringBuilder() << "HTTP response has been changed from " << FirstHttpResponseCode << " to " << LastHttpResponseCode));
+ if (!HttpResponseCode) {
+ curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &HttpResponseCode);
+ OnStart(HttpResponseCode);
}
const auto realsize = size * nmemb;
- Position += realsize;
if (!Cancelled)
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));
return realsize;
@@ -426,11 +416,10 @@ private:
const IHTTPGateway::TOnDownloadFinish OnFinish;
const std::shared_ptr<std::atomic_size_t> Counter;
- bool Working = false;
- size_t Position = 0ULL;
+ bool Started = false;
+ bool Paused = false;
bool Cancelled = false;
- long FirstHttpResponseCode = 0L;
- long LastHttpResponseCode = 0L;
+ long HttpResponseCode = 0L;
};
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
@@ -529,7 +518,7 @@ public:
private:
size_t MaxHandlers = 1024U;
size_t MaxSimulatenousDownloadsSize = 8_GB;
- size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U;
+ size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 3U;
TCurlInitConfig InitConfig;
void InitCurl() {
@@ -589,22 +578,22 @@ private:
size_t FillHandlers() {
const std::unique_lock lock(Sync);
-
for (auto it = Streams.cbegin(); Streams.cend() != it;) {
if (const auto& stream = it->lock()) {
+ const auto streamHandle = stream->GetHandle();
switch (stream->GetAction(BuffersSizePerStream)) {
- case TEasyCurlStream::EAction::Drop:
- Allocated.erase(stream->GetHandle());
+ case TEasyCurlStream::EAction::Init:
+ curl_multi_add_handle(Handle, streamHandle);
break;
case TEasyCurlStream::EAction::Work:
- curl_multi_add_handle(Handle, stream->GetHandle());
+ curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT);
break;
case TEasyCurlStream::EAction::Stop:
- curl_multi_remove_handle(Handle, stream->GetHandle());
+ curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE);
break;
- case TEasyCurlStream::EAction::StopAndDrop:
- curl_multi_remove_handle(Handle, stream->GetHandle());
- Allocated.erase(stream->GetHandle());
+ case TEasyCurlStream::EAction::Drop:
+ curl_multi_remove_handle(Handle, streamHandle);
+ Allocated.erase(streamHandle);
break;
case TEasyCurlStream::EAction::None:
break;