diff options
author | Sergey Uzhakov <uzhastik@gmail.com> | 2022-05-26 00:01:14 +0300 |
---|---|---|
committer | Sergey Uzhakov <uzhastik@gmail.com> | 2022-05-26 00:01:14 +0300 |
commit | de2b15d82a058e78241f05b6f68c4f99ee57cd5e (patch) | |
tree | b93d4784c23c9739cb729a67a7e7888796afead5 | |
parent | 55377746cfa5edfc6be95ea6fa2242d7e811d69d (diff) | |
download | ydb-de2b15d82a058e78241f05b6f68c4f99ee57cd5e.tar.gz |
add AwaitQueueTopExpectedSize sensor, download speed, reject huge files downloading
ref:f663acbfd53452c64ec492226a47dadf1c2418cc
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 40 |
1 files changed, 27 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 8ec8f0d6318..c9b16436abe 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -18,8 +18,8 @@ public: using TPtr = std::shared_ptr<TEasyCurl>; using TWeakPtr = std::weak_ptr<TEasyCurl>; - TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData) - : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter) + TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData) + : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) { curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L); @@ -72,7 +72,9 @@ public: private: static size_t WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { - return static_cast<TEasyCurl*>(userp)->Write(contents, size, nmemb); + auto self = static_cast<TEasyCurl*>(userp); + self->DownloadedBytes->Add(size * nmemb); + return self->Write(contents, size, nmemb); }; static size_t @@ -85,19 +87,20 @@ private: CURL *const Handle; curl_slist* Headers = nullptr; const NMonitoring::TDynamicCounters::TCounterPtr Counter; + const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; }; class TEasyCurlBuffer : public TEasyCurl { public: - TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) - : TEasyCurl(counter, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) + TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer) { Output.Reserve(expectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { - return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); } private: bool AddCallback(IHTTPGateway::TOnResult callback) final { @@ -155,13 +158,13 @@ private: class TEasyCurlStream : public TEasyCurl { public: - TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) + TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) + : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)) { } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { - return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish)); } private: bool AddCallback(IHTTPGateway::TOnResult) final { return false; } @@ -226,6 +229,8 @@ public: , OutputMemory(Counters->GetCounter("OutputMemory")) , PerformCycles(Counters->GetCounter("PerformCycles", true)) , AwaitQueue(Counters->GetCounter("AwaitQueue")) + , AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize")) + , DownloadedBytes(Counters->GetCounter("DownloadedBytes", true)) { if (httpGatewaysCfg) { if (httpGatewaysCfg->HasMaxInFlightCount()) { @@ -305,6 +310,8 @@ private: size_t FillHandlers() { const std::unique_lock lock(Sync); + const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize(); + AwaitQueueTopExpectedSize->Set(topExpectedSize); while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) { AllocatedSize += Await.front()->GetExpectedSize(); const auto handle = Await.front()->GetHandle(); @@ -379,6 +386,11 @@ private: { Rps->Inc(); + if (expectedSize > MaxSimulatenousDownloadsSize) { + TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize); + callback(TIssues{error}); + return; + } const std::unique_lock lock(Sync); auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; StraightInFlight->Set(Requests.size()); @@ -386,7 +398,7 @@ private: if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); entry = easy; Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); @@ -401,7 +413,7 @@ private: TOnDownloadFinish onFinish) final { constexpr auto buffersSize = CURL_MAX_WRITE_SIZE << 4U; - auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish)); + auto easy = TEasyCurlStream::Make(InFlight, DownloadedBytes, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); Await.emplace(std::move(easy)); Wakeup(buffersSize); @@ -470,6 +482,8 @@ private: const NMonitoring::TDynamicCounters::TCounterPtr OutputMemory; const NMonitoring::TDynamicCounters::TCounterPtr PerformCycles; const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue; + const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize; + const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; TTaskScheduler TaskScheduler; }; |