aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreshcherbin <eshcherbin@yandex-team.com>2023-10-24 17:33:21 +0300
committereshcherbin <eshcherbin@yandex-team.com>2023-10-24 18:11:34 +0300
commitf9cbd389cd6379f7e439acaf9ea76c19433940ce (patch)
treeb48952ef1665ccaa4102dc2445975a7206cc524a
parent3547fbbc82a002091ef67079148ed1725dcd7d5e (diff)
downloadydb-f9cbd389cd6379f7e439acaf9ea76c19433940ce.tar.gz
Fix incorrect algorithm for computing average wait time yet again
-rw-r--r--yt/yt/core/concurrency/fair_share_invoker_pool.cpp33
1 files changed, 21 insertions, 12 deletions
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
index 0ae163a2cf9..30b9e00c2d2 100644
--- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp
@@ -190,7 +190,9 @@ private:
public:
void OnActionEnqueued(TInstant now)
{
- UpdateTotalWaitTime(now);
+ UpdateLatestObservedTime(now);
+
+ TotalWaitTime_ += LatestObservedTime_ - now;
ActionEnqueueTimes_.push(now);
++EnqueuedActionCount_;
}
@@ -198,19 +200,20 @@ private:
void OnActionDequeued()
{
YT_VERIFY(!ActionEnqueueTimes_.empty());
- YT_VERIFY(LastTotalWaitTimeUpdateTime_);
auto actionEnqueueTime = ActionEnqueueTimes_.front();
- auto actionRecordedWaitTime = *LastTotalWaitTimeUpdateTime_ - actionEnqueueTime;
- TotalWaitTime_ -= actionRecordedWaitTime;
-
+ TotalWaitTime_ -= LatestObservedTime_ - actionEnqueueTime;
ActionEnqueueTimes_.pop();
++DequeuedActionCount_;
+
+ if (ActionEnqueueTimes_.empty()) {
+ YT_VERIFY(TotalWaitTime_ == TDuration::Zero());
+ }
}
TInvokerStatistics GetInvokerStatistics(TInstant now) const
{
- UpdateTotalWaitTime(now);
+ UpdateLatestObservedTime(now);
auto waitingActionCount = std::ssize(ActionEnqueueTimes_);
auto averageWaitTime = waitingActionCount > 0
@@ -228,17 +231,23 @@ private:
private:
TRingQueue<TInstant> ActionEnqueueTimes_;
mutable TDuration TotalWaitTime_;
- mutable std::optional<TInstant> LastTotalWaitTimeUpdateTime_;
+ mutable TInstant LatestObservedTime_;
i64 EnqueuedActionCount_ = 0;
i64 DequeuedActionCount_ = 0;
- void UpdateTotalWaitTime(TInstant now) const
+ void UpdateLatestObservedTime(TInstant now) const
{
- auto singleActionWaitTimeDelta = now - LastTotalWaitTimeUpdateTime_.value_or(now);
- int waitingActionCount = std::ssize(ActionEnqueueTimes_);
- TotalWaitTime_ += waitingActionCount * singleActionWaitTimeDelta;
+ if (now <= LatestObservedTime_) {
+ return;
+ }
+
+ if (!ActionEnqueueTimes_.empty()) {
+ auto singleActionWaitTimeDelta = now - LatestObservedTime_;
+ int waitingActionCount = std::ssize(ActionEnqueueTimes_);
+ TotalWaitTime_ += waitingActionCount * singleActionWaitTimeDelta;
+ }
- LastTotalWaitTimeUpdateTime_ = now;
+ LatestObservedTime_ = now;
}
};