diff options
author | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-03-02 22:13:47 +0300 |
---|---|---|
committer | dinmukhammed <dinmukhammed@yandex-team.ru> | 2022-03-02 22:13:47 +0300 |
commit | 6be490464800b21f433af2009a805f0370411b95 (patch) | |
tree | 1f490a2c8fb47c4d0a4d764432047cf939da9d33 | |
parent | b3e58e6a1f6b37f788fce4205f7a9484ca355607 (diff) | |
download | ydb-6be490464800b21f433af2009a805f0370411b95.tar.gz |
YQ-916 Modified InFlight metric logic
Fixed InFlight logic
ref:bb4ff9db96b60ce5aa1d7c6d3b49cdbe9361c1d2
-rw-r--r-- | ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index d894276833..2a0a4649c8 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -71,20 +71,23 @@ public: return true; } - void Fail(const TIssue& error) { + size_t Fail(const TIssue& error) { TIssues issues{error}; const std::unique_lock lock(SyncCallbacks); + size_t callbacksSize = Callbacks.size(); while (!Callbacks.empty()) { Callbacks.top()(issues); Callbacks.pop(); } + return callbacksSize; } - void Done(CURLcode result) { + size_t Done(CURLcode result) { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); const std::unique_lock lock(SyncCallbacks); + size_t callbacksSize = Callbacks.size(); while (!Callbacks.empty()) { if (1U == Callbacks.size()) Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer))); @@ -92,6 +95,7 @@ public: Callbacks.top()(IHTTPGateway::TContent(Buffer)); Callbacks.pop(); } + return callbacksSize; } private: static size_t @@ -148,6 +152,7 @@ public: : Counters(std::move(counters)) , Rps(Counters->GetCounter("Requests", true)) , InFlight(Counters->GetCounter("InFlight")) + , StraightInFlight(Counters->GetCounter("StraightInFlight")) , MaxInFlight(Counters->GetCounter("MaxInFlight")) , AllocatedMemory(Counters->GetCounter("AllocatedMemory")) , MaxAllocatedMemory(Counters->GetCounter("MaxAllocatedMemory")) @@ -235,13 +240,13 @@ private: Await.pop(); curl_multi_add_handle(Handle, handle); } - AllocatedMemory->Set(AllocatedSize); return Allocated.size(); } void Done(CURL* handle, CURLcode result) { TEasyCurl::TPtr easy; + bool isRetry = false; { const std::unique_lock lock(Sync); if (const auto it = Allocated.find(handle); Allocated.cend() != it) { @@ -251,7 +256,7 @@ private: if (const auto stateIt = Easy2RetryState.find(easy); stateIt != Easy2RetryState.end()) { if (const auto& nextRetryDelay = stateIt->second->GetNextRetryDelay(httpResponseCode)) { - Y_VERIFY(TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay)); + Y_VERIFY(isRetry = TaskScheduler.Add(new THttpGatewayTask(easy, Singleton), *nextRetryDelay)); } else { Easy2RetryState.erase(stateIt); } @@ -263,9 +268,8 @@ private: if (Await.empty() && Allocated.empty()) Requests.clear(); } - if (easy) { - InFlight->Dec(); - easy->Done(result); + if (!isRetry && easy) { + InFlight->Sub(easy->Done(result)); } } @@ -286,9 +290,8 @@ private: } const TIssue error(curl_multi_strerror(result)); - InFlight->Sub(works.size()); while (!works.empty()) { - works.top()->Fail(error); + InFlight->Sub(works.top()->Fail(error)); works.pop(); } } @@ -306,6 +309,7 @@ private: const std::unique_lock lock(Sync); auto& entry = Requests[TKeyType(url, headers, data, retryPolicy)]; + StraightInFlight->Set(Requests.size()); if (const auto& easy = entry.lock()) if (easy->AddCallback(std::move(callback))) return; @@ -372,6 +376,7 @@ private: const NMonitoring::TDynamicCounterPtr Counters; const NMonitoring::TDynamicCounters::TCounterPtr Rps; const NMonitoring::TDynamicCounters::TCounterPtr InFlight; + const NMonitoring::TDynamicCounters::TCounterPtr StraightInFlight; // doesn't consider merged requests which use one curl const NMonitoring::TDynamicCounters::TCounterPtr MaxInFlight; const NMonitoring::TDynamicCounters::TCounterPtr AllocatedMemory; const NMonitoring::TDynamicCounters::TCounterPtr MaxAllocatedMemory; |