summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorignat <[email protected]>2023-12-13 12:53:13 +0300
committerignat <[email protected]>2023-12-13 13:45:04 +0300
commitc9afe4ec27e10d5b6b023166ec7e5f8624f6e538 (patch)
tree530f70af85e2ef266403d58e87a3a92c5db4b2fb
parent87a600285aeff7ae820a4ae457c642696d0105af (diff)
YT-20686: Fix TResponseKeeper
-rw-r--r--yt/yt/core/rpc/response_keeper.cpp22
1 files changed, 13 insertions, 9 deletions
diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp
index a5c6ac602c1..e2525ca596d 100644
--- a/yt/yt/core/rpc/response_keeper.cpp
+++ b/yt/yt/core/rpc/response_keeper.cpp
@@ -103,6 +103,7 @@ public:
return DoFindRequest(id, isRetry);
}
+ [[nodiscard]]
std::function<void()> EndRequest(
TMutationId id,
TSharedRefArray response,
@@ -157,6 +158,7 @@ public:
}
}
+ [[nodiscard]]
std::function<void()> EndRequest(
TMutationId id,
TErrorOr<TSharedRefArray> responseOrError,
@@ -227,10 +229,13 @@ public:
context->GetAsyncResponseMessage()
.Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TErrorOr<TSharedRefArray>& responseMessageOrError) {
if (!responseMessageOrError.IsOK()) {
- EndRequest(
+ if (auto setResponseKeeperPromise = EndRequest(
mutationId,
CreateErrorResponseMessage(responseMessageOrError),
- /*remember*/ false);
+ /*remember*/ false))
+ {
+ setResponseKeeperPromise();
+ }
return;
}
@@ -240,10 +245,9 @@ public:
YT_VERIFY(TryParseResponseHeader(responseMessage, &header));
bool remember = FromProto<NRpc::EErrorCode>(header.error().code()) != NRpc::EErrorCode::Unavailable;
- EndRequest(
- mutationId,
- responseMessage,
- remember);
+ if (auto setResponseKeeperPromise = EndRequest(mutationId, responseMessage, remember)) {
+ setResponseKeeperPromise();
+ }
}).Via(Invoker_));
}
@@ -344,7 +348,7 @@ private:
return;
}
- YT_LOG_DEBUG("Response Keeper eviction tick started");
+ YT_LOG_DEBUG("Response keeper eviction tick started");
NProfiling::TWallTimer timer;
int counter = 0;
@@ -358,7 +362,7 @@ private:
if (++counter % Config_->EvictionTickTimeCheckPeriod == 0) {
if (timer.GetElapsedTime() > Config_->MaxEvictionTickTime) {
- YT_LOG_DEBUG("Response Keeper eviction tick interrupted (ResponseCount: %v)",
+ YT_LOG_DEBUG("Response keeper eviction tick interrupted (ResponseCount: %v)",
counter);
return;
}
@@ -372,7 +376,7 @@ private:
ResponseEvictionQueue_.pop();
}
- YT_LOG_DEBUG("Response Keeper eviction tick completed (ResponseCount: %v)",
+ YT_LOG_DEBUG("Response keeper eviction tick completed (ResponseCount: %v)",
counter);
}
};