diff options
author | snaury <[email protected]> | 2023-03-21 16:15:01 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2023-03-21 16:15:01 +0300 |
commit | 1b107053ec82e21829bba51c11637b78c238a17f (patch) | |
tree | e6ca58748f08d6c2766357c088f4c0acdc4e79c6 | |
parent | aaa853101d85cd190217f71ace35b2210ccc77b8 (diff) |
Handle volatile transactions when checking bulk erase conditions
-rw-r--r-- | ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_direct_erase.cpp | 2 |
2 files changed, 18 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp index aea50ffbe3c..6e0ce9fd114 100644 --- a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp @@ -2,6 +2,7 @@ #include "datashard_distributed_erase.h" #include "datashard_impl.h" #include "datashard_pipeline.h" +#include "datashard_user_db.h" #include "erase_rows_condition.h" #include "execution_unit_ctors.h" @@ -75,8 +76,8 @@ public: { } - bool IsReadyToExecute(TOperation::TPtr) const override { - return true; + bool IsReadyToExecute(TOperation::TPtr op) const override { + return !op->HasRuntimeConflicts(); } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override { @@ -92,6 +93,7 @@ public: const auto& request = eraseTx->GetRequest(); const ui64 tableId = request.GetTableId(); + const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId); Y_VERIFY(DataShard.GetUserTables().contains(tableId)); const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId); @@ -102,6 +104,7 @@ public: const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds()); auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion; + TDataShardUserDb userDb(DataShard, txc.DB, readVersion); bool pageFault = false; TDynBitMap confirmedRows; @@ -136,7 +139,7 @@ public: } NTable::TRowState row; - const auto ready = txc.DB.Select(tableInfo.LocalTid, key, tags, row, 0, readVersion); + const auto ready = userDb.SelectRow(fullTableId, key, tags, row); if (pageFault) { continue; @@ -159,6 +162,17 @@ public: } } + if (!userDb.GetVolatileReadDependencies().empty()) { + for (ui64 txId : userDb.GetVolatileReadDependencies()) { + op->AddVolatileDependency(txId); + bool ok = DataShard.GetVolatileTxManager().AttachBlockedOperation(txId, op->GetTxId()); + Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId); + } + Y_VERIFY(!txc.DB.HasChanges(), + "Unexpected database changes while building distributed erase outgoing readsets"); + return EExecutionStatus::Continue; + } + if (pageFault) { return EExecutionStatus::Restart; } diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index cf6505c307a..69151336e6a 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -126,7 +126,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( if (condition) { NTable::TRowState row; - const auto ready = params.Txc->DB.Select(localTableId, key, condition->Tags(), row, 0, params.ReadVersion); + const auto ready = userDb->SelectRow(fullTableId, key, condition->Tags(), row); switch (ready) { case NTable::EReady::Page: |