aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2024-07-08 11:34:06 +0300
committerbabenko <babenko@yandex-team.com>2024-07-08 11:47:33 +0300
commitad2c69ac0b84bf01a4301f1dddbf914407e10e38 (patch)
treedaae2a7a31fb295b38cf4646e750905c4190727a
parent237b7b346919fc6926fa8fdc17b104347d609e9c (diff)
downloadydb-ad2c69ac0b84bf01a4301f1dddbf914407e10e38.tar.gz
YT-22170: Make ITransaction::Abort retriable
7cae5eadbc9fe56378d96cd3e7c903d8a2ede95d
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.cpp37
-rw-r--r--yt/yt/client/api/rpc_proxy/transaction_impl.h3
2 files changed, 28 insertions, 12 deletions
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
index 112829ea2d..3aef87b49c 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp
@@ -902,7 +902,7 @@ TFuture<void> TTransaction::DoAbort(
{
VERIFY_SPINLOCK_AFFINITY(SpinLock_);
- if (State_ == ETransactionState::Aborting || State_ == ETransactionState::Aborted) {
+ if (AbortPromise_) {
return AbortPromise_.ToFuture();
}
@@ -912,43 +912,59 @@ TFuture<void> TTransaction::DoAbort(
auto alienTransactions = AlienTransactions_;
+ AbortPromise_ = NewPromise<void>();
+ auto abortFuture = AbortPromise_.ToFuture();
+
guard->Release();
auto req = Proxy_.AbortTransaction();
ToProto(req->mutable_transaction_id(), GetId());
- AbortPromise_.TrySetFrom(req->Invoke().Apply(
+ req->Invoke().Subscribe(
BIND([=, this, this_ = MakeStrong(this)] (const TApiServiceProxy::TErrorOrRspAbortTransactionPtr& rspOrError) {
{
auto guard = Guard(SpinLock_);
- if (State_ != ETransactionState::Aborting) {
+ if (!AbortPromise_) {
YT_LOG_DEBUG(rspOrError, "Transaction is no longer aborting, abort response ignored");
return;
}
+ TError abortError;
if (rspOrError.IsOK()) {
YT_LOG_DEBUG("Transaction aborted");
} else if (rspOrError.FindMatching(NTransactionClient::EErrorCode::NoSuchTransaction)) {
- YT_LOG_DEBUG("Transaction has expired or was already aborted, ignored");
+ YT_LOG_DEBUG("Transaction has expired or was already aborted");
} else {
YT_LOG_DEBUG(rspOrError, "Error aborting transaction");
- THROW_ERROR_EXCEPTION("Error aborting transaction %v",
+ abortError = TError("Error aborting transaction %v",
GetId())
<< rspOrError;
}
- State_ = ETransactionState::Aborted;
- }
+ if (abortError.IsOK()) {
+ State_ = ETransactionState::Aborted;
+ } else {
+ State_ = ETransactionState::AbortFailed;
+ }
+
+ auto abortPromise = std::exchange(AbortPromise_, TPromise<void>());
- Aborted_.Fire(TError("Transaction aborted by user request"));
- })));
+ guard.Release();
+
+ if (abortError.IsOK()) {
+ Aborted_.Fire(TError("Transaction aborted by user request"));
+ }
+
+ abortPromise.Set(std::move(abortError));
+ }
+ }));
for (const auto& transaction : alienTransactions) {
YT_UNUSED_FUTURE(transaction->Abort());
}
- return AbortPromise_.ToFuture();
+ return abortFuture;
}
TFuture<void> TTransaction::SendPing()
@@ -987,7 +1003,6 @@ TFuture<void> TTransaction::SendPing()
GetId());
if (fireAborted) {
- AbortPromise_.TrySet();
Aborted_.Fire(error);
}
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h
index db5138879f..588469d752 100644
--- a/yt/yt/client/api/rpc_proxy/transaction_impl.h
+++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h
@@ -22,6 +22,7 @@ DEFINE_ENUM(ETransactionState,
(FlushedModifications)
(Aborting)
(Aborted)
+ (AbortFailed)
(Detached)
);
@@ -269,7 +270,7 @@ private:
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
ETransactionState State_ = ETransactionState::Active;
- const TPromise<void> AbortPromise_ = NewPromise<void>();
+ TPromise<void> AbortPromise_;
std::vector<NApi::ITransactionPtr> AlienTransactions_;
THashSet<NObjectClient::TCellId> AdditionalParticipantCellIds_;