aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorcapone212 <capone212@yandex-team.com>2023-11-13 00:54:01 +0300
committercapone212 <capone212@yandex-team.com>2023-11-13 01:15:43 +0300
commit4abf85baae59e2393868790fa21539430c9cc1b0 (patch)
treeea203575ed085dbcb043f1beb0faed9e9bf43a5b
parente24f824435e26ee6d92e01d20813c6f9e23efe8d (diff)
downloadydb-4abf85baae59e2393868790fa21539430c9cc1b0.tar.gz
YT-20212: Concurrency throttling for OverloadController
-rw-r--r--yt/yt/core/rpc/server_detail.cpp8
-rw-r--r--yt/yt/core/rpc/server_detail.h12
-rw-r--r--yt/yt/core/rpc/service.cpp23
-rw-r--r--yt/yt/core/rpc/service.h2
-rw-r--r--yt/yt/core/rpc/service_detail.cpp168
-rw-r--r--yt/yt/core/rpc/service_detail.h41
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp2
-rw-r--r--yt/yt/core/rpc/unittests/mock/service.h4
8 files changed, 194 insertions, 66 deletions
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp
index fdf7c65e10..b5526d5c43 100644
--- a/yt/yt/core/rpc/server_detail.cpp
+++ b/yt/yt/core/rpc/server_detail.cpp
@@ -207,10 +207,10 @@ bool TServiceContextBase::IsReplied() const
return Replied_.load();
}
-void TServiceContextBase::SubscribeCanceled(const TClosure& /*callback*/)
+void TServiceContextBase::SubscribeCanceled(const TCanceledCallback& /*callback*/)
{ }
-void TServiceContextBase::UnsubscribeCanceled(const TClosure& /*callback*/)
+void TServiceContextBase::UnsubscribeCanceled(const TCanceledCallback& /*callback*/)
{ }
void TServiceContextBase::SubscribeReplied(const TClosure& /*callback*/)
@@ -593,12 +593,12 @@ void TServiceContextWrapper::SetComplete()
UnderlyingContext_->SetComplete();
}
-void TServiceContextWrapper::SubscribeCanceled(const TClosure& callback)
+void TServiceContextWrapper::SubscribeCanceled(const TCanceledCallback& callback)
{
UnderlyingContext_->SubscribeCanceled(callback);
}
-void TServiceContextWrapper::UnsubscribeCanceled(const TClosure& callback)
+void TServiceContextWrapper::UnsubscribeCanceled(const TCanceledCallback& callback)
{
UnderlyingContext_->UnsubscribeCanceled(callback);
}
diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h
index b746308d8b..6182658514 100644
--- a/yt/yt/core/rpc/server_detail.h
+++ b/yt/yt/core/rpc/server_detail.h
@@ -61,8 +61,10 @@ public:
const TSharedRefArray& GetResponseMessage() const override;
- void SubscribeCanceled(const TClosure& callback) override;
- void UnsubscribeCanceled(const TClosure& callback) override;
+ using TCanceledCallback = TCallback<void(const TError&)>;
+
+ void SubscribeCanceled(const TCanceledCallback& callback) override;
+ void UnsubscribeCanceled(const TCanceledCallback& callback) override;
void SubscribeReplied(const TClosure& callback) override;
void UnsubscribeReplied(const TClosure& callback) override;
@@ -200,8 +202,10 @@ public:
TFuture<TSharedRefArray> GetAsyncResponseMessage() const override;
const TSharedRefArray& GetResponseMessage() const override;
- void SubscribeCanceled(const TClosure& callback) override;
- void UnsubscribeCanceled(const TClosure& callback) override;
+ using TCanceledCallback = TCallback<void(const TError&)>;
+
+ void SubscribeCanceled(const TCanceledCallback& callback) override;
+ void UnsubscribeCanceled(const TCanceledCallback& callback) override;
void SubscribeReplied(const TClosure& callback) override;
void UnsubscribeReplied(const TClosure& callback) override;
diff --git a/yt/yt/core/rpc/service.cpp b/yt/yt/core/rpc/service.cpp
index 2ad7909776..95749e42d2 100644
--- a/yt/yt/core/rpc/service.cpp
+++ b/yt/yt/core/rpc/service.cpp
@@ -4,17 +4,6 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
-namespace {
-
-TError MakeCanceledError()
-{
- return TError("RPC request canceled");
-}
-
-} // namespace
-
-////////////////////////////////////////////////////////////////////////////////
-
void IServiceContext::SetRequestInfo()
{
SetRawRequestInfo(TString(), false);
@@ -34,8 +23,8 @@ void IServiceContext::ReplyFrom(TFuture<TSharedRefArray> asyncMessage)
Reply(TError(result));
}
}));
- SubscribeCanceled(BIND([asyncMessage = std::move(asyncMessage)] {
- asyncMessage.Cancel(MakeCanceledError());
+ SubscribeCanceled(BIND([asyncMessage = std::move(asyncMessage)] (const TError& error) {
+ asyncMessage.Cancel(error);
}));
}
@@ -44,8 +33,8 @@ void IServiceContext::ReplyFrom(TFuture<void> asyncError)
asyncError.Subscribe(BIND([this, this_ = MakeStrong(this)] (const TError& error) {
Reply(error);
}));
- SubscribeCanceled(BIND([asyncError = std::move(asyncError)] {
- asyncError.Cancel(MakeCanceledError());
+ SubscribeCanceled(BIND([asyncError = std::move(asyncError)] (const TError& error) {
+ asyncError.Cancel(error);
}));
}
@@ -55,8 +44,8 @@ void IServiceContext::ReplyFrom(TFuture<void> asyncError, const IInvokerPtr& inv
Reply(error);
})
.Via(invoker));
- SubscribeCanceled(BIND([asyncError = std::move(asyncError)] {
- asyncError.Cancel(MakeCanceledError());
+ SubscribeCanceled(BIND([asyncError = std::move(asyncError)] (const TError& error) {
+ asyncError.Cancel(error);
}));
}
diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h
index 188e2f77aa..9dde3ec659 100644
--- a/yt/yt/core/rpc/service.h
+++ b/yt/yt/core/rpc/service.h
@@ -131,7 +131,7 @@ struct IServiceContext
virtual bool IsCanceled() const = 0;
//! Raised when request processing is canceled.
- DECLARE_INTERFACE_SIGNAL(void(), Canceled);
+ DECLARE_INTERFACE_SIGNAL(void(const TError&), Canceled);
//! Raised when Reply() was called. Allows doing some post-request stuff like extended structured logging.
DECLARE_INTERFACE_SIGNAL(void(), Replied);
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 7a04ab78ed..c137e35452 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -32,6 +32,17 @@
#include <library/cpp/yt/misc/tls.h>
+namespace NYT
+{
+ static TError operator<<(TError error, const std::optional<TError>& maybeError)
+ {
+ if (maybeError) {
+ return error << *maybeError;
+ }
+ return error;
+ }
+}
+
namespace NYT::NRpc {
using namespace NBus;
@@ -76,6 +87,34 @@ THandlerInvocationOptions THandlerInvocationOptions::SetResponseCodec(NCompressi
////////////////////////////////////////////////////////////////////////////////
+void TDynamicConcurrencyLimit::Reconfigure(int limit)
+{
+ ConfigLimit_.store(limit, std::memory_order::relaxed);
+ SetDynamicLimit(limit);
+}
+
+int TDynamicConcurrencyLimit::GetLimitFromConfiguration() const
+{
+ return ConfigLimit_.load(std::memory_order::relaxed);
+}
+
+int TDynamicConcurrencyLimit::GetDynamicLimit() const
+{
+ return DynamicLimit_.load(std::memory_order::relaxed);
+}
+
+void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit)
+{
+ auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed);
+ auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed);
+
+ if (oldLimit != limit) {
+ Updated_.Fire();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
TServiceBase::TMethodDescriptor::TMethodDescriptor(
TString method,
TLiteHandler liteHandler,
@@ -264,12 +303,12 @@ public:
, RuntimeInfo_(acceptedRequest.RuntimeInfo)
, TraceContext_(std::move(acceptedRequest.TraceContext))
, RequestQueue_(acceptedRequest.RequestQueue)
+ , ThrottledError_(acceptedRequest.ThrottledError)
, MethodPerformanceCounters_(Service_->GetMethodPerformanceCounters(
RuntimeInfo_,
{GetAuthenticationIdentity().UserTag, RequestQueue_}))
, PerformanceCounters_(Service_->GetPerformanceCounters())
, ArriveInstant_(NProfiling::GetInstant())
-
{
YT_ASSERT(RequestMessage_);
YT_ASSERT(ReplyBus_);
@@ -371,12 +410,12 @@ public:
invoker->Invoke(BIND(&TServiceContext::DoRun, MakeStrong(this), handler));
}
- void SubscribeCanceled(const TClosure& callback) override
+ void SubscribeCanceled(const TCanceledCallback& callback) override
{
CanceledList_.Subscribe(callback);
}
- void UnsubscribeCanceled(const TClosure& callback) override
+ void UnsubscribeCanceled(const TCanceledCallback& callback) override
{
CanceledList_.Unsubscribe(callback);
}
@@ -396,9 +435,15 @@ public:
return CanceledList_.IsFired();
}
+ TError GetCanceledError() const
+ {
+ return TError(NYT::EErrorCode::Canceled, "RPC request is canceled")
+ << ThrottledError_;
+ }
+
void Cancel() override
{
- if (!CanceledList_.Fire()) {
+ if (!CanceledList_.Fire(GetCanceledError())) {
return;
}
@@ -420,21 +465,22 @@ public:
DoSetComplete();
}
- void HandleTimeout()
+ void HandleTimeout(ERequestProcessingStage stage)
{
if (TimedOutLatch_.exchange(true)) {
return;
}
- YT_LOG_DEBUG("Request timed out, canceling (RequestId: %v)",
- RequestId_);
+ YT_LOG_DEBUG("Request timed out, canceling (RequestId: %v, Stage: %v)",
+ RequestId_,
+ stage);
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
static const auto TimedOutError = TError("Request timed out");
AbortStreamsUnlessClosed(TimedOutError);
}
- CanceledList_.Fire();
+ CanceledList_.Fire(GetCanceledError());
MethodPerformanceCounters_->TimedOutRequestCounter.Increment();
// Guards from race with DoGuardedRun.
@@ -554,6 +600,29 @@ public:
}
}
+ void BeforeEnqueued()
+ {
+ auto requestTimeout = GetTimeout();
+ if (!requestTimeout) {
+ return;
+ }
+
+ auto waitingTimeoutFraction = RuntimeInfo_->WaitingTimeoutFraction.load(std::memory_order::relaxed);
+ if (waitingTimeoutFraction == 0) {
+ return;
+ }
+
+ auto waitingTimeout = *requestTimeout * waitingTimeoutFraction;
+ WaitingTimeoutCookie_ = TDelayedExecutor::Submit(
+ BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_, ERequestProcessingStage::Waiting),
+ waitingTimeout);
+ }
+
+ void AfterDequeued()
+ {
+ TDelayedExecutor::CancelAndClear(WaitingTimeoutCookie_);
+ }
+
private:
const TServiceBasePtr Service_;
const TRequestId RequestId_;
@@ -561,15 +630,17 @@ private:
TRuntimeMethodInfo* const RuntimeInfo_;
const TTraceContextPtr TraceContext_;
TRequestQueue* const RequestQueue_;
+ std::optional<TError> ThrottledError_;
TMethodPerformanceCounters* const MethodPerformanceCounters_;
TPerformanceCounters* const PerformanceCounters_;
NCompression::ECodec RequestCodec_;
- TDelayedExecutorCookie TimeoutCookie_;
+ TDelayedExecutorCookie WaitingTimeoutCookie_;
+ TDelayedExecutorCookie ExecutingTimeoutCookie_;
bool Cancelable_ = false;
- TSingleShotCallbackList<void()> CanceledList_;
+ TSingleShotCallbackList<void(const TError&)> CanceledList_;
const TInstant ArriveInstant_;
std::optional<TInstant> RunInstant_;
@@ -734,7 +805,8 @@ private:
}
FinishLatch_ = true;
- TDelayedExecutor::CancelAndClear(TimeoutCookie_);
+ TDelayedExecutor::CancelAndClear(WaitingTimeoutCookie_);
+ TDelayedExecutor::CancelAndClear(ExecutingTimeoutCookie_);
if (IsRegistrable()) {
Service_->UnregisterRequest(this);
@@ -809,8 +881,8 @@ private:
return;
}
if (Cancelable_) {
- TimeoutCookie_ = TDelayedExecutor::Submit(
- BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_),
+ ExecutingTimeoutCookie_ = TDelayedExecutor::Submit(
+ BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_, ERequestProcessingStage::Executing),
remainingTimeout);
}
}
@@ -819,8 +891,8 @@ private:
// TODO(lukyan): Wrap in CancelableExecution.
auto fiberCanceler = GetCurrentFiberCanceler();
if (fiberCanceler) {
- auto cancelationHandler = BIND([fiberCanceler = std::move(fiberCanceler)] {
- fiberCanceler(TError("RPC request canceled"));
+ auto cancelationHandler = BIND([fiberCanceler = std::move(fiberCanceler)] (const TError& error) {
+ fiberCanceler(error);
});
if (!CanceledList_.TrySubscribe(std::move(cancelationHandler))) {
YT_LOG_DEBUG("Request was canceled before being run (RequestId: %v)",
@@ -1212,6 +1284,10 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod
if (!Registered_.load(std::memory_order::relaxed)) {
Service_ = service;
RuntimeInfo_ = runtimeInfo;
+
+ RuntimeInfo_->ConcurrencyLimit.SubscribeUpdated(BIND(
+ &TRequestQueue::OnConcurrencyLimitChanged,
+ MakeWeak(this)));
}
Registered_.store(true, std::memory_order::release);
}
@@ -1219,6 +1295,13 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod
return true;
}
+void TRequestQueue::OnConcurrencyLimitChanged()
+{
+ if (QueueSize_.load() > 0) {
+ ScheduleRequestsFromQueue();
+ }
+}
+
void TRequestQueue::TRequestThrottler::Reconfigure(const TThroughputThrottlerConfigPtr& config)
{
Throttler->Reconfigure(config ? config : New<TThroughputThrottlerConfig>());
@@ -1270,7 +1353,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
{
// Fast path.
auto newConcurrencySemaphore = IncrementConcurrency();
- if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.load(std::memory_order::relaxed) &&
+ if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() &&
!AreThrottlersOverdrafted())
{
RunRequest(std::move(context));
@@ -1280,7 +1363,10 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context)
// Slow path.
DecrementConcurrency();
IncrementQueueSize();
- Queue_.enqueue(std::move(context));
+
+ context->BeforeEnqueued();
+ YT_VERIFY(Queue_.enqueue(std::move(context)));
+
ScheduleRequestsFromQueue();
}
@@ -1317,7 +1403,7 @@ void TRequestQueue::ScheduleRequestsFromQueue()
#endif
// NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler.
- auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.load(std::memory_order::relaxed);
+ auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit();
while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit) {
if (AreThrottlersOverdrafted()) {
SubscribeToThrottlers();
@@ -1325,15 +1411,22 @@ void TRequestQueue::ScheduleRequestsFromQueue()
}
TServiceBase::TServiceContextPtr context;
- while (!Queue_.try_dequeue(context)) {
- // False negatives are possible in Moody Camel.
- if (QueueSize_.load() > 0) {
- continue;
+ while (!context) {
+ if (!Queue_.try_dequeue(context)) {
+ // False negatives are possible in Moody Camel.
+ if (QueueSize_.load() > 0) {
+ continue;
+ }
+ return;
+ }
+
+ DecrementQueueSize();
+ context->AfterDequeued();
+ if (context->IsCanceled()) {
+ context.Reset();
}
- return;
}
- DecrementQueueSize();
IncrementConcurrency();
RunRequest(std::move(context));
}
@@ -1534,13 +1627,16 @@ void TServiceBase::HandleRequest(
auto* requestQueue = GetRequestQueue(runtimeInfo, *header);
RegisterRequestQueue(runtimeInfo, requestQueue);
+ auto maybeThrottled = GetThrottledError(*header);
+
if (requestQueue->IsQueueLimitSizeExceeded()) {
runtimeInfo->RequestQueueSizeLimitErrorCounter.Increment();
replyError(TError(
NRpc::EErrorCode::RequestQueueSizeLimitExceeded,
"Request queue size limit exceeded")
<< TErrorAttribute("limit", runtimeInfo->QueueSizeLimit.load())
- << TErrorAttribute("queue", requestQueue->GetName()));
+ << TErrorAttribute("queue", requestQueue->GetName())
+ << maybeThrottled);
return;
}
@@ -1554,7 +1650,8 @@ void TServiceBase::HandleRequest(
std::move(traceContext),
std::move(header),
std::move(message),
- requestQueue
+ requestQueue,
+ maybeThrottled
};
if (!IsAuthenticationNeeded(acceptedRequest)) {
@@ -1874,14 +1971,14 @@ TError TServiceBase::DoCheckRequestFeatures(const NRpc::NProto::TRequestHeader&
return {};
}
-void TServiceBase::OnRequestTimeout(TRequestId requestId, bool /*aborted*/)
+void TServiceBase::OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool /*aborted*/)
{
auto context = FindRequest(requestId);
if (!context) {
return;
}
- context->HandleTimeout();
+ context->HandleTimeout(stage);
}
void TServiceBase::OnReplyBusTerminated(const IBusPtr& bus, const TError& error)
@@ -2286,7 +2383,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
runtimeInfo->Heavy.store(descriptor.Options.Heavy);
runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit);
- runtimeInfo->ConcurrencyLimit.store(descriptor.ConcurrencyLimit);
+ runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit);
runtimeInfo->LogLevel.store(descriptor.LogLevel);
runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout);
@@ -2298,7 +2395,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe
return runtimeInfo->QueueSizeLimit.load(std::memory_order::relaxed);
});
profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] {
- return runtimeInfo->ConcurrencyLimit.load(std::memory_order::relaxed);
+ return runtimeInfo->ConcurrencyLimit.GetDynamicLimit();
});
return runtimeInfo;
@@ -2359,7 +2456,7 @@ void TServiceBase::DoConfigure(
const auto& descriptor = runtimeInfo->Descriptor;
runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy));
runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit));
- runtimeInfo->ConcurrencyLimit.store(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
+ runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit));
runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel));
runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout));
runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled)));
@@ -2386,10 +2483,15 @@ void TServiceBase::DoConfigure(
} catch (const std::exception& ex) {
THROW_ERROR_EXCEPTION("Error configuring RPC service %v",
ServiceId_.ServiceName)
- << ex;
+ << TError(ex);
}
}
+std::optional<TError> TServiceBase::GetThrottledError(const NProto::TRequestHeader& /*requestHeader*/)
+{
+ return {};
+}
+
void TServiceBase::Configure(
const TServiceCommonConfigPtr& configDefaults,
const INodePtr& configNode)
@@ -2401,7 +2503,7 @@ void TServiceBase::Configure(
} catch (const std::exception& ex) {
THROW_ERROR_EXCEPTION("Error parsing RPC service %v config",
ServiceId_.ServiceName)
- << ex;
+ << TError(ex);
}
} else {
config = New<TServiceConfig>();
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index 6dd81db2be..3545fe2f0d 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -47,6 +47,13 @@ namespace NYT::NRpc {
////////////////////////////////////////////////////////////////////////////////
+DEFINE_ENUM(ERequestProcessingStage,
+ (Waiting)
+ (Executing)
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
template <class TRequestMessage>
class TTypedServiceRequest
: public TRequestMessage
@@ -246,7 +253,7 @@ public:
underlyingContext->Reply(TError(
NRpc::EErrorCode::ProtocolError,
"Error deserializing request attachments")
- << ex);
+ << TError(ex));
return false;
}
@@ -458,6 +465,24 @@ TRequestQueuePtr CreateRequestQueue(TString name, const NProfiling::TProfiler& p
////////////////////////////////////////////////////////////////////////////////
+class TDynamicConcurrencyLimit
+{
+public:
+ DEFINE_SIGNAL(void(), Updated);
+
+ void Reconfigure(int limit);
+ int GetLimitFromConfiguration() const;
+
+ int GetDynamicLimit() const;
+ void SetDynamicLimit(std::optional<int> dynamicLimit);
+
+private:
+ std::atomic<int> ConfigLimit_ = 0;
+ std::atomic<int> DynamicLimit_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
//! Provides a base for implementing IService.
class TServiceBase
: public virtual IService
@@ -682,7 +707,10 @@ protected:
std::atomic<bool> Pooled = true;
std::atomic<int> QueueSizeLimit = 0;
- std::atomic<int> ConcurrencyLimit = 0;
+
+ TDynamicConcurrencyLimit ConcurrencyLimit;
+ std::atomic<double> WaitingTimeoutFraction = 0;
+
NProfiling::TCounter RequestQueueSizeLimitErrorCounter;
NProfiling::TCounter UnauthenticatedRequestsCounter;
@@ -763,7 +791,7 @@ protected:
//! Registers a method handler.
//! This call is must be performed prior to service registration.
- TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor);
+ virtual TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor);
//! Register a feature as being supported by server.
//! This call is must be performed prior to service registration.
@@ -815,6 +843,8 @@ protected:
const TServiceCommonConfigPtr& configDefaults,
const TServiceConfigPtr& config);
+ virtual std::optional<TError> GetThrottledError(const NProto::TRequestHeader& requestHeader);
+
protected:
void ReplyError(
TError error,
@@ -910,6 +940,7 @@ private:
std::unique_ptr<NRpc::NProto::TRequestHeader> Header;
TSharedRefArray Message;
TRequestQueue* RequestQueue;
+ std::optional<TError> ThrottledError;
};
void DoDeclareServerFeature(int featureId);
@@ -917,7 +948,7 @@ private:
TError DoCheckRequestProtocol(const NRpc::NProto::TRequestHeader& header);
TError DoCheckRequestFeatures(const NRpc::NProto::TRequestHeader& header);
- void OnRequestTimeout(TRequestId requestId, bool aborted);
+ void OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool aborted);
void OnReplyBusTerminated(const NYT::NBus::IBusPtr& bus, const TError& error);
void OnRequestAuthenticated(
@@ -1044,6 +1075,8 @@ private:
bool AreThrottlersOverdrafted() const;
void AcquireThrottlers(const TServiceBase::TServiceContextPtr& context);
void SubscribeToThrottlers();
+
+ void OnConcurrencyLimitChanged();
};
DEFINE_REFCOUNTED_TYPE(TRequestQueue)
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index fdc5354ca6..601d95c22c 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -228,7 +228,7 @@ public:
context->SetRequestInfo();
auto promise = NewPromise<void>();
- context->SubscribeCanceled(BIND([=] () mutable {
+ context->SubscribeCanceled(BIND([=] (const TError&) mutable {
promise.Set();
}));
diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h
index c78905c5b7..21204b12d9 100644
--- a/yt/yt/core/rpc/unittests/mock/service.h
+++ b/yt/yt/core/rpc/unittests/mock/service.h
@@ -198,14 +198,14 @@ public:
MOCK_METHOD(
void,
SubscribeCanceled,
- (const TCallback<void()>& callback),
+ (const TCallback<void(const TError&)>& callback),
(override)
);
MOCK_METHOD(
void,
UnsubscribeCanceled,
- (const TCallback<void()>& callback),
+ (const TCallback<void(const TError&)>& callback),
(override)
);