diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-14 17:39:37 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-09-14 17:39:37 +0300 |
commit | 332da01f2cfe1a99b6b7bfeda9ebe3b0a39a7112 (patch) | |
tree | c93edca22b538b4216c915333d01e9eb04252d3f | |
parent | 459f5cf25ea6d6180fbbb8ef048d73a41876b124 (diff) | |
download | ydb-332da01f2cfe1a99b6b7bfeda9ebe3b0a39a7112.tar.gz |
Pause curl handle on backpressure.
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 61 |
1 files changed, 25 insertions, 36 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index f8324a1ad4..21a1427dcb 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -357,26 +357,22 @@ public: Stop = -1, None = 0, Work = 1, - StopAndDrop = Stop + Drop + Init = 2 }; EAction GetAction(size_t buffersSize) { + if (!Started) { + Started = true; + return EAction::Init; + } + if (Cancelled) { - const auto ret = Working ? EAction::StopAndDrop : EAction::Drop; - Working = false; - return ret; + return EAction::Drop; } - if (Working != Counter->load() < buffersSize) { - if (const auto limit = GetSizeLimit(); !Working && limit && Position >= limit) { - OnFinish(TIssues()); - return EAction::Drop; - } - if (Working = !Working) - SkipTo(Position); - else - LastHttpResponseCode = 0L; - return Working ? EAction::Work : EAction::Stop; + if (buffersSize && Paused != Counter->load() >= buffersSize) { + Paused = !Paused; + return Paused ? EAction::Stop : EAction::Work; } return EAction::None; @@ -388,7 +384,6 @@ public: } private: void Fail(const TIssue& error) final { - Working = false; if (!Cancelled) OnFinish(TIssues{error}); } @@ -397,22 +392,17 @@ private: if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); - Working = false; if (!Cancelled) OnFinish(TIssues()); } size_t Write(void* contents, size_t size, size_t nmemb) final { - if (!LastHttpResponseCode) { - curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &LastHttpResponseCode); - if (!FirstHttpResponseCode) - OnStart(FirstHttpResponseCode = LastHttpResponseCode); - else if (FirstHttpResponseCode != LastHttpResponseCode) - Cancel(TIssue(TStringBuilder() << "HTTP response has been changed from " << FirstHttpResponseCode << " to " << LastHttpResponseCode)); + if (!HttpResponseCode) { + curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &HttpResponseCode); + OnStart(HttpResponseCode); } const auto realsize = size * nmemb; - Position += realsize; if (!Cancelled) OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter)); return realsize; @@ -426,11 +416,10 @@ private: const IHTTPGateway::TOnDownloadFinish OnFinish; const std::shared_ptr<std::atomic_size_t> Counter; - bool Working = false; - size_t Position = 0ULL; + bool Started = false; + bool Paused = false; bool Cancelled = false; - long FirstHttpResponseCode = 0L; - long LastHttpResponseCode = 0L; + long HttpResponseCode = 0L; }; using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; @@ -529,7 +518,7 @@ public: private: size_t MaxHandlers = 1024U; size_t MaxSimulatenousDownloadsSize = 8_GB; - size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 1U; + size_t BuffersSizePerStream = CURL_MAX_WRITE_SIZE << 3U; TCurlInitConfig InitConfig; void InitCurl() { @@ -589,22 +578,22 @@ private: size_t FillHandlers() { const std::unique_lock lock(Sync); - for (auto it = Streams.cbegin(); Streams.cend() != it;) { if (const auto& stream = it->lock()) { + const auto streamHandle = stream->GetHandle(); switch (stream->GetAction(BuffersSizePerStream)) { - case TEasyCurlStream::EAction::Drop: - Allocated.erase(stream->GetHandle()); + case TEasyCurlStream::EAction::Init: + curl_multi_add_handle(Handle, streamHandle); break; case TEasyCurlStream::EAction::Work: - curl_multi_add_handle(Handle, stream->GetHandle()); + curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT); break; case TEasyCurlStream::EAction::Stop: - curl_multi_remove_handle(Handle, stream->GetHandle()); + curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE); break; - case TEasyCurlStream::EAction::StopAndDrop: - curl_multi_remove_handle(Handle, stream->GetHandle()); - Allocated.erase(stream->GetHandle()); + case TEasyCurlStream::EAction::Drop: + curl_multi_remove_handle(Handle, streamHandle); + Allocated.erase(streamHandle); break; case TEasyCurlStream::EAction::None: break; |