summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2023-03-21 16:15:01 +0300
committersnaury <[email protected]>2023-03-21 16:15:01 +0300
commit1b107053ec82e21829bba51c11637b78c238a17f (patch)
treee6ca58748f08d6c2766357c088f4c0acdc4e79c6
parentaaa853101d85cd190217f71ace35b2210ccc77b8 (diff)
Handle volatile transactions when checking bulk erase conditions
-rw-r--r--ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp2
2 files changed, 18 insertions, 4 deletions
diff --git a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
index aea50ffbe3c..6e0ce9fd114 100644
--- a/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
+++ b/ydb/core/tx/datashard/build_distributed_erase_tx_out_rs_unit.cpp
@@ -2,6 +2,7 @@
#include "datashard_distributed_erase.h"
#include "datashard_impl.h"
#include "datashard_pipeline.h"
+#include "datashard_user_db.h"
#include "erase_rows_condition.h"
#include "execution_unit_ctors.h"
@@ -75,8 +76,8 @@ public:
{
}
- bool IsReadyToExecute(TOperation::TPtr) const override {
- return true;
+ bool IsReadyToExecute(TOperation::TPtr op) const override {
+ return !op->HasRuntimeConflicts();
}
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext&) override {
@@ -92,6 +93,7 @@ public:
const auto& request = eraseTx->GetRequest();
const ui64 tableId = request.GetTableId();
+ const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
Y_VERIFY(DataShard.GetUserTables().contains(tableId));
const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId);
@@ -102,6 +104,7 @@ public:
const auto tags = MakeTags(condition->Tags(), eraseTx->GetIndexColumnIds());
auto readVersion = DataShard.GetReadWriteVersions(tx).ReadVersion;
+ TDataShardUserDb userDb(DataShard, txc.DB, readVersion);
bool pageFault = false;
TDynBitMap confirmedRows;
@@ -136,7 +139,7 @@ public:
}
NTable::TRowState row;
- const auto ready = txc.DB.Select(tableInfo.LocalTid, key, tags, row, 0, readVersion);
+ const auto ready = userDb.SelectRow(fullTableId, key, tags, row);
if (pageFault) {
continue;
@@ -159,6 +162,17 @@ public:
}
}
+ if (!userDb.GetVolatileReadDependencies().empty()) {
+ for (ui64 txId : userDb.GetVolatileReadDependencies()) {
+ op->AddVolatileDependency(txId);
+ bool ok = DataShard.GetVolatileTxManager().AttachBlockedOperation(txId, op->GetTxId());
+ Y_VERIFY_S(ok, "Unexpected failure to attach " << *op << " to volatile tx " << txId);
+ }
+ Y_VERIFY(!txc.DB.HasChanges(),
+ "Unexpected database changes while building distributed erase outgoing readsets");
+ return EExecutionStatus::Continue;
+ }
+
if (pageFault) {
return EExecutionStatus::Restart;
}
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index cf6505c307a..69151336e6a 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -126,7 +126,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
if (condition) {
NTable::TRowState row;
- const auto ready = params.Txc->DB.Select(localTableId, key, condition->Tags(), row, 0, params.ReadVersion);
+ const auto ready = userDb->SelectRow(fullTableId, key, condition->Tags(), row);
switch (ready) {
case NTable::EReady::Page: