diff options
| author | ignat <[email protected]> | 2023-12-13 12:53:13 +0300 |
|---|---|---|
| committer | ignat <[email protected]> | 2023-12-13 13:45:04 +0300 |
| commit | c9afe4ec27e10d5b6b023166ec7e5f8624f6e538 (patch) | |
| tree | 530f70af85e2ef266403d58e87a3a92c5db4b2fb | |
| parent | 87a600285aeff7ae820a4ae457c642696d0105af (diff) | |
YT-20686: Fix TResponseKeeper
| -rw-r--r-- | yt/yt/core/rpc/response_keeper.cpp | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp index a5c6ac602c1..e2525ca596d 100644 --- a/yt/yt/core/rpc/response_keeper.cpp +++ b/yt/yt/core/rpc/response_keeper.cpp @@ -103,6 +103,7 @@ public: return DoFindRequest(id, isRetry); } + [[nodiscard]] std::function<void()> EndRequest( TMutationId id, TSharedRefArray response, @@ -157,6 +158,7 @@ public: } } + [[nodiscard]] std::function<void()> EndRequest( TMutationId id, TErrorOr<TSharedRefArray> responseOrError, @@ -227,10 +229,13 @@ public: context->GetAsyncResponseMessage() .Subscribe(BIND([=, this, this_ = MakeStrong(this)] (const TErrorOr<TSharedRefArray>& responseMessageOrError) { if (!responseMessageOrError.IsOK()) { - EndRequest( + if (auto setResponseKeeperPromise = EndRequest( mutationId, CreateErrorResponseMessage(responseMessageOrError), - /*remember*/ false); + /*remember*/ false)) + { + setResponseKeeperPromise(); + } return; } @@ -240,10 +245,9 @@ public: YT_VERIFY(TryParseResponseHeader(responseMessage, &header)); bool remember = FromProto<NRpc::EErrorCode>(header.error().code()) != NRpc::EErrorCode::Unavailable; - EndRequest( - mutationId, - responseMessage, - remember); + if (auto setResponseKeeperPromise = EndRequest(mutationId, responseMessage, remember)) { + setResponseKeeperPromise(); + } }).Via(Invoker_)); } @@ -344,7 +348,7 @@ private: return; } - YT_LOG_DEBUG("Response Keeper eviction tick started"); + YT_LOG_DEBUG("Response keeper eviction tick started"); NProfiling::TWallTimer timer; int counter = 0; @@ -358,7 +362,7 @@ private: if (++counter % Config_->EvictionTickTimeCheckPeriod == 0) { if (timer.GetElapsedTime() > Config_->MaxEvictionTickTime) { - YT_LOG_DEBUG("Response Keeper eviction tick interrupted (ResponseCount: %v)", + YT_LOG_DEBUG("Response keeper eviction tick interrupted (ResponseCount: %v)", counter); return; } @@ -372,7 +376,7 @@ private: ResponseEvictionQueue_.pop(); } - YT_LOG_DEBUG("Response Keeper eviction tick completed (ResponseCount: %v)", + YT_LOG_DEBUG("Response keeper eviction tick completed (ResponseCount: %v)", counter); } }; |
