diff options
author | capone212 <capone212@yandex-team.com> | 2023-11-13 00:54:01 +0300 |
---|---|---|
committer | capone212 <capone212@yandex-team.com> | 2023-11-13 01:15:43 +0300 |
commit | 4abf85baae59e2393868790fa21539430c9cc1b0 (patch) | |
tree | ea203575ed085dbcb043f1beb0faed9e9bf43a5b | |
parent | e24f824435e26ee6d92e01d20813c6f9e23efe8d (diff) | |
download | ydb-4abf85baae59e2393868790fa21539430c9cc1b0.tar.gz |
YT-20212: Concurrency throttling for OverloadController
-rw-r--r-- | yt/yt/core/rpc/server_detail.cpp | 8 | ||||
-rw-r--r-- | yt/yt/core/rpc/server_detail.h | 12 | ||||
-rw-r--r-- | yt/yt/core/rpc/service.cpp | 23 | ||||
-rw-r--r-- | yt/yt/core/rpc/service.h | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 168 | ||||
-rw-r--r-- | yt/yt/core/rpc/service_detail.h | 41 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/mock/service.h | 4 |
8 files changed, 194 insertions, 66 deletions
diff --git a/yt/yt/core/rpc/server_detail.cpp b/yt/yt/core/rpc/server_detail.cpp index fdf7c65e10..b5526d5c43 100644 --- a/yt/yt/core/rpc/server_detail.cpp +++ b/yt/yt/core/rpc/server_detail.cpp @@ -207,10 +207,10 @@ bool TServiceContextBase::IsReplied() const return Replied_.load(); } -void TServiceContextBase::SubscribeCanceled(const TClosure& /*callback*/) +void TServiceContextBase::SubscribeCanceled(const TCanceledCallback& /*callback*/) { } -void TServiceContextBase::UnsubscribeCanceled(const TClosure& /*callback*/) +void TServiceContextBase::UnsubscribeCanceled(const TCanceledCallback& /*callback*/) { } void TServiceContextBase::SubscribeReplied(const TClosure& /*callback*/) @@ -593,12 +593,12 @@ void TServiceContextWrapper::SetComplete() UnderlyingContext_->SetComplete(); } -void TServiceContextWrapper::SubscribeCanceled(const TClosure& callback) +void TServiceContextWrapper::SubscribeCanceled(const TCanceledCallback& callback) { UnderlyingContext_->SubscribeCanceled(callback); } -void TServiceContextWrapper::UnsubscribeCanceled(const TClosure& callback) +void TServiceContextWrapper::UnsubscribeCanceled(const TCanceledCallback& callback) { UnderlyingContext_->UnsubscribeCanceled(callback); } diff --git a/yt/yt/core/rpc/server_detail.h b/yt/yt/core/rpc/server_detail.h index b746308d8b..6182658514 100644 --- a/yt/yt/core/rpc/server_detail.h +++ b/yt/yt/core/rpc/server_detail.h @@ -61,8 +61,10 @@ public: const TSharedRefArray& GetResponseMessage() const override; - void SubscribeCanceled(const TClosure& callback) override; - void UnsubscribeCanceled(const TClosure& callback) override; + using TCanceledCallback = TCallback<void(const TError&)>; + + void SubscribeCanceled(const TCanceledCallback& callback) override; + void UnsubscribeCanceled(const TCanceledCallback& callback) override; void SubscribeReplied(const TClosure& callback) override; void UnsubscribeReplied(const TClosure& callback) override; @@ -200,8 +202,10 @@ public: TFuture<TSharedRefArray> GetAsyncResponseMessage() const override; const TSharedRefArray& GetResponseMessage() const override; - void SubscribeCanceled(const TClosure& callback) override; - void UnsubscribeCanceled(const TClosure& callback) override; + using TCanceledCallback = TCallback<void(const TError&)>; + + void SubscribeCanceled(const TCanceledCallback& callback) override; + void UnsubscribeCanceled(const TCanceledCallback& callback) override; void SubscribeReplied(const TClosure& callback) override; void UnsubscribeReplied(const TClosure& callback) override; diff --git a/yt/yt/core/rpc/service.cpp b/yt/yt/core/rpc/service.cpp index 2ad7909776..95749e42d2 100644 --- a/yt/yt/core/rpc/service.cpp +++ b/yt/yt/core/rpc/service.cpp @@ -4,17 +4,6 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// -namespace { - -TError MakeCanceledError() -{ - return TError("RPC request canceled"); -} - -} // namespace - -//////////////////////////////////////////////////////////////////////////////// - void IServiceContext::SetRequestInfo() { SetRawRequestInfo(TString(), false); @@ -34,8 +23,8 @@ void IServiceContext::ReplyFrom(TFuture<TSharedRefArray> asyncMessage) Reply(TError(result)); } })); - SubscribeCanceled(BIND([asyncMessage = std::move(asyncMessage)] { - asyncMessage.Cancel(MakeCanceledError()); + SubscribeCanceled(BIND([asyncMessage = std::move(asyncMessage)] (const TError& error) { + asyncMessage.Cancel(error); })); } @@ -44,8 +33,8 @@ void IServiceContext::ReplyFrom(TFuture<void> asyncError) asyncError.Subscribe(BIND([this, this_ = MakeStrong(this)] (const TError& error) { Reply(error); })); - SubscribeCanceled(BIND([asyncError = std::move(asyncError)] { - asyncError.Cancel(MakeCanceledError()); + SubscribeCanceled(BIND([asyncError = std::move(asyncError)] (const TError& error) { + asyncError.Cancel(error); })); } @@ -55,8 +44,8 @@ void IServiceContext::ReplyFrom(TFuture<void> asyncError, const IInvokerPtr& inv Reply(error); }) .Via(invoker)); - SubscribeCanceled(BIND([asyncError = std::move(asyncError)] { - asyncError.Cancel(MakeCanceledError()); + SubscribeCanceled(BIND([asyncError = std::move(asyncError)] (const TError& error) { + asyncError.Cancel(error); })); } diff --git a/yt/yt/core/rpc/service.h b/yt/yt/core/rpc/service.h index 188e2f77aa..9dde3ec659 100644 --- a/yt/yt/core/rpc/service.h +++ b/yt/yt/core/rpc/service.h @@ -131,7 +131,7 @@ struct IServiceContext virtual bool IsCanceled() const = 0; //! Raised when request processing is canceled. - DECLARE_INTERFACE_SIGNAL(void(), Canceled); + DECLARE_INTERFACE_SIGNAL(void(const TError&), Canceled); //! Raised when Reply() was called. Allows doing some post-request stuff like extended structured logging. DECLARE_INTERFACE_SIGNAL(void(), Replied); diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 7a04ab78ed..c137e35452 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -32,6 +32,17 @@ #include <library/cpp/yt/misc/tls.h> +namespace NYT +{ + static TError operator<<(TError error, const std::optional<TError>& maybeError) + { + if (maybeError) { + return error << *maybeError; + } + return error; + } +} + namespace NYT::NRpc { using namespace NBus; @@ -76,6 +87,34 @@ THandlerInvocationOptions THandlerInvocationOptions::SetResponseCodec(NCompressi //////////////////////////////////////////////////////////////////////////////// +void TDynamicConcurrencyLimit::Reconfigure(int limit) +{ + ConfigLimit_.store(limit, std::memory_order::relaxed); + SetDynamicLimit(limit); +} + +int TDynamicConcurrencyLimit::GetLimitFromConfiguration() const +{ + return ConfigLimit_.load(std::memory_order::relaxed); +} + +int TDynamicConcurrencyLimit::GetDynamicLimit() const +{ + return DynamicLimit_.load(std::memory_order::relaxed); +} + +void TDynamicConcurrencyLimit::SetDynamicLimit(std::optional<int> dynamicLimit) +{ + auto limit = dynamicLimit.has_value() ? *dynamicLimit : ConfigLimit_.load(std::memory_order::relaxed); + auto oldLimit = DynamicLimit_.exchange(limit, std::memory_order::relaxed); + + if (oldLimit != limit) { + Updated_.Fire(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + TServiceBase::TMethodDescriptor::TMethodDescriptor( TString method, TLiteHandler liteHandler, @@ -264,12 +303,12 @@ public: , RuntimeInfo_(acceptedRequest.RuntimeInfo) , TraceContext_(std::move(acceptedRequest.TraceContext)) , RequestQueue_(acceptedRequest.RequestQueue) + , ThrottledError_(acceptedRequest.ThrottledError) , MethodPerformanceCounters_(Service_->GetMethodPerformanceCounters( RuntimeInfo_, {GetAuthenticationIdentity().UserTag, RequestQueue_})) , PerformanceCounters_(Service_->GetPerformanceCounters()) , ArriveInstant_(NProfiling::GetInstant()) - { YT_ASSERT(RequestMessage_); YT_ASSERT(ReplyBus_); @@ -371,12 +410,12 @@ public: invoker->Invoke(BIND(&TServiceContext::DoRun, MakeStrong(this), handler)); } - void SubscribeCanceled(const TClosure& callback) override + void SubscribeCanceled(const TCanceledCallback& callback) override { CanceledList_.Subscribe(callback); } - void UnsubscribeCanceled(const TClosure& callback) override + void UnsubscribeCanceled(const TCanceledCallback& callback) override { CanceledList_.Unsubscribe(callback); } @@ -396,9 +435,15 @@ public: return CanceledList_.IsFired(); } + TError GetCanceledError() const + { + return TError(NYT::EErrorCode::Canceled, "RPC request is canceled") + << ThrottledError_; + } + void Cancel() override { - if (!CanceledList_.Fire()) { + if (!CanceledList_.Fire(GetCanceledError())) { return; } @@ -420,21 +465,22 @@ public: DoSetComplete(); } - void HandleTimeout() + void HandleTimeout(ERequestProcessingStage stage) { if (TimedOutLatch_.exchange(true)) { return; } - YT_LOG_DEBUG("Request timed out, canceling (RequestId: %v)", - RequestId_); + YT_LOG_DEBUG("Request timed out, canceling (RequestId: %v, Stage: %v)", + RequestId_, + stage); if (RuntimeInfo_->Descriptor.StreamingEnabled) { static const auto TimedOutError = TError("Request timed out"); AbortStreamsUnlessClosed(TimedOutError); } - CanceledList_.Fire(); + CanceledList_.Fire(GetCanceledError()); MethodPerformanceCounters_->TimedOutRequestCounter.Increment(); // Guards from race with DoGuardedRun. @@ -554,6 +600,29 @@ public: } } + void BeforeEnqueued() + { + auto requestTimeout = GetTimeout(); + if (!requestTimeout) { + return; + } + + auto waitingTimeoutFraction = RuntimeInfo_->WaitingTimeoutFraction.load(std::memory_order::relaxed); + if (waitingTimeoutFraction == 0) { + return; + } + + auto waitingTimeout = *requestTimeout * waitingTimeoutFraction; + WaitingTimeoutCookie_ = TDelayedExecutor::Submit( + BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_, ERequestProcessingStage::Waiting), + waitingTimeout); + } + + void AfterDequeued() + { + TDelayedExecutor::CancelAndClear(WaitingTimeoutCookie_); + } + private: const TServiceBasePtr Service_; const TRequestId RequestId_; @@ -561,15 +630,17 @@ private: TRuntimeMethodInfo* const RuntimeInfo_; const TTraceContextPtr TraceContext_; TRequestQueue* const RequestQueue_; + std::optional<TError> ThrottledError_; TMethodPerformanceCounters* const MethodPerformanceCounters_; TPerformanceCounters* const PerformanceCounters_; NCompression::ECodec RequestCodec_; - TDelayedExecutorCookie TimeoutCookie_; + TDelayedExecutorCookie WaitingTimeoutCookie_; + TDelayedExecutorCookie ExecutingTimeoutCookie_; bool Cancelable_ = false; - TSingleShotCallbackList<void()> CanceledList_; + TSingleShotCallbackList<void(const TError&)> CanceledList_; const TInstant ArriveInstant_; std::optional<TInstant> RunInstant_; @@ -734,7 +805,8 @@ private: } FinishLatch_ = true; - TDelayedExecutor::CancelAndClear(TimeoutCookie_); + TDelayedExecutor::CancelAndClear(WaitingTimeoutCookie_); + TDelayedExecutor::CancelAndClear(ExecutingTimeoutCookie_); if (IsRegistrable()) { Service_->UnregisterRequest(this); @@ -809,8 +881,8 @@ private: return; } if (Cancelable_) { - TimeoutCookie_ = TDelayedExecutor::Submit( - BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_), + ExecutingTimeoutCookie_ = TDelayedExecutor::Submit( + BIND(&TServiceBase::OnRequestTimeout, Service_, RequestId_, ERequestProcessingStage::Executing), remainingTimeout); } } @@ -819,8 +891,8 @@ private: // TODO(lukyan): Wrap in CancelableExecution. auto fiberCanceler = GetCurrentFiberCanceler(); if (fiberCanceler) { - auto cancelationHandler = BIND([fiberCanceler = std::move(fiberCanceler)] { - fiberCanceler(TError("RPC request canceled")); + auto cancelationHandler = BIND([fiberCanceler = std::move(fiberCanceler)] (const TError& error) { + fiberCanceler(error); }); if (!CanceledList_.TrySubscribe(std::move(cancelationHandler))) { YT_LOG_DEBUG("Request was canceled before being run (RequestId: %v)", @@ -1212,6 +1284,10 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod if (!Registered_.load(std::memory_order::relaxed)) { Service_ = service; RuntimeInfo_ = runtimeInfo; + + RuntimeInfo_->ConcurrencyLimit.SubscribeUpdated(BIND( + &TRequestQueue::OnConcurrencyLimitChanged, + MakeWeak(this))); } Registered_.store(true, std::memory_order::release); } @@ -1219,6 +1295,13 @@ bool TRequestQueue::Register(TServiceBase* service, TServiceBase::TRuntimeMethod return true; } +void TRequestQueue::OnConcurrencyLimitChanged() +{ + if (QueueSize_.load() > 0) { + ScheduleRequestsFromQueue(); + } +} + void TRequestQueue::TRequestThrottler::Reconfigure(const TThroughputThrottlerConfigPtr& config) { Throttler->Reconfigure(config ? config : New<TThroughputThrottlerConfig>()); @@ -1270,7 +1353,7 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) { // Fast path. auto newConcurrencySemaphore = IncrementConcurrency(); - if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.load(std::memory_order::relaxed) && + if (newConcurrencySemaphore <= RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit() && !AreThrottlersOverdrafted()) { RunRequest(std::move(context)); @@ -1280,7 +1363,10 @@ void TRequestQueue::OnRequestArrived(TServiceBase::TServiceContextPtr context) // Slow path. DecrementConcurrency(); IncrementQueueSize(); - Queue_.enqueue(std::move(context)); + + context->BeforeEnqueued(); + YT_VERIFY(Queue_.enqueue(std::move(context))); + ScheduleRequestsFromQueue(); } @@ -1317,7 +1403,7 @@ void TRequestQueue::ScheduleRequestsFromQueue() #endif // NB: Racy, may lead to overcommit in concurrency semaphore and request bytes throttler. - auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.load(std::memory_order::relaxed); + auto concurrencyLimit = RuntimeInfo_->ConcurrencyLimit.GetDynamicLimit(); while (QueueSize_.load() > 0 && Concurrency_.load() < concurrencyLimit) { if (AreThrottlersOverdrafted()) { SubscribeToThrottlers(); @@ -1325,15 +1411,22 @@ void TRequestQueue::ScheduleRequestsFromQueue() } TServiceBase::TServiceContextPtr context; - while (!Queue_.try_dequeue(context)) { - // False negatives are possible in Moody Camel. - if (QueueSize_.load() > 0) { - continue; + while (!context) { + if (!Queue_.try_dequeue(context)) { + // False negatives are possible in Moody Camel. + if (QueueSize_.load() > 0) { + continue; + } + return; + } + + DecrementQueueSize(); + context->AfterDequeued(); + if (context->IsCanceled()) { + context.Reset(); } - return; } - DecrementQueueSize(); IncrementConcurrency(); RunRequest(std::move(context)); } @@ -1534,13 +1627,16 @@ void TServiceBase::HandleRequest( auto* requestQueue = GetRequestQueue(runtimeInfo, *header); RegisterRequestQueue(runtimeInfo, requestQueue); + auto maybeThrottled = GetThrottledError(*header); + if (requestQueue->IsQueueLimitSizeExceeded()) { runtimeInfo->RequestQueueSizeLimitErrorCounter.Increment(); replyError(TError( NRpc::EErrorCode::RequestQueueSizeLimitExceeded, "Request queue size limit exceeded") << TErrorAttribute("limit", runtimeInfo->QueueSizeLimit.load()) - << TErrorAttribute("queue", requestQueue->GetName())); + << TErrorAttribute("queue", requestQueue->GetName()) + << maybeThrottled); return; } @@ -1554,7 +1650,8 @@ void TServiceBase::HandleRequest( std::move(traceContext), std::move(header), std::move(message), - requestQueue + requestQueue, + maybeThrottled }; if (!IsAuthenticationNeeded(acceptedRequest)) { @@ -1874,14 +1971,14 @@ TError TServiceBase::DoCheckRequestFeatures(const NRpc::NProto::TRequestHeader& return {}; } -void TServiceBase::OnRequestTimeout(TRequestId requestId, bool /*aborted*/) +void TServiceBase::OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool /*aborted*/) { auto context = FindRequest(requestId); if (!context) { return; } - context->HandleTimeout(); + context->HandleTimeout(stage); } void TServiceBase::OnReplyBusTerminated(const IBusPtr& bus, const TError& error) @@ -2286,7 +2383,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe runtimeInfo->Heavy.store(descriptor.Options.Heavy); runtimeInfo->QueueSizeLimit.store(descriptor.QueueSizeLimit); - runtimeInfo->ConcurrencyLimit.store(descriptor.ConcurrencyLimit); + runtimeInfo->ConcurrencyLimit.Reconfigure(descriptor.ConcurrencyLimit); runtimeInfo->LogLevel.store(descriptor.LogLevel); runtimeInfo->LoggingSuppressionTimeout.store(descriptor.LoggingSuppressionTimeout); @@ -2298,7 +2395,7 @@ TServiceBase::TRuntimeMethodInfoPtr TServiceBase::RegisterMethod(const TMethodDe return runtimeInfo->QueueSizeLimit.load(std::memory_order::relaxed); }); profiler.AddFuncGauge("/concurrency_limit", MakeStrong(this), [=] { - return runtimeInfo->ConcurrencyLimit.load(std::memory_order::relaxed); + return runtimeInfo->ConcurrencyLimit.GetDynamicLimit(); }); return runtimeInfo; @@ -2359,7 +2456,7 @@ void TServiceBase::DoConfigure( const auto& descriptor = runtimeInfo->Descriptor; runtimeInfo->Heavy.store(methodConfig->Heavy.value_or(descriptor.Options.Heavy)); runtimeInfo->QueueSizeLimit.store(methodConfig->QueueSizeLimit.value_or(descriptor.QueueSizeLimit)); - runtimeInfo->ConcurrencyLimit.store(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); + runtimeInfo->ConcurrencyLimit.Reconfigure(methodConfig->ConcurrencyLimit.value_or(descriptor.ConcurrencyLimit)); runtimeInfo->LogLevel.store(methodConfig->LogLevel.value_or(descriptor.LogLevel)); runtimeInfo->LoggingSuppressionTimeout.store(methodConfig->LoggingSuppressionTimeout.value_or(descriptor.LoggingSuppressionTimeout)); runtimeInfo->Pooled.store(methodConfig->Pooled.value_or(config->Pooled.value_or(descriptor.Pooled))); @@ -2386,10 +2483,15 @@ void TServiceBase::DoConfigure( } catch (const std::exception& ex) { THROW_ERROR_EXCEPTION("Error configuring RPC service %v", ServiceId_.ServiceName) - << ex; + << TError(ex); } } +std::optional<TError> TServiceBase::GetThrottledError(const NProto::TRequestHeader& /*requestHeader*/) +{ + return {}; +} + void TServiceBase::Configure( const TServiceCommonConfigPtr& configDefaults, const INodePtr& configNode) @@ -2401,7 +2503,7 @@ void TServiceBase::Configure( } catch (const std::exception& ex) { THROW_ERROR_EXCEPTION("Error parsing RPC service %v config", ServiceId_.ServiceName) - << ex; + << TError(ex); } } else { config = New<TServiceConfig>(); diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index 6dd81db2be..3545fe2f0d 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -47,6 +47,13 @@ namespace NYT::NRpc { //////////////////////////////////////////////////////////////////////////////// +DEFINE_ENUM(ERequestProcessingStage, + (Waiting) + (Executing) +); + +//////////////////////////////////////////////////////////////////////////////// + template <class TRequestMessage> class TTypedServiceRequest : public TRequestMessage @@ -246,7 +253,7 @@ public: underlyingContext->Reply(TError( NRpc::EErrorCode::ProtocolError, "Error deserializing request attachments") - << ex); + << TError(ex)); return false; } @@ -458,6 +465,24 @@ TRequestQueuePtr CreateRequestQueue(TString name, const NProfiling::TProfiler& p //////////////////////////////////////////////////////////////////////////////// +class TDynamicConcurrencyLimit +{ +public: + DEFINE_SIGNAL(void(), Updated); + + void Reconfigure(int limit); + int GetLimitFromConfiguration() const; + + int GetDynamicLimit() const; + void SetDynamicLimit(std::optional<int> dynamicLimit); + +private: + std::atomic<int> ConfigLimit_ = 0; + std::atomic<int> DynamicLimit_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + //! Provides a base for implementing IService. class TServiceBase : public virtual IService @@ -682,7 +707,10 @@ protected: std::atomic<bool> Pooled = true; std::atomic<int> QueueSizeLimit = 0; - std::atomic<int> ConcurrencyLimit = 0; + + TDynamicConcurrencyLimit ConcurrencyLimit; + std::atomic<double> WaitingTimeoutFraction = 0; + NProfiling::TCounter RequestQueueSizeLimitErrorCounter; NProfiling::TCounter UnauthenticatedRequestsCounter; @@ -763,7 +791,7 @@ protected: //! Registers a method handler. //! This call is must be performed prior to service registration. - TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor); + virtual TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor); //! Register a feature as being supported by server. //! This call is must be performed prior to service registration. @@ -815,6 +843,8 @@ protected: const TServiceCommonConfigPtr& configDefaults, const TServiceConfigPtr& config); + virtual std::optional<TError> GetThrottledError(const NProto::TRequestHeader& requestHeader); + protected: void ReplyError( TError error, @@ -910,6 +940,7 @@ private: std::unique_ptr<NRpc::NProto::TRequestHeader> Header; TSharedRefArray Message; TRequestQueue* RequestQueue; + std::optional<TError> ThrottledError; }; void DoDeclareServerFeature(int featureId); @@ -917,7 +948,7 @@ private: TError DoCheckRequestProtocol(const NRpc::NProto::TRequestHeader& header); TError DoCheckRequestFeatures(const NRpc::NProto::TRequestHeader& header); - void OnRequestTimeout(TRequestId requestId, bool aborted); + void OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool aborted); void OnReplyBusTerminated(const NYT::NBus::IBusPtr& bus, const TError& error); void OnRequestAuthenticated( @@ -1044,6 +1075,8 @@ private: bool AreThrottlersOverdrafted() const; void AcquireThrottlers(const TServiceBase::TServiceContextPtr& context); void SubscribeToThrottlers(); + + void OnConcurrencyLimitChanged(); }; DEFINE_REFCOUNTED_TYPE(TRequestQueue) diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index fdc5354ca6..601d95c22c 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -228,7 +228,7 @@ public: context->SetRequestInfo(); auto promise = NewPromise<void>(); - context->SubscribeCanceled(BIND([=] () mutable { + context->SubscribeCanceled(BIND([=] (const TError&) mutable { promise.Set(); })); diff --git a/yt/yt/core/rpc/unittests/mock/service.h b/yt/yt/core/rpc/unittests/mock/service.h index c78905c5b7..21204b12d9 100644 --- a/yt/yt/core/rpc/unittests/mock/service.h +++ b/yt/yt/core/rpc/unittests/mock/service.h @@ -198,14 +198,14 @@ public: MOCK_METHOD( void, SubscribeCanceled, - (const TCallback<void()>& callback), + (const TCallback<void(const TError&)>& callback), (override) ); MOCK_METHOD( void, UnsubscribeCanceled, - (const TCallback<void()>& callback), + (const TCallback<void(const TError&)>& callback), (override) ); |