aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-05-25 18:15:28 +0300
committersnaury <snaury@ydb.tech>2023-05-25 18:15:28 +0300
commit14c17870c6b053f007138d6f0f6188a30f30bc83 (patch)
treed25637ee0324ec9c2214307fa0668e78ca5ac039
parent86fb23705bc8eeee1c6a68ac18c4e844f2331d19 (diff)
downloadydb-14c17870c6b053f007138d6f0f6188a30f30bc83.tar.gz
Cache uncommitted write conflicts for distributed operations
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp3
-rw-r--r--ydb/core/tx/datashard/conflicts_cache.cpp366
-rw-r--r--ydb/core/tx/datashard/conflicts_cache.h208
-rw-r--r--ydb/core/tx/datashard/datashard.cpp34
-rw-r--r--ydb/core/tx/datashard/datashard.h4
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp30
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h9
-rw-r--r--ydb/core/tx/datashard/datashard_change_receiving.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h15
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp29
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h4
-rw-r--r--ydb/core/tx/datashard/datashard_repl_apply.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_trans_queue.cpp1
-rw-r--r--ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp2
-rw-r--r--ydb/core/tx/datashard/operation.h4
-rw-r--r--ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp1
-rw-r--r--ydb/core/tx/datashard/store_data_tx_unit.cpp5
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp2
27 files changed, 709 insertions, 22 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 4ab23cf5e3a..4971a76c62d 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -452,4 +452,5 @@ enum ETxTypes {
TXTYPE_VOLATILE_TX_ABORT = 74 [(TxTypeOpts) = {Name: "TxVolatileTxAbort"}];
TXTYPE_CDC_STREAM_SCAN_RUN = 75 [(TxTypeOpts) = {Name: "TTxCdcStreamScanRun"}];
TXTYPE_CDC_STREAM_SCAN_PROGRESS = 76 [(TxTypeOpts) = {Name: "TTxCdcStreamScanProgress"}];
+ TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}];
}
diff --git a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
index 6f4d1978e52..28c5b364bbe 100644
--- a/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.darwin-x86_64.txt
@@ -176,6 +176,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
index 3e69f6c4fa9..41bb30c51fd 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-aarch64.txt
@@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
index 3e69f6c4fa9..41bb30c51fd 100644
--- a/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.linux-x86_64.txt
@@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp
diff --git a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
index e819352becc..a7eb4c861d0 100644
--- a/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/datashard/CMakeLists.windows-x86_64.txt
@@ -177,6 +177,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/check_snapshot_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/complete_data_tx_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/completed_operations_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/conflicts_cache.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_persistent_snapshot_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/create_table_unit.cpp
diff --git a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
index bd55d64eb20..aa2a99150d2 100644
--- a/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
+++ b/ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
@@ -90,6 +90,9 @@ EExecutionStatus TBuildAndWaitDependenciesUnit::Execute(TOperation::TPtr op,
}
if (!IsReadyToExecute(op)) {
+ // Cache write keys while operation waits in the queue
+ Pipeline.RegisterDistributedWrites(op, txc.DB);
+
TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
if (tx) {
// We should put conflicting tx into cache
diff --git a/ydb/core/tx/datashard/conflicts_cache.cpp b/ydb/core/tx/datashard/conflicts_cache.cpp
new file mode 100644
index 00000000000..dd02a5adf30
--- /dev/null
+++ b/ydb/core/tx/datashard/conflicts_cache.cpp
@@ -0,0 +1,366 @@
+#include "conflicts_cache.h"
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+TTableConflictsCache::TTableConflictsCache(ui64 localTid)
+ : LocalTid(localTid)
+{}
+
+void TTableConflictsCache::AddUncommittedWrite(TConstArrayRef<TCell> key, ui64 txId, NTable::TDatabase& db) {
+ if (WriteKeys.empty()) {
+ // Avoid expensive key lookup when we know cache is empty
+ return;
+ }
+
+ auto itWriteKey = WriteKeys.find(key);
+ if (itWriteKey == WriteKeys.end()) {
+ return; // we don't have matching distributed writes
+ }
+
+ auto* k = itWriteKey->second.get();
+ auto res = k->UncommittedWrites.insert(txId);
+ if (res.second) {
+ // New write key for this uncommitted tx
+ auto& p = UncommittedWrites[txId];
+ if (!p) {
+ p = std::make_unique<TUncommittedWrite>();
+ }
+ auto r = p->WriteKeys.insert(k);
+ Y_VERIFY(r.second);
+ AddRollbackOp(TRollbackOpRemoveUncommittedWrite{ txId, k }, db);
+ }
+}
+
+void TTableConflictsCache::RemoveUncommittedWrites(TConstArrayRef<TCell> key, NTable::TDatabase& db) {
+ if (UncommittedWrites.empty()) {
+ // Avoid expensive kep lookup when we know there are no writes
+ return;
+ }
+
+ auto itWriteKey = WriteKeys.find(key);
+ if (itWriteKey == WriteKeys.end()) {
+ return; // we don't have matching distributed writes
+ }
+
+ auto* k = itWriteKey->second.get();
+ for (ui64 txId : k->UncommittedWrites) {
+ auto it = UncommittedWrites.find(txId);
+ Y_VERIFY(it != UncommittedWrites.end());
+ auto r = it->second->WriteKeys.erase(k);
+ Y_VERIFY(r);
+ AddRollbackOp(TRollbackOpAddUncommittedWrite{ txId, k }, db);
+ if (it->second->WriteKeys.empty()) {
+ AddRollbackOp(TRollbackOpRestoreUncommittedWrite{ txId, std::move(it->second) }, db);
+ UncommittedWrites.erase(it);
+ }
+ }
+
+ k->UncommittedWrites.clear();
+}
+
+void TTableConflictsCache::RemoveUncommittedWrites(ui64 txId, NTable::TDatabase& db) {
+ auto it = UncommittedWrites.find(txId);
+ if (it == UncommittedWrites.end()) {
+ return;
+ }
+
+ auto* p = it->second.get();
+
+ // Note: UncommittedWrite will be kept alive by rollback buffer
+ AddRollbackOp(TRollbackOpRestoreUncommittedWrite{ txId, std::move(it->second) }, db);
+ UncommittedWrites.erase(it);
+
+ // Note: on rollback these links will be restored
+ for (auto* k : p->WriteKeys) {
+ k->UncommittedWrites.erase(txId);
+ }
+}
+
+class TTableConflictsCache::TTxObserver : public NTable::ITransactionObserver {
+public:
+ TTxObserver() {}
+
+ void OnSkipUncommitted(ui64 txId) override {
+ Y_VERIFY(Target);
+ Target->insert(txId);
+ }
+
+ void OnSkipCommitted(const TRowVersion&) override {}
+ void OnSkipCommitted(const TRowVersion&, ui64) override {}
+ void OnApplyCommitted(const TRowVersion&) override {}
+ void OnApplyCommitted(const TRowVersion&, ui64) override {}
+
+ void SetTarget(absl::flat_hash_set<ui64>* target) {
+ Target = target;
+ }
+
+private:
+ absl::flat_hash_set<ui64>* Target = nullptr;
+};
+
+bool TTableConflictsCache::RegisterDistributedWrite(ui64 txId, const TOwnedCellVec& key, NTable::TDatabase& db) {
+ if (auto it = WriteKeys.find(key); it != WriteKeys.end()) {
+ // We already have this write key cached
+ auto* k = it->second.get();
+ auto& p = DistributedWrites[txId];
+ if (!p) {
+ p = std::make_unique<TDistributedWrite>();
+ }
+ auto r = p->WriteKeys.insert(k);
+ if (r.second) {
+ k->DistributedWrites++;
+ }
+ return true;
+ }
+
+ absl::flat_hash_set<ui64> txIds;
+
+ // Search for conflicts only when we know there are uncommitted changes right now
+ if (db.GetOpenTxCount(LocalTid)) {
+ if (!TxObserver) {
+ TxObserver = new TTxObserver();
+ }
+
+ static_cast<TTxObserver*>(TxObserver.Get())->SetTarget(&txIds);
+ auto res = db.SelectRowVersion(LocalTid, key, /* readFlags */ 0, nullptr, TxObserver);
+ static_cast<TTxObserver*>(TxObserver.Get())->SetTarget(nullptr);
+
+ if (res.Ready == NTable::EReady::Page) {
+ return false;
+ }
+
+ auto itTxIds = txIds.begin();
+ while (itTxIds != txIds.end()) {
+ // Filter removed transactions
+ if (db.HasRemovedTx(LocalTid, *itTxIds)) {
+ txIds.erase(itTxIds++);
+ } else {
+ ++itTxIds;
+ }
+ }
+ }
+
+ auto& k = WriteKeys[key];
+ Y_VERIFY(!k);
+ k = std::make_unique<TWriteKey>();
+ k->Key = key;
+
+ if (!txIds.empty()) {
+ k->UncommittedWrites = std::move(txIds);
+ for (ui64 txId : k->UncommittedWrites) {
+ auto& p = UncommittedWrites[txId];
+ if (!p) {
+ p = std::make_unique<TUncommittedWrite>();
+ }
+ auto r = p->WriteKeys.insert(k.get());
+ Y_VERIFY(r.second);
+ }
+ }
+
+ auto& p = DistributedWrites[txId];
+ if (!p) {
+ p = std::make_unique<TDistributedWrite>();
+ }
+ auto r = p->WriteKeys.insert(k.get());
+ Y_VERIFY(r.second);
+ k->DistributedWrites++;
+
+ return true;
+}
+
+void TTableConflictsCache::UnregisterDistributedWrites(ui64 txId) {
+ if (!RollbackOps.empty()) {
+ RollbackAllowed = false;
+ }
+
+ auto it = DistributedWrites.find(txId);
+ if (it == DistributedWrites.end()) {
+ return;
+ }
+
+ std::unique_ptr<TDistributedWrite> p = std::move(it->second);
+ DistributedWrites.erase(it);
+
+ for (auto* k : p->WriteKeys) {
+ Y_VERIFY(k->DistributedWrites > 0);
+ if (0 == --k->DistributedWrites) {
+ DropWriteKey(k);
+ }
+ }
+}
+
+void TTableConflictsCache::DropWriteKey(TWriteKey* k) {
+ Y_VERIFY(k->DistributedWrites == 0);
+
+ auto itWriteKey = WriteKeys.find(k->Key);
+ Y_VERIFY(itWriteKey != WriteKeys.end());
+ Y_VERIFY(itWriteKey->second.get() == k);
+
+ std::unique_ptr<TWriteKey> saved = std::move(itWriteKey->second);
+ WriteKeys.erase(itWriteKey);
+
+ for (ui64 txId : k->UncommittedWrites) {
+ auto it = UncommittedWrites.find(txId);
+ Y_VERIFY(it != UncommittedWrites.end());
+ it->second->WriteKeys.erase(k);
+ if (it->second->WriteKeys.empty()) {
+ UncommittedWrites.erase(it);
+ }
+ }
+}
+
+const absl::flat_hash_set<ui64>* TTableConflictsCache::FindUncommittedWrites(TConstArrayRef<TCell> key) {
+ if (WriteKeys.empty()) {
+ // Avoid expensive key lookup when we know cache is empty
+ return nullptr;
+ }
+
+ auto it = WriteKeys.find(key);
+ if (it == WriteKeys.end()) {
+ return nullptr;
+ }
+
+ return &it->second->UncommittedWrites;
+}
+
+void TTableConflictsCache::AddRollbackOp(TRollbackOp&& op, NTable::TDatabase& db) {
+ if (RollbackOps.empty()) {
+ db.OnCommit([this]() {
+ OnCommitChanges();
+ });
+ db.OnRollback([this]() {
+ OnRollbackChanges();
+ });
+ }
+ RollbackOps.push_back(std::move(op));
+}
+
+void TTableConflictsCache::OnCommitChanges() {
+ RollbackOps.clear();
+ RollbackAllowed = true;
+}
+
+void TTableConflictsCache::OnRollbackChanges() {
+ Y_VERIFY(RollbackAllowed, "Unexpected transaction rollback");
+
+ struct TPerformRollback {
+ TTableConflictsCache* Self;
+
+ void operator()(TRollbackOpAddUncommittedWrite& op) const {
+ auto it = Self->UncommittedWrites.find(op.TxId);
+ Y_VERIFY(it != Self->UncommittedWrites.end());
+ auto* p = it->second.get();
+ auto* k = op.WriteKey;
+ auto r1 = p->WriteKeys.insert(k);
+ Y_VERIFY(r1.second);
+ auto r2 = k->UncommittedWrites.insert(op.TxId);
+ Y_VERIFY(r2.second);
+ }
+
+ void operator()(TRollbackOpRemoveUncommittedWrite& op) const {
+ auto it = Self->UncommittedWrites.find(op.TxId);
+ Y_VERIFY(it != Self->UncommittedWrites.end());
+ auto* p = it->second.get();
+ auto* k = op.WriteKey;
+ auto r1 = p->WriteKeys.erase(k);
+ Y_VERIFY(r1);
+ auto r2 = k->UncommittedWrites.erase(op.TxId);
+ Y_VERIFY(r2);
+ if (p->WriteKeys.empty()) {
+ Self->UncommittedWrites.erase(it);
+ }
+ }
+
+ void operator()(TRollbackOpRestoreUncommittedWrite& op) const {
+ auto r1 = Self->UncommittedWrites.emplace(op.TxId, std::move(op.Data));
+ Y_VERIFY(r1.second);
+ auto& p = r1.first->second;
+ for (auto* k : p->WriteKeys) {
+ auto r2 = k->UncommittedWrites.insert(op.TxId);
+ Y_VERIFY(r2.second);
+ }
+ }
+ };
+
+ while (!RollbackOps.empty()) {
+ std::visit(TPerformRollback{ this }, RollbackOps.back());
+ RollbackOps.pop_back();
+ }
+}
+
+class TConflictsCache::TTxFindWriteConflicts
+ : public NTabletFlatExecutor::TTransactionBase<TDataShard>
+{
+public:
+ TTxFindWriteConflicts(TDataShard* self, ui64 txId)
+ : TBase(self)
+ , TxId(txId)
+ { }
+
+ TTxType GetTxType() const override { return TXTYPE_FIND_WRITE_CONFLICTS; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ auto& cache = Self->GetConflictsCache();
+ auto itWrites = cache.PendingWrites.find(TxId);
+ if (itWrites == cache.PendingWrites.end()) {
+ // Already cancelled
+ return true;
+ }
+
+ auto& writes = itWrites->second;
+ Y_VERIFY(!writes.empty());
+
+ auto dst = writes.begin();
+ for (auto it = writes.begin(); it != writes.end(); ++it) {
+ if (!cache.GetTableCache(it->LocalTid).RegisterDistributedWrite(TxId, it->WriteKey, txc.DB)) {
+ if (dst != it) {
+ *dst = std::move(*it);
+ }
+ ++dst;
+ }
+ }
+ writes.erase(dst, writes.end());
+
+ if (!writes.empty()) {
+ return false;
+ }
+
+ cache.PendingWrites.erase(itWrites);
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+
+private:
+ const ui64 TxId;
+};
+
+void TConflictsCache::RegisterDistributedWrites(ui64 txId, TPendingWrites&& writes, NTable::TDatabase& db) {
+ auto dst = writes.begin();
+ for (auto it = writes.begin(); it != writes.end(); ++it) {
+ if (!GetTableCache(it->LocalTid).RegisterDistributedWrite(txId, it->WriteKey, db)) {
+ if (dst != it) {
+ *dst = std::move(*it);
+ }
+ ++dst;
+ }
+ }
+ writes.erase(dst, writes.end());
+
+ if (!writes.empty()) {
+ PendingWrites[txId] = std::move(writes);
+ Self->EnqueueExecute(new TTxFindWriteConflicts(Self, txId));
+ }
+}
+
+void TConflictsCache::UnregisterDistributedWrites(ui64 txId) {
+ for (auto& pr : Tables) {
+ pr.second.UnregisterDistributedWrites(txId);
+ }
+ PendingWrites.erase(txId);
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/conflicts_cache.h b/ydb/core/tx/datashard/conflicts_cache.h
new file mode 100644
index 00000000000..61c2b1ea748
--- /dev/null
+++ b/ydb/core/tx/datashard/conflicts_cache.h
@@ -0,0 +1,208 @@
+#pragma once
+#include "datashard_active_transaction.h"
+
+#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
+#include <library/cpp/containers/absl_flat_hash/flat_hash_set.h>
+#include <variant>
+#include <vector>
+
+namespace NKikimr::NDataShard {
+
+class TDataShard;
+
+/**
+ * Caches conflicts for distributed operations
+ *
+ * Basically holds a hash table which maps known operation write keys to a set
+ * of uncommitted writes at that key, as well as pending operations that may
+ * overwrite that key. When new uncommitted writes are performed they lookup
+ * pending operations on their key and mark that their uncommitted write
+ * conflicts with those operations. This means later distributed operations
+ * don't need to search for conflicts on disk and may either gather a set of
+ * conflicts from memory, or trust that their conflicts are already accounted
+ * for.
+ */
+class TTableConflictsCache {
+ struct THashableKey {
+ TConstArrayRef<TCell> Cells;
+
+ template <typename H>
+ friend H AbslHashValue(H h, const THashableKey& key) {
+ h = H::combine(std::move(h), key.Cells.size());
+ for (const TCell& cell : key.Cells) {
+ h = H::combine(std::move(h), cell.IsNull());
+ if (!cell.IsNull()) {
+ h = H::combine(std::move(h), cell.Size());
+ h = H::combine_contiguous(std::move(h), cell.Data(), cell.Size());
+ }
+ }
+ return h;
+ }
+ };
+
+ struct TKeyHash {
+ using is_transparent = void;
+
+ bool operator()(TConstArrayRef<TCell> key) const {
+ return absl::Hash<THashableKey>()(THashableKey{ key });
+ }
+ };
+
+ struct TKeyEq {
+ using is_transparent = void;
+
+ bool operator()(TConstArrayRef<TCell> a, TConstArrayRef<TCell> b) const {
+ if (a.size() != b.size()) {
+ return false;
+ }
+
+ const TCell* pa = a.data();
+ const TCell* pb = b.data();
+ if (pa == pb) {
+ return true;
+ }
+
+ size_t left = a.size();
+ while (left > 0) {
+ if (pa->IsNull()) {
+ if (!pb->IsNull()) {
+ return false;
+ }
+ } else {
+ if (pb->IsNull()) {
+ return false;
+ }
+ if (pa->Size() != pb->Size()) {
+ return false;
+ }
+ if (pa->Size() > 0 && ::memcmp(pa->Data(), pb->Data(), pa->Size()) != 0) {
+ return false;
+ }
+ }
+ ++pa;
+ ++pb;
+ --left;
+ }
+
+ return true;
+ }
+ };
+
+ struct TWriteKey {
+ TOwnedCellVec Key;
+ absl::flat_hash_set<ui64> UncommittedWrites;
+ size_t DistributedWrites = 0;
+ };
+
+ struct TUncommittedWrite {
+ absl::flat_hash_set<TWriteKey*> WriteKeys;
+ };
+
+ struct TDistributedWrite {
+ absl::flat_hash_set<TWriteKey*> WriteKeys;
+ };
+
+ using TWriteKeys = absl::flat_hash_map<TOwnedCellVec, std::unique_ptr<TWriteKey>, TKeyHash, TKeyEq>;
+ using TUncommittedWrites = absl::flat_hash_map<ui64, std::unique_ptr<TUncommittedWrite>>;
+ using TDistributedWrites = absl::flat_hash_map<ui64, std::unique_ptr<TDistributedWrite>>;
+
+ class TTxObserver;
+
+public:
+ explicit TTableConflictsCache(ui64 localTid);
+
+ void AddUncommittedWrite(TConstArrayRef<TCell> key, ui64 txId, NTable::TDatabase& db);
+ void RemoveUncommittedWrites(TConstArrayRef<TCell> key, NTable::TDatabase& db);
+ void RemoveUncommittedWrites(ui64 txId, NTable::TDatabase& db);
+
+ bool RegisterDistributedWrite(ui64 txId, const TOwnedCellVec& key, NTable::TDatabase& db);
+ void UnregisterDistributedWrites(ui64 txId);
+
+ const absl::flat_hash_set<ui64>* FindUncommittedWrites(TConstArrayRef<TCell> key);
+
+private:
+ void DropWriteKey(TWriteKey* k);
+
+private:
+ struct TRollbackOpAddUncommittedWrite {
+ ui64 TxId;
+ TWriteKey* WriteKey;
+ };
+
+ struct TRollbackOpRemoveUncommittedWrite {
+ ui64 TxId;
+ TWriteKey* WriteKey;
+ };
+
+ struct TRollbackOpRestoreUncommittedWrite {
+ ui64 TxId;
+ std::unique_ptr<TUncommittedWrite> Data;
+ };
+
+ using TRollbackOp = std::variant<
+ TRollbackOpAddUncommittedWrite,
+ TRollbackOpRemoveUncommittedWrite,
+ TRollbackOpRestoreUncommittedWrite>;
+
+ void AddRollbackOp(TRollbackOp&& op, NTable::TDatabase& db);
+ void OnRollbackChanges();
+ void OnCommitChanges();
+
+private:
+ const ui32 LocalTid;
+ TWriteKeys WriteKeys;
+ TUncommittedWrites UncommittedWrites;
+ TDistributedWrites DistributedWrites;
+ NTable::ITransactionObserverPtr TxObserver;
+ std::vector<TRollbackOp> RollbackOps;
+ bool RollbackAllowed = false;
+};
+
+class TConflictsCache {
+public:
+ struct TPendingWrite {
+ ui32 LocalTid;
+ TOwnedCellVec WriteKey;
+
+ TPendingWrite(ui32 localTid, TOwnedCellVec writeKey)
+ : LocalTid(localTid)
+ , WriteKey(std::move(writeKey))
+ { }
+ };
+
+ using TPendingWrites = std::vector<TPendingWrite>;
+
+ class TTxFindWriteConflicts;
+
+public:
+ TConflictsCache(TDataShard* self)
+ : Self(self)
+ { }
+
+ TTableConflictsCache& GetTableCache(ui32 localTid) {
+ auto it = Tables.find(localTid);
+ if (it != Tables.end()) {
+ return it->second;
+ }
+ auto res = Tables.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(localTid),
+ std::forward_as_tuple(localTid));
+ Y_VERIFY(res.second);
+ return res.first->second;
+ }
+
+ void DropTableCaches(ui32 localTid) {
+ Tables.erase(localTid);
+ }
+
+ void RegisterDistributedWrites(ui64 txId, TPendingWrites&& writes, NTable::TDatabase& db);
+ void UnregisterDistributedWrites(ui64 txId);
+
+private:
+ TDataShard* const Self;
+ THashMap<ui32, TTableConflictsCache> Tables;
+ THashMap<ui64, TPendingWrites> PendingWrites; // TxId -> Writes
+};
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 13a11588c64..2dc2c39d736 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -130,6 +130,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, SnapshotManager(this)
, SchemaSnapshotManager(this)
, VolatileTxManager(this)
+ , ConflictsCache(this)
, DisableByKeyFilter(0, 0, 1)
, MaxTxInFly(15000, 0, 100000)
, MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
@@ -3947,14 +3948,8 @@ public:
}
void OnSkipUncommitted(ui64 txId) override {
- if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
- if (info->State != EVolatileTxState::Aborting) {
- Y_VERIFY(VolatileDependencies);
- VolatileDependencies->insert(txId);
- }
- } else {
- Self->SysLocksTable().BreakLock(txId);
- }
+ Y_VERIFY(VolatileDependencies);
+ Self->BreakWriteConflict(txId, *VolatileDependencies);
}
void OnSkipCommitted(const TRowVersion&) override {
@@ -4002,10 +3997,13 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
{
const auto localTid = GetLocalTableId(tableId);
Y_VERIFY(localTid);
- const NTable::TScheme& scheme = db.GetScheme();
- const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTid);
- TSmallVec<TRawTypeValue> key;
- NMiniKQL::ConvertTableKeys(scheme, tableInfo, keyCells, key, nullptr);
+
+ if (auto* cached = GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) {
+ for (ui64 txId : *cached) {
+ BreakWriteConflict(txId, volatileDependencies);
+ }
+ return true;
+ }
if (!BreakWriteConflictsTxObserver) {
BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this);
@@ -4018,7 +4016,7 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
// We are not actually interested in the row version, we only need to
// detect uncommitted transaction skips on the path to that version.
auto res = db.SelectRowVersion(
- localTid, key, /* readFlags */ 0,
+ localTid, keyCells, /* readFlags */ 0,
nullptr,
BreakWriteConflictsTxObserver);
@@ -4029,6 +4027,16 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
return true;
}
+void TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies) {
+ if (auto* info = GetVolatileTxManager().FindByCommitTxId(txId)) {
+ if (info->State != EVolatileTxState::Aborting) {
+ volatileDependencies.insert(txId);
+ }
+ } else {
+ SysLocksTable().BreakLock(txId);
+ }
+}
+
class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
public:
TTxGetOpenTxs(TDataShard* self, TEvDataShard::TEvGetOpenTxs::TPtr&& ev)
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index aa520ca4a27..0d79d0e7582 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -128,8 +128,10 @@ namespace NDataShard {
WaitingForGlobalTxId = 1ULL << 45,
// Operation is waiting for restart
WaitingForRestart = 1ULL << 46,
+ // Operation has write keys registered in the cache
+ DistributedWritesRegistered = 1ULL << 47,
- LastFlag = WaitingForRestart,
+ LastFlag = DistributedWritesRegistered,
PrivateFlagsMask = 0xFFFFFFFFFFFF0000ULL,
PreservedPrivateFlagsMask = ReadOnly | ProposeBlocker | NeedDiagnostics | GlobalReader
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 164351e7799..469935e2455 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -348,6 +348,7 @@ public:
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID());
DB.CommitTx(localTid, lockId, writeVersion);
+ Self->GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(lockId, DB);
if (!CommittedLockChanges.contains(lockId) && Self->HasLockChangeRecords(lockId)) {
if (auto* collector = GetChangeCollector(tableId)) {
@@ -516,6 +517,14 @@ public:
} else {
TEngineHost::UpdateRow(tableId, row, commands);
}
+
+ if (VolatileTxId) {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db);
+ } else if (LockTxId) {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db);
+ } else {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db);
+ }
}
void EraseRow(const TTableId& tableId, const TArrayRef<const TCell>& row) override {
@@ -534,6 +543,14 @@ public:
Self->SetTableUpdateTime(tableId, Now);
TEngineHost::EraseRow(tableId, row);
+
+ if (VolatileTxId) {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, VolatileTxId, Db);
+ } else if (LockTxId) {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).AddUncommittedWrite(row, LockTxId, Db);
+ } else {
+ Self->GetConflictsCache().GetTableCache(LocalTableId(tableId)).RemoveUncommittedWrites(row, Db);
+ }
}
// Returns whether row belong this shard.
@@ -782,7 +799,7 @@ public:
return false;
}
- void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> row) {
+ void CheckWriteConflicts(const TTableId& tableId, TArrayRef<const TCell> keyCells) {
// When there are uncommitted changes (write locks) we must find which
// locks would break upon commit.
bool mustFindConflicts = Self->SysLocksTable().HasWriteLocks(tableId);
@@ -799,17 +816,20 @@ public:
const auto localTid = LocalTableId(tableId);
Y_VERIFY(localTid);
- const TScheme::TTableInfo* tableInfo = Scheme.GetTableInfo(localTid);
- TSmallVec<TRawTypeValue> key;
- ConvertTableKeys(Scheme, tableInfo, row, key, nullptr);
ui64 skipCount = 0;
NTable::ITransactionObserverPtr txObserver;
if (LockTxId) {
+ // We cannot use cached conflicts since we need to find skip count
txObserver = new TLockedWriteTxObserver(this, LockTxId, skipCount, localTid);
// Locked writes are immediate, increased latency is not critical
mustFindConflicts = true;
+ } else if (auto* cached = Self->GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) {
+ for (ui64 txId : *cached) {
+ BreakWriteConflict(txId);
+ }
+ return;
} else {
txObserver = new TWriteTxObserver(this);
// Prefer precise conflicts for non-distributed transactions
@@ -821,7 +841,7 @@ public:
// We are not actually interested in the row version, we only need to
// detect uncommitted transaction skips on the path to that version.
auto res = Db.SelectRowVersion(
- localTid, key, /* readFlags */ 0,
+ localTid, keyCells, /* readFlags */ 0,
nullptr, txObserver);
if (res.Ready == NTable::EReady::Page) {
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 7ddf8478bb1..b419ee1ecd1 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -501,6 +501,15 @@ public:
// TOperation iface.
void BuildExecutionPlan(bool loaded) override;
+ bool HasKeysInfo() const override
+ {
+ if (DataTx) {
+ return DataTx->TxInfo().Loaded;
+ }
+
+ return false;
+ }
+
const NMiniKQL::IEngineFlat::TValidationInfo &GetKeysInfo() const override
{
if (DataTx) {
diff --git a/ydb/core/tx/datashard/datashard_change_receiving.cpp b/ydb/core/tx/datashard/datashard_change_receiving.cpp
index da6d43e4586..1d7b3127d59 100644
--- a/ydb/core/tx/datashard/datashard_change_receiving.cpp
+++ b/ydb/core/tx/datashard/datashard_change_receiving.cpp
@@ -270,6 +270,7 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
}
txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId()));
+ Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB);
tableInfo.Stats.UpdateTime = TAppData::TimeProvider->Now();
AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_OK);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index fe2a42421a9..10f5bcdbe4b 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -219,6 +219,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
throw TNeedGlobalTxId();
}
txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId);
+ self->GetConflictsCache().GetTableCache(writeTableId).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB);
if (!commitAdded) {
// Make sure we see our own changes on further iterations
userDb.AddCommitTxId(globalTxId, writeVersion);
@@ -226,6 +227,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
} else {
txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
+ self->GetConflictsCache().GetTableCache(writeTableId).RemoveUncommittedWrites(keyCells.GetCells(), txc.DB);
}
}
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 69151336e6a..d83446d3638 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -175,6 +175,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
throw TNeedGlobalTxId();
}
params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId);
+ self->GetConflictsCache().GetTableCache(localTableId).AddUncommittedWrite(keyCells.GetCells(), params.GlobalTxId, params.Txc->DB);
if (!commitAdded && userDb) {
// Make sure we see our own changes on further iterations
userDb->AddCommitTxId(params.GlobalTxId, params.WriteVersion);
@@ -182,6 +183,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
} else {
params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion);
+ self->GetConflictsCache().GetTableCache(localTableId).RemoveUncommittedWrites(keyCells.GetCells(), params.Txc->DB);
}
}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 04b51f97bc2..7103ade2fb7 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -20,6 +20,7 @@
#include "progress_queue.h"
#include "read_iterator.h"
#include "volatile_tx.h"
+#include "conflicts_cache.h"
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
@@ -268,6 +269,7 @@ class TDataShard
friend class TSnapshotManager;
friend class TSchemaSnapshotManager;
friend class TVolatileTxManager;
+ friend class TConflictsCache;
friend class TCdcStreamScanManager;
friend class TReplicationSourceOffsetsClient;
friend class TReplicationSourceOffsetsServer;
@@ -1777,6 +1779,8 @@ public:
TVolatileTxManager& GetVolatileTxManager() { return VolatileTxManager; }
const TVolatileTxManager& GetVolatileTxManager() const { return VolatileTxManager; }
+ TConflictsCache& GetConflictsCache() { return ConflictsCache; }
+
TCdcStreamScanManager& GetCdcStreamScanManager() { return CdcStreamScanManager; }
const TCdcStreamScanManager& GetCdcStreamScanManager() const { return CdcStreamScanManager; }
@@ -1881,6 +1885,16 @@ public:
bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId,
TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies);
+ /**
+ * Handles a specific write conflict txId
+ *
+ * Prerequisites: TSetupSysLocks is active and caller does not have any
+ * uncommitted write locks.
+ *
+ * Either adds txId to volatile dependencies or breaks a known write lock.
+ */
+ void BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies);
+
private:
///
class TLoanReturnTracker {
@@ -2402,6 +2416,7 @@ private:
TSnapshotManager SnapshotManager;
TSchemaSnapshotManager SchemaSnapshotManager;
TVolatileTxManager VolatileTxManager;
+ TConflictsCache ConflictsCache;
TCdcStreamScanManager CdcStreamScanManager;
TReplicationSourceOffsetsServerLink ReplicationSourceOffsetsServer;
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
index c48d0f96f19..9285acb6df4 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.cpp
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -138,6 +138,7 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
// Removing the lock also removes any uncommitted data
if (DB.HasOpenTx(tid, lockId)) {
DB.RemoveTx(tid, lockId);
+ Self.GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(lockId, DB);
}
}
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 4fcb4330491..a38867a4dcf 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1143,15 +1143,17 @@ ui64 TPipeline::GetInactiveTxSize() const {
return res;
}
-void TPipeline::SaveForPropose(TValidatedDataTx::TPtr tx) {
+bool TPipeline::SaveForPropose(TValidatedDataTx::TPtr tx) {
Y_VERIFY(tx && tx->TxId());
if (DataTxCache.size() <= Config.LimitDataTxCache) {
ui64 quota = tx->GetTxSize() + tx->GetMemoryAllocated();
if (Self->TryCaptureTxCache(quota)) {
tx->SetTxCacheUsage(quota);
DataTxCache[tx->TxId()] = tx;
+ return true;
}
}
+ return false;
}
void TPipeline::SetProposed(ui64 txId, const TActorId& actorId) {
@@ -1441,6 +1443,31 @@ void TPipeline::BuildDataTx(TActiveTransaction *tx, TTransactionContext &txc, co
dataTx->ExtractKeys(false);
}
+void TPipeline::RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db)
+{
+ if (op->HasFlag(TTxFlags::DistributedWritesRegistered)) {
+ return;
+ }
+
+ // Try to cache write keys if possible
+ if (!op->IsImmediate() && op->HasKeysInfo()) {
+ auto& keysInfo = op->GetKeysInfo();
+ if (keysInfo.WritesCount > 0) {
+ TConflictsCache::TPendingWrites writes;
+ for (const auto& vk : keysInfo.Keys) {
+ const auto& k = *vk.Key;
+ if (vk.IsWrite && k.Range.Point && Self->IsUserTable(k.TableId)) {
+ writes.emplace_back(Self->GetLocalTableId(k.TableId), k.Range.GetOwnedFrom());
+ }
+ }
+ if (!writes.empty()) {
+ Self->GetConflictsCache().RegisterDistributedWrites(op->GetTxId(), std::move(writes), db);
+ }
+ op->SetFlag(TTxFlags::DistributedWritesRegistered);
+ }
+ }
+}
+
EExecutionStatus TPipeline::RunExecutionUnit(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx)
{
Y_VERIFY(!op->IsExecutionPlanFinished());
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 16d3e332a93..46211553bcb 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -107,7 +107,7 @@ public:
// tx propose
- void SaveForPropose(TValidatedDataTx::TPtr tx);
+ bool SaveForPropose(TValidatedDataTx::TPtr tx);
void SetProposed(ui64 txId, const TActorId& actorId);
void ForgetUnproposedTx(ui64 txId);
@@ -266,6 +266,8 @@ public:
return tx->RestoreTxData(Self, txc, ctx);
}
+ void RegisterDistributedWrites(const TOperation::TPtr& op, NTable::TDatabase& db);
+
// Execution units
TExecutionUnit &GetExecutionUnit(EExecutionUnitKind kind)
{
diff --git a/ydb/core/tx/datashard/datashard_repl_apply.cpp b/ydb/core/tx/datashard/datashard_repl_apply.cpp
index 205c5528a15..cd251152b61 100644
--- a/ydb/core/tx/datashard/datashard_repl_apply.cpp
+++ b/ydb/core/tx/datashard/datashard_repl_apply.cpp
@@ -175,6 +175,7 @@ public:
if (writeTxId) {
txc.DB.UpdateTx(userTable.LocalTid, rop, key, update, writeTxId);
+ Self->GetConflictsCache().GetTableCache(userTable.LocalTid).AddUncommittedWrite(keyCellVec.GetCells(), writeTxId, txc.DB);
} else {
if (!MvccReadWriteVersion) {
auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
@@ -184,6 +185,7 @@ public:
Self->SysLocksTable().BreakLocks(tableId, keyCellVec.GetCells());
txc.DB.Update(userTable.LocalTid, rop, key, update, *MvccReadWriteVersion);
+ Self->GetConflictsCache().GetTableCache(userTable.LocalTid).RemoveUncommittedWrites(keyCellVec.GetCells(), txc.DB);
}
return true;
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp
index a23b9fcbb81..c19243f55a1 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.cpp
+++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp
@@ -30,6 +30,7 @@ void TTransQueue::RemoveTxInFly(ui64 txId) {
TxsInFly.erase(it);
ProposeDelayers.erase(txId);
Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size());
+ Self->GetConflictsCache().UnregisterDistributedWrites(txId);
}
}
diff --git a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
index df567b199cb..244f19ea022 100644
--- a/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_commit_writes_tx_unit.cpp
@@ -45,6 +45,7 @@ public:
// FIXME: temporary break all locks, but we want to be smarter about which locks we break
DataShard.SysLocksTable().BreakAllLocks(fullTableId);
txc.DB.CommitTx(tableInfo.LocalTid, writeTxId, versions.WriteVersion);
+ DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(writeTxId, txc.DB);
if (Pipeline.AddLockDependencies(op, guardLocks)) {
if (txc.DB.HasChanges()) {
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index 46943e1f394..540c95a7b7e 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -188,6 +188,7 @@ public:
if (!volatileDependencies.empty() || volatileOrdered) {
txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId);
+ DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).AddUncommittedWrite(keyCells.GetCells(), globalTxId, txc.DB);
if (!commitAdded && userDb) {
// Make sure we see our own changes on further iterations
userDb->AddCommitTxId(globalTxId, writeVersion);
@@ -195,6 +196,7 @@ public:
}
} else {
txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion);
+ DataShard.GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(keyCells.GetCells(), txc.DB);
}
}
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 183d25acaae..41bbd646b4c 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -702,6 +702,10 @@ public:
*/
bool HasRuntimeConflicts() const noexcept;
+ virtual bool HasKeysInfo() const
+ {
+ return false;
+ }
virtual const NMiniKQL::IEngineFlat::TValidationInfo &GetKeysInfo() const
{
return EmptyKeysInfo;
diff --git a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
index 1ccb4e10396..d8f2f8e0e19 100644
--- a/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
+++ b/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
@@ -58,6 +58,7 @@ EExecutionStatus TReceiveSnapshotCleanupUnit::Execute(TOperation::TPtr op,
return EExecutionStatus::Reschedule;
}
txc.DB.RemoveTx(localTid, txId);
+ DataShard.GetConflictsCache().GetTableCache(localTid).RemoveUncommittedWrites(txId, txc.DB);
++removedTxs;
}
}
diff --git a/ydb/core/tx/datashard/store_data_tx_unit.cpp b/ydb/core/tx/datashard/store_data_tx_unit.cpp
index 929002a4ca9..d5898e857e2 100644
--- a/ydb/core/tx/datashard/store_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/store_data_tx_unit.cpp
@@ -47,7 +47,10 @@ EExecutionStatus TStoreDataTxUnit::Execute(TOperation::TPtr op,
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
Y_VERIFY(tx->GetDataTx());
- Pipeline.SaveForPropose(tx->GetDataTx());
+ bool cached = Pipeline.SaveForPropose(tx->GetDataTx());
+ if (cached) {
+ Pipeline.RegisterDistributedWrites(op, txc.DB);
+ }
Pipeline.ProposeTx(op, tx->GetTxBody(), txc, ctx);
if (!op->HasVolatilePrepareFlag()) {
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 43ea3405de6..593153f9857 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -37,6 +37,7 @@ namespace NKikimr::NDataShard {
for (ui64 commitTxId : info->CommitTxIds) {
if (txc.DB.HasOpenTx(tid, commitTxId)) {
txc.DB.CommitTx(tid, commitTxId, info->Version);
+ Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB);
}
}
}
@@ -145,6 +146,7 @@ namespace NKikimr::NDataShard {
for (ui64 commitTxId : info->CommitTxIds) {
if (txc.DB.HasOpenTx(tid, commitTxId)) {
txc.DB.RemoveTx(tid, commitTxId);
+ Self->GetConflictsCache().GetTableCache(tid).RemoveUncommittedWrites(commitTxId, txc.DB);
}
}
}