diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-05 14:51:09 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-08-05 15:31:20 +0300 |
commit | 5213b839c544c58f58131b48686406008788c7b5 (patch) | |
tree | a773f7d84897e75becf76196a50638749daa706f | |
parent | a3f104c22b12182c435e7a95ed013d116380a566 (diff) | |
download | ydb-5213b839c544c58f58131b48686406008788c7b5.tar.gz |
YT-22212: Better logs in throttlers
16f9acd778a1e2782e6c0b967b0c01096f3f59be
-rw-r--r-- | yt/yt/core/concurrency/fair_throttler.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/concurrency/throughput_throttler.cpp | 34 |
2 files changed, 29 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp index 12e78b5ad4..3899c949e7 100644 --- a/yt/yt/core/concurrency/fair_throttler.cpp +++ b/yt/yt/core/concurrency/fair_throttler.cpp @@ -261,6 +261,7 @@ struct TBucketThrottleRequest TPromise<void> Promise = NewPromise<void>(); std::atomic<bool> Cancelled = false; NProfiling::TCpuInstant StartTime = NProfiling::GetCpuInstant(); + TGuid RequestId = TGuid::Create(); void Cancel(const TError& /*error*/) { @@ -371,7 +372,10 @@ public: request->Promise.OnCanceled(BIND(&TBucketThrottleRequest::Cancel, MakeWeak(request))); request->Promise.ToFuture().Subscribe(BIND(&TBucketThrottler::OnRequestComplete, MakeWeak(this), amount)); - YT_LOG_DEBUG("Started waiting for throttler (Amount: %v)", amount); + YT_LOG_DEBUG( + "Started waiting for throttler (Amount: %v, RequestId: %v)", + amount, + request->RequestId); auto guard = Guard(Lock_); Queue_.push_back(request); diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp index 0503dbf4f6..70a41ff07f 100644 --- a/yt/yt/core/concurrency/throughput_throttler.cpp +++ b/yt/yt/core/concurrency/throughput_throttler.cpp @@ -306,15 +306,21 @@ private: } // Enqueue request to be executed later. - YT_LOG_DEBUG("Started waiting for throttler (Amount: %v)", amount); auto promise = NewPromise<void>(); auto request = New<TThrottlerRequest>(amount); request->TraceContext = NTracing::CreateTraceContextFromCurrent("Throttler"); + + YT_LOG_DEBUG( + "Started waiting for throttler (Amount: %v, RequestTraceId: %v)", + amount, + request->TraceContext->GetTraceId()); + promise.OnCanceled(BIND([weakRequest = MakeWeak(request), amount, this, this_ = MakeStrong(this)] (const TError& error) { auto request = weakRequest.Lock(); if (request && !request->Set.test_and_set()) { NTracing::TTraceContextFinishGuard guard(std::move(request->TraceContext)); - YT_LOG_DEBUG("Canceled waiting for throttler (Amount: %v)", + YT_LOG_DEBUG( + "Canceled waiting for throttler (Amount: %v)", amount); request->Promise.Set(TError(NYT::EErrorCode::Canceled, "Throttled request canceled") << error); @@ -472,7 +478,8 @@ private: NTracing::TTraceContextGuard traceGuard(std::move(request->TraceContext)); auto waitTime = NProfiling::CpuDurationToDuration(NProfiling::GetCpuInstant() - request->StartTime); - YT_LOG_DEBUG("Finished waiting for throttler (Amount: %v, WaitTime: %v)", + YT_LOG_DEBUG( + "Finished waiting for throttler (Amount: %v, WaitTime: %v)", request->Amount, waitTime); @@ -872,7 +879,8 @@ public: IncomingRequests_.emplace_back(TIncomingRequest{amount, promise, incomingRequestId}); } - YT_LOG_DEBUG("Enqueued a request to the prefetching throttler (Id: %v, Amount: %v)", + YT_LOG_DEBUG( + "Enqueued a request to the prefetching throttler (Id: %v, Amount: %v)", incomingRequestId, amount); @@ -956,7 +964,8 @@ public: Available_ += amount; } - YT_LOG_DEBUG("Released from prefetching throttler (Amount: %v)", + YT_LOG_DEBUG( + "Released from prefetching throttler (Amount: %v)", amount); } @@ -1142,7 +1151,8 @@ private: prefetchAmount = PrefetchAmount_; } - YT_LOG_DEBUG("Request to the underlying throttler (Id: %v, UnderlyingAmount: %v, Balance: %v, Prefetch: %v, IncomingRps: %v, UnderlyingRps: %v)", + YT_LOG_DEBUG( + "Request to the underlying throttler (Id: %v, UnderlyingAmount: %v, Balance: %v, Prefetch: %v, IncomingRps: %v, UnderlyingRps: %v)", underlyingRequestId, underlyingAmount, balance, @@ -1184,7 +1194,8 @@ private: } PrefetchAmount_ = std::clamp(PrefetchAmount_, Config_->MinPrefetchAmount, Config_->MaxPrefetchAmount); - YT_LOG_DEBUG("Recalculate the amount to prefetch from the underlying throttler (RequestsInWindow: %v, Window: %v, UnderlyingRps: %v, TargetRps: %v, PrefetchAmount: %v)", + YT_LOG_DEBUG( + "Recalculate the amount to prefetch from the underlying throttler (RequestsInWindow: %v, Window: %v, UnderlyingRps: %v, TargetRps: %v, PrefetchAmount: %v)", UnderlyingRequests_.size(), Config_->Window, underlyingRps, @@ -1195,7 +1206,8 @@ private: //! Handles a response from the underlying throttler. void OnThrottlingResponse(i64 available, i64 id, const TError& error) { - YT_LOG_DEBUG("Response from the underlying throttler (Id: %v, Amount: %v, Result: %v)", + YT_LOG_DEBUG( + "Response from the underlying throttler (Id: %v, Amount: %v, Result: %v)", id, available, error.IsOK()); @@ -1241,7 +1253,8 @@ private: // a recursive call to #SatisfyIncomingRequests when the corresponding #promise is set. // So that #promise should be set without holding the #Lock_. auto result = request.Promise.TrySet(); - YT_LOG_DEBUG("Sent the response for the incoming request (Id: %v, Amount: %v, Result: %v)", + YT_LOG_DEBUG( + "Sent the response for the incoming request (Id: %v, Amount: %v, Result: %v)", request.Id, request.Amount, result); @@ -1269,7 +1282,8 @@ private: for (auto& request : fulfilled) { request.Promise.Set(error); - YT_LOG_DEBUG("Dropped the incoming request (Id: %v, Amount: %v)", + YT_LOG_DEBUG( + "Dropped the incoming request (Id: %v, Amount: %v)", request.Id, request.Amount); } |