aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-05 14:51:09 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-08-05 15:31:20 +0300
commit5213b839c544c58f58131b48686406008788c7b5 (patch)
treea773f7d84897e75becf76196a50638749daa706f
parenta3f104c22b12182c435e7a95ed013d116380a566 (diff)
downloadydb-5213b839c544c58f58131b48686406008788c7b5.tar.gz
YT-22212: Better logs in throttlers
16f9acd778a1e2782e6c0b967b0c01096f3f59be
-rw-r--r--yt/yt/core/concurrency/fair_throttler.cpp6
-rw-r--r--yt/yt/core/concurrency/throughput_throttler.cpp34
2 files changed, 29 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp
index 12e78b5ad4..3899c949e7 100644
--- a/yt/yt/core/concurrency/fair_throttler.cpp
+++ b/yt/yt/core/concurrency/fair_throttler.cpp
@@ -261,6 +261,7 @@ struct TBucketThrottleRequest
TPromise<void> Promise = NewPromise<void>();
std::atomic<bool> Cancelled = false;
NProfiling::TCpuInstant StartTime = NProfiling::GetCpuInstant();
+ TGuid RequestId = TGuid::Create();
void Cancel(const TError& /*error*/)
{
@@ -371,7 +372,10 @@ public:
request->Promise.OnCanceled(BIND(&TBucketThrottleRequest::Cancel, MakeWeak(request)));
request->Promise.ToFuture().Subscribe(BIND(&TBucketThrottler::OnRequestComplete, MakeWeak(this), amount));
- YT_LOG_DEBUG("Started waiting for throttler (Amount: %v)", amount);
+ YT_LOG_DEBUG(
+ "Started waiting for throttler (Amount: %v, RequestId: %v)",
+ amount,
+ request->RequestId);
auto guard = Guard(Lock_);
Queue_.push_back(request);
diff --git a/yt/yt/core/concurrency/throughput_throttler.cpp b/yt/yt/core/concurrency/throughput_throttler.cpp
index 0503dbf4f6..70a41ff07f 100644
--- a/yt/yt/core/concurrency/throughput_throttler.cpp
+++ b/yt/yt/core/concurrency/throughput_throttler.cpp
@@ -306,15 +306,21 @@ private:
}
// Enqueue request to be executed later.
- YT_LOG_DEBUG("Started waiting for throttler (Amount: %v)", amount);
auto promise = NewPromise<void>();
auto request = New<TThrottlerRequest>(amount);
request->TraceContext = NTracing::CreateTraceContextFromCurrent("Throttler");
+
+ YT_LOG_DEBUG(
+ "Started waiting for throttler (Amount: %v, RequestTraceId: %v)",
+ amount,
+ request->TraceContext->GetTraceId());
+
promise.OnCanceled(BIND([weakRequest = MakeWeak(request), amount, this, this_ = MakeStrong(this)] (const TError& error) {
auto request = weakRequest.Lock();
if (request && !request->Set.test_and_set()) {
NTracing::TTraceContextFinishGuard guard(std::move(request->TraceContext));
- YT_LOG_DEBUG("Canceled waiting for throttler (Amount: %v)",
+ YT_LOG_DEBUG(
+ "Canceled waiting for throttler (Amount: %v)",
amount);
request->Promise.Set(TError(NYT::EErrorCode::Canceled, "Throttled request canceled")
<< error);
@@ -472,7 +478,8 @@ private:
NTracing::TTraceContextGuard traceGuard(std::move(request->TraceContext));
auto waitTime = NProfiling::CpuDurationToDuration(NProfiling::GetCpuInstant() - request->StartTime);
- YT_LOG_DEBUG("Finished waiting for throttler (Amount: %v, WaitTime: %v)",
+ YT_LOG_DEBUG(
+ "Finished waiting for throttler (Amount: %v, WaitTime: %v)",
request->Amount,
waitTime);
@@ -872,7 +879,8 @@ public:
IncomingRequests_.emplace_back(TIncomingRequest{amount, promise, incomingRequestId});
}
- YT_LOG_DEBUG("Enqueued a request to the prefetching throttler (Id: %v, Amount: %v)",
+ YT_LOG_DEBUG(
+ "Enqueued a request to the prefetching throttler (Id: %v, Amount: %v)",
incomingRequestId,
amount);
@@ -956,7 +964,8 @@ public:
Available_ += amount;
}
- YT_LOG_DEBUG("Released from prefetching throttler (Amount: %v)",
+ YT_LOG_DEBUG(
+ "Released from prefetching throttler (Amount: %v)",
amount);
}
@@ -1142,7 +1151,8 @@ private:
prefetchAmount = PrefetchAmount_;
}
- YT_LOG_DEBUG("Request to the underlying throttler (Id: %v, UnderlyingAmount: %v, Balance: %v, Prefetch: %v, IncomingRps: %v, UnderlyingRps: %v)",
+ YT_LOG_DEBUG(
+ "Request to the underlying throttler (Id: %v, UnderlyingAmount: %v, Balance: %v, Prefetch: %v, IncomingRps: %v, UnderlyingRps: %v)",
underlyingRequestId,
underlyingAmount,
balance,
@@ -1184,7 +1194,8 @@ private:
}
PrefetchAmount_ = std::clamp(PrefetchAmount_, Config_->MinPrefetchAmount, Config_->MaxPrefetchAmount);
- YT_LOG_DEBUG("Recalculate the amount to prefetch from the underlying throttler (RequestsInWindow: %v, Window: %v, UnderlyingRps: %v, TargetRps: %v, PrefetchAmount: %v)",
+ YT_LOG_DEBUG(
+ "Recalculate the amount to prefetch from the underlying throttler (RequestsInWindow: %v, Window: %v, UnderlyingRps: %v, TargetRps: %v, PrefetchAmount: %v)",
UnderlyingRequests_.size(),
Config_->Window,
underlyingRps,
@@ -1195,7 +1206,8 @@ private:
//! Handles a response from the underlying throttler.
void OnThrottlingResponse(i64 available, i64 id, const TError& error)
{
- YT_LOG_DEBUG("Response from the underlying throttler (Id: %v, Amount: %v, Result: %v)",
+ YT_LOG_DEBUG(
+ "Response from the underlying throttler (Id: %v, Amount: %v, Result: %v)",
id,
available,
error.IsOK());
@@ -1241,7 +1253,8 @@ private:
// a recursive call to #SatisfyIncomingRequests when the corresponding #promise is set.
// So that #promise should be set without holding the #Lock_.
auto result = request.Promise.TrySet();
- YT_LOG_DEBUG("Sent the response for the incoming request (Id: %v, Amount: %v, Result: %v)",
+ YT_LOG_DEBUG(
+ "Sent the response for the incoming request (Id: %v, Amount: %v, Result: %v)",
request.Id,
request.Amount,
result);
@@ -1269,7 +1282,8 @@ private:
for (auto& request : fulfilled) {
request.Promise.Set(error);
- YT_LOG_DEBUG("Dropped the incoming request (Id: %v, Amount: %v)",
+ YT_LOG_DEBUG(
+ "Dropped the incoming request (Id: %v, Amount: %v)",
request.Id,
request.Amount);
}