diff options
author | snaury <snaury@ydb.tech> | 2023-04-07 23:32:10 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-04-07 23:32:10 +0300 |
commit | 8aea54cda7486e7df055e9a7fbf1b2f7859b4870 (patch) | |
tree | 6a375e8d82351e15916e7c1e547beb523f510eea | |
parent | d055c2573aa4521f926c24de1780f796b0dfe923 (diff) | |
download | ydb-8aea54cda7486e7df055e9a7fbf1b2f7859b4870.tar.gz |
Allow explicit rollback in transactions
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 21 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_seat.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_exec_seat.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_tx_env.h | 10 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_part_iface.h | 7 | ||||
-rw-r--r-- | ydb/core/tablet_flat/tablet_flat_executor.h | 19 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_dbase.h | 15 | ||||
-rw-r--r-- | ydb/core/tablet_flat/ut/ut_db_iface.cpp | 30 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_snapshots.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/direct_tx_unit.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_data_tx_unit.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 29 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 2 |
17 files changed, 151 insertions, 66 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 656a6cbdde8..93c3411da37 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -412,6 +412,19 @@ void TDatabase::Begin(TTxStamp stamp, IPages& env) NoMoreReadsFlag = false; } +void TDatabase::RollbackChanges() +{ + Y_VERIFY(Redo, "Transaction is not in progress"); + Y_VERIFY(Env); + + TTxStamp stamp = Change->Stamp; + IPages& env = *Env; + + Commit(stamp, false, nullptr); + env.OnRollbackChanges(); + Begin(stamp, env); +} + TPartView TDatabase::GetPartView(ui32 tableId, const TLogoBlobID &bundle) const { return Require(tableId)->GetPartView(bundle); } @@ -629,10 +642,13 @@ bool TDatabase::HasChanges() const TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator *cookieAllocator) { + TVector<std::function<void()>> onPersistent; + if (commit) { for (auto& callback : OnCommit_) { callback(); } + onPersistent = std::move(OnPersistent_); } else { auto it = OnRollback_.rbegin(); auto end = OnRollback_.rend(); @@ -644,6 +660,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator OnCommit_.clear(); OnRollback_.clear(); + OnPersistent_.clear(); TempIterators.clear(); @@ -747,7 +764,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator Alter_ = nullptr; Env = nullptr; - return { std::move(Change) }; + return { std::move(Change), std::move(onPersistent) }; } TTable* TDatabase::Require(ui32 table) const noexcept diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 6da0eff70e3..9410e4906c0 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -45,6 +45,7 @@ public: struct TProd { THolder<TChange> Change; + TVector<std::function<void()>> OnPersistent; }; struct TChg { @@ -201,6 +202,13 @@ public: */ bool HasChanges() const; + /** + * Rollback all current transaction changes + * + * Similar to aborting transaction and then starting a new one + */ + void RollbackChanges(); + // executor interface void Begin(TTxStamp, IPages& env); TProd Commit(TTxStamp, bool commit, TCookieAllocator* = nullptr); @@ -213,7 +221,7 @@ public: TCompactionStats GetCompactionStats(ui32 table) const; /** - * Adds callback, which is called when database changes are committed + * Adds a callback, which is called when database changes are committed */ template<class TCallback> void OnCommit(TCallback&& callback) { @@ -221,7 +229,7 @@ public: } /** - * Adds callback, which is called when database changes are rolled back + * Adds a callback, which is called when database changes are rolled back * * @param callback */ @@ -230,6 +238,14 @@ public: OnRollback_.emplace_back(std::forward<TCallback>(callback)); } + /** + * Adds a callback, which is called when database changes are persistent + */ + template<class TCallback> + void OnPersistent(TCallback&& callback) { + OnPersistent_.emplace_back(std::forward<TCallback>(callback)); + } + private: TTable* Require(ui32 tableId) const noexcept; TTable* RequireForUpdate(ui32 tableId) const noexcept; @@ -252,6 +268,7 @@ private: TVector<std::function<void()>> OnCommit_; TVector<std::function<void()>> OnRollback_; + TVector<std::function<void()>> OnPersistent_; }; diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp index f113d1c811b..e4f4e683221 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.cpp +++ b/ydb/core/tablet_flat/flat_exec_seat.cpp @@ -4,7 +4,7 @@ namespace NKikimr { namespace NTabletFlatExecutor { void TSeat::Complete(const TActorContext& ctx) noexcept { - for (auto& callback : OnCommitted) { + for (auto& callback : OnPersistent) { callback(); } Self->Complete(ctx); diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h index f37d3bfb0b6..2b3cd3a9fd8 100644 --- a/ydb/core/tablet_flat/flat_exec_seat.h +++ b/ydb/core/tablet_flat/flat_exec_seat.h @@ -52,7 +52,7 @@ namespace NTabletFlatExecutor { TAutoPtr<TMemoryToken> AttachedMemory; TIntrusivePtr<TMemoryGCToken> CapturedMemory; - TVector<std::function<void()>> OnCommitted; + TVector<std::function<void()>> OnPersistent; ETerminationReason TerminationReason = ETerminationReason::None; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 4934e4658a1..b44da9ed047 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1615,10 +1615,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct } else if (done) { Y_VERIFY(!txc.IsRescheduled()); Y_VERIFY(!seat->RequestedMemory); - for (auto it = txc.OnCommit_.begin(); it != txc.OnCommit_.end(); ++it) { - (*it)(); - } - seat->OnCommitted = std::move(txc.OnCommitted_); + seat->OnPersistent = std::move(prod.OnPersistent); CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx); } else { Y_VERIFY(!seat->CapturedMemory); @@ -1626,9 +1623,6 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type " << NFmt::Do(*seat->Self) << " postoned w/o demands"); } - for (auto it = txc.OnRollback_.rbegin(); it != txc.OnRollback_.rend(); ++it) { - (*it)(); - } PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx); } diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h index bfad1f626fc..6288d5876ba 100644 --- a/ydb/core/tablet_flat/flat_executor_tx_env.h +++ b/ydb/core/tablet_flat/flat_executor_tx_env.h @@ -139,6 +139,16 @@ namespace NTabletFlatExecutor { || LoanConfirmation; } + protected: + void OnRollbackChanges() noexcept override { + MakeSnap.clear(); + DropSnap.Reset(); + BorrowUpdates.clear(); + LoanBundle.clear(); + LoanTxStatus.clear(); + LoanConfirmation.clear(); + } + protected: /* IExecuting, tx stage func implementation */ void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override; diff --git a/ydb/core/tablet_flat/flat_part_iface.h b/ydb/core/tablet_flat/flat_part_iface.h index bbfeab8f5de..68ca39afb0d 100644 --- a/ydb/core/tablet_flat/flat_part_iface.h +++ b/ydb/core/tablet_flat/flat_part_iface.h @@ -69,6 +69,13 @@ namespace NTable { virtual TResult Locate(const TMemTable*, ui64 ref, ui32 tag) noexcept = 0; virtual TResult Locate(const TPart*, ui64 ref, ELargeObj lob) noexcept = 0; virtual const TSharedData* TryGetPage(const TPart* part, TPageId id, TGroupId groupId = { }) = 0; + + /** + * Hook for cleaning up env on DB.RollbackChanges() + */ + virtual void OnRollbackChanges() noexcept { + // nothing by default + } }; } diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index eb452378f97..9b611f793e1 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -211,22 +211,10 @@ public: ~TTransactionContext() {} - void OnCommit(std::function<void()> callback) { - OnCommit_.emplace_back(std::move(callback)); - } - - void OnRollback(std::function<void()> callback) { - OnRollback_.emplace_back(std::move(callback)); - } - - void OnCommitted(std::function<void()> callback) { - OnCommitted_.emplace_back(std::move(callback)); - } - /** - * Will rollback any changes and reschedule transaction for a retry + * Request transaction to restart at some later time * - * Transaction must return false from its Execute after calling this method. + * Transaction's Execute method must return false after calling this method. */ void Reschedule() { Rescheduled_ = true; @@ -244,9 +232,6 @@ public: NTable::TDatabase &DB; private: - TVector<std::function<void()>> OnCommit_; - TVector<std::function<void()>> OnRollback_; - TVector<std::function<void()>> OnCommitted_; bool Rescheduled_ = false; }; diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h index 07cb17101a9..5f3ecaf1bd4 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h +++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h @@ -141,6 +141,14 @@ namespace NTest { return *this; } + TDbExec& RollbackChanges() { + Y_VERIFY(OnTx != EOnTx::None); + + Base->RollbackChanges(); + + return *this; + } + TDbExec& Add(ui32 table, const TRow &row, ERowOp rop = ERowOp::Upsert) { const NTest::TRowTool tool(RowSchemeFor(table)); @@ -439,7 +447,8 @@ namespace NTest { if (was != (real ? EOnTx::Real : EOnTx::Auto)) Y_FAIL("There is no active dbase tx"); - auto up = Base->Commit({ Gen, Step }, apply, Annex.Get()).Change; + auto prod = Base->Commit({ Gen, Step }, apply, Annex.Get()); + auto up = std::move(prod.Change); Env.reset(); Last = Max<ui32>(), Altered = false; @@ -461,6 +470,10 @@ namespace NTest { } RedoLog.emplace_back(std::move(up)); + + for (auto& callback : prod.OnPersistent) { + callback(); + } } ReadVersion = TRowVersion::Max(); diff --git a/ydb/core/tablet_flat/ut/ut_db_iface.cpp b/ydb/core/tablet_flat/ut/ut_db_iface.cpp index dd35dcbcc58..53b893940b1 100644 --- a/ydb/core/tablet_flat/ut/ut_db_iface.cpp +++ b/ydb/core/tablet_flat/ut/ut_db_iface.cpp @@ -441,6 +441,36 @@ Y_UNIT_TEST_SUITE(DBase) { UNIT_ASSERT(closed->Frozen.at(0)->GetBlobs()->Head == 3); } + Y_UNIT_TEST(AnnexRollbackChanges) + { + auto alter = MakeAlter(1); + + alter.SetRedo(32).SetFamilyBlobs(1, 0, Max<ui32>(), 24); + + TDbExec me; + + me.To(10).Begin().Apply(*alter.Flush()).Commit(); + + const TString large35("0123456789abcdef0123456789abcdef012"); + const TString large42("0123456789abcdef0123456789abcdef0123456789"); + + me.To(12).Begin().PutN(1, "l35", 35_u64, large35); + me.To(13).RollbackChanges().PutN(1, "l42", 42_u64, large42).Commit(); + me.To(14).Iter(1) + .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42) + .Next().Is(EReady::Gone); + + UNIT_ASSERT(me.BackLog().Annex.size() == 1); + UNIT_ASSERT(me.BackLog().Annex[0].Data.size() == 8 + 42); + + me.To(21).Replay(EPlay::Boot).Iter(1) + .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42) + .Next().Is(EReady::Gone); + me.To(22).Replay(EPlay::Redo).Iter(1) + .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42) + .Next().Is(EReady::Gone); + } + Y_UNIT_TEST(Outer) { auto alter = MakeAlter(1); diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index b40a9c8057c..9b489f0f108 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -171,7 +171,7 @@ bool TSnapshotManager::PromoteCompleteEdge(const TRowVersion& version, TTransact NIceDb::TNiceDb db(txc.DB); SetCompleteEdge(db, version); - txc.OnCommitted([this, edge = CompleteEdge] { + txc.DB.OnPersistent([this, edge = CompleteEdge] { this->CommittedCompleteEdge = edge; }); @@ -294,7 +294,7 @@ void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedRea PerformedUnprotectedReads = performedUnprotectedReads; PerformedUnprotectedReadsUncommitted++; - txc.OnCommitted([this] { + txc.DB.OnPersistent([this] { this->PerformedUnprotectedReadsUncommitted--; }); } @@ -420,7 +420,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext Y_FAIL("Unexpected mvcc state# %d", (ui32)state); } - txc.OnCommitted([this, edge = CompleteEdge] { + txc.DB.OnPersistent([this, edge = CompleteEdge] { this->CommittedCompleteEdge = edge; }); diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index d469f83e849..b5651fbd64c 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -56,15 +56,16 @@ public: op->SetWaitingForGlobalTxIdFlag(); if (txc.DB.HasChanges()) { - txc.Reschedule(); - return EExecutionStatus::Restart; + txc.DB.RollbackChanges(); } return EExecutionStatus::Continue; } if (Pipeline.AddLockDependencies(op, guardLocks)) { - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } op->ChangeRecords() = std::move(tx->GetCollectedChanges()); diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp index cc31f453a65..df567b199cb 100644 --- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp @@ -47,8 +47,10 @@ public: txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion); if (Pipeline.AddLockDependencies(op, guardLocks)) { - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE); diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 802af68b786..c4a1995d9ae 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -30,7 +30,7 @@ private: void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx); private: - class TRescheduleOpException : public yexception {}; + class TRollbackAndWaitException : public yexception {}; }; TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard, @@ -193,15 +193,17 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, tx->ReleaseTxData(txc, ctx); return EExecutionStatus::Restart; - } catch (const TRescheduleOpException&) { + } catch (const TRollbackAndWaitException&) { LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID() - << " needs to reschedule " << *op << " for dependencies"); + << " needs to wait " << *op << " for dependencies"); tx->GetDataTx()->ResetCollectedChanges(); tx->ReleaseTxData(txc, ctx); - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } DataShard.IncCounter(COUNTER_WAIT_EXECUTE_LATENCY_MS, waitExecuteLatency.MilliSeconds()); @@ -253,7 +255,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, IEngineFlat::EResult engineResult = engine->Execute(); if (Pipeline.AddLockDependencies(op, guardLocks)) { - throw TRescheduleOpException(); + throw TRollbackAndWaitException(); } if (engineResult != IEngineFlat::EResult::Ok) { diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 6a27d5a7db3..46943e1f394 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -63,13 +63,17 @@ public: Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId); } - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } if (Pipeline.AddLockDependencies(op, guardLocks)) { - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } if (changeCollector) { @@ -95,8 +99,10 @@ public: } if (Pipeline.AddLockDependencies(op, guardLocks)) { - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } } else { Y_FAIL_S("Invalid distributed erase tx: " << eraseTx->GetBody().ShortDebugString()); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 825f057ab75..5f111993c5a 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -264,11 +264,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio tx->ReleaseTxData(txc, ctx); // Transaction may have made some changes before it detected - // inconsistency, so we need to roll them back. We do this by - // marking transaction for reschedule and restarting. The next - // cycle will detect aborted operation and move along. - txc.Reschedule(); - return EExecutionStatus::Restart; + // inconsistency, so we need to roll them back. + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Executed; } if (!result && computeCtx.IsTabletNotReady()) { @@ -291,8 +291,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio // Rollback database changes, if any if (txc.DB.HasChanges()) { - txc.Reschedule(); - return EExecutionStatus::Restart; + txc.DB.RollbackChanges(); } return EExecutionStatus::Continue; @@ -302,8 +301,10 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio allocGuard.Release(); dataTx->ResetCollectedChanges(); tx->ReleaseTxData(txc, ctx); - txc.Reschedule(); - return EExecutionStatus::Restart; + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Continue; } Y_VERIFY(result); @@ -396,11 +397,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio tx->ReleaseTxData(txc, ctx); // Transaction may have made some changes before it hit the limit, - // so we need to roll them back. We do this by marking transaction for - // reschedule and restarting. The next cycle will detect aborted - // operation and move along. - txc.Reschedule(); - return EExecutionStatus::Restart; + // so we need to roll them back. + if (txc.DB.HasChanges()) { + txc.DB.RollbackChanges(); + } + return EExecutionStatus::Executed; } catch (const yexception& e) { LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what()); if (op->IsReadOnly() || op->IsImmediate()) { diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 5587c77b1e2..02d0302bcb6 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -522,7 +522,7 @@ namespace NKikimr::NDataShard { txc.DB.OnRollback([this, txId]() { RollbackAddVolatileTx(txId); }); - txc.OnCommitted([this, txId]() { + txc.DB.OnPersistent([this, txId]() { auto* info = FindByTxId(txId); Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId); Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId); |