aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-02-28 10:25:42 +0300
committersnaury <snaury@ydb.tech>2023-02-28 10:25:42 +0300
commit9728c69ba0b0bb42c4bbbd872110add535b69207 (patch)
treee5dd32af9eafe95f7122734f15e195b246f5dce9
parent6340e68e824b903a970de91f1b9fbf142a9ecc48 (diff)
downloadydb-9728c69ba0b0bb42c4bbbd872110add535b69207.tar.gz
Commit change records with volatile transactions
-rw-r--r--ydb/core/protos/tx_datashard.proto3
-rw-r--r--ydb/core/tx/datashard/change_collector.cpp49
-rw-r--r--ydb/core/tx/datashard/change_collector.h46
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp43
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h1
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h1
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp139
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp60
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/remove_lock_change_records.cpp6
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp50
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h2
16 files changed, 387 insertions, 31 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 7b5a154b52c..27f1076cdf5 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -1881,4 +1881,7 @@ message TTxVolatileDetails {
// A list of volatile transaction ids that need to be committed or rolled back before this transaction commits.
// This is because local db changes must be committed in the same order they performed writes.
repeated uint64 Dependencies = 5 [packed = true];
+
+ // An optional change group for committing change collector changes
+ optional uint64 ChangeGroup = 6;
}
diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp
index facc452aabd..7ead8295aa2 100644
--- a/ydb/core/tx/datashard/change_collector.cpp
+++ b/ydb/core/tx/datashard/change_collector.cpp
@@ -11,18 +11,25 @@ namespace NDataShard {
using namespace NMiniKQL;
+ui64 TDataShardChangeGroupProvider::GetChangeGroup() {
+ if (!Group) {
+ NIceDb::TNiceDb db(Db);
+ Group = Self.AllocateChangeRecordGroup(db);
+ }
+
+ return *Group;
+}
+
class TChangeCollectorProxy
: public IDataShardChangeCollector
, public IBaseChangeCollectorSink
{
public:
- TChangeCollectorProxy(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx)
+ TChangeCollectorProxy(TDataShard* self, NTable::TDatabase& db, IDataShardChangeGroupProvider& groupProvider)
: Self(self)
, Db(db)
+ , GroupProvider(groupProvider)
{
- if (!isImmediateTx) {
- Group = 0;
- }
}
void AddUnderlying(THolder<IBaseChangeCollector> collector) {
@@ -89,11 +96,7 @@ public:
void CommitLockChanges(ui64 lockId, const TRowVersion& writeVersion) override {
NIceDb::TNiceDb db(Db);
- if (!Group) {
- Group = Self->AllocateChangeRecordGroup(db);
- }
-
- Self->CommitLockChangeRecords(db, lockId, *Group, writeVersion, Collected);
+ Self->CommitLockChangeRecords(db, lockId, GroupProvider.GetChangeGroup(), writeVersion, Collected);
}
TVersionState GetVersionState() override {
@@ -117,12 +120,10 @@ public:
TChangeRecordBuilder builder(kind);
if (!WriteTxId) {
- if (!Group) {
- Group = Self->AllocateChangeRecordGroup(db);
- }
+ ui64 group = GroupProvider.GetChangeGroup();
builder
.WithOrder(Self->AllocateChangeRecordOrder(db))
- .WithGroup(*Group)
+ .WithGroup(group)
.WithStep(WriteVersion.Step)
.WithTxId(WriteVersion.TxId);
} else {
@@ -159,8 +160,8 @@ public:
private:
TDataShard* Self;
NTable::TDatabase& Db;
+ IDataShardChangeGroupProvider& GroupProvider;
- TMaybe<ui64> Group;
TVector<THolder<IBaseChangeCollector>> Underlying;
TVector<TChange> Collected;
@@ -169,7 +170,13 @@ private:
}; // TChangeCollectorProxy
-IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) {
+IDataShardChangeCollector* CreateChangeCollector(
+ TDataShard& dataShard,
+ IDataShardUserDb& userDb,
+ IDataShardChangeGroupProvider& groupProvider,
+ NTable::TDatabase& db,
+ const TUserTable& table)
+{
const bool hasAsyncIndexes = table.HasAsyncIndexes();
const bool hasCdcStreams = table.HasCdcStreams();
@@ -177,7 +184,7 @@ IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataSha
return nullptr;
}
- auto proxy = MakeHolder<TChangeCollectorProxy>(&dataShard, db, isImmediateTx);
+ auto proxy = MakeHolder<TChangeCollectorProxy>(&dataShard, db, groupProvider);
if (hasAsyncIndexes) {
proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, userDb, *proxy));
@@ -190,10 +197,16 @@ IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataSha
return proxy.Release();
}
-IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) {
+IDataShardChangeCollector* CreateChangeCollector(
+ TDataShard& dataShard,
+ IDataShardUserDb& userDb,
+ IDataShardChangeGroupProvider& groupProvider,
+ NTable::TDatabase& db,
+ ui64 tableId)
+{
Y_VERIFY(dataShard.GetUserTables().contains(tableId));
const TUserTable& tableInfo = *dataShard.GetUserTables().at(tableId);
- return CreateChangeCollector(dataShard, userDb, db, tableInfo, isImmediateTx);
+ return CreateChangeCollector(dataShard, userDb, groupProvider, db, tableInfo);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h
index 4ebc6742b68..2b9b034a827 100644
--- a/ydb/core/tx/datashard/change_collector.h
+++ b/ydb/core/tx/datashard/change_collector.h
@@ -11,6 +11,38 @@ struct TUserTable;
class IDataShardUserDb;
+class IDataShardChangeGroupProvider {
+protected:
+ ~IDataShardChangeGroupProvider() = default;
+
+public:
+ virtual bool HasChangeGroup() const = 0;
+ virtual ui64 GetChangeGroup() = 0;
+};
+
+class TDataShardChangeGroupProvider final
+ : public IDataShardChangeGroupProvider
+{
+public:
+ // Note: for distributed transactions group is expected to be 0
+ TDataShardChangeGroupProvider(TDataShard& self, NTable::TDatabase& db, std::optional<ui64> group = std::nullopt)
+ : Self(self)
+ , Db(db)
+ , Group(group)
+ { }
+
+ bool HasChangeGroup() const override {
+ return bool(Group);
+ }
+
+ ui64 GetChangeGroup() override;
+
+private:
+ TDataShard& Self;
+ NTable::TDatabase& Db;
+ std::optional<ui64> Group;
+};
+
class IDataShardChangeCollector : public NMiniKQL::IChangeCollector {
public:
// basic change record's info
@@ -37,8 +69,18 @@ public:
virtual TVector<TChange>&& GetCollected() = 0;
};
-IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx);
-IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx);
+IDataShardChangeCollector* CreateChangeCollector(
+ TDataShard& dataShard,
+ IDataShardUserDb& userDb,
+ IDataShardChangeGroupProvider& groupProvider,
+ NTable::TDatabase& db,
+ const TUserTable& table);
+IDataShardChangeCollector* CreateChangeCollector(
+ TDataShard& dataShard,
+ IDataShardUserDb& userDb,
+ IDataShardChangeGroupProvider& groupProvider,
+ NTable::TDatabase& db,
+ ui64 tableId);
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index f5e4ce2da05..f55b108288b 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -193,7 +193,11 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) {
}
///
-class TDataShardEngineHost : public TEngineHost, public IDataShardUserDb {
+class TDataShardEngineHost final
+ : public TEngineHost
+ , public IDataShardUserDb
+ , public IDataShardChangeGroupProvider
+{
public:
TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
: TEngineHost(db, counters,
@@ -271,6 +275,24 @@ public:
IsRepeatableSnapshot = true;
}
+ bool HasChangeGroup() const override {
+ return bool(ChangeGroup);
+ }
+
+ ui64 GetChangeGroup() override {
+ if (!ChangeGroup) {
+ if (IsImmediateTx) {
+ NIceDb::TNiceDb db(DB);
+ ChangeGroup = Self->AllocateChangeRecordGroup(db);
+ } else {
+ // Distributed transactions have their group set to zero
+ ChangeGroup = 0;
+ }
+ }
+
+ return *ChangeGroup;
+ }
+
IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
auto it = ChangeCollectors.find(tableId.PathId);
if (it != ChangeCollectors.end()) {
@@ -282,7 +304,12 @@ public:
return it->second.Get();
}
- it->second.Reset(CreateChangeCollector(*Self, *const_cast<TDataShardEngineHost*>(this), DB, tableId.PathId.LocalPathId, IsImmediateTx));
+ it->second.Reset(CreateChangeCollector(
+ *Self,
+ *const_cast<TDataShardEngineHost*>(this),
+ *const_cast<TDataShardEngineHost*>(this),
+ DB,
+ tableId.PathId.LocalPathId));
return it->second.Get();
}
@@ -368,6 +395,10 @@ public:
return dependencies;
}
+ std::optional<ui64> GetVolatileChangeGroup() const {
+ return ChangeGroup;
+ }
+
bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override {
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
@@ -927,6 +958,7 @@ private:
mutable absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers;
mutable absl::flat_hash_set<ui64> VolatileCommitTxIds;
mutable absl::flat_hash_set<ui64> VolatileDependencies;
+ std::optional<ui64> ChangeGroup = std::nullopt;
};
//
@@ -1156,6 +1188,13 @@ TVector<ui64> TEngineBay::GetVolatileDependencies() const {
return host->GetVolatileDependencies();
}
+std::optional<ui64> TEngineBay::GetVolatileChangeGroup() const {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ return host->GetVolatileChangeGroup();
+}
+
IEngineFlat * TEngineBay::GetEngine() {
if (!Engine) {
Engine = CreateEngineFlat(*EngineSettings);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 62bfe0a1971..42515d42446 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -114,6 +114,7 @@ public:
TVector<ui64> GetVolatileCommitTxIds() const;
TVector<ui64> GetVolatileDependencies() const;
+ std::optional<ui64> GetVolatileChangeGroup() const;
void ResetCounters() { EngineHostCounters = TEngineHostCounters(); }
const TEngineHostCounters& GetCounters() const { return EngineHostCounters; }
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 5ca5ff51642..5b114d06f39 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -190,6 +190,7 @@ public:
TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); }
TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); }
+ std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); }
TActorId Source() const { return Source_; }
void SetSource(const TActorId& actorId) { Source_ = actorId; }
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 9fd3330d9cc..6cd4da5e67c 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -61,9 +61,10 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId);
TDataShardUserDb userDb(*self, txc.DB, readVersion);
+ TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
if (CollectChanges) {
- ChangeCollector.Reset(CreateChangeCollector(*self, userDb, txc.DB, tableInfo, true));
+ ChangeCollector.Reset(CreateChangeCollector(*self, userDb, groupProvider, txc.DB, tableInfo));
}
// Prepare (id, Type) vector for value columns
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 2ad5d5e6595..e27bec92afd 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -60,6 +60,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
std::optional<TDataShardUserDb> userDb;
+ std::optional<TDataShardChangeGroupProvider> groupProvider;
THolder<IEraseRowsCondition> condition;
if (params) {
@@ -69,7 +70,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
userDb.emplace(*self, params.Txc->DB, params.ReadVersion);
- params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, params.Txc->DB, tableInfo, true));
+ groupProvider.emplace(*self, params.Txc->DB);
+ params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, *groupProvider, params.Txc->DB, tableInfo));
}
const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId);
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
index 6e57ee9bec4..3914bd484d0 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.cpp
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -111,7 +111,8 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) {
void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
// We remove lock changes unless it's managed by volatile tx manager
- if (!Self.GetVolatileTxManager().FindByCommitTxId(lockId)) {
+ bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId);
+ if (!isVolatile) {
for (auto& pr : Self.GetUserTables()) {
auto tid = pr.second->LocalTid;
// Removing the lock also removes any uncommitted data
@@ -126,7 +127,9 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
db.Table<Schema::Locks>().Key(lockId).Delete();
HasChanges_ = true;
- Self.ScheduleRemoveLockChanges(lockId);
+ if (!isVolatile) {
+ Self.ScheduleRemoveLockChanges(lockId);
+ }
}
void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) {
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 552f2fc9e26..27da0724b61 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1154,8 +1154,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit;
TVector<TLockInfo> Locks;
- void AddLocks(const TVector<TLockInfo>& locks) {
+ TInjectLocks& AddLocks(const TVector<TLockInfo>& locks) {
Locks.insert(Locks.end(), locks.begin(), locks.end());
+ return *this;
}
};
@@ -3388,6 +3389,142 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"ERROR: WrongRequest\n");
}
+ Y_UNIT_TEST(LockedWriteWithAsyncIndexAndVolatileCommit) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ WaitTxNotification(server, sender,
+ AsyncAlterAddIndex(server, "/Root", "/Root/table-1",
+ TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}));
+
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ WaitTxNotification(server, sender,
+ AsyncAlterAddIndex(server, "/Root", "/Root/table-2",
+ TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}));
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to keys 1 and 2 using tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to keys 10 and 20 using tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 110), (20, 210)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1` VIEW by_value
+ WHERE value in (1, 11, 21)
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-2` VIEW by_value
+ WHERE value in (10, 110, 210)
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }");
+
+ // Commit changes in tx 123
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().AddLocks(locks1).AddLocks(locks2);
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0);
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1` VIEW by_value
+ WHERE value in (1, 11, 21)
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 11 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 21 } }");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-2` VIEW by_value
+ WHERE value in (10, 110, 210)
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 10 } items { uint32_value: 110 } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 210 } }");
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 3152351d6dd..ac595562f03 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1352,6 +1352,66 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
UNIT_ASSERT_VALUES_EQUAL(observedStatus, Ydb::StatusIds::SUCCESS);
}
+ Y_UNIT_TEST(DistributedWriteWithAsyncIndex) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ })
+ .Indexes({
+ {"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync},
+ });
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ ExecSQL(server, sender, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 30);
+ )");
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Make sure changes are actually delivered
+ SimulateSleep(runtime, TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, R"(
+ SELECT key, value
+ FROM `/Root/table-1` VIEW by_value
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 3 } }");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, R"(
+ SELECT key, value
+ FROM `/Root/table-2` VIEW by_value
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 30 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 318b422f879..b57b346ecf2 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -46,7 +46,8 @@ public:
if (eraseTx->HasDependents()) {
TDataShardUserDb userDb(DataShard, txc.DB, readVersion);
- THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, txc.DB, request.GetTableId(), false)};
+ TDataShardChangeGroupProvider groupProvider(DataShard, txc.DB, /* distributed tx group */ 0);
+ THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())};
auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize());
if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) {
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 4c20ff9f2a5..bd6400a9919 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -324,6 +324,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
commitTxIds,
dataTx->GetVolatileDependencies(),
participants,
+ dataTx->GetVolatileChangeGroup(),
txc);
}
diff --git a/ydb/core/tx/datashard/remove_lock_change_records.cpp b/ydb/core/tx/datashard/remove_lock_change_records.cpp
index b387c02f5a3..ccc2e8f4b7c 100644
--- a/ydb/core/tx/datashard/remove_lock_change_records.cpp
+++ b/ydb/core/tx/datashard/remove_lock_change_records.cpp
@@ -35,6 +35,12 @@ public:
continue;
}
+ if (Self->GetVolatileTxManager().FindByCommitTxId(lockId)) {
+ // Don't remove records that are managed by volatile tx manager
+ Self->PendingLockChangeRecordsToRemove.pop_back();
+ continue;
+ }
+
while (!it->second.Changes.empty() && removed < MaxRecordsToRemove) {
auto& record = it->second.Changes.back();
db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete();
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 0f079b37df5..275ab15d439 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -14,6 +14,8 @@ namespace NKikimr::NDataShard {
TTxType GetTxType() const override { return TXTYPE_VOLATILE_TX_COMMIT; }
bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
+ NIceDb::TNiceDb db(txc.DB);
+
Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled);
Self->VolatileTxManager.PendingCommitTxScheduled = false;
@@ -36,7 +38,30 @@ namespace NKikimr::NDataShard {
}
}
- // TODO: commit change records
+ auto getGroup = [&]() -> ui64 {
+ if (!info->ChangeGroup) {
+ if (info->Version.TxId != info->TxId) {
+ // Assume it's an immediate transaction and allocate new group
+ info->ChangeGroup = Self->AllocateChangeRecordGroup(db);
+ } else {
+ // Distributed transactions commit changes with group zero
+ info->ChangeGroup = 0;
+ }
+ }
+ return *info->ChangeGroup;
+ };
+
+ // First commit change records from any committed locks
+ for (ui64 commitTxId : info->CommitTxIds) {
+ if (commitTxId != info->TxId && Self->HasLockChangeRecords(commitTxId)) {
+ Self->CommitLockChangeRecords(db, commitTxId, getGroup(), info->Version, Collected);
+ }
+ }
+
+ // Commit change records from the transaction itself
+ if (info->CommitTxIds.contains(info->TxId) && Self->HasLockChangeRecords(info->TxId)) {
+ Self->CommitLockChangeRecords(db, info->TxId, getGroup(), info->Version, Collected);
+ }
Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc);
@@ -53,6 +78,9 @@ namespace NKikimr::NDataShard {
if (Delayed) {
OnCommitted(ctx);
}
+ if (Collected) {
+ Self->EnqueueChangeRecords(std::move(Collected));
+ }
}
void OnCommitted(const TActorContext& ctx) {
@@ -71,6 +99,7 @@ namespace NKikimr::NDataShard {
private:
ui64 TxId;
+ TVector<IDataShardChangeCollector::TChange> Collected;
bool Delayed = false;
};
@@ -107,8 +136,6 @@ namespace NKikimr::NDataShard {
}
}
- // TODO: abort change records
-
Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc);
return true;
}
@@ -118,6 +145,9 @@ namespace NKikimr::NDataShard {
Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
Y_VERIFY(info->AddCommitted);
+ // Make a copy since it will disappear soon
+ auto commitTxIds = info->CommitTxIds;
+
// Run callbacks only after we successfully persist aborted tx
Self->VolatileTxManager.RunAbortCallbacks(info);
@@ -127,6 +157,11 @@ namespace NKikimr::NDataShard {
Self->VolatileTxManager.RemoveVolatileTx(TxId);
+ // Schedule removal of all lock changes we were supposed to commit
+ for (ui64 commitTxId : commitTxIds) {
+ Self->ScheduleRemoveLockChanges(commitTxId);
+ }
+
Self->CheckSplitCanStart(ctx);
}
@@ -235,6 +270,9 @@ namespace NKikimr::NDataShard {
info->Version = TRowVersion(details.GetVersionStep(), details.GetVersionTxId());
info->CommitTxIds.insert(details.GetCommitTxIds().begin(), details.GetCommitTxIds().end());
info->Dependencies.insert(details.GetDependencies().begin(), details.GetDependencies().end());
+ if (details.HasChangeGroup()) {
+ info->ChangeGroup = details.GetChangeGroup();
+ }
info->AddCommitted = true; // we loaded it from local db, so it is committed
if (!rowset.Next()) {
@@ -354,6 +392,7 @@ namespace NKikimr::NDataShard {
TConstArrayRef<ui64> commitTxIds,
TConstArrayRef<ui64> dependencies,
TConstArrayRef<ui64> participants,
+ std::optional<ui64> changeGroup,
TTransactionContext& txc)
{
using Schema = TDataShard::Schema;
@@ -372,6 +411,7 @@ namespace NKikimr::NDataShard {
info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end());
info->Dependencies.insert(dependencies.begin(), dependencies.end());
info->Participants.insert(participants.begin(), participants.end());
+ info->ChangeGroup = changeGroup;
if (info->Participants.empty()) {
// Transaction is committed when we don't have to wait for other participants
@@ -417,6 +457,10 @@ namespace NKikimr::NDataShard {
std::sort(m->begin(), m->end());
}
+ if (info->ChangeGroup) {
+ details.SetChangeGroup(*info->ChangeGroup);
+ }
+
db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update(
NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State),
NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details)));
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index be6202a8d1f..6af069540ec 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -48,6 +48,7 @@ namespace NKikimr::NDataShard {
absl::flat_hash_set<ui64> Dependencies;
absl::flat_hash_set<ui64> Dependents;
absl::flat_hash_set<ui64> Participants;
+ std::optional<ui64> ChangeGroup;
bool AddCommitted = false;
absl::flat_hash_set<ui64> BlockedOperations;
absl::flat_hash_set<ui64> WaitingRemovalOperations;
@@ -175,6 +176,7 @@ namespace NKikimr::NDataShard {
TConstArrayRef<ui64> commitTxIds,
TConstArrayRef<ui64> dependencies,
TConstArrayRef<ui64> participants,
+ std::optional<ui64> changeGroup,
TTransactionContext& txc);
bool AttachVolatileTxCallback(