diff options
author | snaury <snaury@ydb.tech> | 2023-03-07 14:50:41 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-07 14:50:41 +0300 |
commit | 2f885def7c5026f00c52b9e82c9ccf3ed9921b6a (patch) | |
tree | b6670e40f8ccd73d0f3d7bb347f4d7d8c318b2c0 | |
parent | 34872f64f9aaa7edc7f19def053c0c403d7cad3b (diff) | |
download | ydb-2f885def7c5026f00c52b9e82c9ccf3ed9921b6a.tar.gz |
Handle volatile dependencies in uncommitted locks
10 files changed, 306 insertions, 18 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index f55b108288..8cd85db72a 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -321,6 +321,14 @@ public: return; } + if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min())) { + lock->ForAllVolatileDependencies([this](ui64 txId) { + if (VolatileDependencies.insert(txId).second && !VolatileTxId) { + VolatileTxId = EngineBay.GetTxId(); + } + }); + } + if (VolatileTxId) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Scheduling commit of lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); @@ -902,7 +910,9 @@ public: void AddWriteConflict(ui64 txId) const { if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { - Y_FAIL("TODO: add future lock dependency from %" PRIu64 " on %" PRIu64, LockTxId, info->TxId); + if (info->State != EVolatileTxState::Aborting) { + Self->SysLocksTable().AddVolatileDependency(info->TxId); + } } else { Self->SysLocksTable().AddWriteConflict(txId); } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 6baa4a909e..983f22933c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -978,6 +978,14 @@ class TDataShard >; }; + struct LockVolatileDependencies : Table<35> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct TxId : Column<2, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<LockId, TxId>; + using TColumns = TableColumns<LockId, TxId>; + }; + using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue, DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress, Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts, @@ -985,7 +993,8 @@ class TDataShard ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived, UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts, LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits, - TxVolatileDetails, TxVolatileParticipants, CdcStreamScans>; + TxVolatileDetails, TxVolatileParticipants, CdcStreamScans, + LockVolatileDependencies>; // These settings are persisted on each Init. So we use empty settings in order not to overwrite what // was changed by the user diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index a48b97301a..3b9b56fafd 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -158,6 +158,12 @@ void TLockInfo::PersistRemoveLock(ILocksDb* db) { Y_VERIFY(IsPersistent()); Y_VERIFY(db, "Cannot persist lock without a db"); + // Remove persistent volatile dependencies + for (ui64 txId : VolatileDependencies) { + db->PersistRemoveVolatileDependency(LockId, txId); + } + VolatileDependencies.clear(); + // Remove persistent conflicts for (auto& pr : ConflictLocks) { TLockInfo* otherLock = pr.first; @@ -236,6 +242,15 @@ void TLockInfo::AddConflict(TLockInfo* otherLock, ILocksDb* db) { } } +void TLockInfo::AddVolatileDependency(ui64 txId, ILocksDb* db) { + Y_VERIFY(LockId != txId, "Unexpected volatile dependency between a lock and itself"); + + if (VolatileDependencies.insert(txId).second && IsPersistent()) { + Y_VERIFY(db, "Cannot persist dependencies without a db"); + db->PersistAddVolatileDependency(LockId, txId); + } +} + void TLockInfo::PersistConflicts(ILocksDb* db) { Y_VERIFY(IsPersistent()); Y_VERIFY(db, "Cannot persist conflicts without a db"); @@ -252,6 +267,9 @@ void TLockInfo::PersistConflicts(ILocksDb* db) { db->PersistAddConflict(otherLock->LockId, LockId); } } + for (ui64 txId : VolatileDependencies) { + db->PersistAddVolatileDependency(LockId, txId); + } } void TLockInfo::CleanupConflicts() { @@ -272,6 +290,7 @@ void TLockInfo::CleanupConflicts() { otherLock->ConflictLocks.erase(this); } ConflictLocks.clear(); + VolatileDependencies.clear(); } } @@ -305,6 +324,12 @@ void TLockInfo::RestorePersistentConflict(TLockInfo* otherLock) { otherLock->ConflictLocks[this] |= ELockConflictFlags::BreakUsOnTheirCommit; } +void TLockInfo::RestorePersistentVolatileDependency(ui64 txId) { + Y_VERIFY(IsPersistent()); + + VolatileDependencies.insert(txId); +} + // TTableLocks void TTableLocks::AddShardLock(TLockInfo* lock) { @@ -775,6 +800,9 @@ TVector<TSysLocks::TLock> TSysLocks::ApplyLocks() { for (auto& writeConflictLock : Update->WriteConflictLocks) { lock->AddConflict(&writeConflictLock, Db); } + for (ui64 txId : Update->VolatileDependencies) { + lock->AddVolatileDependency(txId, Db); + } if (lock->GetWriteTables() && !lock->IsPersistent()) { // We need to persist a new lock @@ -999,6 +1027,12 @@ void TSysLocks::AddWriteConflict(const TTableId& tableId, const TArrayRef<const } } +void TSysLocks::AddVolatileDependency(ui64 txId) { + Y_VERIFY(Update && Update->LockTxId); + + Update->AddVolatileDependency(txId); +} + void TSysLocks::BreakAllLocks(const TTableId& tableId) { Y_VERIFY(Update); Y_VERIFY(!tableId.HasSamePath(TTableId(TSysTables::SysSchemeShard, TSysTables::SysTableLocks))); @@ -1145,6 +1179,9 @@ bool TSysLocks::Load(ILocksDb& db) { lock->RestorePersistentConflict(otherLock); } } + for (ui64 txId : lockRow.VolatileDependencies) { + lock->RestorePersistentVolatileDependency(txId); + } } return true; diff --git a/ydb/core/tx/datashard/datashard_locks.h b/ydb/core/tx/datashard/datashard_locks.h index 7d65c115ab..74834c85cb 100644 --- a/ydb/core/tx/datashard/datashard_locks.h +++ b/ydb/core/tx/datashard/datashard_locks.h @@ -8,6 +8,7 @@ #include <ydb/core/tablet/tablet_counters.h> #include <library/cpp/cache/cache.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> #include <util/generic/list.h> #include <util/generic/queue.h> #include <util/generic/set.h> @@ -42,6 +43,7 @@ public: TVector<TLockRange> Ranges; TVector<ui64> Conflicts; + TVector<ui64> VolatileDependencies; }; virtual bool Load(TVector<TLockRow>& rows) = 0; @@ -65,6 +67,10 @@ public: // Persist a conflict, i.e. this lock must break some other lock on commit virtual void PersistAddConflict(ui64 lockId, ui64 otherLockId) = 0; virtual void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) = 0; + + // Persist volatile dependencies, i.e. which undecided transactions must be waited for on commit + virtual void PersistAddVolatileDependency(ui64 lockId, ui64 txId) = 0; + virtual void PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) = 0; }; class TLocksDataShard { @@ -310,11 +316,13 @@ public: void PersistRanges(ILocksDb* db); void AddConflict(TLockInfo* otherLock, ILocksDb* db); + void AddVolatileDependency(ui64 txId, ILocksDb* db); void PersistConflicts(ILocksDb* db); void CleanupConflicts(); void RestorePersistentRange(ui64 rangeId, const TPathId& tableId, ELockRangeFlags flags); void RestorePersistentConflict(TLockInfo* otherLock); + void RestorePersistentVolatileDependency(ui64 txId); template<class TCallback> void ForAllConflicts(TCallback&& callback) { @@ -323,6 +331,13 @@ public: } } + template<class TCallback> + void ForAllVolatileDependencies(TCallback&& callback) { + for (auto& item : VolatileDependencies) { + callback(item); + } + } + ui64 GetLastOpId() const { return LastOpId; } void SetLastOpId(ui64 opId) { LastOpId = opId; } @@ -367,6 +382,7 @@ private: // A set of locks we must break on commit THashMap<TLockInfo*, ELockConflictFlags> ConflictLocks; + absl::flat_hash_set<ui64> VolatileDependencies; TVector<TPersistentRange> PersistentRanges; ui64 LastOpId = 0; @@ -651,6 +667,7 @@ struct TLocksUpdate { TIntrusiveList<TLockInfo, TLockInfoReadConflictListTag> ReadConflictLocks; TIntrusiveList<TLockInfo, TLockInfoWriteConflictListTag> WriteConflictLocks; TIntrusiveList<TTableLocks, TTableLocksWriteConflictShardListTag> WriteConflictShardLocks; + absl::flat_hash_set<ui64> VolatileDependencies; TIntrusiveList<TLockInfo, TLockInfoEraseListTag> EraseLocks; @@ -721,6 +738,10 @@ struct TLocksUpdate { WriteConflictShardLocks.PushBack(table); } + void AddVolatileDependency(ui64 txId) { + VolatileDependencies.insert(txId); + } + void AddEraseLock(TLockInfo* lock) { EraseLocks.PushBack(lock); } @@ -814,6 +835,7 @@ public: void AddReadConflict(ui64 conflictId); void AddWriteConflict(ui64 conflictId); void AddWriteConflict(const TTableId& tableId, const TArrayRef<const TCell>& key); + void AddVolatileDependency(ui64 txId); void BreakAllLocks(const TTableId& tableId); void BreakSetLocks(); bool IsMyKey(const TArrayRef<const TCell>& key) const; diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index 3914bd484d..54821486e0 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -74,6 +74,26 @@ bool TDataShardLocksDb::Load(TVector<TLockRow>& rows) { } } + // Load volatile dependencies + { + auto rowset = db.Table<Schema::LockVolatileDependencies>().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + auto lockId = rowset.GetValue<Schema::LockVolatileDependencies::LockId>(); + auto it = lockIndex.find(lockId); + if (it != lockIndex.end()) { + auto& lock = rows[it->second]; + auto txId = rowset.GetValue<Schema::LockVolatileDependencies::TxId>(); + lock.VolatileDependencies.push_back(txId); + } + if (!rowset.Next()) { + return false; + } + } + } + return true; } @@ -172,4 +192,18 @@ void TDataShardLocksDb::PersistRemoveConflict(ui64 lockId, ui64 otherLockId) { HasChanges_ = true; } +void TDataShardLocksDb::PersistAddVolatileDependency(ui64 lockId, ui64 txId) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockVolatileDependencies>().Key(lockId, txId).Update(); + HasChanges_ = true; +} + +void TDataShardLocksDb::PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) { + using Schema = TDataShard::Schema; + NIceDb::TNiceDb db(DB); + db.Table<Schema::LockVolatileDependencies>().Key(lockId, txId).Delete(); + HasChanges_ = true; +} + } // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_locks_db.h b/ydb/core/tx/datashard/datashard_locks_db.h index ea2b2cd3b4..1aba7176ec 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.h +++ b/ydb/core/tx/datashard/datashard_locks_db.h @@ -34,6 +34,10 @@ public: void PersistAddConflict(ui64 lockId, ui64 otherLockId) override; void PersistRemoveConflict(ui64 lockId, ui64 otherLockId) override; + // Persist volatile dependencies, i.e. which undecided transactions must be waited for on commit + void PersistAddVolatileDependency(ui64 lockId, ui64 txId) override; + void PersistRemoveVolatileDependency(ui64 lockId, ui64 txId) override; + private: TDataShard& Self; NTable::TDatabase& DB; diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 9994ee28b3..8bfc5c5ec3 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -19,21 +19,21 @@ namespace NKqpHelpers { template<class TResp> inline TResp AwaitResponse(TTestActorRuntime& runtime, NThreading::TFuture<TResp> f) { - size_t responses = 0; - TResp response; - f.Subscribe([&](NThreading::TFuture<TResp> fut){ - ++responses; - TResp r = fut.ExtractValueSync(); - response.Swap(&r); - }); - - TDispatchOptions options; - options.FinalEvents.emplace_back( - [&](IEventHandle& ) -> bool { return responses >= 1; } - ); - - runtime.DispatchEvents(options); - return response; + if (!f.HasValue() && !f.HasException()) { + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return f.HasValue() || f.HasException(); + }; + options.FinalEvents.emplace_back([&](IEventHandle&) { + return f.HasValue() || f.HasException(); + }); + + runtime.DispatchEvents(options); + + UNIT_ASSERT(f.HasValue() || f.HasException()); + } + + return f.ExtractValueSync(); } inline TString CreateSessionRPC(TTestActorRuntime& runtime, const TString& database = {}) { diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 27da0724b6..2c0a5c42cf 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -3525,6 +3525,120 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "{ items { uint32_value: 20 } items { uint32_value: 210 } }"); } + Y_UNIT_TEST(LockedWriteWithPendingVolatileCommit) { + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10)")); + + SimulateSleep(runtime, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + observer.BlockReadSets = true; + + Cerr << "!!! Sending volatile upsert" << Endl; + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + TString volatileSessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", volatileSessionId, "", true), + "/Root"); + SimulateSleep(runtime, TDuration::Seconds(1)); + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Should be 2 expectations + 2 commit decisions + UNIT_ASSERT(!upsertResult.HasValue()); + UNIT_ASSERT_VALUES_EQUAL(observer.BlockedReadSets.size(), 4u); + + // Start a snapshot read transaction, make sure not to touch the uncommitted 2 key + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 1 + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + + // Write uncommitted changes to keys 1 and 2 using tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value2) VALUES (1, 11), (2, 22) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value2) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + // This compaction verifies there's no commit race with the waiting + // distributed transaction. If commits happen in incorrect order we + // would observe unexpected results. + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + CompactTable(runtime, shard1, tableId1, false); + + observer.UnblockReadSets(); + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(upsertResult))), + "<empty>"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + SELECT key, value, value2 FROM `/Root/table-1` + ORDER BY key + )")), + "{ items { uint32_value: 1 } items { uint32_value: 1 } items { uint32_value: 11 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } items { uint32_value: 22 } }"); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index 18955ee931..e7feeae7a9 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -24,6 +24,7 @@ public: private: void ExecuteDataTx(TOperation::TPtr op, + TTransactionContext& txc, const TActorContext& ctx, TSetupSysLocks& guardLocks); void AddLocksToResult(TOperation::TPtr op, const TActorContext& ctx); @@ -158,7 +159,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, try { try { - ExecuteDataTx(op, ctx, guardLocks); + ExecuteDataTx(op, txc, ctx, guardLocks); } catch (const TNotReadyTabletException&) { // We want to try pinning (actually precharging) all required pages // before restarting the transaction, to minimize future restarts. @@ -219,6 +220,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, + TTransactionContext& txc, const TActorContext& ctx, TSetupSysLocks& guardLocks) { @@ -315,6 +317,20 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, DataShard.SysLocksTable().BreakSetLocks(); } + // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies + // Such transactions would have no participants and become immediately committed + if (auto commitTxIds = tx->GetDataTx()->GetVolatileCommitTxIds()) { + TVector<ui64> participants; // empty participants + DataShard.GetVolatileTxManager().PersistAddVolatileTx( + tx->GetTxId(), + writeVersion, + commitTxIds, + tx->GetDataTx()->GetVolatileDependencies(), + participants, + tx->GetDataTx()->GetVolatileChangeGroup(), + txc); + } + AddLocksToResult(op, ctx); Pipeline.AddCommittingOp(op); diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 0c51c0565c..1b47acab12 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1900,6 +1900,48 @@ } }, { + "TableId": 35, + "TableName": "LockVolatileDependencies", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "TxId", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { "TableId": 101, "TableName": "LockChangeRecords", "TableKey": [ |