aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordinmukhammed <dinmukhammed@yandex-team.ru>2022-03-02 22:13:47 +0300
committerdinmukhammed <dinmukhammed@yandex-team.ru>2022-03-02 22:13:47 +0300
commit6be490464800b21f433af2009a805f0370411b95 (patch)
tree1f490a2c8fb47c4d0a4d764432047cf939da9d33
parentb3e58e6a1f6b37f788fce4205f7a9484ca355607 (diff)
downloadydb-6be490464800b21f433af2009a805f0370411b95.tar.gz
YQ-916 Modified InFlight metric logic
Fixed InFlight logic ref:bb4ff9db96b60ce5aa1d7c6d3b49cdbe9361c1d2
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp23
1 files changed, 14 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index d894276833..2a0a4649c8 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -71,20 +71,23 @@ public:
return true;
}
- void Fail(const TIssue& error) {
+ size_t Fail(const TIssue& error) {
TIssues issues{error};
const std::unique_lock lock(SyncCallbacks);
+ size_t callbacksSize = Callbacks.size();
while (!Callbacks.empty()) {
Callbacks.top()(issues);
Callbacks.pop();
}
+ return callbacksSize;
}
- void Done(CURLcode result) {
+ size_t Done(CURLcode result) {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
const std::unique_lock lock(SyncCallbacks);
+ size_t callbacksSize = Callbacks.size();
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer)));
@@ -92,6 +95,7 @@ public:
Callbacks.top()(IHTTPGateway::TContent(Buffer));
Callbacks.pop();
}
+ return callbacksSize;
}
private:
static size_t
@@ -148,6 +152,7 @@ public:
: Counters(std::move(counters))
, Rps(Counters->GetCounter("Requests", true))
, InFlight(Counters->GetCounter("InFlight"))
+ , StraightInFlight(Counters->GetCounter("StraightInFlight"))
, MaxInFlight(Counters->GetCounter("MaxInFlight"))
, AllocatedMemory(Counters->GetCounter("AllocatedMemory"))
, MaxAllocatedMemory(Counters->GetCounter("MaxAllocatedMemory"))
@@ -235,13 +240,13 @@ private:
Await.pop();
curl_multi_add_handle(Handle, handle);
}
-
AllocatedMemory->Set(AllocatedSize);
return Allocated.size();
}
void Done(CURL* handle, CURLcode result) {
TEasyCurl::TPtr easy;
+ bool isRetry = false;
{
const std::unique_lock lock(Sync);
if (const auto it = Allocated.find(handle); Allocated.cend() != it) {
@@ -251,7 +256,7 @@ private:
if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) {
if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) {
- Y_VERIFY(TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay));
+ Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay));
} else {
Easy2RetryState.erase(stateIt);
}
@@ -263,9 +268,8 @@ private:
if (Await.empty() && Allocated.empty())
Requests.clear();
}
- if (easy) {
- InFlight->Dec();
- easy->Done(result);
+ if (!isRetry && easy) {
+ InFlight->Sub(easy->Done(result));
}
}
@@ -286,9 +290,8 @@ private:
}
const TIssue error(curl_multi_strerror(result));
- InFlight->Sub(works.size());
while (!works.empty()) {
- works.top()->Fail(error);
+ InFlight->Sub(works.top()->Fail(error));
works.pop();
}
}
@@ -306,6 +309,7 @@ private:
const std::unique_lock lock(Sync);
auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)];
+ StraightInFlight->Set(Requests.size());
if (const auto& easy = entry.lock())
if (easy->AddCallback(std::move(callback)))
return;
@@ -372,6 +376,7 @@ private:
const NMonitoring::TDynamicCounterPtr Counters;
const NMonitoring::TDynamicCounters::TCounterPtr Rps;
const NMonitoring::TDynamicCounters::TCounterPtr InFlight;
+ const NMonitoring::TDynamicCounters::TCounterPtr StraightInFlight; // doesn't consider merged requests which use one curl
const NMonitoring::TDynamicCounters::TCounterPtr MaxInFlight;
const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory;
const NMonitoring::TDynamicCounters::TCounterPtr MaxAllocatedMemory;