aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Uzhakov <uzhastik@gmail.com>2022-05-26 00:01:14 +0300
committerSergey Uzhakov <uzhastik@gmail.com>2022-05-26 00:01:14 +0300
commitde2b15d82a058e78241f05b6f68c4f99ee57cd5e (patch)
treeb93d4784c23c9739cb729a67a7e7888796afead5
parent55377746cfa5edfc6be95ea6fa2242d7e811d69d (diff)
downloadydb-de2b15d82a058e78241f05b6f68c4f99ee57cd5e.tar.gz
add AwaitQueueTopExpectedSize sensor, download speed, reject huge files downloading
ref:f663acbfd53452c64ec492226a47dadf1c2418cc
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp40
1 files changed, 27 insertions, 13 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 8ec8f0d6318..c9b16436abe 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -18,8 +18,8 @@ public:
using TPtr = std::shared_ptr<TEasyCurl>;
using TWeakPtr = std::weak_ptr<TEasyCurl>;
- TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData)
- : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter)
+ TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, bool withData)
+ : Offset(offset), ExpectedSize(expectedSize), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)
{
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L);
@@ -72,7 +72,9 @@ public:
private:
static size_t
WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) {
- return static_cast<TEasyCurl*>(userp)->Write(contents, size, nmemb);
+ auto self = static_cast<TEasyCurl*>(userp);
+ self->DownloadedBytes->Add(size * nmemb);
+ return self->Write(contents, size, nmemb);
};
static size_t
@@ -85,19 +87,20 @@ private:
CURL *const Handle;
curl_slist* Headers = nullptr;
const NMonitoring::TDynamicCounters::TCounterPtr Counter;
+ const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
};
class TEasyCurlBuffer : public TEasyCurl {
public:
- TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
- : TEasyCurl(counter, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
+ TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, !data.empty()), Data(std::move(data)), Input(Data), Output(Buffer)
{
Output.Reserve(expectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
- return std::make_shared<TEasyCurlBuffer>(counter, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
}
private:
bool AddCallback(IHTTPGateway::TOnResult callback) final {
@@ -155,13 +158,13 @@ private:
class TEasyCurlStream : public TEasyCurl {
public:
- TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
+ TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
+ : TEasyCurl(counter, downloadedBytes, url, headers, offset, expectedSize, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish))
{
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
- return std::make_shared<TEasyCurlStream>(counter, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, std::size_t offset, std::size_t expectedSize, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, expectedSize, std::move(onNewData), std::move(onFinish));
}
private:
bool AddCallback(IHTTPGateway::TOnResult) final { return false; }
@@ -226,6 +229,8 @@ public:
, OutputMemory(Counters->GetCounter("OutputMemory"))
, PerformCycles(Counters->GetCounter("PerformCycles", true))
, AwaitQueue(Counters->GetCounter("AwaitQueue"))
+ , AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize"))
+ , DownloadedBytes(Counters->GetCounter("DownloadedBytes", true))
{
if (httpGatewaysCfg) {
if (httpGatewaysCfg->HasMaxInFlightCount()) {
@@ -305,6 +310,8 @@ private:
size_t FillHandlers() {
const std::unique_lock lock(Sync);
+ const ui64 topExpectedSize = Await.empty() ? 0 : Await.front()->GetExpectedSize();
+ AwaitQueueTopExpectedSize->Set(topExpectedSize);
while (!Await.empty() && Allocated.size() < MaxHandlers && AllocatedSize + Await.front()->GetExpectedSize() <= MaxSimulatenousDownloadsSize) {
AllocatedSize += Await.front()->GetExpectedSize();
const auto handle = Await.front()->GetHandle();
@@ -379,6 +386,11 @@ private:
{
Rps->Inc();
+ if (expectedSize > MaxSimulatenousDownloadsSize) {
+ TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize);
+ callback(TIssues{error});
+ return;
+ }
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
StraightInFlight->Set(Requests.size());
@@ -386,7 +398,7 @@ private:
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
entry = easy;
Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
@@ -401,7 +413,7 @@ private:
TOnDownloadFinish onFinish) final
{
constexpr auto buffersSize = CURL_MAX_WRITE_SIZE << 4U;
- auto easy = TEasyCurlStream::Make(InFlight, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish));
+ auto easy = TEasyCurlStream::Make(InFlight, DownloadedBytes, std::move(url), std::move(headers), offset, buffersSize, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
Await.emplace(std::move(easy));
Wakeup(buffersSize);
@@ -470,6 +482,8 @@ private:
const NMonitoring::TDynamicCounters::TCounterPtr OutputMemory;
const NMonitoring::TDynamicCounters::TCounterPtr PerformCycles;
const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue;
+ const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize;
+ const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
TTaskScheduler TaskScheduler;
};