diff options
author | snaury <snaury@ydb.tech> | 2023-02-28 10:25:42 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-02-28 10:25:42 +0300 |
commit | 9728c69ba0b0bb42c4bbbd872110add535b69207 (patch) | |
tree | e5dd32af9eafe95f7122734f15e195b246f5dce9 | |
parent | 6340e68e824b903a970de91f1b9fbf142a9ecc48 (diff) | |
download | ydb-9728c69ba0b0bb42c4bbbd872110add535b69207.tar.gz |
Commit change records with volatile transactions
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector.cpp | 49 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector.h | 46 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 43 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_common_upload.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks_db.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 139 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 60 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/remove_lock_change_records.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.cpp | 50 | ||||
-rw-r--r-- | ydb/core/tx/datashard/volatile_tx.h | 2 |
16 files changed, 387 insertions, 31 deletions
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 7b5a154b52c..27f1076cdf5 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -1881,4 +1881,7 @@ message TTxVolatileDetails { // A list of volatile transaction ids that need to be committed or rolled back before this transaction commits. // This is because local db changes must be committed in the same order they performed writes. repeated uint64 Dependencies = 5 [packed = true]; + + // An optional change group for committing change collector changes + optional uint64 ChangeGroup = 6; } diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp index facc452aabd..7ead8295aa2 100644 --- a/ydb/core/tx/datashard/change_collector.cpp +++ b/ydb/core/tx/datashard/change_collector.cpp @@ -11,18 +11,25 @@ namespace NDataShard { using namespace NMiniKQL; +ui64 TDataShardChangeGroupProvider::GetChangeGroup() { + if (!Group) { + NIceDb::TNiceDb db(Db); + Group = Self.AllocateChangeRecordGroup(db); + } + + return *Group; +} + class TChangeCollectorProxy : public IDataShardChangeCollector , public IBaseChangeCollectorSink { public: - TChangeCollectorProxy(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx) + TChangeCollectorProxy(TDataShard* self, NTable::TDatabase& db, IDataShardChangeGroupProvider& groupProvider) : Self(self) , Db(db) + , GroupProvider(groupProvider) { - if (!isImmediateTx) { - Group = 0; - } } void AddUnderlying(THolder<IBaseChangeCollector> collector) { @@ -89,11 +96,7 @@ public: void CommitLockChanges(ui64 lockId, const TRowVersion& writeVersion) override { NIceDb::TNiceDb db(Db); - if (!Group) { - Group = Self->AllocateChangeRecordGroup(db); - } - - Self->CommitLockChangeRecords(db, lockId, *Group, writeVersion, Collected); + Self->CommitLockChangeRecords(db, lockId, GroupProvider.GetChangeGroup(), writeVersion, Collected); } TVersionState GetVersionState() override { @@ -117,12 +120,10 @@ public: TChangeRecordBuilder builder(kind); if (!WriteTxId) { - if (!Group) { - Group = Self->AllocateChangeRecordGroup(db); - } + ui64 group = GroupProvider.GetChangeGroup(); builder .WithOrder(Self->AllocateChangeRecordOrder(db)) - .WithGroup(*Group) + .WithGroup(group) .WithStep(WriteVersion.Step) .WithTxId(WriteVersion.TxId); } else { @@ -159,8 +160,8 @@ public: private: TDataShard* Self; NTable::TDatabase& Db; + IDataShardChangeGroupProvider& GroupProvider; - TMaybe<ui64> Group; TVector<THolder<IBaseChangeCollector>> Underlying; TVector<TChange> Collected; @@ -169,7 +170,13 @@ private: }; // TChangeCollectorProxy -IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) { +IDataShardChangeCollector* CreateChangeCollector( + TDataShard& dataShard, + IDataShardUserDb& userDb, + IDataShardChangeGroupProvider& groupProvider, + NTable::TDatabase& db, + const TUserTable& table) +{ const bool hasAsyncIndexes = table.HasAsyncIndexes(); const bool hasCdcStreams = table.HasCdcStreams(); @@ -177,7 +184,7 @@ IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataSha return nullptr; } - auto proxy = MakeHolder<TChangeCollectorProxy>(&dataShard, db, isImmediateTx); + auto proxy = MakeHolder<TChangeCollectorProxy>(&dataShard, db, groupProvider); if (hasAsyncIndexes) { proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, userDb, *proxy)); @@ -190,10 +197,16 @@ IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataSha return proxy.Release(); } -IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) { +IDataShardChangeCollector* CreateChangeCollector( + TDataShard& dataShard, + IDataShardUserDb& userDb, + IDataShardChangeGroupProvider& groupProvider, + NTable::TDatabase& db, + ui64 tableId) +{ Y_VERIFY(dataShard.GetUserTables().contains(tableId)); const TUserTable& tableInfo = *dataShard.GetUserTables().at(tableId); - return CreateChangeCollector(dataShard, userDb, db, tableInfo, isImmediateTx); + return CreateChangeCollector(dataShard, userDb, groupProvider, db, tableInfo); } } // NDataShard diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h index 4ebc6742b68..2b9b034a827 100644 --- a/ydb/core/tx/datashard/change_collector.h +++ b/ydb/core/tx/datashard/change_collector.h @@ -11,6 +11,38 @@ struct TUserTable; class IDataShardUserDb; +class IDataShardChangeGroupProvider { +protected: + ~IDataShardChangeGroupProvider() = default; + +public: + virtual bool HasChangeGroup() const = 0; + virtual ui64 GetChangeGroup() = 0; +}; + +class TDataShardChangeGroupProvider final + : public IDataShardChangeGroupProvider +{ +public: + // Note: for distributed transactions group is expected to be 0 + TDataShardChangeGroupProvider(TDataShard& self, NTable::TDatabase& db, std::optional<ui64> group = std::nullopt) + : Self(self) + , Db(db) + , Group(group) + { } + + bool HasChangeGroup() const override { + return bool(Group); + } + + ui64 GetChangeGroup() override; + +private: + TDataShard& Self; + NTable::TDatabase& Db; + std::optional<ui64> Group; +}; + class IDataShardChangeCollector : public NMiniKQL::IChangeCollector { public: // basic change record's info @@ -37,8 +69,18 @@ public: virtual TVector<TChange>&& GetCollected() = 0; }; -IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx); -IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx); +IDataShardChangeCollector* CreateChangeCollector( + TDataShard& dataShard, + IDataShardUserDb& userDb, + IDataShardChangeGroupProvider& groupProvider, + NTable::TDatabase& db, + const TUserTable& table); +IDataShardChangeCollector* CreateChangeCollector( + TDataShard& dataShard, + IDataShardUserDb& userDb, + IDataShardChangeGroupProvider& groupProvider, + NTable::TDatabase& db, + ui64 tableId); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index f5e4ce2da05..f55b108288b 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -193,7 +193,11 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) { } /// -class TDataShardEngineHost : public TEngineHost, public IDataShardUserDb { +class TDataShardEngineHost final + : public TEngineHost + , public IDataShardUserDb + , public IDataShardChangeGroupProvider +{ public: TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) : TEngineHost(db, counters, @@ -271,6 +275,24 @@ public: IsRepeatableSnapshot = true; } + bool HasChangeGroup() const override { + return bool(ChangeGroup); + } + + ui64 GetChangeGroup() override { + if (!ChangeGroup) { + if (IsImmediateTx) { + NIceDb::TNiceDb db(DB); + ChangeGroup = Self->AllocateChangeRecordGroup(db); + } else { + // Distributed transactions have their group set to zero + ChangeGroup = 0; + } + } + + return *ChangeGroup; + } + IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override { auto it = ChangeCollectors.find(tableId.PathId); if (it != ChangeCollectors.end()) { @@ -282,7 +304,12 @@ public: return it->second.Get(); } - it->second.Reset(CreateChangeCollector(*Self, *const_cast<TDataShardEngineHost*>(this), DB, tableId.PathId.LocalPathId, IsImmediateTx)); + it->second.Reset(CreateChangeCollector( + *Self, + *const_cast<TDataShardEngineHost*>(this), + *const_cast<TDataShardEngineHost*>(this), + DB, + tableId.PathId.LocalPathId)); return it->second.Get(); } @@ -368,6 +395,10 @@ public: return dependencies; } + std::optional<ui64> GetVolatileChangeGroup() const { + return ChangeGroup; + } + bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override { if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); @@ -927,6 +958,7 @@ private: mutable absl::flat_hash_map<TPathId, NTable::ITransactionObserverPtr> TxObservers; mutable absl::flat_hash_set<ui64> VolatileCommitTxIds; mutable absl::flat_hash_set<ui64> VolatileDependencies; + std::optional<ui64> ChangeGroup = std::nullopt; }; // @@ -1156,6 +1188,13 @@ TVector<ui64> TEngineBay::GetVolatileDependencies() const { return host->GetVolatileDependencies(); } +std::optional<ui64> TEngineBay::GetVolatileChangeGroup() const { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + return host->GetVolatileChangeGroup(); +} + IEngineFlat * TEngineBay::GetEngine() { if (!Engine) { Engine = CreateEngineFlat(*EngineSettings); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 62bfe0a1971..42515d42446 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -114,6 +114,7 @@ public: TVector<ui64> GetVolatileCommitTxIds() const; TVector<ui64> GetVolatileDependencies() const; + std::optional<ui64> GetVolatileChangeGroup() const; void ResetCounters() { EngineHostCounters = TEngineHostCounters(); } const TEngineHostCounters& GetCounters() const { return EngineHostCounters; } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 5ca5ff51642..5b114d06f39 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -190,6 +190,7 @@ public: TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); } TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); } + std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); } TActorId Source() const { return Source_; } void SetSource(const TActorId& actorId) { Source_ = actorId; } diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 9fd3330d9cc..6cd4da5e67c 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -61,9 +61,10 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId); TDataShardUserDb userDb(*self, txc.DB, readVersion); + TDataShardChangeGroupProvider groupProvider(*self, txc.DB); if (CollectChanges) { - ChangeCollector.Reset(CreateChangeCollector(*self, userDb, txc.DB, tableInfo, true)); + ChangeCollector.Reset(CreateChangeCollector(*self, userDb, groupProvider, txc.DB, tableInfo)); } // Prepare (id, Type) vector for value columns diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 2ad5d5e6595..e27bec92afd 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -60,6 +60,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } std::optional<TDataShardUserDb> userDb; + std::optional<TDataShardChangeGroupProvider> groupProvider; THolder<IEraseRowsCondition> condition; if (params) { @@ -69,7 +70,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } userDb.emplace(*self, params.Txc->DB, params.ReadVersion); - params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, params.Txc->DB, tableInfo, true)); + groupProvider.emplace(*self, params.Txc->DB); + params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, *groupProvider, params.Txc->DB, tableInfo)); } const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId); diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index 6e57ee9bec4..3914bd484d0 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -111,7 +111,8 @@ void TDataShardLocksDb::PersistLockCounter(ui64 lockId, ui64 counter) { void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { // We remove lock changes unless it's managed by volatile tx manager - if (!Self.GetVolatileTxManager().FindByCommitTxId(lockId)) { + bool isVolatile = Self.GetVolatileTxManager().FindByCommitTxId(lockId); + if (!isVolatile) { for (auto& pr : Self.GetUserTables()) { auto tid = pr.second->LocalTid; // Removing the lock also removes any uncommitted data @@ -126,7 +127,9 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { db.Table<Schema::Locks>().Key(lockId).Delete(); HasChanges_ = true; - Self.ScheduleRemoveLockChanges(lockId); + if (!isVolatile) { + Self.ScheduleRemoveLockChanges(lockId); + } } void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 552f2fc9e26..27da0724b61 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1154,8 +1154,9 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { NKikimrTxDataShard::TKqpLocks_ELocksOp Op = NKikimrTxDataShard::TKqpLocks::Commit; TVector<TLockInfo> Locks; - void AddLocks(const TVector<TLockInfo>& locks) { + TInjectLocks& AddLocks(const TVector<TLockInfo>& locks) { Locks.insert(Locks.end(), locks.begin(), locks.end()); + return *this; } }; @@ -3388,6 +3389,142 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "ERROR: WrongRequest\n"); } + Y_UNIT_TEST(LockedWriteWithAsyncIndexAndVolatileCommit) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + WaitTxNotification(server, sender, + AsyncAlterAddIndex(server, "/Root", "/Root/table-1", + TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync})); + + CreateShardedTable(server, sender, "/Root", "table-2", 1); + WaitTxNotification(server, sender, + AsyncAlterAddIndex(server, "/Root", "/Root/table-2", + TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync})); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to keys 1 and 2 using tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to keys 10 and 20 using tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 110), (20, 210) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + SimulateSleep(server, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` VIEW by_value + WHERE value in (1, 11, 21) + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-2` VIEW by_value + WHERE value in (10, 110, 210) + ORDER BY key + )")), + "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); + + // Commit changes in tx 123 + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().AddLocks(locks1).AddLocks(locks2); + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0); + UPSERT INTO `/Root/table-2` (key, value) VALUES (0, 0); + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + SimulateSleep(server, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` VIEW by_value + WHERE value in (1, 11, 21) + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 21 } }"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-2` VIEW by_value + WHERE value in (10, 110, 210) + ORDER BY key + )")), + "{ items { uint32_value: 10 } items { uint32_value: 110 } }, " + "{ items { uint32_value: 20 } items { uint32_value: 210 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 3152351d6dd..ac595562f03 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1352,6 +1352,66 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { UNIT_ASSERT_VALUES_EQUAL(observedStatus, Ydb::StatusIds::SUCCESS); } + Y_UNIT_TEST(DistributedWriteWithAsyncIndex) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + }) + .Indexes({ + {"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}, + }); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + ExecSQL(server, sender, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 3); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 30); + )"); + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Make sure changes are actually delivered + SimulateSleep(runtime, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, R"( + SELECT key, value + FROM `/Root/table-1` VIEW by_value + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 3 } }"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, R"( + SELECT key, value + FROM `/Root/table-2` VIEW by_value + ORDER BY key + )"), + "{ items { uint32_value: 10 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 20 } items { uint32_value: 30 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 318b422f879..b57b346ecf2 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -46,7 +46,8 @@ public: if (eraseTx->HasDependents()) { TDataShardUserDb userDb(DataShard, txc.DB, readVersion); - THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, txc.DB, request.GetTableId(), false)}; + TDataShardChangeGroupProvider groupProvider(DataShard, txc.DB, /* distributed tx group */ 0); + THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())}; auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize()); if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) { diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 4c20ff9f2a5..bd6400a9919 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -324,6 +324,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio commitTxIds, dataTx->GetVolatileDependencies(), participants, + dataTx->GetVolatileChangeGroup(), txc); } diff --git a/ydb/core/tx/datashard/remove_lock_change_records.cpp b/ydb/core/tx/datashard/remove_lock_change_records.cpp index b387c02f5a3..ccc2e8f4b7c 100644 --- a/ydb/core/tx/datashard/remove_lock_change_records.cpp +++ b/ydb/core/tx/datashard/remove_lock_change_records.cpp @@ -35,6 +35,12 @@ public: continue; } + if (Self->GetVolatileTxManager().FindByCommitTxId(lockId)) { + // Don't remove records that are managed by volatile tx manager + Self->PendingLockChangeRecordsToRemove.pop_back(); + continue; + } + while (!it->second.Changes.empty() && removed < MaxRecordsToRemove) { auto& record = it->second.Changes.back(); db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete(); diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 0f079b37df5..275ab15d439 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -14,6 +14,8 @@ namespace NKikimr::NDataShard { TTxType GetTxType() const override { return TXTYPE_VOLATILE_TX_COMMIT; } bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { + NIceDb::TNiceDb db(txc.DB); + Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled); Self->VolatileTxManager.PendingCommitTxScheduled = false; @@ -36,7 +38,30 @@ namespace NKikimr::NDataShard { } } - // TODO: commit change records + auto getGroup = [&]() -> ui64 { + if (!info->ChangeGroup) { + if (info->Version.TxId != info->TxId) { + // Assume it's an immediate transaction and allocate new group + info->ChangeGroup = Self->AllocateChangeRecordGroup(db); + } else { + // Distributed transactions commit changes with group zero + info->ChangeGroup = 0; + } + } + return *info->ChangeGroup; + }; + + // First commit change records from any committed locks + for (ui64 commitTxId : info->CommitTxIds) { + if (commitTxId != info->TxId && Self->HasLockChangeRecords(commitTxId)) { + Self->CommitLockChangeRecords(db, commitTxId, getGroup(), info->Version, Collected); + } + } + + // Commit change records from the transaction itself + if (info->CommitTxIds.contains(info->TxId) && Self->HasLockChangeRecords(info->TxId)) { + Self->CommitLockChangeRecords(db, info->TxId, getGroup(), info->Version, Collected); + } Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc); @@ -53,6 +78,9 @@ namespace NKikimr::NDataShard { if (Delayed) { OnCommitted(ctx); } + if (Collected) { + Self->EnqueueChangeRecords(std::move(Collected)); + } } void OnCommitted(const TActorContext& ctx) { @@ -71,6 +99,7 @@ namespace NKikimr::NDataShard { private: ui64 TxId; + TVector<IDataShardChangeCollector::TChange> Collected; bool Delayed = false; }; @@ -107,8 +136,6 @@ namespace NKikimr::NDataShard { } } - // TODO: abort change records - Self->VolatileTxManager.PersistRemoveVolatileTx(TxId, txc); return true; } @@ -118,6 +145,9 @@ namespace NKikimr::NDataShard { Y_VERIFY(info && info->State == EVolatileTxState::Aborting); Y_VERIFY(info->AddCommitted); + // Make a copy since it will disappear soon + auto commitTxIds = info->CommitTxIds; + // Run callbacks only after we successfully persist aborted tx Self->VolatileTxManager.RunAbortCallbacks(info); @@ -127,6 +157,11 @@ namespace NKikimr::NDataShard { Self->VolatileTxManager.RemoveVolatileTx(TxId); + // Schedule removal of all lock changes we were supposed to commit + for (ui64 commitTxId : commitTxIds) { + Self->ScheduleRemoveLockChanges(commitTxId); + } + Self->CheckSplitCanStart(ctx); } @@ -235,6 +270,9 @@ namespace NKikimr::NDataShard { info->Version = TRowVersion(details.GetVersionStep(), details.GetVersionTxId()); info->CommitTxIds.insert(details.GetCommitTxIds().begin(), details.GetCommitTxIds().end()); info->Dependencies.insert(details.GetDependencies().begin(), details.GetDependencies().end()); + if (details.HasChangeGroup()) { + info->ChangeGroup = details.GetChangeGroup(); + } info->AddCommitted = true; // we loaded it from local db, so it is committed if (!rowset.Next()) { @@ -354,6 +392,7 @@ namespace NKikimr::NDataShard { TConstArrayRef<ui64> commitTxIds, TConstArrayRef<ui64> dependencies, TConstArrayRef<ui64> participants, + std::optional<ui64> changeGroup, TTransactionContext& txc) { using Schema = TDataShard::Schema; @@ -372,6 +411,7 @@ namespace NKikimr::NDataShard { info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end()); info->Dependencies.insert(dependencies.begin(), dependencies.end()); info->Participants.insert(participants.begin(), participants.end()); + info->ChangeGroup = changeGroup; if (info->Participants.empty()) { // Transaction is committed when we don't have to wait for other participants @@ -417,6 +457,10 @@ namespace NKikimr::NDataShard { std::sort(m->begin(), m->end()); } + if (info->ChangeGroup) { + details.SetChangeGroup(*info->ChangeGroup); + } + db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Update( NIceDb::TUpdate<Schema::TxVolatileDetails::State>(info->State), NIceDb::TUpdate<Schema::TxVolatileDetails::Details>(std::move(details))); diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index be6202a8d1f..6af069540ec 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -48,6 +48,7 @@ namespace NKikimr::NDataShard { absl::flat_hash_set<ui64> Dependencies; absl::flat_hash_set<ui64> Dependents; absl::flat_hash_set<ui64> Participants; + std::optional<ui64> ChangeGroup; bool AddCommitted = false; absl::flat_hash_set<ui64> BlockedOperations; absl::flat_hash_set<ui64> WaitingRemovalOperations; @@ -175,6 +176,7 @@ namespace NKikimr::NDataShard { TConstArrayRef<ui64> commitTxIds, TConstArrayRef<ui64> dependencies, TConstArrayRef<ui64> participants, + std::optional<ui64> changeGroup, TTransactionContext& txc); bool AttachVolatileTxCallback( |