aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-04-07 23:32:10 +0300
committersnaury <snaury@ydb.tech>2023-04-07 23:32:10 +0300
commit8aea54cda7486e7df055e9a7fbf1b2f7859b4870 (patch)
tree6a375e8d82351e15916e7c1e547beb523f510eea
parentd055c2573aa4521f926c24de1780f796b0dfe923 (diff)
downloadydb-8aea54cda7486e7df055e9a7fbf1b2f7859b4870.tar.gz
Allow explicit rollback in transactions
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp19
-rw-r--r--ydb/core/tablet_flat/flat_database.h21
-rw-r--r--ydb/core/tablet_flat/flat_exec_seat.cpp2
-rw-r--r--ydb/core/tablet_flat/flat_exec_seat.h2
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp8
-rw-r--r--ydb/core/tablet_flat/flat_executor_tx_env.h10
-rw-r--r--ydb/core/tablet_flat/flat_part_iface.h7
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h19
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_dbase.h15
-rw-r--r--ydb/core/tablet_flat/ut/ut_db_iface.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp6
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp9
-rw-r--r--ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp6
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp14
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp18
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp29
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp2
17 files changed, 151 insertions, 66 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 656a6cbdde8..93c3411da37 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -412,6 +412,19 @@ void TDatabase::Begin(TTxStamp stamp, IPages& env)
NoMoreReadsFlag = false;
}
+void TDatabase::RollbackChanges()
+{
+ Y_VERIFY(Redo, "Transaction is not in progress");
+ Y_VERIFY(Env);
+
+ TTxStamp stamp = Change->Stamp;
+ IPages& env = *Env;
+
+ Commit(stamp, false, nullptr);
+ env.OnRollbackChanges();
+ Begin(stamp, env);
+}
+
TPartView TDatabase::GetPartView(ui32 tableId, const TLogoBlobID &bundle) const {
return Require(tableId)->GetPartView(bundle);
}
@@ -629,10 +642,13 @@ bool TDatabase::HasChanges() const
TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator *cookieAllocator)
{
+ TVector<std::function<void()>> onPersistent;
+
if (commit) {
for (auto& callback : OnCommit_) {
callback();
}
+ onPersistent = std::move(OnPersistent_);
} else {
auto it = OnRollback_.rbegin();
auto end = OnRollback_.rend();
@@ -644,6 +660,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
OnCommit_.clear();
OnRollback_.clear();
+ OnPersistent_.clear();
TempIterators.clear();
@@ -747,7 +764,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
Alter_ = nullptr;
Env = nullptr;
- return { std::move(Change) };
+ return { std::move(Change), std::move(onPersistent) };
}
TTable* TDatabase::Require(ui32 table) const noexcept
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 6da0eff70e3..9410e4906c0 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -45,6 +45,7 @@ public:
struct TProd {
THolder<TChange> Change;
+ TVector<std::function<void()>> OnPersistent;
};
struct TChg {
@@ -201,6 +202,13 @@ public:
*/
bool HasChanges() const;
+ /**
+ * Rollback all current transaction changes
+ *
+ * Similar to aborting transaction and then starting a new one
+ */
+ void RollbackChanges();
+
// executor interface
void Begin(TTxStamp, IPages& env);
TProd Commit(TTxStamp, bool commit, TCookieAllocator* = nullptr);
@@ -213,7 +221,7 @@ public:
TCompactionStats GetCompactionStats(ui32 table) const;
/**
- * Adds callback, which is called when database changes are committed
+ * Adds a callback, which is called when database changes are committed
*/
template<class TCallback>
void OnCommit(TCallback&& callback) {
@@ -221,7 +229,7 @@ public:
}
/**
- * Adds callback, which is called when database changes are rolled back
+ * Adds a callback, which is called when database changes are rolled back
*
* @param callback
*/
@@ -230,6 +238,14 @@ public:
OnRollback_.emplace_back(std::forward<TCallback>(callback));
}
+ /**
+ * Adds a callback, which is called when database changes are persistent
+ */
+ template<class TCallback>
+ void OnPersistent(TCallback&& callback) {
+ OnPersistent_.emplace_back(std::forward<TCallback>(callback));
+ }
+
private:
TTable* Require(ui32 tableId) const noexcept;
TTable* RequireForUpdate(ui32 tableId) const noexcept;
@@ -252,6 +268,7 @@ private:
TVector<std::function<void()>> OnCommit_;
TVector<std::function<void()>> OnRollback_;
+ TVector<std::function<void()>> OnPersistent_;
};
diff --git a/ydb/core/tablet_flat/flat_exec_seat.cpp b/ydb/core/tablet_flat/flat_exec_seat.cpp
index f113d1c811b..e4f4e683221 100644
--- a/ydb/core/tablet_flat/flat_exec_seat.cpp
+++ b/ydb/core/tablet_flat/flat_exec_seat.cpp
@@ -4,7 +4,7 @@ namespace NKikimr {
namespace NTabletFlatExecutor {
void TSeat::Complete(const TActorContext& ctx) noexcept {
- for (auto& callback : OnCommitted) {
+ for (auto& callback : OnPersistent) {
callback();
}
Self->Complete(ctx);
diff --git a/ydb/core/tablet_flat/flat_exec_seat.h b/ydb/core/tablet_flat/flat_exec_seat.h
index f37d3bfb0b6..2b3cd3a9fd8 100644
--- a/ydb/core/tablet_flat/flat_exec_seat.h
+++ b/ydb/core/tablet_flat/flat_exec_seat.h
@@ -52,7 +52,7 @@ namespace NTabletFlatExecutor {
TAutoPtr<TMemoryToken> AttachedMemory;
TIntrusivePtr<TMemoryGCToken> CapturedMemory;
- TVector<std::function<void()>> OnCommitted;
+ TVector<std::function<void()>> OnPersistent;
ETerminationReason TerminationReason = ETerminationReason::None;
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 4934e4658a1..b44da9ed047 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1615,10 +1615,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
} else if (done) {
Y_VERIFY(!txc.IsRescheduled());
Y_VERIFY(!seat->RequestedMemory);
- for (auto it = txc.OnCommit_.begin(); it != txc.OnCommit_.end(); ++it) {
- (*it)();
- }
- seat->OnCommitted = std::move(txc.OnCommitted_);
+ seat->OnPersistent = std::move(prod.OnPersistent);
CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx);
} else {
Y_VERIFY(!seat->CapturedMemory);
@@ -1626,9 +1623,6 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type "
<< NFmt::Do(*seat->Self) << " postoned w/o demands");
}
- for (auto it = txc.OnRollback_.rbegin(); it != txc.OnRollback_.rend(); ++it) {
- (*it)();
- }
PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx);
}
diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h
index bfad1f626fc..6288d5876ba 100644
--- a/ydb/core/tablet_flat/flat_executor_tx_env.h
+++ b/ydb/core/tablet_flat/flat_executor_tx_env.h
@@ -139,6 +139,16 @@ namespace NTabletFlatExecutor {
|| LoanConfirmation;
}
+ protected:
+ void OnRollbackChanges() noexcept override {
+ MakeSnap.clear();
+ DropSnap.Reset();
+ BorrowUpdates.clear();
+ LoanBundle.clear();
+ LoanTxStatus.clear();
+ LoanConfirmation.clear();
+ }
+
protected: /* IExecuting, tx stage func implementation */
void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override;
diff --git a/ydb/core/tablet_flat/flat_part_iface.h b/ydb/core/tablet_flat/flat_part_iface.h
index bbfeab8f5de..68ca39afb0d 100644
--- a/ydb/core/tablet_flat/flat_part_iface.h
+++ b/ydb/core/tablet_flat/flat_part_iface.h
@@ -69,6 +69,13 @@ namespace NTable {
virtual TResult Locate(const TMemTable*, ui64 ref, ui32 tag) noexcept = 0;
virtual TResult Locate(const TPart*, ui64 ref, ELargeObj lob) noexcept = 0;
virtual const TSharedData* TryGetPage(const TPart* part, TPageId id, TGroupId groupId = { }) = 0;
+
+ /**
+ * Hook for cleaning up env on DB.RollbackChanges()
+ */
+ virtual void OnRollbackChanges() noexcept {
+ // nothing by default
+ }
};
}
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index eb452378f97..9b611f793e1 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -211,22 +211,10 @@ public:
~TTransactionContext() {}
- void OnCommit(std::function<void()> callback) {
- OnCommit_.emplace_back(std::move(callback));
- }
-
- void OnRollback(std::function<void()> callback) {
- OnRollback_.emplace_back(std::move(callback));
- }
-
- void OnCommitted(std::function<void()> callback) {
- OnCommitted_.emplace_back(std::move(callback));
- }
-
/**
- * Will rollback any changes and reschedule transaction for a retry
+ * Request transaction to restart at some later time
*
- * Transaction must return false from its Execute after calling this method.
+ * Transaction's Execute method must return false after calling this method.
*/
void Reschedule() {
Rescheduled_ = true;
@@ -244,9 +232,6 @@ public:
NTable::TDatabase &DB;
private:
- TVector<std::function<void()>> OnCommit_;
- TVector<std::function<void()>> OnRollback_;
- TVector<std::function<void()>> OnCommitted_;
bool Rescheduled_ = false;
};
diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
index 07cb17101a9..5f3ecaf1bd4 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
@@ -141,6 +141,14 @@ namespace NTest {
return *this;
}
+ TDbExec& RollbackChanges() {
+ Y_VERIFY(OnTx != EOnTx::None);
+
+ Base->RollbackChanges();
+
+ return *this;
+ }
+
TDbExec& Add(ui32 table, const TRow &row, ERowOp rop = ERowOp::Upsert)
{
const NTest::TRowTool tool(RowSchemeFor(table));
@@ -439,7 +447,8 @@ namespace NTest {
if (was != (real ? EOnTx::Real : EOnTx::Auto))
Y_FAIL("There is no active dbase tx");
- auto up = Base->Commit({ Gen, Step }, apply, Annex.Get()).Change;
+ auto prod = Base->Commit({ Gen, Step }, apply, Annex.Get());
+ auto up = std::move(prod.Change);
Env.reset();
Last = Max<ui32>(), Altered = false;
@@ -461,6 +470,10 @@ namespace NTest {
}
RedoLog.emplace_back(std::move(up));
+
+ for (auto& callback : prod.OnPersistent) {
+ callback();
+ }
}
ReadVersion = TRowVersion::Max();
diff --git a/ydb/core/tablet_flat/ut/ut_db_iface.cpp b/ydb/core/tablet_flat/ut/ut_db_iface.cpp
index dd35dcbcc58..53b893940b1 100644
--- a/ydb/core/tablet_flat/ut/ut_db_iface.cpp
+++ b/ydb/core/tablet_flat/ut/ut_db_iface.cpp
@@ -441,6 +441,36 @@ Y_UNIT_TEST_SUITE(DBase) {
UNIT_ASSERT(closed->Frozen.at(0)->GetBlobs()->Head == 3);
}
+ Y_UNIT_TEST(AnnexRollbackChanges)
+ {
+ auto alter = MakeAlter(1);
+
+ alter.SetRedo(32).SetFamilyBlobs(1, 0, Max<ui32>(), 24);
+
+ TDbExec me;
+
+ me.To(10).Begin().Apply(*alter.Flush()).Commit();
+
+ const TString large35("0123456789abcdef0123456789abcdef012");
+ const TString large42("0123456789abcdef0123456789abcdef0123456789");
+
+ me.To(12).Begin().PutN(1, "l35", 35_u64, large35);
+ me.To(13).RollbackChanges().PutN(1, "l42", 42_u64, large42).Commit();
+ me.To(14).Iter(1)
+ .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42)
+ .Next().Is(EReady::Gone);
+
+ UNIT_ASSERT(me.BackLog().Annex.size() == 1);
+ UNIT_ASSERT(me.BackLog().Annex[0].Data.size() == 8 + 42);
+
+ me.To(21).Replay(EPlay::Boot).Iter(1)
+ .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42)
+ .Next().Is(EReady::Gone);
+ me.To(22).Replay(EPlay::Redo).Iter(1)
+ .Seek({}, ESeek::Lower).IsN("l42", 42_u64, large42)
+ .Next().Is(EReady::Gone);
+ }
+
Y_UNIT_TEST(Outer)
{
auto alter = MakeAlter(1);
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index b40a9c8057c..9b489f0f108 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -171,7 +171,7 @@ bool TSnapshotManager::PromoteCompleteEdge(const TRowVersion& version, TTransact
NIceDb::TNiceDb db(txc.DB);
SetCompleteEdge(db, version);
- txc.OnCommitted([this, edge = CompleteEdge] {
+ txc.DB.OnPersistent([this, edge = CompleteEdge] {
this->CommittedCompleteEdge = edge;
});
@@ -294,7 +294,7 @@ void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedRea
PerformedUnprotectedReads = performedUnprotectedReads;
PerformedUnprotectedReadsUncommitted++;
- txc.OnCommitted([this] {
+ txc.DB.OnPersistent([this] {
this->PerformedUnprotectedReadsUncommitted--;
});
}
@@ -420,7 +420,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
Y_FAIL("Unexpected mvcc state# %d", (ui32)state);
}
- txc.OnCommitted([this, edge = CompleteEdge] {
+ txc.DB.OnPersistent([this, edge = CompleteEdge] {
this->CommittedCompleteEdge = edge;
});
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index d469f83e849..b5651fbd64c 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -56,15 +56,16 @@ public:
op->SetWaitingForGlobalTxIdFlag();
if (txc.DB.HasChanges()) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ txc.DB.RollbackChanges();
}
return EExecutionStatus::Continue;
}
if (Pipeline.AddLockDependencies(op, guardLocks)) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
op->ChangeRecords() = std::move(tx->GetCollectedChanges());
diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
index cc31f453a65..df567b199cb 100644
--- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
@@ -47,8 +47,10 @@ public:
txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion);
if (Pipeline.AddLockDependencies(op, guardLocks)) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE);
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 802af68b786..c4a1995d9ae 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -30,7 +30,7 @@ private:
void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx);
private:
- class TRescheduleOpException : public yexception {};
+ class TRollbackAndWaitException : public yexception {};
};
TExecuteDataTxUnit::TExecuteDataTxUnit(TDataShard& dataShard,
@@ -193,15 +193,17 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
tx->ReleaseTxData(txc, ctx);
return EExecutionStatus::Restart;
- } catch (const TRescheduleOpException&) {
+ } catch (const TRollbackAndWaitException&) {
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Tablet " << DataShard.TabletID()
- << " needs to reschedule " << *op << " for dependencies");
+ << " needs to wait " << *op << " for dependencies");
tx->GetDataTx()->ResetCollectedChanges();
tx->ReleaseTxData(txc, ctx);
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
DataShard.IncCounter(COUNTER_WAIT_EXECUTE_LATENCY_MS, waitExecuteLatency.MilliSeconds());
@@ -253,7 +255,7 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
IEngineFlat::EResult engineResult = engine->Execute();
if (Pipeline.AddLockDependencies(op, guardLocks)) {
- throw TRescheduleOpException();
+ throw TRollbackAndWaitException();
}
if (engineResult != IEngineFlat::EResult::Ok) {
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 6a27d5a7db3..46943e1f394 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -63,13 +63,17 @@ public:
Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId);
}
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
if (Pipeline.AddLockDependencies(op, guardLocks)) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
if (changeCollector) {
@@ -95,8 +99,10 @@ public:
}
if (Pipeline.AddLockDependencies(op, guardLocks)) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
} else {
Y_FAIL_S("Invalid distributed erase tx: " << eraseTx->GetBody().ShortDebugString());
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 825f057ab75..5f111993c5a 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -264,11 +264,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
tx->ReleaseTxData(txc, ctx);
// Transaction may have made some changes before it detected
- // inconsistency, so we need to roll them back. We do this by
- // marking transaction for reschedule and restarting. The next
- // cycle will detect aborted operation and move along.
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ // inconsistency, so we need to roll them back.
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Executed;
}
if (!result && computeCtx.IsTabletNotReady()) {
@@ -291,8 +291,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
// Rollback database changes, if any
if (txc.DB.HasChanges()) {
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ txc.DB.RollbackChanges();
}
return EExecutionStatus::Continue;
@@ -302,8 +301,10 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
allocGuard.Release();
dataTx->ResetCollectedChanges();
tx->ReleaseTxData(txc, ctx);
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Continue;
}
Y_VERIFY(result);
@@ -396,11 +397,11 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
tx->ReleaseTxData(txc, ctx);
// Transaction may have made some changes before it hit the limit,
- // so we need to roll them back. We do this by marking transaction for
- // reschedule and restarting. The next cycle will detect aborted
- // operation and move along.
- txc.Reschedule();
- return EExecutionStatus::Restart;
+ // so we need to roll them back.
+ if (txc.DB.HasChanges()) {
+ txc.DB.RollbackChanges();
+ }
+ return EExecutionStatus::Executed;
} catch (const yexception& e) {
LOG_C("Exception while executing KQP transaction " << *op << " at " << tabletId << ": " << e.what());
if (op->IsReadOnly() || op->IsImmediate()) {
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 5587c77b1e2..02d0302bcb6 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -522,7 +522,7 @@ namespace NKikimr::NDataShard {
txc.DB.OnRollback([this, txId]() {
RollbackAddVolatileTx(txId);
});
- txc.OnCommitted([this, txId]() {
+ txc.DB.OnPersistent([this, txId]() {
auto* info = FindByTxId(txId);
Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId);
Y_VERIFY_S(!info->AddCommitted, "Unexpected commit of a committed volatile txId# " << txId);