summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yt/yt/core/http/helpers.cpp1
-rw-r--r--yt/yt/core/rpc/grpc/server.cpp3
-rw-r--r--yt/yt/core/rpc/service_detail.cpp32
-rw-r--r--yt/yt/core/rpc/service_detail.h2
4 files changed, 24 insertions, 14 deletions
diff --git a/yt/yt/core/http/helpers.cpp b/yt/yt/core/http/helpers.cpp
index fd96bd5c3f8..52e56c0dfab 100644
--- a/yt/yt/core/http/helpers.cpp
+++ b/yt/yt/core/http/helpers.cpp
@@ -162,6 +162,7 @@ static const auto HeadersWhitelist = JoinSeq(", ", std::vector<TString>{
"Content-Type",
"Accept",
"Cache-Control",
+ "Request-Timeout",
"X-Csrf-Token",
"X-YT-Parameters",
"X-YT-Parameters0",
diff --git a/yt/yt/core/rpc/grpc/server.cpp b/yt/yt/core/rpc/grpc/server.cpp
index d4eaf72ff47..169bd8cb1ea 100644
--- a/yt/yt/core/rpc/grpc/server.cpp
+++ b/yt/yt/core/rpc/grpc/server.cpp
@@ -339,9 +339,6 @@ private:
~TCallHandler()
{
- if (ReplyBus_) {
- ReplyBus_->Terminate(TError(NYT::EErrorCode::Canceled, "GRPC call completed"));
- }
Owner_->OnCallHandlerDestroyed();
}
diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp
index 140aacf8e2e..f99333ac739 100644
--- a/yt/yt/core/rpc/service_detail.cpp
+++ b/yt/yt/core/rpc/service_detail.cpp
@@ -335,10 +335,18 @@ public:
~TServiceContext()
{
- if (!Replied_ && !CanceledList_.IsFired()) {
+ if (!Replied_) {
// Prevent alerting.
RequestInfoSet_ = true;
- Reply(TError(NRpc::EErrorCode::Unavailable, "Service is unable to complete your request"));
+ if (CanceledList_.IsFired()) {
+ if (TimedOutLatch_) {
+ Reply(TError(NYT::EErrorCode::Timeout, "Request timed out"));
+ } else {
+ Reply(TError(NYT::EErrorCode::Canceled, "Request canceled"));
+ }
+ } else {
+ Reply(TError(NRpc::EErrorCode::Unavailable, "Service is unable to complete your request"));
+ }
}
Finish();
@@ -466,7 +474,7 @@ public:
RequestId_);
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
- static const auto CanceledError = TError("Request canceled");
+ static const auto CanceledError = TError(NYT::EErrorCode::Canceled, "Request canceled");
AbortStreamsUnlessClosed(CanceledError);
}
@@ -491,11 +499,12 @@ public:
stage);
if (RuntimeInfo_->Descriptor.StreamingEnabled) {
- static const auto TimedOutError = TError("Request timed out");
+ static const auto TimedOutError = TError(NYT::EErrorCode::Timeout, "Request timed out");
AbortStreamsUnlessClosed(TimedOutError);
}
CanceledList_.Fire(GetCanceledError());
+
MethodPerformanceCounters_->TimedOutRequestCounter.Increment();
// Guards from race with DoGuardedRun.
@@ -681,7 +690,7 @@ private:
bool IsRegistrable()
{
- if (RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable()) {
+ if (Cancelable_) {
return true;
}
@@ -725,11 +734,11 @@ private:
BuildGlobalRequestInfo();
+ Cancelable_ = RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable();
+
if (IsRegistrable()) {
Service_->RegisterRequest(this);
}
-
- Cancelable_ = RuntimeInfo_->Descriptor.Cancelable && !RequestHeader_->uncancelable();
}
void BuildGlobalRequestInfo()
@@ -2046,10 +2055,10 @@ void TServiceBase::OnRequestTimeout(TRequestId requestId, ERequestProcessingStag
context->HandleTimeout(stage);
}
-void TServiceBase::OnReplyBusTerminated(const IBusPtr& bus, const TError& error)
+void TServiceBase::OnReplyBusTerminated(const NYT::TWeakPtr<NYT::NBus::IBus>& busWeak, const TError& error)
{
std::vector<TServiceContextPtr> contexts;
- {
+ if (auto bus = busWeak.Lock()) {
auto* bucket = GetReplyBusBucket(bus);
auto guard = Guard(bucket->Lock);
auto it = bucket->ReplyBusToContexts.find(bus);
@@ -2111,7 +2120,7 @@ void TServiceBase::RegisterRequest(TServiceContext* context)
}
if (subscribe) {
- replyBus->SubscribeTerminated(BIND(&TServiceBase::OnReplyBusTerminated, MakeWeak(this), replyBus));
+ replyBus->SubscribeTerminated(BIND(&TServiceBase::OnReplyBusTerminated, MakeWeak(this), MakeWeak(replyBus.Get())));
}
auto pendingPayloads = GetAndErasePendingPayloads(requestId);
@@ -2144,6 +2153,9 @@ void TServiceBase::UnregisterRequest(TServiceContext* context)
if (it != bucket->ReplyBusToContexts.end()) {
auto& contexts = it->second;
contexts.erase(context);
+ if (contexts.empty()) {
+ bucket->ReplyBusToContexts.erase(it);
+ }
}
}
}
diff --git a/yt/yt/core/rpc/service_detail.h b/yt/yt/core/rpc/service_detail.h
index 5ae839ae0dd..dae71d0c1e3 100644
--- a/yt/yt/core/rpc/service_detail.h
+++ b/yt/yt/core/rpc/service_detail.h
@@ -955,7 +955,7 @@ private:
TError DoCheckRequestCodecs(const NRpc::NProto::TRequestHeader& header);
void OnRequestTimeout(TRequestId requestId, ERequestProcessingStage stage, bool aborted);
- void OnReplyBusTerminated(const NYT::NBus::IBusPtr& bus, const TError& error);
+ void OnReplyBusTerminated(const NYT::TWeakPtr<NYT::NBus::IBus>& busWeak, const TError& error);
void OnRequestAuthenticated(
const NProfiling::TWallTimer& timer,