aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshakurov <shakurov@yandex-team.com>2023-11-29 15:06:48 +0300
committershakurov <shakurov@yandex-team.com>2023-11-29 17:10:11 +0300
commit7a36afae44e104d93478e1d97945c81bf4c6e1d3 (patch)
treee06ffc2b6e81c770a7cc446286a9734856be0f5b
parent4bf5797bef28b63bcd3085c9c45eb259ed92803a (diff)
downloadydb-7a36afae44e104d93478e1d97945c81bf4c6e1d3.tar.gz
In decorated automaton, offload setting LocalCommitPromise and response keeper's promise to a non-automaton thread.
-rw-r--r--yt/yt/core/rpc/response_keeper.cpp32
-rw-r--r--yt/yt/core/rpc/response_keeper.h17
2 files changed, 34 insertions, 15 deletions
diff --git a/yt/yt/core/rpc/response_keeper.cpp b/yt/yt/core/rpc/response_keeper.cpp
index 1f62ef041d..d055c4ca56 100644
--- a/yt/yt/core/rpc/response_keeper.cpp
+++ b/yt/yt/core/rpc/response_keeper.cpp
@@ -108,14 +108,17 @@ public:
return DoFindRequest(id, isRetry);
}
- void EndRequest(TMutationId id, TSharedRefArray response, bool remember) override
+ std::function<void()> EndRequest(
+ TMutationId id,
+ TSharedRefArray response,
+ bool remember) override
{
auto guard = WriterGuard(Lock_);
YT_ASSERT(id);
if (!Started_) {
- return;
+ return {};
}
if (!response) {
@@ -148,30 +151,37 @@ public:
}
}
+ guard.Release();
+
if (promise) {
- guard.Release();
- promise.TrySet(response);
+ return [promise = std::move(promise), response = std::move(response)] {
+ promise.TrySet(std::move(response));
+ };
+ } else {
+ return {};
}
}
- void EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember) override
+ std::function<void()> EndRequest(
+ TMutationId id,
+ TErrorOr<TSharedRefArray> responseOrError,
+ bool remember) override
{
YT_ASSERT(id);
if (responseOrError.IsOK()) {
- EndRequest(id, std::move(responseOrError.Value()), remember);
- return;
+ return EndRequest(id, std::move(responseOrError.Value()), remember);
}
auto guard = WriterGuard(Lock_);
if (!Started_) {
- return;
+ return {};
}
auto it = PendingResponses_.find(id);
if (it == PendingResponses_.end()) {
- return;
+ return {};
}
auto promise = std::move(it->second);
@@ -179,7 +189,9 @@ public:
guard.Release();
- promise.TrySet(TError(std::move(responseOrError)));
+ return [promise = std::move(promise), responseOrError = std::move(responseOrError)] {
+ promise.TrySet(std::move(responseOrError));
+ };
}
void CancelPendingRequests(const TError& error) override
diff --git a/yt/yt/core/rpc/response_keeper.h b/yt/yt/core/rpc/response_keeper.h
index ce75c8808b..70b15a89f5 100644
--- a/yt/yt/core/rpc/response_keeper.h
+++ b/yt/yt/core/rpc/response_keeper.h
@@ -59,15 +59,22 @@ public:
//! Called when a request with a given mutation #id is finished and a #response is ready.
/*
- * The latter #response is pushed to every subscriber waiting for the future
- * previously returned by #TryBeginRequest. Additionally, if #remember is true,
- * #response is remembered and returned by future calls to #TryBeginRequest.
+ * If #remember is true, the latter #response is remembered and returned by
+ * future calls to #TryBeginRequest.
+ * Additionally, a call to the returned function object will push #response
+ * to every subscriber waiting for the future previously returned by
+ * #TryBeginRequest. Such a call must be done, or the subscribers will get
+ * a 'promise abandoned' error.
+ * NB: the returned function object may be null if there weren't any
+ * requests associated with #mutationId (or if response keeper isn't started).
*/
- virtual void EndRequest(TMutationId id, TSharedRefArray response, bool remember = true) = 0;
+ [[nodiscard]]
+ virtual std::function<void()> EndRequest(TMutationId id, TSharedRefArray response, bool remember = true) = 0;
//! Similar to its non-error counterpart but also accepts errors.
//! Note that these are never remembered and are just propagated to the listeners.
- virtual void EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember = true) = 0;
+ [[nodiscard]]
+ virtual std::function<void()> EndRequest(TMutationId id, TErrorOr<TSharedRefArray> responseOrError, bool remember = true) = 0;
//! Forgets all pending requests, which were previously registered via #TryBeginRequest.
virtual void CancelPendingRequests(const TError& error) = 0;