diff options
author | babenko <babenko@yandex-team.com> | 2024-07-08 11:34:06 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2024-07-08 11:47:33 +0300 |
commit | ad2c69ac0b84bf01a4301f1dddbf914407e10e38 (patch) | |
tree | daae2a7a31fb295b38cf4646e750905c4190727a | |
parent | 237b7b346919fc6926fa8fdc17b104347d609e9c (diff) | |
download | ydb-ad2c69ac0b84bf01a4301f1dddbf914407e10e38.tar.gz |
YT-22170: Make ITransaction::Abort retriable
7cae5eadbc9fe56378d96cd3e7c903d8a2ede95d
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.cpp | 37 | ||||
-rw-r--r-- | yt/yt/client/api/rpc_proxy/transaction_impl.h | 3 |
2 files changed, 28 insertions, 12 deletions
diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp index 112829ea2d..3aef87b49c 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.cpp +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.cpp @@ -902,7 +902,7 @@ TFuture<void> TTransaction::DoAbort( { VERIFY_SPINLOCK_AFFINITY(SpinLock_); - if (State_ == ETransactionState::Aborting || State_ == ETransactionState::Aborted) { + if (AbortPromise_) { return AbortPromise_.ToFuture(); } @@ -912,43 +912,59 @@ TFuture<void> TTransaction::DoAbort( auto alienTransactions = AlienTransactions_; + AbortPromise_ = NewPromise<void>(); + auto abortFuture = AbortPromise_.ToFuture(); + guard->Release(); auto req = Proxy_.AbortTransaction(); ToProto(req->mutable_transaction_id(), GetId()); - AbortPromise_.TrySetFrom(req->Invoke().Apply( + req->Invoke().Subscribe( BIND([=, this, this_ = MakeStrong(this)] (const TApiServiceProxy::TErrorOrRspAbortTransactionPtr& rspOrError) { { auto guard = Guard(SpinLock_); - if (State_ != ETransactionState::Aborting) { + if (!AbortPromise_) { YT_LOG_DEBUG(rspOrError, "Transaction is no longer aborting, abort response ignored"); return; } + TError abortError; if (rspOrError.IsOK()) { YT_LOG_DEBUG("Transaction aborted"); } else if (rspOrError.FindMatching(NTransactionClient::EErrorCode::NoSuchTransaction)) { - YT_LOG_DEBUG("Transaction has expired or was already aborted, ignored"); + YT_LOG_DEBUG("Transaction has expired or was already aborted"); } else { YT_LOG_DEBUG(rspOrError, "Error aborting transaction"); - THROW_ERROR_EXCEPTION("Error aborting transaction %v", + abortError = TError("Error aborting transaction %v", GetId()) << rspOrError; } - State_ = ETransactionState::Aborted; - } + if (abortError.IsOK()) { + State_ = ETransactionState::Aborted; + } else { + State_ = ETransactionState::AbortFailed; + } + + auto abortPromise = std::exchange(AbortPromise_, TPromise<void>()); - Aborted_.Fire(TError("Transaction aborted by user request")); - }))); + guard.Release(); + + if (abortError.IsOK()) { + Aborted_.Fire(TError("Transaction aborted by user request")); + } + + abortPromise.Set(std::move(abortError)); + } + })); for (const auto& transaction : alienTransactions) { YT_UNUSED_FUTURE(transaction->Abort()); } - return AbortPromise_.ToFuture(); + return abortFuture; } TFuture<void> TTransaction::SendPing() @@ -987,7 +1003,6 @@ TFuture<void> TTransaction::SendPing() GetId()); if (fireAborted) { - AbortPromise_.TrySet(); Aborted_.Fire(error); } diff --git a/yt/yt/client/api/rpc_proxy/transaction_impl.h b/yt/yt/client/api/rpc_proxy/transaction_impl.h index db5138879f..588469d752 100644 --- a/yt/yt/client/api/rpc_proxy/transaction_impl.h +++ b/yt/yt/client/api/rpc_proxy/transaction_impl.h @@ -22,6 +22,7 @@ DEFINE_ENUM(ETransactionState, (FlushedModifications) (Aborting) (Aborted) + (AbortFailed) (Detached) ); @@ -269,7 +270,7 @@ private: YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); ETransactionState State_ = ETransactionState::Active; - const TPromise<void> AbortPromise_ = NewPromise<void>(); + TPromise<void> AbortPromise_; std::vector<NApi::ITransactionPtr> AlienTransactions_; THashSet<NObjectClient::TCellId> AdditionalParticipantCellIds_; |