aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-07 14:50:41 +0300
committersnaury <snaury@ydb.tech>2023-03-07 14:50:41 +0300
commit2f885def7c5026f00c52b9e82c9ccf3ed9921b6a (patch)
treeb6670e40f8ccd73d0f3d7bb347f4d7d8c318b2c0
parent34872f64f9aaa7edc7f19def053c0c403d7cad3b (diff)
downloadydb-2f885def7c5026f00c52b9e82c9ccf3ed9921b6a.tar.gz
Handle volatile dependencies in uncommitted locks
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h11
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp37
-rw-r--r--ydb/core/tx/datashard/datashard_locks.h22
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp34
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.h4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h30
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp114
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp18
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema42
10 files changed, 306 insertions, 18 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index f55b108288..8cd85db72a 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -321,6 +321,14 @@ public:
return;
}
+ if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min())) {
+ lock->ForAllVolatileDependencies([this](ui64 txId) {
+ if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
+ VolatileTxId = EngineBay.GetTxId();
+ }
+ });
+ }
+
if (VolatileTxId) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID());
@@ -902,7 +910,9 @@ public:
void AddWriteConflict(ui64 txId) const {
if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
- Y_FAIL("TODO: add future lock dependency from %" PRIu64 " on %" PRIu64, LockTxId, info->TxId);
+ if (info->State != EVolatileTxState::Aborting) {
+ Self->SysLocksTable().AddVolatileDependency(info->TxId);
+ }
} else {
Self->SysLocksTable().AddWriteConflict(txId);
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 6baa4a909e..983f22933c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -978,6 +978,14 @@ class TDataShard
>;
};
+ struct LockVolatileDependencies : Table<35> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<LockId, TxId>;
+ using TColumns = TableColumns<LockId, TxId>;
+ };
+
using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue,
DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress,
Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts,
@@ -985,7 +993,8 @@ class TDataShard
ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived,
UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts,
LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits,
- TxVolatileDetails, TxVolatileParticipants, CdcStreamScans>;
+ TxVolatileDetails, TxVolatileParticipants, CdcStreamScans,
+ LockVolatileDependencies>;
// These settings are persisted on each Init. So we use empty settings in order not to overwrite what
// was changed by the user
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index a48b97301a..3b9b56fafd 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -158,6 +158,12 @@ void TLockInfo::PersistRemoveLock(ILocksDb* db) {
Y_VERIFY(IsPersistent());
Y_VERIFY(db, "Cannot persist lock without a db");
+ // Remove persistent volatile dependencies
+ for (ui64 txId : VolatileDependencies) {
+ db->PersistRemoveVolatileDependency(LockId, txId);
+ }
+ VolatileDependencies.clear();
+
// Remove persistent conflicts
for (auto& pr : ConflictLocks) {
TLockInfo* otherLock = pr.first;
@@ -236,6 +242,15 @@ void TLockInfo::AddConflict(TLockInfo* otherLock, ILocksDb* db) {
}
}
+void TLockInfo::AddVolatileDependency(ui64 txId, ILocksDb* db) {
+ Y_VERIFY(LockId != txId, "Unexpected volatile dependency between a lock and itself");
+
+ if (VolatileDependencies.insert(txId).second && IsPersistent()) {
+ Y_VERIFY(db, "Cannot persist dependencies without a db");
+ db->PersistAddVolatileDependency(LockId, txId);
+ }
+}
+
void TLockInfo::PersistConflicts(ILocksDb* db) {
Y_VERIFY(IsPersistent());
Y_VERIFY(db, "Cannot persist conflicts without a db");
@@ -252,6 +267,9 @@ void TLockInfo::PersistConflicts(ILocksDb* db) {
db->PersistAddConflict(otherLock->LockId, LockId);
}
}
+ for (ui64 txId : VolatileDependencies) {
+ db->PersistAddVolatileDependency(LockId, txId);
+ }
}
void TLockInfo::CleanupConflicts() {
@@ -272,6 +290,7 @@ void TLockInfo::CleanupConflicts() {
otherLock->ConflictLocks.erase(this);
}
ConflictLocks.clear();
+ VolatileDependencies.clear();
}
}
@@ -305,6 +324,12 @@ void TLockInfo::RestorePersistentConflict(TLockInfo* otherLock) {
otherLock->ConflictLocks[this] |= ELockConflictFlags::BreakUsOnTheirCommit;
}
+void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) {
+ Y_VERIFY(IsPersistent());
+
+ VolatileDependencies.insert(txId);
+}
+
// TTableLocks
void TTableLocks::AddShardLock(TLockInfo* lock) {
@@ -775,6 +800,9 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() {
for (auto& writeConflictLock : Update->WriteConflictLocks) {
lock->AddConflict(&writeConflictLock, Db);
}
+ for (ui64 txId : Update->VolatileDependencies) {
+ lock->AddVolatileDependency(txId, Db);
+ }
if (lock->GetWriteTables() && !lock->IsPersistent()) {
// We need to persist a new lock
@@ -999,6 +1027,12 @@ void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const
}
}
+void TSysLocks::AddVolatileDependency(ui64 txId) {
+ Y_VERIFY(Update && Update->LockTxId);
+
+ Update->AddVolatileDependency(txId);
+}
+
void TSysLocks::BreakAllLocks(const TTableId& tableId) {
Y_VERIFY(Update);
Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks)));
@@ -1145,6 +1179,9 @@ bool TSysLocks::Load(ILocksDb& db) {
lock->RestorePersistentConflict(otherLock);
}
}
+ for (ui64 txId : lockRow.VolatileDependencies) {
+ lock->RestorePersistentVolatileDependency(txId);
+ }
}
return true;
diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h
index 7d65c115ab..74834c85cb 100644
--- a/ydb/core/tx/datashard/datashard_locks.h
+++ b/ydb/core/tx/datashard/datashard_locks.h
@@ -8,6 +8,7 @@
#include <ydb/core/tablet/tablet_counters.h>
#include <library/cpp/cache/cache.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
#include <util/generic/list.h>
#include <util/generic/queue.h>
#include <util/generic/set.h>
@@ -42,6 +43,7 @@ public:
TVector<TLockRange> Ranges;
TVector<ui64> Conflicts;
+ TVector<ui64> VolatileDependencies;
};
virtual bool Load(TVector<TLockRow>& rows) = 0;
@@ -65,6 +67,10 @@ public:
// Persist a conflict, i.e. this lock must break some other lock on commit
virtual void PersistAddConflict(ui64 lockId, ui64 otherLockId) = 0;
virtual void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) = 0;
+
+ // Persist volatile dependencies, i.e. which undecided transactions must be waited for on commit
+ virtual void PersistAddVolatileDependency(ui64 lockId, ui64 txId) = 0;
+ virtual void PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) = 0;
};
class TLocksDataShard {
@@ -310,11 +316,13 @@ public:
void PersistRanges(ILocksDb* db);
void AddConflict(TLockInfo* otherLock, ILocksDb* db);
+ void AddVolatileDependency(ui64 txId, ILocksDb* db);
void PersistConflicts(ILocksDb* db);
void CleanupConflicts();
void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags);
void RestorePersistentConflict(TLockInfo* otherLock);
+ void RestorePersistentVolatileDependency(ui64 txId);
template<class TCallback>
void ForAllConflicts(TCallback&& callback) {
@@ -323,6 +331,13 @@ public:
}
}
+ template<class TCallback>
+ void ForAllVolatileDependencies(TCallback&& callback) {
+ for (auto& item : VolatileDependencies) {
+ callback(item);
+ }
+ }
+
ui64 GetLastOpId() const { return LastOpId; }
void SetLastOpId(ui64 opId) { LastOpId = opId; }
@@ -367,6 +382,7 @@ private:
// A set of locks we must break on commit
THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks;
+ absl::flat_hash_set<ui64> VolatileDependencies;
TVector<TPersistentRange> PersistentRanges;
ui64 LastOpId = 0;
@@ -651,6 +667,7 @@ struct TLocksUpdate {
TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks;
TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks;
TIntrusiveList<TTableLocks, TTableLocksWriteConflictShardListTag> WriteConflictShardLocks;
+ absl::flat_hash_set<ui64> VolatileDependencies;
TIntrusiveList<TLockInfo, TLockInfoEraseListTag> EraseLocks;
@@ -721,6 +738,10 @@ struct TLocksUpdate {
WriteConflictShardLocks.PushBack(table);
}
+ void AddVolatileDependency(ui64 txId) {
+ VolatileDependencies.insert(txId);
+ }
+
void AddEraseLock(TLockInfo* lock) {
EraseLocks.PushBack(lock);
}
@@ -814,6 +835,7 @@ public:
void AddReadConflict(ui64 conflictId);
void AddWriteConflict(ui64 conflictId);
void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key);
+ void AddVolatileDependency(ui64 txId);
void BreakAllLocks(const TTableId& tableId);
void BreakSetLocks();
bool IsMyKey(const TArrayRef<const TCell>& key) const;
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
index 3914bd484d..54821486e0 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.cpp
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -74,6 +74,26 @@ bool TDataShardLocksDb::Load(TVector<TLockRow>& rows) {
}
}
+ // Load volatile dependencies
+ {
+ auto rowset = db.Table<Schema::LockVolatileDependencies>().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ auto lockId = rowset.GetValue<Schema::LockVolatileDependencies::LockId>();
+ auto it = lockIndex.find(lockId);
+ if (it != lockIndex.end()) {
+ auto& lock = rows[it->second];
+ auto txId = rowset.GetValue<Schema::LockVolatileDependencies::TxId>();
+ lock.VolatileDependencies.push_back(txId);
+ }
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
return true;
}
@@ -172,4 +192,18 @@ void TDataShardLocksDb::PersistRemoveConflict(ui64 lockId, ui64 otherLockId) {
HasChanges_ = true;
}
+void TDataShardLocksDb::PersistAddVolatileDependency(ui64 lockId, ui64 txId) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockVolatileDependencies>().Key(lockId, txId).Update();
+ HasChanges_ = true;
+}
+
+void TDataShardLocksDb::PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) {
+ using Schema = TDataShard::Schema;
+ NIceDb::TNiceDb db(DB);
+ db.Table<Schema::LockVolatileDependencies>().Key(lockId, txId).Delete();
+ HasChanges_ = true;
+}
+
} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_locks_db.h b/ydb/core/tx/datashard/datashard_locks_db.h
index ea2b2cd3b4..1aba7176ec 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.h
+++ b/ydb/core/tx/datashard/datashard_locks_db.h
@@ -34,6 +34,10 @@ public:
void PersistAddConflict(ui64 lockId, ui64 otherLockId) override;
void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) override;
+ // Persist volatile dependencies, i.e. which undecided transactions must be waited for on commit
+ void PersistAddVolatileDependency(ui64 lockId, ui64 txId) override;
+ void PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) override;
+
private:
TDataShard& Self;
NTable::TDatabase& DB;
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 9994ee28b3..8bfc5c5ec3 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -19,21 +19,21 @@ namespace NKqpHelpers {
template<class TResp>
inline TResp AwaitResponse(TTestActorRuntime& runtime, NThreading::TFuture<TResp> f) {
- size_t responses = 0;
- TResp response;
- f.Subscribe([&](NThreading::TFuture<TResp> fut){
- ++responses;
- TResp r = fut.ExtractValueSync();
- response.Swap(&r);
- });
-
- TDispatchOptions options;
- options.FinalEvents.emplace_back(
- [&](IEventHandle& ) -> bool { return responses >= 1; }
- );
-
- runtime.DispatchEvents(options);
- return response;
+ if (!f.HasValue() && !f.HasException()) {
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return f.HasValue() || f.HasException();
+ };
+ options.FinalEvents.emplace_back([&](IEventHandle&) {
+ return f.HasValue() || f.HasException();
+ });
+
+ runtime.DispatchEvents(options);
+
+ UNIT_ASSERT(f.HasValue() || f.HasException());
+ }
+
+ return f.ExtractValueSync();
}
inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) {
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 27da0724b6..2c0a5c42cf 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3525,6 +3525,120 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 20 } items { uint32_value: 210 } }");
}
+ Y_UNIT_TEST(LockedWriteWithPendingVolatileCommit) {
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10)"));
+
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+ observer.BlockReadSets = true;
+
+ Cerr << "!!! Sending volatile upsert" << Endl;
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ TString volatileSessionId = CreateSessionRPC(runtime, "/Root");
+ auto upsertResult = SendRequest(
+ runtime,
+ MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", volatileSessionId, "", true),
+ "/Root");
+ SimulateSleep(runtime, TDuration::Seconds(1));
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Should be 2 expectations + 2 commit decisions
+ UNIT_ASSERT(!upsertResult.HasValue());
+ UNIT_ASSERT_VALUES_EQUAL(observer.BlockedReadSets.size(), 4u);
+
+ // Start a snapshot read transaction, make sure not to touch the uncommitted 2 key
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 1
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to keys 1 and 2 using tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value2) VALUES (1, 11), (2, 22)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value2) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ // This compaction verifies there's no commit race with the waiting
+ // distributed transaction. If commits happen in incorrect order we
+ // would observe unexpected results.
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+ CompactTable(runtime, shard1, tableId1, false);
+
+ observer.UnblockReadSets();
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(upsertResult))),
+ "<empty>");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } items { uint32_value: 11 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 22 } }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index 18955ee931..e7feeae7a9 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -24,6 +24,7 @@ public:
private:
void ExecuteDataTx(TOperation::TPtr op,
+ TTransactionContext& txc,
const TActorContext& ctx,
TSetupSysLocks& guardLocks);
void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx);
@@ -158,7 +159,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
try {
try {
- ExecuteDataTx(op, ctx, guardLocks);
+ ExecuteDataTx(op, txc, ctx, guardLocks);
} catch (const TNotReadyTabletException&) {
// We want to try pinning (actually precharging) all required pages
// before restarting the transaction, to minimize future restarts.
@@ -219,6 +220,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
+ TTransactionContext& txc,
const TActorContext& ctx,
TSetupSysLocks& guardLocks)
{
@@ -315,6 +317,20 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
DataShard.SysLocksTable().BreakSetLocks();
}
+ // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies
+ // Such transactions would have no participants and become immediately committed
+ if (auto commitTxIds = tx->GetDataTx()->GetVolatileCommitTxIds()) {
+ TVector<ui64> participants; // empty participants
+ DataShard.GetVolatileTxManager().PersistAddVolatileTx(
+ tx->GetTxId(),
+ writeVersion,
+ commitTxIds,
+ tx->GetDataTx()->GetVolatileDependencies(),
+ participants,
+ tx->GetDataTx()->GetVolatileChangeGroup(),
+ txc);
+ }
+
AddLocksToResult(op, ctx);
Pipeline.AddCommittingOp(op);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 0c51c0565c..1b47acab12 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1900,6 +1900,48 @@
}
},
{
+ "TableId": 35,
+ "TableName": "LockVolatileDependencies",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "TxId",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
"TableId": 101,
"TableName": "LockChangeRecords",
"TableKey": [