diff options
author | snaury <snaury@ydb.tech> | 2023-05-25 18:15:28 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-05-25 18:15:28 +0300 |
commit | 14c17870c6b053f007138d6f0f6188a30f30bc83 (patch) | |
tree | d25637ee0324ec9c2214307fa0668e78ca5ac039 | |
parent | 86fb23705bc8eeee1c6a68ac18c4e844f2331d19 (diff) | |
download | ydb-14c17870c6b053f007138d6f0f6188a30f30bc83.tar.gz |
Cache uncommitted write conflicts for distributed operations
27 files changed, 709 insertions, 22 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 4ab23cf5e3a..4971a76c62d 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -452,4 +452,5 @@ enum ETxTypes { TXTYPE_VOLATILE_TX_ABORT = 74 [(TxTypeOpts) = {Name: "TxVolatileTxAbort"}]; TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}]; TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}]; + TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}]; } diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt index 6f4d1978e52..28c5b364bbe 100644 --- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt @@ -176,6 +176,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt index 3e69f6c4fa9..41bb30c51fd 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt @@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt index 3e69f6c4fa9..41bb30c51fd 100644 --- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt @@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt index e819352becc..a7eb4c861d0 100644 --- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt @@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp diff --git a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp index bd55d64eb20..aa2a99150d2 100644 --- a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp +++ b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp @@ -90,6 +90,9 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op, } if (!IsReadyToExecute(op)) { + // Cache write keys while operation waits in the queue + Pipeline.RegisterDistributedWrites(op, txc.DB); + TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get()); if (tx) { // We should put conflicting tx into cache diff --git a/ydb/core/tx/datashard/conflicts_cache.cpp b/ydb/core/tx/datashard/conflicts_cache.cpp new file mode 100644 index 00000000000..dd02a5adf30 --- /dev/null +++ b/ydb/core/tx/datashard/conflicts_cache.cpp @@ -0,0 +1,366 @@ +#include "conflicts_cache.h" +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +TTableConflictsCache::TTableConflictsCache(ui64 localTid) + : LocalTid(localTid) +{} + +void TTableConflictsCache::AddUncommittedWrite(TConstArrayRef<TCell> key, ui64 txId, NTable::TDatabase& db) { + if (WriteKeys.empty()) { + // Avoid expensive key lookup when we know cache is empty + return; + } + + auto itWriteKey = WriteKeys.find(key); + if (itWriteKey == WriteKeys.end()) { + return; // we don't have matching distributed writes + } + + auto* k = itWriteKey->second.get(); + auto res = k->UncommittedWrites.insert(txId); + if (res.second) { + // New write key for this uncommitted tx + auto& p = UncommittedWrites[txId]; + if (!p) { + p = std::make_unique<TUncommittedWrite>(); + } + auto r = p->WriteKeys.insert(k); + Y_VERIFY(r.second); + AddRollbackOp(TRollbackOpRemoveUncommittedWrite{ txId, k }, db); + } +} + +void TTableConflictsCache::RemoveUncommittedWrites(TConstArrayRef<TCell> key, NTable::TDatabase& db) { + if (UncommittedWrites.empty()) { + // Avoid expensive kep lookup when we know there are no writes + return; + } + + auto itWriteKey = WriteKeys.find(key); + if (itWriteKey == WriteKeys.end()) { + return; // we don't have matching distributed writes + } + + auto* k = itWriteKey->second.get(); + for (ui64 txId : k->UncommittedWrites) { + auto it = UncommittedWrites.find(txId); + Y_VERIFY(it != UncommittedWrites.end()); + auto r = it->second->WriteKeys.erase(k); + Y_VERIFY(r); + AddRollbackOp(TRollbackOpAddUncommittedWrite{ txId, k }, db); + if (it->second->WriteKeys.empty()) { + AddRollbackOp(TRollbackOpRestoreUncommittedWrite{ txId, std::move(it->second) }, db); + UncommittedWrites.erase(it); + } + } + + k->UncommittedWrites.clear(); +} + +void TTableConflictsCache::RemoveUncommittedWrites(ui64 txId, NTable::TDatabase& db) { + auto it = UncommittedWrites.find(txId); + if (it == UncommittedWrites.end()) { + return; + } + + auto* p = it->second.get(); + + // Note: UncommittedWrite will be kept alive by rollback buffer + AddRollbackOp(TRollbackOpRestoreUncommittedWrite{ txId, std::move(it->second) }, db); + UncommittedWrites.erase(it); + + // Note: on rollback these links will be restored + for (auto* k : p->WriteKeys) { + k->UncommittedWrites.erase(txId); + } +} + +class TTableConflictsCache::TTxObserver : public NTable::ITransactionObserver { +public: + TTxObserver() {} + + void OnSkipUncommitted(ui64 txId) override { + Y_VERIFY(Target); + Target->insert(txId); + } + + void OnSkipCommitted(const TRowVersion&) override {} + void OnSkipCommitted(const TRowVersion&, ui64) override {} + void OnApplyCommitted(const TRowVersion&) override {} + void OnApplyCommitted(const TRowVersion&, ui64) override {} + + void SetTarget(absl::flat_hash_set<ui64>* target) { + Target = target; + } + +private: + absl::flat_hash_set<ui64>* Target = nullptr; +}; + +bool TTableConflictsCache::RegisterDistributedWrite(ui64 txId, const TOwnedCellVec& key, NTable::TDatabase& db) { + if (auto it = WriteKeys.find(key); it != WriteKeys.end()) { + // We already have this write key cached + auto* k = it->second.get(); + auto& p = DistributedWrites[txId]; + if (!p) { + p = std::make_unique<TDistributedWrite>(); + } + auto r = p->WriteKeys.insert(k); + if (r.second) { + k->DistributedWrites++; + } + return true; + } + + absl::flat_hash_set<ui64> txIds; + + // Search for conflicts only when we know there are uncommitted changes right now + if (db.GetOpenTxCount(LocalTid)) { + if (!TxObserver) { + TxObserver = new TTxObserver(); + } + + static_cast<TTxObserver*>(TxObserver.Get())->SetTarget(&txIds); + auto res = db.SelectRowVersion(LocalTid, key, /* readFlags */ 0, nullptr, TxObserver); + static_cast<TTxObserver*>(TxObserver.Get())->SetTarget(nullptr); + + if (res.Ready == NTable::EReady::Page) { + return false; + } + + auto itTxIds = txIds.begin(); + while (itTxIds != txIds.end()) { + // Filter removed transactions + if (db.HasRemovedTx(LocalTid, *itTxIds)) { + txIds.erase(itTxIds++); + } else { + ++itTxIds; + } + } + } + + auto& k = WriteKeys[key]; + Y_VERIFY(!k); + k = std::make_unique<TWriteKey>(); + k->Key = key; + + if (!txIds.empty()) { + k->UncommittedWrites = std::move(txIds); + for (ui64 txId : k->UncommittedWrites) { + auto& p = UncommittedWrites[txId]; + if (!p) { + p = std::make_unique<TUncommittedWrite>(); + } + auto r = p->WriteKeys.insert(k.get()); + Y_VERIFY(r.second); + } + } + + auto& p = DistributedWrites[txId]; + if (!p) { + p = std::make_unique<TDistributedWrite>(); + } + auto r = p->WriteKeys.insert(k.get()); + Y_VERIFY(r.second); + k->DistributedWrites++; + + return true; +} + +void TTableConflictsCache::UnregisterDistributedWrites(ui64 txId) { + if (!RollbackOps.empty()) { + RollbackAllowed = false; + } + + auto it = DistributedWrites.find(txId); + if (it == DistributedWrites.end()) { + return; + } + + std::unique_ptr<TDistributedWrite> p = std::move(it->second); + DistributedWrites.erase(it); + + for (auto* k : p->WriteKeys) { + Y_VERIFY(k->DistributedWrites > 0); + if (0 == --k->DistributedWrites) { + DropWriteKey(k); + } + } +} + +void TTableConflictsCache::DropWriteKey(TWriteKey* k) { + Y_VERIFY(k->DistributedWrites == 0); + + auto itWriteKey = WriteKeys.find(k->Key); + Y_VERIFY(itWriteKey != WriteKeys.end()); + Y_VERIFY(itWriteKey->second.get() == k); + + std::unique_ptr<TWriteKey> saved = std::move(itWriteKey->second); + WriteKeys.erase(itWriteKey); + + for (ui64 txId : k->UncommittedWrites) { + auto it = UncommittedWrites.find(txId); + Y_VERIFY(it != UncommittedWrites.end()); + it->second->WriteKeys.erase(k); + if (it->second->WriteKeys.empty()) { + UncommittedWrites.erase(it); + } + } +} + +const absl::flat_hash_set<ui64>* TTableConflictsCache::FindUncommittedWrites(TConstArrayRef<TCell> key) { + if (WriteKeys.empty()) { + // Avoid expensive key lookup when we know cache is empty + return nullptr; + } + + auto it = WriteKeys.find(key); + if (it == WriteKeys.end()) { + return nullptr; + } + + return &it->second->UncommittedWrites; +} + +void TTableConflictsCache::AddRollbackOp(TRollbackOp&& op, NTable::TDatabase& db) { + if (RollbackOps.empty()) { + db.OnCommit([this]() { + OnCommitChanges(); + }); + db.OnRollback([this]() { + OnRollbackChanges(); + }); + } + RollbackOps.push_back(std::move(op)); +} + +void TTableConflictsCache::OnCommitChanges() { + RollbackOps.clear(); + RollbackAllowed = true; +} + +void TTableConflictsCache::OnRollbackChanges() { + Y_VERIFY(RollbackAllowed, "Unexpected transaction rollback"); + + struct TPerformRollback { + TTableConflictsCache* Self; + + void operator()(TRollbackOpAddUncommittedWrite& op) const { + auto it = Self->UncommittedWrites.find(op.TxId); + Y_VERIFY(it != Self->UncommittedWrites.end()); + auto* p = it->second.get(); + auto* k = op.WriteKey; + auto r1 = p->WriteKeys.insert(k); + Y_VERIFY(r1.second); + auto r2 = k->UncommittedWrites.insert(op.TxId); + Y_VERIFY(r2.second); + } + + void operator()(TRollbackOpRemoveUncommittedWrite& op) const { + auto it = Self->UncommittedWrites.find(op.TxId); + Y_VERIFY(it != Self->UncommittedWrites.end()); + auto* p = it->second.get(); + auto* k = op.WriteKey; + auto r1 = p->WriteKeys.erase(k); + Y_VERIFY(r1); + auto r2 = k->UncommittedWrites.erase(op.TxId); + Y_VERIFY(r2); + if (p->WriteKeys.empty()) { + Self->UncommittedWrites.erase(it); + } + } + + void operator()(TRollbackOpRestoreUncommittedWrite& op) const { + auto r1 = Self->UncommittedWrites.emplace(op.TxId, std::move(op.Data)); + Y_VERIFY(r1.second); + auto& p = r1.first->second; + for (auto* k : p->WriteKeys) { + auto r2 = k->UncommittedWrites.insert(op.TxId); + Y_VERIFY(r2.second); + } + } + }; + + while (!RollbackOps.empty()) { + std::visit(TPerformRollback{ this }, RollbackOps.back()); + RollbackOps.pop_back(); + } +} + +class TConflictsCache::TTxFindWriteConflicts + : public NTabletFlatExecutor::TTransactionBase<TDataShard> +{ +public: + TTxFindWriteConflicts(TDataShard* self, ui64 txId) + : TBase(self) + , TxId(txId) + { } + + TTxType GetTxType() const override { return TXTYPE_FIND_WRITE_CONFLICTS; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + auto& cache = Self->GetConflictsCache(); + auto itWrites = cache.PendingWrites.find(TxId); + if (itWrites == cache.PendingWrites.end()) { + // Already cancelled + return true; + } + + auto& writes = itWrites->second; + Y_VERIFY(!writes.empty()); + + auto dst = writes.begin(); + for (auto it = writes.begin(); it != writes.end(); ++it) { + if (!cache.GetTableCache(it->LocalTid).RegisterDistributedWrite(TxId, it->WriteKey, txc.DB)) { + if (dst != it) { + *dst = std::move(*it); + } + ++dst; + } + } + writes.erase(dst, writes.end()); + + if (!writes.empty()) { + return false; + } + + cache.PendingWrites.erase(itWrites); + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + +private: + const ui64 TxId; +}; + +void TConflictsCache::RegisterDistributedWrites(ui64 txId, TPendingWrites&& writes, NTable::TDatabase& db) { + auto dst = writes.begin(); + for (auto it = writes.begin(); it != writes.end(); ++it) { + if (!GetTableCache(it->LocalTid).RegisterDistributedWrite(txId, it->WriteKey, db)) { + if (dst != it) { + *dst = std::move(*it); + } + ++dst; + } + } + writes.erase(dst, writes.end()); + + if (!writes.empty()) { + PendingWrites[txId] = std::move(writes); + Self->EnqueueExecute(new TTxFindWriteConflicts(Self, txId)); + } +} + +void TConflictsCache::UnregisterDistributedWrites(ui64 txId) { + for (auto& pr : Tables) { + pr.second.UnregisterDistributedWrites(txId); + } + PendingWrites.erase(txId); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/conflicts_cache.h b/ydb/core/tx/datashard/conflicts_cache.h new file mode 100644 index 00000000000..61c2b1ea748 --- /dev/null +++ b/ydb/core/tx/datashard/conflicts_cache.h @@ -0,0 +1,208 @@ +#pragma once +#include "datashard_active_transaction.h" + +#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h> +#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h> +#include <variant> +#include <vector> + +namespace NKikimr::NDataShard { + +class TDataShard; + +/** + * Caches conflicts for distributed operations + * + * Basically holds a hash table which maps known operation write keys to a set + * of uncommitted writes at that key, as well as pending operations that may + * overwrite that key. When new uncommitted writes are performed they lookup + * pending operations on their key and mark that their uncommitted write + * conflicts with those operations. This means later distributed operations + * don't need to search for conflicts on disk and may either gather a set of + * conflicts from memory, or trust that their conflicts are already accounted + * for. + */ +class TTableConflictsCache { + struct THashableKey { + TConstArrayRef<TCell> Cells; + + template <typename H> + friend H AbslHashValue(H h, const THashableKey& key) { + h = H::combine(std::move(h), key.Cells.size()); + for (const TCell& cell : key.Cells) { + h = H::combine(std::move(h), cell.IsNull()); + if (!cell.IsNull()) { + h = H::combine(std::move(h), cell.Size()); + h = H::combine_contiguous(std::move(h), cell.Data(), cell.Size()); + } + } + return h; + } + }; + + struct TKeyHash { + using is_transparent = void; + + bool operator()(TConstArrayRef<TCell> key) const { + return absl::Hash<THashableKey>()(THashableKey{ key }); + } + }; + + struct TKeyEq { + using is_transparent = void; + + bool operator()(TConstArrayRef<TCell> a, TConstArrayRef<TCell> b) const { + if (a.size() != b.size()) { + return false; + } + + const TCell* pa = a.data(); + const TCell* pb = b.data(); + if (pa == pb) { + return true; + } + + size_t left = a.size(); + while (left > 0) { + if (pa->IsNull()) { + if (!pb->IsNull()) { + return false; + } + } else { + if (pb->IsNull()) { + return false; + } + if (pa->Size() != pb->Size()) { + return false; + } + if (pa->Size() > 0 && ::memcmp(pa->Data(), pb->Data(), pa->Size()) != 0) { + return false; + } + } + ++pa; + ++pb; + --left; + } + + return true; + } + }; + + struct TWriteKey { + TOwnedCellVec Key; + absl::flat_hash_set<ui64> UncommittedWrites; + size_t DistributedWrites = 0; + }; + + struct TUncommittedWrite { + absl::flat_hash_set<TWriteKey*> WriteKeys; + }; + + struct TDistributedWrite { + absl::flat_hash_set<TWriteKey*> WriteKeys; + }; + + using TWriteKeys = absl::flat_hash_map<TOwnedCellVec, std::unique_ptr<TWriteKey>, TKeyHash, TKeyEq>; + using TUncommittedWrites = absl::flat_hash_map<ui64, std::unique_ptr<TUncommittedWrite>>; + using TDistributedWrites = absl::flat_hash_map<ui64, std::unique_ptr<TDistributedWrite>>; + + class TTxObserver; + +public: + explicit TTableConflictsCache(ui64 localTid); + + void AddUncommittedWrite(TConstArrayRef<TCell> key, ui64 txId, NTable::TDatabase& db); + void RemoveUncommittedWrites(TConstArrayRef<TCell> key, NTable::TDatabase& db); + void RemoveUncommittedWrites(ui64 txId, NTable::TDatabase& db); + + bool RegisterDistributedWrite(ui64 txId, const TOwnedCellVec& key, NTable::TDatabase& db); + void UnregisterDistributedWrites(ui64 txId); + + const absl::flat_hash_set<ui64>* FindUncommittedWrites(TConstArrayRef<TCell> key); + +private: + void DropWriteKey(TWriteKey* k); + +private: + struct TRollbackOpAddUncommittedWrite { + ui64 TxId; + TWriteKey* WriteKey; + }; + + struct TRollbackOpRemoveUncommittedWrite { + ui64 TxId; + TWriteKey* WriteKey; + }; + + struct TRollbackOpRestoreUncommittedWrite { + ui64 TxId; + std::unique_ptr<TUncommittedWrite> Data; + }; + + using TRollbackOp = std::variant< + TRollbackOpAddUncommittedWrite, + TRollbackOpRemoveUncommittedWrite, + TRollbackOpRestoreUncommittedWrite>; + + void AddRollbackOp(TRollbackOp&& op, NTable::TDatabase& db); + void OnRollbackChanges(); + void OnCommitChanges(); + +private: + const ui32 LocalTid; + TWriteKeys WriteKeys; + TUncommittedWrites UncommittedWrites; + TDistributedWrites DistributedWrites; + NTable::ITransactionObserverPtr TxObserver; + std::vector<TRollbackOp> RollbackOps; + bool RollbackAllowed = false; +}; + +class TConflictsCache { +public: + struct TPendingWrite { + ui32 LocalTid; + TOwnedCellVec WriteKey; + + TPendingWrite(ui32 localTid, TOwnedCellVec writeKey) + : LocalTid(localTid) + , WriteKey(std::move(writeKey)) + { } + }; + + using TPendingWrites = std::vector<TPendingWrite>; + + class TTxFindWriteConflicts; + +public: + TConflictsCache(TDataShard* self) + : Self(self) + { } + + TTableConflictsCache& GetTableCache(ui32 localTid) { + auto it = Tables.find(localTid); + if (it != Tables.end()) { + return it->second; + } + auto res = Tables.emplace( + std::piecewise_construct, + std::forward_as_tuple(localTid), + std::forward_as_tuple(localTid)); + Y_VERIFY(res.second); + return res.first->second; + } + + void DropTableCaches(ui32 localTid) { + Tables.erase(localTid); + } + + void RegisterDistributedWrites(ui64 txId, TPendingWrites&& writes, NTable::TDatabase& db); + void UnregisterDistributedWrites(ui64 txId); + +private: + TDataShard* const Self; + THashMap<ui32, TTableConflictsCache> Tables; + THashMap<ui64, TPendingWrites> PendingWrites; // TxId -> Writes +}; + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 13a11588c64..2dc2c39d736 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -130,6 +130,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , SnapshotManager(this) , SchemaSnapshotManager(this) , VolatileTxManager(this) + , ConflictsCache(this) , DisableByKeyFilter(0, 0, 1) , MaxTxInFly(15000, 0, 100000) , MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll) @@ -3947,14 +3948,8 @@ public: } void OnSkipUncommitted(ui64 txId) override { - if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { - if (info->State != EVolatileTxState::Aborting) { - Y_VERIFY(VolatileDependencies); - VolatileDependencies->insert(txId); - } - } else { - Self->SysLocksTable().BreakLock(txId); - } + Y_VERIFY(VolatileDependencies); + Self->BreakWriteConflict(txId, *VolatileDependencies); } void OnSkipCommitted(const TRowVersion&) override { @@ -4002,10 +3997,13 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl { const auto localTid = GetLocalTableId(tableId); Y_VERIFY(localTid); - const NTable::TScheme& scheme = db.GetScheme(); - const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTid); - TSmallVec<TRawTypeValue> key; - NMiniKQL::ConvertTableKeys(scheme, tableInfo, keyCells, key, nullptr); + + if (auto* cached = GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) { + for (ui64 txId : *cached) { + BreakWriteConflict(txId, volatileDependencies); + } + return true; + } if (!BreakWriteConflictsTxObserver) { BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this); @@ -4018,7 +4016,7 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl // We are not actually interested in the row version, we only need to // detect uncommitted transaction skips on the path to that version. auto res = db.SelectRowVersion( - localTid, key, /* readFlags */ 0, + localTid, keyCells, /* readFlags */ 0, nullptr, BreakWriteConflictsTxObserver); @@ -4029,6 +4027,16 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl return true; } +void TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies) { + if (auto* info = GetVolatileTxManager().FindByCommitTxId(txId)) { + if (info->State != EVolatileTxState::Aborting) { + volatileDependencies.insert(txId); + } + } else { + SysLocksTable().BreakLock(txId); + } +} + class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> { public: TTxGetOpenTxs(TDataShard* self, TEvDataShard::TEvGetOpenTxs::TPtr&& ev) diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index aa520ca4a27..0d79d0e7582 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -128,8 +128,10 @@ namespace NDataShard { WaitingForGlobalTxId = 1ULL << 45, // Operation is waiting for restart WaitingForRestart = 1ULL << 46, + // Operation has write keys registered in the cache + DistributedWritesRegistered = 1ULL << 47, - LastFlag = WaitingForRestart, + LastFlag = DistributedWritesRegistered, PrivateFlagsMask = 0xFFFFFFFFFFFF0000ULL, PreservedPrivateFlagsMask = ReadOnly | ProposeBlocker | NeedDiagnostics | GlobalReader diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 164351e7799..469935e2455 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -348,6 +348,7 @@ public: LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); DB.CommitTx(localTid, lockId, writeVersion); + Self->GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(lockId, DB); if (!CommittedLockChanges.contains(lockId) && Self->HasLockChangeRecords(lockId)) { if (auto* collector = GetChangeCollector(tableId)) { @@ -516,6 +517,14 @@ public: } else { TEngineHost::UpdateRow(tableId, row, commands); } + + if (VolatileTxId) { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db); + } else if (LockTxId) { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db); + } else { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db); + } } void EraseRow(const TTableId& tableId, const TArrayRef<const TCell>& row) override { @@ -534,6 +543,14 @@ public: Self->SetTableUpdateTime(tableId, Now); TEngineHost::EraseRow(tableId, row); + + if (VolatileTxId) { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db); + } else if (LockTxId) { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db); + } else { + Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db); + } } // Returns whether row belong this shard. @@ -782,7 +799,7 @@ public: return false; } - void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) { + void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> keyCells) { // When there are uncommitted changes (write locks) we must find which // locks would break upon commit. bool mustFindConflicts = Self->SysLocksTable().HasWriteLocks(tableId); @@ -799,17 +816,20 @@ public: const auto localTid = LocalTableId(tableId); Y_VERIFY(localTid); - const TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(localTid); - TSmallVec<TRawTypeValue> key; - ConvertTableKeys(Scheme, tableInfo, row, key, nullptr); ui64 skipCount = 0; NTable::ITransactionObserverPtr txObserver; if (LockTxId) { + // We cannot use cached conflicts since we need to find skip count txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid); // Locked writes are immediate, increased latency is not critical mustFindConflicts = true; + } else if (auto* cached = Self->GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) { + for (ui64 txId : *cached) { + BreakWriteConflict(txId); + } + return; } else { txObserver = new TWriteTxObserver(this); // Prefer precise conflicts for non-distributed transactions @@ -821,7 +841,7 @@ public: // We are not actually interested in the row version, we only need to // detect uncommitted transaction skips on the path to that version. auto res = Db.SelectRowVersion( - localTid, key, /* readFlags */ 0, + localTid, keyCells, /* readFlags */ 0, nullptr, txObserver); if (res.Ready == NTable::EReady::Page) { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 7ddf8478bb1..b419ee1ecd1 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -501,6 +501,15 @@ public: // TOperation iface. void BuildExecutionPlan(bool loaded) override; + bool HasKeysInfo() const override + { + if (DataTx) { + return DataTx->TxInfo().Loaded; + } + + return false; + } + const NMiniKQL::IEngineFlat::TValidationInfo &GetKeysInfo() const override { if (DataTx) { diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp index da6d43e4586..1d7b3127d59 100644 --- a/ydb/core/tx/datashard/datashard_change_receiving.cpp +++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp @@ -270,6 +270,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> { } txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId())); + Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB); tableInfo.Stats.UpdateTime = TAppData::TimeProvider->Now(); AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_OK); diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index fe2a42421a9..10f5bcdbe4b 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -219,6 +219,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans throw TNeedGlobalTxId(); } txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId); + self->GetConflictsCache().GetTableCache(writeTableId).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB); if (!commitAdded) { // Make sure we see our own changes on further iterations userDb.AddCommitTxId(globalTxId, writeVersion); @@ -226,6 +227,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } } else { txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); + self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(keyCells.GetCells(), txc.DB); } } diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 69151336e6a..d83446d3638 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -175,6 +175,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( throw TNeedGlobalTxId(); } params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId); + self->GetConflictsCache().GetTableCache(localTableId).AddUncommittedWrite(keyCells.GetCells(), params.GlobalTxId, params.Txc->DB); if (!commitAdded && userDb) { // Make sure we see our own changes on further iterations userDb->AddCommitTxId(params.GlobalTxId, params.WriteVersion); @@ -182,6 +183,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } else { params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion); + self->GetConflictsCache().GetTableCache(localTableId).RemoveUncommittedWrites(keyCells.GetCells(), params.Txc->DB); } } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 04b51f97bc2..7103ade2fb7 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -20,6 +20,7 @@ #include "progress_queue.h" #include "read_iterator.h" #include "volatile_tx.h" +#include "conflicts_cache.h" #include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> @@ -268,6 +269,7 @@ class TDataShard friend class TSnapshotManager; friend class TSchemaSnapshotManager; friend class TVolatileTxManager; + friend class TConflictsCache; friend class TCdcStreamScanManager; friend class TReplicationSourceOffsetsClient; friend class TReplicationSourceOffsetsServer; @@ -1777,6 +1779,8 @@ public: TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; } const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; } + TConflictsCache& GetConflictsCache() { return ConflictsCache; } + TCdcStreamScanManager& GetCdcStreamScanManager() { return CdcStreamScanManager; } const TCdcStreamScanManager& GetCdcStreamScanManager() const { return CdcStreamScanManager; } @@ -1881,6 +1885,16 @@ public: bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies); + /** + * Handles a specific write conflict txId + * + * Prerequisites: TSetupSysLocks is active and caller does not have any + * uncommitted write locks. + * + * Either adds txId to volatile dependencies or breaks a known write lock. + */ + void BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies); + private: /// class TLoanReturnTracker { @@ -2402,6 +2416,7 @@ private: TSnapshotManager SnapshotManager; TSchemaSnapshotManager SchemaSnapshotManager; TVolatileTxManager VolatileTxManager; + TConflictsCache ConflictsCache; TCdcStreamScanManager CdcStreamScanManager; TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer; diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index c48d0f96f19..9285acb6df4 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -138,6 +138,7 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { // Removing the lock also removes any uncommitted data if (DB.HasOpenTx(tid, lockId)) { DB.RemoveTx(tid, lockId); + Self.GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(lockId, DB); } } } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 4fcb4330491..a38867a4dcf 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1143,15 +1143,17 @@ ui64 TPipeline::GetInactiveTxSize() const { return res; } -void TPipeline::SaveForPropose(TValidatedDataTx::TPtr tx) { +bool TPipeline::SaveForPropose(TValidatedDataTx::TPtr tx) { Y_VERIFY(tx && tx->TxId()); if (DataTxCache.size() <= Config.LimitDataTxCache) { ui64 quota = tx->GetTxSize() + tx->GetMemoryAllocated(); if (Self->TryCaptureTxCache(quota)) { tx->SetTxCacheUsage(quota); DataTxCache[tx->TxId()] = tx; + return true; } } + return false; } void TPipeline::SetProposed(ui64 txId, const TActorId& actorId) { @@ -1441,6 +1443,31 @@ void TPipeline::BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, co dataTx->ExtractKeys(false); } +void TPipeline::RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db) +{ + if (op->HasFlag(TTxFlags::DistributedWritesRegistered)) { + return; + } + + // Try to cache write keys if possible + if (!op->IsImmediate() && op->HasKeysInfo()) { + auto& keysInfo = op->GetKeysInfo(); + if (keysInfo.WritesCount > 0) { + TConflictsCache::TPendingWrites writes; + for (const auto& vk : keysInfo.Keys) { + const auto& k = *vk.Key; + if (vk.IsWrite && k.Range.Point && Self->IsUserTable(k.TableId)) { + writes.emplace_back(Self->GetLocalTableId(k.TableId), k.Range.GetOwnedFrom()); + } + } + if (!writes.empty()) { + Self->GetConflictsCache().RegisterDistributedWrites(op->GetTxId(), std::move(writes), db); + } + op->SetFlag(TTxFlags::DistributedWritesRegistered); + } + } +} + EExecutionStatus TPipeline::RunExecutionUnit(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) { Y_VERIFY(!op->IsExecutionPlanFinished()); diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 16d3e332a93..46211553bcb 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -107,7 +107,7 @@ public: // tx propose - void SaveForPropose(TValidatedDataTx::TPtr tx); + bool SaveForPropose(TValidatedDataTx::TPtr tx); void SetProposed(ui64 txId, const TActorId& actorId); void ForgetUnproposedTx(ui64 txId); @@ -266,6 +266,8 @@ public: return tx->RestoreTxData(Self, txc, ctx); } + void RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db); + // Execution units TExecutionUnit &GetExecutionUnit(EExecutionUnitKind kind) { diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp index 205c5528a15..cd251152b61 100644 --- a/ydb/core/tx/datashard/datashard_repl_apply.cpp +++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp @@ -175,6 +175,7 @@ public: if (writeTxId) { txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId); + Self->GetConflictsCache().GetTableCache(userTable.LocalTid).AddUncommittedWrite(keyCellVec.GetCells(), writeTxId, txc.DB); } else { if (!MvccReadWriteVersion) { auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); @@ -184,6 +185,7 @@ public: Self->SysLocksTable().BreakLocks(tableId, keyCellVec.GetCells()); txc.DB.Update(userTable.LocalTid, rop, key, update, *MvccReadWriteVersion); + Self->GetConflictsCache().GetTableCache(userTable.LocalTid).RemoveUncommittedWrites(keyCellVec.GetCells(), txc.DB); } return true; diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp index a23b9fcbb81..c19243f55a1 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.cpp +++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp @@ -30,6 +30,7 @@ void TTransQueue::RemoveTxInFly(ui64 txId) { TxsInFly.erase(it); ProposeDelayers.erase(txId); Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size()); + Self->GetConflictsCache().UnregisterDistributedWrites(txId); } } diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp index df567b199cb..244f19ea022 100644 --- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp @@ -45,6 +45,7 @@ public: // FIXME: temporary break all locks, but we want to be smarter about which locks we break DataShard.SysLocksTable().BreakAllLocks(fullTableId); txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion); + DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(writeTxId, txc.DB); if (Pipeline.AddLockDependencies(op, guardLocks)) { if (txc.DB.HasChanges()) { diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index 46943e1f394..540c95a7b7e 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -188,6 +188,7 @@ public: if (!volatileDependencies.empty() || volatileOrdered) { txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId); + DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB); if (!commitAdded && userDb) { // Make sure we see our own changes on further iterations userDb->AddCommitTxId(globalTxId, writeVersion); @@ -195,6 +196,7 @@ public: } } else { txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion); + DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(keyCells.GetCells(), txc.DB); } } diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 183d25acaae..41bbd646b4c 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -702,6 +702,10 @@ public: */ bool HasRuntimeConflicts() const noexcept; + virtual bool HasKeysInfo() const + { + return false; + } virtual const NMiniKQL::IEngineFlat::TValidationInfo &GetKeysInfo() const { return EmptyKeysInfo; diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp index 1ccb4e10396..d8f2f8e0e19 100644 --- a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp +++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp @@ -58,6 +58,7 @@ EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op, return EExecutionStatus::Reschedule; } txc.DB.RemoveTx(localTid, txId); + DataShard.GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(txId, txc.DB); ++removedTxs; } } diff --git a/ydb/core/tx/datashard/store_data_tx_unit.cpp b/ydb/core/tx/datashard/store_data_tx_unit.cpp index 929002a4ca9..d5898e857e2 100644 --- a/ydb/core/tx/datashard/store_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/store_data_tx_unit.cpp @@ -47,7 +47,10 @@ EExecutionStatus TStoreDataTxUnit::Execute(TOperation::TPtr op, Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); Y_VERIFY(tx->GetDataTx()); - Pipeline.SaveForPropose(tx->GetDataTx()); + bool cached = Pipeline.SaveForPropose(tx->GetDataTx()); + if (cached) { + Pipeline.RegisterDistributedWrites(op, txc.DB); + } Pipeline.ProposeTx(op, tx->GetTxBody(), txc, ctx); if (!op->HasVolatilePrepareFlag()) { diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 43ea3405de6..593153f9857 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -37,6 +37,7 @@ namespace NKikimr::NDataShard { for (ui64 commitTxId : info->CommitTxIds) { if (txc.DB.HasOpenTx(tid, commitTxId)) { txc.DB.CommitTx(tid, commitTxId, info->Version); + Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB); } } } @@ -145,6 +146,7 @@ namespace NKikimr::NDataShard { for (ui64 commitTxId : info->CommitTxIds) { if (txc.DB.HasOpenTx(tid, commitTxId)) { txc.DB.RemoveTx(tid, commitTxId); + Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB); } } } |