diff options
| -rw-r--r-- | yt/yt/core/http/helpers.cpp | 1 | ||||
| -rw-r--r-- | yt/yt/core/rpc/grpc/server.cpp | 3 | ||||
| -rw-r--r-- | yt/yt/core/rpc/service_detail.cpp | 32 | ||||
| -rw-r--r-- | yt/yt/core/rpc/service_detail.h | 2 |
4 files changed, 24 insertions, 14 deletions
diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp index fd96bd5c3f8..52e56c0dfab 100644 --- a/yt/yt/core/http/helpers.cpp +++ b/yt/yt/core/http/helpers.cpp @@ -162,6 +162,7 @@ static const auto HeadersWhitelist = JoinSeq(", ", std::vector<TString>{ "Content-Type", "Accept", "Cache-Control", + "Request-Timeout", "X-Csrf-Token", "X-YT-Parameters", "X-YT-Parameters0", diff --git a/yt/yt/core/rpc/grpc/server.cpp b/yt/yt/core/rpc/grpc/server.cpp index d4eaf72ff47..169bd8cb1ea 100644 --- a/yt/yt/core/rpc/grpc/server.cpp +++ b/yt/yt/core/rpc/grpc/server.cpp @@ -339,9 +339,6 @@ private: ~TCallHandler() { - if (ReplyBus_) { - ReplyBus_->Terminate(TError(NYT::EErrorCode::Canceled, "GRPC call completed")); - } Owner_->OnCallHandlerDestroyed(); } diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 140aacf8e2e..f99333ac739 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -335,10 +335,18 @@ public: ~TServiceContext() { - if (!Replied_ && !CanceledList_.IsFired()) { + if (!Replied_) { // Prevent alerting. RequestInfoSet_ = true; - Reply(TError(NRpc::EErrorCode::Unavailable, "Service is unable to complete your request")); + if (CanceledList_.IsFired()) { + if (TimedOutLatch_) { + Reply(TError(NYT::EErrorCode::Timeout, "Request timed out")); + } else { + Reply(TError(NYT::EErrorCode::Canceled, "Request canceled")); + } + } else { + Reply(TError(NRpc::EErrorCode::Unavailable, "Service is unable to complete your request")); + } } Finish(); @@ -466,7 +474,7 @@ public: RequestId_); if (RuntimeInfo_->Descriptor.StreamingEnabled) { - static const auto CanceledError = TError("Request canceled"); + static const auto CanceledError = TError(NYT::EErrorCode::Canceled, "Request canceled"); AbortStreamsUnlessClosed(CanceledError); } @@ -491,11 +499,12 @@ public: stage); if (RuntimeInfo_->Descriptor.StreamingEnabled) { - static const auto TimedOutError = TError("Request timed out"); + static const auto TimedOutError = TError(NYT::EErrorCode::Timeout, "Request timed out"); AbortStreamsUnlessClosed(TimedOutError); } CanceledList_.Fire(GetCanceledError()); + MethodPerformanceCounters_->TimedOutRequestCounter.Increment(); // Guards from race with DoGuardedRun. @@ -681,7 +690,7 @@ private: bool IsRegistrable() { - if (RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable()) { + if (Cancelable_) { return true; } @@ -725,11 +734,11 @@ private: BuildGlobalRequestInfo(); + Cancelable_ = RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable(); + if (IsRegistrable()) { Service_->RegisterRequest(this); } - - Cancelable_ = RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable(); } void BuildGlobalRequestInfo() @@ -2046,10 +2055,10 @@ void TServiceBase::OnRequestTimeout(TRequestId requestId, ERequestProcessingStag context->HandleTimeout(stage); } -void TServiceBase::OnReplyBusTerminated(const IBusPtr& bus, const TError& error) +void TServiceBase::OnReplyBusTerminated(const NYT::TWeakPtr<NYT::NBus::IBus>& busWeak, const TError& error) { std::vector<TServiceContextPtr> contexts; - { + if (auto bus = busWeak.Lock()) { auto* bucket = GetReplyBusBucket(bus); auto guard = Guard(bucket->Lock); auto it = bucket->ReplyBusToContexts.find(bus); @@ -2111,7 +2120,7 @@ void TServiceBase::RegisterRequest(TServiceContext* context) } if (subscribe) { - replyBus->SubscribeTerminated(BIND(&TServiceBase::OnReplyBusTerminated, MakeWeak(this), replyBus)); + replyBus->SubscribeTerminated(BIND(&TServiceBase::OnReplyBusTerminated, MakeWeak(this), MakeWeak(replyBus.Get()))); } auto pendingPayloads = GetAndErasePendingPayloads(requestId); @@ -2144,6 +2153,9 @@ void TServiceBase::UnregisterRequest(TServiceContext* context) if (it != bucket->ReplyBusToContexts.end()) { auto& contexts = it->second; contexts.erase(context); + if (contexts.empty()) { + bucket->ReplyBusToContexts.erase(it); + } } } } diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h index 5ae839ae0dd..dae71d0c1e3 100644 --- a/yt/yt/core/rpc/service_detail.h +++ b/yt/yt/core/rpc/service_detail.h @@ -955,7 +955,7 @@ private: TError DoCheckRequestCodecs(const NRpc::NProto::TRequestHeader& header); void OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool aborted); - void OnReplyBusTerminated(const NYT::NBus::IBusPtr& bus, const TError& error); + void OnReplyBusTerminated(const NYT::TWeakPtr<NYT::NBus::IBus>& busWeak, const TError& error); void OnRequestAuthenticated( const NProfiling::TWallTimer& timer, |
