diff options
author | eshcherbin <eshcherbin@yandex-team.com> | 2023-10-24 17:33:21 +0300 |
---|---|---|
committer | eshcherbin <eshcherbin@yandex-team.com> | 2023-10-24 18:11:34 +0300 |
commit | f9cbd389cd6379f7e439acaf9ea76c19433940ce (patch) | |
tree | b48952ef1665ccaa4102dc2445975a7206cc524a | |
parent | 3547fbbc82a002091ef67079148ed1725dcd7d5e (diff) | |
download | ydb-f9cbd389cd6379f7e439acaf9ea76c19433940ce.tar.gz |
Fix incorrect algorithm for computing average wait time yet again
-rw-r--r-- | yt/yt/core/concurrency/fair_share_invoker_pool.cpp | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index 0ae163a2cf9..30b9e00c2d2 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -190,7 +190,9 @@ private: public: void OnActionEnqueued(TInstant now) { - UpdateTotalWaitTime(now); + UpdateLatestObservedTime(now); + + TotalWaitTime_ += LatestObservedTime_ - now; ActionEnqueueTimes_.push(now); ++EnqueuedActionCount_; } @@ -198,19 +200,20 @@ private: void OnActionDequeued() { YT_VERIFY(!ActionEnqueueTimes_.empty()); - YT_VERIFY(LastTotalWaitTimeUpdateTime_); auto actionEnqueueTime = ActionEnqueueTimes_.front(); - auto actionRecordedWaitTime = *LastTotalWaitTimeUpdateTime_ - actionEnqueueTime; - TotalWaitTime_ -= actionRecordedWaitTime; - + TotalWaitTime_ -= LatestObservedTime_ - actionEnqueueTime; ActionEnqueueTimes_.pop(); ++DequeuedActionCount_; + + if (ActionEnqueueTimes_.empty()) { + YT_VERIFY(TotalWaitTime_ == TDuration::Zero()); + } } TInvokerStatistics GetInvokerStatistics(TInstant now) const { - UpdateTotalWaitTime(now); + UpdateLatestObservedTime(now); auto waitingActionCount = std::ssize(ActionEnqueueTimes_); auto averageWaitTime = waitingActionCount > 0 @@ -228,17 +231,23 @@ private: private: TRingQueue<TInstant> ActionEnqueueTimes_; mutable TDuration TotalWaitTime_; - mutable std::optional<TInstant> LastTotalWaitTimeUpdateTime_; + mutable TInstant LatestObservedTime_; i64 EnqueuedActionCount_ = 0; i64 DequeuedActionCount_ = 0; - void UpdateTotalWaitTime(TInstant now) const + void UpdateLatestObservedTime(TInstant now) const { - auto singleActionWaitTimeDelta = now - LastTotalWaitTimeUpdateTime_.value_or(now); - int waitingActionCount = std::ssize(ActionEnqueueTimes_); - TotalWaitTime_ += waitingActionCount * singleActionWaitTimeDelta; + if (now <= LatestObservedTime_) { + return; + } + + if (!ActionEnqueueTimes_.empty()) { + auto singleActionWaitTimeDelta = now - LatestObservedTime_; + int waitingActionCount = std::ssize(ActionEnqueueTimes_); + TotalWaitTime_ += waitingActionCount * singleActionWaitTimeDelta; + } - LastTotalWaitTimeUpdateTime_ = now; + LatestObservedTime_ = now; } }; |