diff options
author | shakurov <shakurov@yandex-team.com> | 2023-11-29 15:06:48 +0300 |
---|---|---|
committer | shakurov <shakurov@yandex-team.com> | 2023-11-29 17:10:11 +0300 |
commit | 7a36afae44e104d93478e1d97945c81bf4c6e1d3 (patch) | |
tree | e06ffc2b6e81c770a7cc446286a9734856be0f5b | |
parent | 4bf5797bef28b63bcd3085c9c45eb259ed92803a (diff) | |
download | ydb-7a36afae44e104d93478e1d97945c81bf4c6e1d3.tar.gz |
In decorated automaton, offload setting LocalCommitPromise and response keeper's promise to a non-automaton thread.
-rw-r--r-- | yt/yt/core/rpc/response_keeper.cpp | 32 | ||||
-rw-r--r-- | yt/yt/core/rpc/response_keeper.h | 17 |
2 files changed, 34 insertions, 15 deletions
diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp index 1f62ef041d..d055c4ca56 100644 --- a/yt/yt/core/rpc/response_keeper.cpp +++ b/yt/yt/core/rpc/response_keeper.cpp @@ -108,14 +108,17 @@ public: return DoFindRequest(id, isRetry); } - void EndRequest(TMutationId id, TSharedRefArray response, bool remember) override + std::function<void()> EndRequest( + TMutationId id, + TSharedRefArray response, + bool remember) override { auto guard = WriterGuard(Lock_); YT_ASSERT(id); if (!Started_) { - return; + return {}; } if (!response) { @@ -148,30 +151,37 @@ public: } } + guard.Release(); + if (promise) { - guard.Release(); - promise.TrySet(response); + return [promise = std::move(promise), response = std::move(response)] { + promise.TrySet(std::move(response)); + }; + } else { + return {}; } } - void EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember) override + std::function<void()> EndRequest( + TMutationId id, + TErrorOr<TSharedRefArray> responseOrError, + bool remember) override { YT_ASSERT(id); if (responseOrError.IsOK()) { - EndRequest(id, std::move(responseOrError.Value()), remember); - return; + return EndRequest(id, std::move(responseOrError.Value()), remember); } auto guard = WriterGuard(Lock_); if (!Started_) { - return; + return {}; } auto it = PendingResponses_.find(id); if (it == PendingResponses_.end()) { - return; + return {}; } auto promise = std::move(it->second); @@ -179,7 +189,9 @@ public: guard.Release(); - promise.TrySet(TError(std::move(responseOrError))); + return [promise = std::move(promise), responseOrError = std::move(responseOrError)] { + promise.TrySet(std::move(responseOrError)); + }; } void CancelPendingRequests(const TError& error) override diff --git a/yt/yt/core/rpc/response_keeper.h b/yt/yt/core/rpc/response_keeper.h index ce75c8808b..70b15a89f5 100644 --- a/yt/yt/core/rpc/response_keeper.h +++ b/yt/yt/core/rpc/response_keeper.h @@ -59,15 +59,22 @@ public: //! Called when a request with a given mutation #id is finished and a #response is ready. /* - * The latter #response is pushed to every subscriber waiting for the future - * previously returned by #TryBeginRequest. Additionally, if #remember is true, - * #response is remembered and returned by future calls to #TryBeginRequest. + * If #remember is true, the latter #response is remembered and returned by + * future calls to #TryBeginRequest. + * Additionally, a call to the returned function object will push #response + * to every subscriber waiting for the future previously returned by + * #TryBeginRequest. Such a call must be done, or the subscribers will get + * a 'promise abandoned' error. + * NB: the returned function object may be null if there weren't any + * requests associated with #mutationId (or if response keeper isn't started). */ - virtual void EndRequest(TMutationId id, TSharedRefArray response, bool remember = true) = 0; + [[nodiscard]] + virtual std::function<void()> EndRequest(TMutationId id, TSharedRefArray response, bool remember = true) = 0; //! Similar to its non-error counterpart but also accepts errors. //! Note that these are never remembered and are just propagated to the listeners. - virtual void EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember = true) = 0; + [[nodiscard]] + virtual std::function<void()> EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember = true) = 0; //! Forgets all pending requests, which were previously registered via #TryBeginRequest. virtual void CancelPendingRequests(const TError& error) = 0; |