aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-20 15:17:56 +0300
committersnaury <snaury@ydb.tech>2023-03-20 15:17:56 +0300
commit3949f900d72edad109de1ea0e6213f2092f86790 (patch)
tree3c4cd16841ab452104bf58afd2b935e1e5fc54f5
parente4db80492edac29772697c7d2663a2f10ee94ecf (diff)
downloadydb-3949f900d72edad109de1ea0e6213f2092f86790.tar.gz
Handle volatile conflicts in bulk operations
-rw-r--r--ydb/core/tx/datashard/change_collector.h6
-rw-r--r--ydb/core/tx/datashard/datashard.cpp48
-rw-r--r--ydb/core/tx/datashard/datashard.h2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp54
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h2
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp32
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.h2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp36
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.h11
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.cpp12
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.h8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.h4
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h12
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp29
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h7
-rw-r--r--ydb/core/tx/datashard/datashard_unsafe_upload.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp123
-rw-r--r--ydb/core/tx/datashard/direct_tx_unit.cpp28
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp57
-rw-r--r--ydb/core/tx/datashard/operation.cpp2
-rw-r--r--ydb/core/tx/datashard/operation.h22
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp102
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h29
27 files changed, 560 insertions, 111 deletions
diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h
index 2b9b034a827..b8171933ea6 100644
--- a/ydb/core/tx/datashard/change_collector.h
+++ b/ydb/core/tx/datashard/change_collector.h
@@ -16,7 +16,7 @@ protected:
~IDataShardChangeGroupProvider() = default;
public:
- virtual bool HasChangeGroup() const = 0;
+ virtual std::optional<ui64> GetCurrentChangeGroup() const = 0;
virtual ui64 GetChangeGroup() = 0;
};
@@ -31,8 +31,8 @@ public:
, Group(group)
{ }
- bool HasChangeGroup() const override {
- return bool(Group);
+ std::optional<ui64> GetCurrentChangeGroup() const override {
+ return Group;
}
ui64 GetChangeGroup() override;
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 206585a7ce1..1797b482798 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3879,6 +3879,8 @@ void SendViaSession(const TActorId& sessionId,
}
class TBreakWriteConflictsTxObserver : public NTable::ITransactionObserver {
+ friend class TBreakWriteConflictsTxObserverVolatileDependenciesGuard;
+
public:
TBreakWriteConflictsTxObserver(TDataShard* self)
: Self(self)
@@ -3886,7 +3888,14 @@ public:
}
void OnSkipUncommitted(ui64 txId) override {
- Self->SysLocksTable().BreakLock(txId);
+ if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) {
+ if (info->State != EVolatileTxState::Aborting) {
+ Y_VERIFY(VolatileDependencies);
+ VolatileDependencies->insert(txId);
+ }
+ } else {
+ Self->SysLocksTable().BreakLock(txId);
+ }
}
void OnSkipCommitted(const TRowVersion&) override {
@@ -3907,9 +3916,31 @@ public:
private:
TDataShard* Self;
+ absl::flat_hash_set<ui64>* VolatileDependencies = nullptr;
};
-bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells) {
+class TBreakWriteConflictsTxObserverVolatileDependenciesGuard {
+public:
+ TBreakWriteConflictsTxObserverVolatileDependenciesGuard(
+ TBreakWriteConflictsTxObserver* observer,
+ absl::flat_hash_set<ui64>& volatileDependencies)
+ : Observer(observer)
+ {
+ Y_VERIFY(!Observer->VolatileDependencies);
+ Observer->VolatileDependencies = &volatileDependencies;
+ }
+
+ ~TBreakWriteConflictsTxObserverVolatileDependenciesGuard() {
+ Observer->VolatileDependencies = nullptr;
+ }
+
+private:
+ TBreakWriteConflictsTxObserver* const Observer;
+};
+
+bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId,
+ TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies)
+{
const auto localTid = GetLocalTableId(tableId);
Y_VERIFY(localTid);
const NTable::TScheme& scheme = db.GetScheme();
@@ -3921,6 +3952,10 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl
BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this);
}
+ TBreakWriteConflictsTxObserverVolatileDependenciesGuard guard(
+ static_cast<TBreakWriteConflictsTxObserver*>(BreakWriteConflictsTxObserver.Get()),
+ volatileDependencies);
+
// We are not actually interested in the row version, we only need to
// detect uncommitted transaction skips on the path to that version.
auto res = db.SelectRowVersion(
@@ -3966,6 +4001,15 @@ void TDataShard::Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorConte
Execute(new TTxGetOpenTxs(this, std::move(ev)), ctx);
}
+void TDataShard::Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev, const TActorContext& ctx) {
+ auto op = Pipeline.FindOp(ev->Cookie);
+ if (op && op->HasWaitingForGlobalTxIdFlag()) {
+ Pipeline.ProvideGlobalTxId(op, ev->Get()->TxId);
+ Pipeline.AddCandidateOp(op);
+ PlanQueue.Progress(ctx);
+ }
+}
+
} // NDataShard
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index b45c0a0affa..5b3a9fb36a7 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -122,6 +122,8 @@ namespace NDataShard {
WaitingForAsyncJob = 1ULL << 43,
// Operation must complete before results sending
WaitCompletion = 1ULL << 44,
+ // Waiting for global tx id allocation
+ WaitingForGlobalTxId = 1ULL << 45,
LastFlag = WaitCompletion,
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 819affd1509..485e7562493 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -275,8 +275,8 @@ public:
IsRepeatableSnapshot = true;
}
- bool HasChangeGroup() const override {
- return bool(ChangeGroup);
+ std::optional<ui64> GetCurrentChangeGroup() const override {
+ return ChangeGroup;
}
ui64 GetChangeGroup() override {
@@ -321,7 +321,7 @@ public:
return;
}
- if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min())) {
+ if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) {
lock->ForAllVolatileDependencies([this](ui64 txId) {
if (VolatileDependencies.insert(txId).second && !VolatileTxId) {
VolatileTxId = EngineBay.GetTxId();
@@ -390,17 +390,8 @@ public:
return commitTxIds;
}
- TVector<ui64> GetVolatileDependencies() const {
- TVector<ui64> dependencies;
-
- if (!VolatileDependencies.empty() && !VolatileCommitOrdered) {
- dependencies.reserve(VolatileDependencies.size());
- for (ui64 dependency : VolatileDependencies) {
- dependencies.push_back(dependency);
- }
- }
-
- return dependencies;
+ const absl::flat_hash_set<ui64>& GetVolatileDependencies() const {
+ return VolatileDependencies;
}
std::optional<ui64> GetVolatileChangeGroup() const {
@@ -842,6 +833,7 @@ public:
VolatileTxId = EngineBay.GetTxId();
}
VolatileCommitOrdered = true;
+ VolatileDependencies.clear();
}
return;
}
@@ -956,7 +948,7 @@ public:
// even though they may not be persistent yet, since this tx will
// also perform writes, and either it fails, or future generation
// could not have possibly committed it already.
- if (info->State != EVolatileTxState::Aborting) {
+ if (info->State != EVolatileTxState::Aborting && !VolatileCommitOrdered) {
if (!VolatileTxId) {
// All further writes will use this VolatileTxId and will
// add it to VolatileCommitTxIds, forcing it to be committed
@@ -1220,7 +1212,7 @@ TVector<ui64> TEngineBay::GetVolatileCommitTxIds() const {
return host->GetVolatileCommitTxIds();
}
-TVector<ui64> TEngineBay::GetVolatileDependencies() const {
+const absl::flat_hash_set<ui64>& TEngineBay::GetVolatileDependencies() const {
Y_VERIFY(EngineHost);
auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index f6d6446fd29..0c83a561869 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -113,7 +113,7 @@ public:
void ResetCollectedChanges();
TVector<ui64> GetVolatileCommitTxIds() const;
- TVector<ui64> GetVolatileDependencies() const;
+ const absl::flat_hash_set<ui64>& GetVolatileDependencies() const;
std::optional<ui64> GetVolatileChangeGroup() const;
bool GetVolatileCommitOrdered() const;
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index c55cf209f5e..7f19015743a 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -12,6 +12,7 @@ class TTxDirectBase : public TTransactionBase<TDataShard> {
TOperation::TPtr Op;
TVector<EExecutionUnitKind> CompleteList;
+ bool WaitComplete = false;
public:
TTxDirectBase(TDataShard* ds, TEvRequest ev)
@@ -30,29 +31,68 @@ public:
if (Ev) {
const ui64 tieBreaker = Self->NextTieBreakerIndex++;
- Op = new TDirectTransaction(tieBreaker, ctx.Now(), tieBreaker, Ev);
+ Op = new TDirectTransaction(ctx.Now(), tieBreaker, Ev);
Op->BuildExecutionPlan(false);
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
Ev = nullptr;
+ Op->IncrementInProgress();
}
+ Y_VERIFY(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty());
+
auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx);
- if (!CompleteList.empty()) {
- return true;
- } else if (status == EExecutionStatus::Restart) {
- return false;
+
+ switch (status) {
+ case EExecutionStatus::Restart:
+ return false;
+
+ case EExecutionStatus::Reschedule:
+ Y_FAIL("Unexpected Reschedule status while handling a direct operation");
+
+ case EExecutionStatus::Executed:
+ case EExecutionStatus::Continue:
+ Op->DecrementInProgress();
+ break;
+
+ case EExecutionStatus::WaitComplete:
+ WaitComplete = true;
+ break;
+
+ case EExecutionStatus::ExecutedNoMoreRestarts:
+ case EExecutionStatus::DelayComplete:
+ case EExecutionStatus::DelayCompleteNoMoreRestarts:
+ Y_FAIL_S("unexpected execution status " << status << " for operation "
+ << *Op << " " << Op->GetKind() << " at " << Self->TabletID());
+ }
+
+ if (WaitComplete || !CompleteList.empty()) {
+ // Keep current operation
} else {
Op = nullptr;
- return true;
}
+
+ return true;
}
void Complete(const TActorContext& ctx) override {
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TTxDirectBase(" << GetTxType() << ") Complete"
<< ": at tablet# " << Self->TabletID());
- Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
+ if (Op) {
+ if (!CompleteList.empty()) {
+ Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
+ }
+
+ if (WaitComplete) {
+ Op->DecrementInProgress();
+
+ if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) {
+ Self->Pipeline.AddCandidateOp(Op);
+ }
+ }
+ }
+
if (Self->Pipeline.CanRunAnotherOp()) {
Self->PlanQueue.Progress(ctx);
}
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index be2ba9a37a5..d641c4086cf 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -897,8 +897,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation {
static constexpr ui32 Flags = NTxDataShard::TTxFlags::ReadOnly | NTxDataShard::TTxFlags::Immediate;
public:
- TReadOperation(TDataShard* ds, ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev)
- : TOperation(TBasicOpInfo(txId, EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex))
+ TReadOperation(TDataShard* ds, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev)
+ : TOperation(TBasicOpInfo(EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex))
, Self(ds)
, Sender(ev->Sender)
, Request(ev->Release().Release())
@@ -1723,7 +1723,7 @@ public:
if (Ev) {
const ui64 tieBreaker = Self->NextTieBreakerIndex++;
- Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev);
+ Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev);
Op->BuildExecutionPlan(false);
Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op);
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 1ffdab103bd..7ddf8478bb1 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -189,7 +189,7 @@ public:
void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); }
TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); }
- TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); }
+ const absl::flat_hash_set<ui64>& GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); }
std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); }
bool GetVolatileCommitOrdered() const { return EngineBay.GetVolatileCommitOrdered(); }
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 6cd4da5e67c..61d132b9e94 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -15,7 +15,7 @@ TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest:
template <typename TEvRequest, typename TEvResponse>
bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTransactionContext& txc,
- const TRowVersion& readVersion, const TRowVersion& writeVersion)
+ const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId)
{
const auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvResponse>(self->TabletID());
@@ -58,7 +58,9 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
const bool readForTableShadow = writeToTableShadow && !shadowTableId;
const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId;
- const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId);
+ const bool breakWriteConflicts = BreakLocks && (
+ self->SysLocksTable().HasWriteLocks(fullTableId) ||
+ self->GetVolatileTxManager().GetTxMap());
TDataShardUserDb userDb(*self, txc.DB, readVersion);
TDataShardChangeGroupProvider groupProvider(*self, txc.DB);
@@ -91,6 +93,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
bool pageFault = false;
NTable::TRowState rowState;
+ absl::flat_hash_set<ui64> volatileDependencies;
+
ui64 bytes = 0;
for (const auto& r : record.GetRows()) {
// TODO: use safe parsing!
@@ -188,7 +192,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
if (BreakLocks) {
if (breakWriteConflicts) {
- if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) {
+ if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
pageFault = true;
}
@@ -201,7 +205,14 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
}
- txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
+ if (!volatileDependencies.empty()) {
+ if (!globalTxId) {
+ throw TNeedGlobalTxId();
+ }
+ txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId);
+ } else {
+ txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion);
+ }
}
if (pageFault) {
@@ -212,6 +223,19 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
return false;
}
+ if (!volatileDependencies.empty()) {
+ self->GetVolatileTxManager().PersistAddVolatileTx(
+ globalTxId,
+ writeVersion,
+ /* commitTxIds */ { globalTxId },
+ volatileDependencies,
+ /* participants */ { },
+ groupProvider.GetCurrentChangeGroup(),
+ /* ordered */ false,
+ txc);
+ // Note: transaction is already committed, no additional waiting needed
+ }
+
self->IncCounter(COUNTER_UPLOAD_ROWS, record.GetRows().size());
self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, bytes);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h
index 7c7f79c99ee..50284f42979 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.h
+++ b/ydb/core/tx/datashard/datashard_common_upload.h
@@ -21,7 +21,7 @@ public:
explicit TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges);
protected:
- bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion);
+ bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId);
void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
const TEvRequest* GetRequest() const;
TEvResponse* GetResult();
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index e27bec92afd..2d7c033bb4a 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -74,7 +74,11 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, *groupProvider, params.Txc->DB, tableInfo));
}
- const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId);
+ const bool breakWriteConflicts = (
+ self->SysLocksTable().HasWriteLocks(fullTableId) ||
+ self->GetVolatileTxManager().GetTxMap());
+
+ absl::flat_hash_set<ui64> volatileDependencies;
bool pageFault = false;
for (const auto& serializedKey : request.GetKeyColumns()) {
@@ -144,7 +148,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
if (breakWriteConflicts) {
- if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells())) {
+ if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
pageFault = true;
}
}
@@ -154,7 +158,15 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
- params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion);
+
+ if (!volatileDependencies.empty()) {
+ if (!params.GlobalTxId) {
+ throw TNeedGlobalTxId();
+ }
+ params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId);
+ } else {
+ params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion);
+ }
}
if (pageFault) {
@@ -165,6 +177,19 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
return EStatus::PageFault;
}
+ if (!volatileDependencies.empty()) {
+ self->GetVolatileTxManager().PersistAddVolatileTx(
+ params.GlobalTxId,
+ params.WriteVersion,
+ /* commitTxIds */ { params.GlobalTxId },
+ volatileDependencies,
+ /* participants */ { },
+ groupProvider ? groupProvider->GetCurrentChangeGroup() : std::nullopt,
+ /* ordered */ false,
+ *params.Txc);
+ // Note: transaction is already committed, no additional waiting needed
+ }
+
status = NKikimrTxDataShard::TEvEraseRowsResponse::OK;
return EStatus::Success;
}
@@ -184,14 +209,15 @@ bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TE
}
bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc,
- const TRowVersion& readVersion, const TRowVersion& writeVersion)
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ ui64 globalTxId)
{
const auto& record = Ev->Get()->Record;
Result = MakeHolder<TEvDataShard::TEvEraseRowsResponse>();
Result->Record.SetTabletID(self->TabletID());
- const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion);
+ const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion, globalTxId);
NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status;
TString error;
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h
index c79ef35bd17..9904fd404e9 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.h
+++ b/ydb/core/tx/datashard/datashard_direct_erase.h
@@ -22,20 +22,23 @@ class TDirectTxErase : public IDirectTx {
TTransactionContext* const Txc;
const TRowVersion ReadVersion;
const TRowVersion WriteVersion;
+ const ui64 GlobalTxId;
private:
explicit TExecuteParams(TDirectTxErase* tx, TTransactionContext* txc,
- const TRowVersion& readVersion, const TRowVersion& writeVersion)
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ const ui64 globalTxId)
: Tx(tx)
, Txc(txc)
, ReadVersion(readVersion)
, WriteVersion(writeVersion)
+ , GlobalTxId(globalTxId)
{
}
public:
static TExecuteParams ForCheck() {
- return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion());
+ return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0);
}
template <typename... Args>
@@ -68,7 +71,9 @@ public:
static bool CheckRequest(TDataShard* self, const NKikimrTxDataShard::TEvEraseRowsRequest& request,
NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error);
- bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
+ bool Execute(TDataShard* self, TTransactionContext& txc,
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ ui64 globalTxId) override;
TDirectTxResult GetResult(TDataShard* self) override;
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
index f83c8ef5504..3b228035e64 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
@@ -5,14 +5,14 @@
namespace NKikimr {
namespace NDataShard {
-TDirectTransaction::TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev)
- : TOperation(TBasicOpInfo(txId, EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex))
+TDirectTransaction::TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev)
+ : TOperation(TBasicOpInfo(EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex))
, Impl(new TDirectTxUpload(ev))
{
}
-TDirectTransaction::TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev)
- : TOperation(TBasicOpInfo(txId, EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex))
+TDirectTransaction::TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev)
+ : TOperation(TBasicOpInfo(EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex))
, Impl(new TDirectTxErase(ev))
{
}
@@ -32,7 +32,9 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded)
bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) {
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
- if (!Impl->Execute(self, txc, readVersion, writeVersion))
+
+ // NOTE: may throw TNeedGlobalTxId exception, which is handled in direct tx unit
+ if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId()))
return false;
if (self->IsMvccEnabled()) {
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h
index 99933431cad..f2bfbc94f28 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.h
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.h
@@ -20,15 +20,17 @@ struct TDirectTxResult {
class IDirectTx {
public:
virtual ~IDirectTx() = default;
- virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0;
+ virtual bool Execute(TDataShard* self, TTransactionContext& txc,
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ ui64 globalTxId) = 0;
virtual TDirectTxResult GetResult(TDataShard* self) = 0;
virtual TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const = 0;
};
class TDirectTransaction : public TOperation {
public:
- TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
- TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev);
+ TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
+ TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev);
void BuildExecutionPlan(bool) override;
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp
index 61b586f0970..21dd40d7b6d 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp
@@ -8,8 +8,11 @@ TDirectTxUpload::TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev)
{
}
-bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) {
- return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion);
+bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc,
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ ui64 globalTxId)
+{
+ return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, globalTxId);
}
TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) {
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h
index 17a94b2b06b..4a59bdeb806 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.h
+++ b/ydb/core/tx/datashard/datashard_direct_upload.h
@@ -14,7 +14,9 @@ class TDirectTxUpload : public IDirectTx
public:
explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
- bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
+ bool Execute(TDataShard* self, TTransactionContext& txc,
+ const TRowVersion& readVersion, const TRowVersion& writeVersion,
+ ui64 globalTxId) override;
TDirectTxResult GetResult(TDataShard* self) override;
TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index c977cbb7209..1cf51f6fd2e 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -23,6 +23,7 @@
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/base/appdata.h>
@@ -149,6 +150,8 @@ enum class TSwitchState {
class TDataShardEngineHost;
struct TSetupSysLocks;
+class TNeedGlobalTxId : public yexception {};
+
///
class TDataShard
: public TActor<TDataShard>
@@ -1204,6 +1207,7 @@ class TDataShard
void Handle(TEvTxProcessing::TEvInterruptTransaction::TPtr &ev, const TActorContext &ctx) {
ForwardEventToOperation(ev, ctx);
}
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr &ev, const TActorContext &ctx);
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr ev, const TActorContext &ctx);
void Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx);
@@ -1849,12 +1853,13 @@ public:
*
* Prerequisites: TSetupSysLocks is active and caller does not have any
* uncommitted write locks.
- * Note: the specified table should have some write locks, otherwise
- * this call is a very expensive no-op.
+ * Note: the specified table should have potential conflicting changes,
+ * otherwise this call is a very expensive no-op.
*
* Returns true on success and false on page fault.
*/
- bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells);
+ bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId,
+ TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies);
private:
///
@@ -2697,6 +2702,7 @@ protected:
HFuncTraced(TEvTxProcessing::TEvStreamClearanceResponse, Handle);
HFuncTraced(TEvTxProcessing::TEvStreamClearancePending, Handle);
HFuncTraced(TEvTxProcessing::TEvInterruptTransaction, Handle);
+ HFuncTraced(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
HFuncTraced(TEvPrivate::TEvProgressTransaction, Handle);
HFuncTraced(TEvPrivate::TEvCleanupTransaction, Handle);
HFuncTraced(TEvPrivate::TEvDelayedProposeTransaction, Handle);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 655e898effc..f8e8e543219 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -2000,4 +2000,33 @@ bool TPipeline::AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& gu
return addedDependencies;
}
+void TPipeline::ProvideGlobalTxId(const TOperation::TPtr& op, ui64 globalTxId) {
+ Y_VERIFY(op->HasWaitingForGlobalTxIdFlag());
+ ui64 localTxId = op->GetTxId();
+
+ auto itImmediate = ImmediateOps.find(localTxId);
+ Y_VERIFY(itImmediate != ImmediateOps.end());
+ ImmediateOps.erase(itImmediate);
+ auto itActive = ActiveOps.find(op->GetStepOrder());
+ Y_VERIFY(itActive != ActiveOps.end());
+ ActiveOps.erase(itActive);
+ bool removedCandidate = false;
+ auto itCandidate = CandidateOps.find(op->GetStepOrder());
+ if (itCandidate != CandidateOps.end()) {
+ CandidateOps.erase(itCandidate);
+ removedCandidate = true;
+ }
+
+ op->SetGlobalTxId(globalTxId);
+ op->SetWaitingForGlobalTxIdFlag(false);
+ auto resImmediate = ImmediateOps.emplace(op->GetTxId(), op);
+ Y_VERIFY(resImmediate.second);
+ auto resActive = ActiveOps.emplace(op->GetStepOrder(), op);
+ Y_VERIFY(resActive.second);
+ if (removedCandidate) {
+ auto resCandidate = CandidateOps.emplace(op->GetStepOrder(), op);
+ Y_VERIFY(resCandidate.second);
+ }
+}
+
}}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index e8c06dea8a1..fcfc94059ce 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -374,6 +374,13 @@ public:
*/
bool AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks);
+ /**
+ * Provides a global txId for the waiting operation
+ *
+ * The operation must have a WaitingForGlobalTxId flag.
+ */
+ void ProvideGlobalTxId(const TOperation::TPtr& op, ui64 globalTxId);
+
private:
struct TStoredExecutionProfile {
TBasicOpInfo OpInfo;
diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
index 1aec8f60214..16d4766971b 100644
--- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
@@ -11,7 +11,9 @@ TDataShard::TTxUnsafeUploadRows::TTxUnsafeUploadRows(TDataShard* ds, TEvDataShar
bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TActorContext&) {
auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
- if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) {
+
+ // NOTE: will not throw TNeedGlobalTxId since we set breakLocks to false
+ if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, /* globalTxId */ 0)) {
return false;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index b8d9145f426..be43f27767c 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -2,6 +2,9 @@
#include "datashard_ut_common_kqp.h"
#include "datashard_active_transaction.h"
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/tx_proxy/upload_rows.h>
+
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
namespace NKikimr {
@@ -1412,6 +1415,126 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 30 } }");
}
+ Y_UNIT_TEST(DistributedWriteThenBulkUpsert) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ {"value2", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ ui64 maxReadSetStep = 0;
+ bool captureReadSets = true;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep());
+ if (captureReadSets) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets");
+ UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u);
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Write to key 2 using bulk upsert
+ {
+ using TRows = TVector<std::pair<TSerializedCellVec, TString>>;
+ using TRowTypes = TVector<std::pair<TString, Ydb::Type>>;
+
+ auto types = std::make_shared<TRowTypes>();
+
+ Ydb::Type type;
+ type.set_type_id(Ydb::Type::UINT32);
+ types->emplace_back("key", type);
+ types->emplace_back("value", type);
+
+ auto rows = std::make_shared<TRows>();
+
+ TVector<TCell> key{ TCell::Make(ui32(2)) };
+ TVector<TCell> values{ TCell::Make(ui32(22)) };
+ TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key));
+ TString serializedValues(TSerializedCellVec::Serialize(values));
+ rows->emplace_back(serializedKey, serializedValues);
+
+ auto upsertSender = runtime.AllocateEdgeActor();
+ auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows);
+ runtime.Register(actor);
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS);
+ }
+
+ // This compaction verifies there's no commit race with the waiting
+ // distributed transaction. If commits happen in incorrect order we
+ // would observe unexpected results.
+ const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+ const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");
+ CompactTable(runtime, shard1, tableId1, false);
+
+ runtime.SetObserverFunc(prevObserverFunc);
+ for (auto& ev : capturedReadSets) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "<empty>");
+
+ // Verify the result
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value, value2 FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value, value2 FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 22 } items { uint32_value: 42 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, "
+ "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp
index 1d0ea8ad17d..d469f83e849 100644
--- a/ydb/core/tx/datashard/direct_tx_unit.cpp
+++ b/ydb/core/tx/datashard/direct_tx_unit.cpp
@@ -19,12 +19,16 @@ public:
}
bool IsReadyToExecute(TOperation::TPtr op) const override {
- return !op->HasRuntimeConflicts();
+ return !op->HasRuntimeConflicts() && !op->HasWaitingForGlobalTxIdFlag();
}
EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);
+ if (op->HasWaitingForGlobalTxIdFlag()) {
+ return EExecutionStatus::Continue;
+ }
+
if (op->IsImmediate()) {
// Every time we execute immediate transaction we may choose a new mvcc version
op->MvccReadWriteVersion.reset();
@@ -36,8 +40,26 @@ public:
TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get());
Y_VERIFY(tx != nullptr);
- if (!tx->Execute(&DataShard, txc)) {
- return EExecutionStatus::Restart;
+ try {
+ if (!tx->Execute(&DataShard, txc)) {
+ return EExecutionStatus::Restart;
+ }
+ } catch (const TNeedGlobalTxId&) {
+ Y_VERIFY_S(op->GetGlobalTxId() == 0,
+ "Unexpected TNeedGlobalTxId exception for direct operation with TxId# " << op->GetGlobalTxId());
+ Y_VERIFY_S(op->IsImmediate(),
+ "Unexpected TNeedGlobalTxId exception for a non-immediate operation with TxId# " << op->GetTxId());
+
+ ctx.Send(MakeTxProxyID(),
+ new TEvTxUserProxy::TEvAllocateTxId(),
+ 0, op->GetTxId());
+ op->SetWaitingForGlobalTxIdFlag();
+
+ if (txc.DB.HasChanges()) {
+ txc.Reschedule();
+ return EExecutionStatus::Restart;
+ }
+ return EExecutionStatus::Continue;
}
if (Pipeline.AddLockDependencies(op, guardLocks)) {
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index b57b346ecf2..5c73693fce7 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -50,7 +50,9 @@ public:
THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())};
auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize());
- if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) {
+ if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(),
+ &groupProvider, changeCollector.Get()))
+ {
return EExecutionStatus::Restart;
}
@@ -75,7 +77,8 @@ public:
Y_VERIFY(body.ParseFromArray(rs.Body.data(), rs.Body.size()));
Y_VERIFY(presentRows.contains(rs.Origin));
- const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion);
+ const bool ok = Execute(txc, request, presentRows.at(rs.Origin),
+ DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion, op->GetGlobalTxId());
Y_VERIFY(ok);
}
}
@@ -98,6 +101,8 @@ public:
bool Execute(TTransactionContext& txc, const NKikimrTxDataShard::TEvEraseRowsRequest& request,
const TDynBitMap& presentRows, const TDynBitMap& confirmedRows, const TRowVersion& writeVersion,
+ ui64 globalTxId,
+ TDataShardChangeGroupProvider* groupProvider = nullptr,
IDataShardChangeCollector* changeCollector = nullptr)
{
const ui64 tableId = request.GetTableId();
@@ -107,6 +112,10 @@ public:
const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId);
const bool breakWriteConflicts = DataShard.SysLocksTable().HasWriteLocks(fullTableId);
+ bool checkVolatileDependencies = bool(DataShard.GetVolatileTxManager().GetTxMap());
+
+ absl::flat_hash_set<ui64> volatileDependencies;
+ bool volatileOrdered = false;
size_t row = 0;
bool pageFault = false;
@@ -128,15 +137,27 @@ public:
key.emplace_back(TRawTypeValue(cell.AsRef(), kt));
}
- if (changeCollector) {
- if (!changeCollector->OnUpdate(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion)) {
- pageFault = true;
+ if (breakWriteConflicts || checkVolatileDependencies) {
+ if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) {
+ if (breakWriteConflicts) {
+ pageFault = true;
+ } else if (checkVolatileDependencies) {
+ checkVolatileDependencies = false;
+ volatileDependencies.clear();
+ volatileOrdered = true;
+ }
}
}
- if (breakWriteConflicts) {
- if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) {
- pageFault = true;
+ if (changeCollector) {
+ if (!volatileDependencies.empty() || volatileOrdered) {
+ if (!changeCollector->OnUpdateTx(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId)) {
+ pageFault = true;
+ }
+ } else {
+ if (!changeCollector->OnUpdate(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion)) {
+ pageFault = true;
+ }
}
}
@@ -145,13 +166,31 @@ public:
}
DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells());
- txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion);
+
+ if (!volatileDependencies.empty() || volatileOrdered) {
+ txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId);
+ } else {
+ txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion);
+ }
}
if (pageFault && changeCollector) {
changeCollector->OnRestart();
}
+ if (!volatileDependencies.empty() || volatileOrdered) {
+ DataShard.GetVolatileTxManager().PersistAddVolatileTx(
+ globalTxId,
+ writeVersion,
+ /* commitTxIds */ { globalTxId },
+ volatileDependencies,
+ /* participants */ { },
+ groupProvider ? groupProvider->GetCurrentChangeGroup() : std::nullopt,
+ volatileOrdered,
+ txc);
+ // Note: transaction is already committed, no additional waiting needed
+ }
+
return !pageFault;
}
diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp
index 1b09efee406..3cd1c432576 100644
--- a/ydb/core/tx/datashard/operation.cpp
+++ b/ydb/core/tx/datashard/operation.cpp
@@ -33,7 +33,7 @@ void PrintDeps(const TOperation *op,
void TBasicOpInfo::Serialize(NKikimrTxDataShard::TBasicOpInfo &info) const
{
- info.SetTxId(TxId);
+ info.SetTxId(GetTxId());
info.SetStep(Step);
info.SetKind(ToString(Kind));
info.SetReceivedAt(ReceivedAt.GetValue());
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 004d10a406d..1569832efbb 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -115,7 +115,7 @@ public:
TBasicOpInfo()
: Kind(EOperationKind::Unknown)
, Flags(0)
- , TxId(0)
+ , GlobalTxId(0)
, Step(0)
, MinStep(0)
, MaxStep(0)
@@ -123,6 +123,15 @@ public:
{
}
+ TBasicOpInfo(EOperationKind kind,
+ ui64 flags,
+ ui64 maxStep,
+ TInstant receivedAt,
+ ui64 tieBreakerIndex)
+ : TBasicOpInfo(0, kind, flags, maxStep, receivedAt, tieBreakerIndex)
+ {
+ }
+
TBasicOpInfo(ui64 txId,
EOperationKind kind,
ui64 flags,
@@ -131,7 +140,7 @@ public:
ui64 tieBreakerIndex)
: Kind(kind)
, Flags(flags)
- , TxId(txId)
+ , GlobalTxId(txId)
, Step(0)
, ReceivedAt(receivedAt)
, MinStep(0)
@@ -342,10 +351,15 @@ public:
bool HasWaitCompletionFlag() const { return HasFlag(TTxFlags::WaitCompletion); }
void SetWaitCompletionFlag(bool val = true) { SetFlag(TTxFlags::WaitCompletion, val); }
+ bool HasWaitingForGlobalTxIdFlag() const { return HasFlag(TTxFlags::WaitingForGlobalTxId); }
+ void SetWaitingForGlobalTxIdFlag(bool val = true) { SetFlag(TTxFlags::WaitingForGlobalTxId, val); }
+
///////////////////////////////////
// OPERATION ID AND PLAN //
///////////////////////////////////
- ui64 GetTxId() const { return TxId; }
+ ui64 GetTxId() const { return GlobalTxId ? GlobalTxId : TieBreakerIndex; }
+ ui64 GetGlobalTxId() const { return GlobalTxId; }
+ void SetGlobalTxId(ui64 txId) { GlobalTxId = txId; }
ui64 GetStep() const { return Step; }
void SetStep(ui64 step) { Step = step; }
@@ -391,7 +405,7 @@ protected:
EOperationKind Kind;
// See TTxFlags.
ui64 Flags;
- ui64 TxId;
+ ui64 GlobalTxId;
ui64 Step;
TInstant ReceivedAt;
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index aebb0516cd1..5587c77b1e2 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -19,16 +19,19 @@ namespace NKikimr::NDataShard {
Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled);
Self->VolatileTxManager.PendingCommitTxScheduled = false;
- Y_VERIFY(!Self->VolatileTxManager.PendingCommits.empty());
- TxId = Self->VolatileTxManager.PendingCommits.front();
- Self->VolatileTxManager.PendingCommits.pop_front();
+ // We may have changed our mind
+ if (Self->VolatileTxManager.PendingCommits.Empty()) {
+ TxId = 0;
+ return true;
+ }
+
+ auto* info = Self->VolatileTxManager.PendingCommits.PopFront();
+ Y_VERIFY(info && info->State == EVolatileTxState::Committed);
+ TxId = info->TxId;
// Schedule another transaction if needed
Self->VolatileTxManager.RunPendingCommitTx();
- auto* info = Self->VolatileTxManager.FindByTxId(TxId);
- Y_VERIFY(info && info->State == EVolatileTxState::Committed);
-
for (auto& pr : Self->GetUserTables()) {
auto tid = pr.second->LocalTid;
for (ui64 commitTxId : info->CommitTxIds) {
@@ -77,9 +80,14 @@ namespace NKikimr::NDataShard {
}
void Complete(const TActorContext& ctx) override {
+ if (TxId == 0) {
+ return;
+ }
+
if (Delayed) {
OnCommitted(ctx);
}
+
if (Collected) {
Self->EnqueueChangeRecords(std::move(Collected));
}
@@ -119,16 +127,19 @@ namespace NKikimr::NDataShard {
Y_VERIFY(Self->VolatileTxManager.PendingAbortTxScheduled);
Self->VolatileTxManager.PendingAbortTxScheduled = false;
- Y_VERIFY(!Self->VolatileTxManager.PendingAborts.empty());
- TxId = Self->VolatileTxManager.PendingAborts.front();
- Self->VolatileTxManager.PendingAborts.pop_front();
+ // We may have changed our mind
+ if (Self->VolatileTxManager.PendingAborts.Empty()) {
+ TxId = 0;
+ return true;
+ }
+
+ auto* info = Self->VolatileTxManager.PendingAborts.PopFront();
+ Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
+ TxId = info->TxId;
// Schedule another transaction if needed
Self->VolatileTxManager.RunPendingAbortTx();
- auto* info = Self->VolatileTxManager.FindByTxId(TxId);
- Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
-
for (auto& pr : Self->GetUserTables()) {
auto tid = pr.second->LocalTid;
for (ui64 commitTxId : info->CommitTxIds) {
@@ -143,6 +154,10 @@ namespace NKikimr::NDataShard {
}
void Complete(const TActorContext& ctx) override {
+ if (TxId == 0) {
+ return;
+ }
+
auto* info = Self->VolatileTxManager.FindByTxId(TxId);
Y_VERIFY(info && info->State == EVolatileTxState::Aborting);
Y_VERIFY(info->AddCommitted);
@@ -237,11 +252,11 @@ namespace NKikimr::NDataShard {
break;
case EVolatileTxState::Committed:
if (ReadyToDbCommit(pr.second.get())) {
- PendingCommits.push_back(pr.first);
+ PendingCommits.PushBack(pr.second.get());
}
break;
case EVolatileTxState::Aborting:
- PendingAborts.push_back(pr.first);
+ PendingAborts.PushBack(pr.second.get());
Y_FAIL("FIXME: unexpected persistent aborting state");
break;
}
@@ -417,7 +432,7 @@ namespace NKikimr::NDataShard {
void TVolatileTxManager::PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
- TConstArrayRef<ui64> dependencies,
+ const absl::flat_hash_set<ui64>& dependencies,
TConstArrayRef<ui64> participants,
std::optional<ui64> changeGroup,
bool commitOrdered,
@@ -437,7 +452,7 @@ namespace NKikimr::NDataShard {
info->TxId = txId;
info->Version = version;
info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end());
- info->Dependencies.insert(dependencies.begin(), dependencies.end());
+ info->Dependencies = dependencies;
info->Participants.insert(participants.begin(), participants.end());
info->ChangeGroup = changeGroup;
info->CommitOrder = NextCommitOrder++;
@@ -504,6 +519,9 @@ namespace NKikimr::NDataShard {
db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update();
}
+ txc.DB.OnRollback([this, txId]() {
+ RollbackAddVolatileTx(txId);
+ });
txc.OnCommitted([this, txId]() {
auto* info = FindByTxId(txId);
Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId);
@@ -513,15 +531,37 @@ namespace NKikimr::NDataShard {
RunCommitCallbacks(info);
}
});
- txc.OnRollback([txId]() {
- Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId);
- });
if (ReadyToDbCommit(info)) {
AddPendingCommit(info->TxId);
}
}
+ void TVolatileTxManager::RollbackAddVolatileTx(ui64 txId) {
+ auto* info = FindByTxId(txId);
+ Y_VERIFY_S(info, "Rollback cannot find volatile txId# " << txId);
+
+ // Unlink dependencies
+ for (ui64 dependencyTxId : info->Dependencies) {
+ if (auto* dependency = FindByTxId(dependencyTxId)) {
+ dependency->Dependents.erase(txId);
+ }
+ }
+
+ // Unlink commits
+ for (ui64 commitTxId : info->CommitTxIds) {
+ TxMap->Remove(commitTxId);
+ VolatileTxByCommitTxId.erase(commitTxId);
+ }
+
+ VolatileTxByVersion.erase(info);
+
+ // FIXME: do we need to handle WaitingSnapshotEvents somehow?
+
+ // This will also unlink from linked lists
+ VolatileTxs.erase(txId);
+ }
+
void TVolatileTxManager::PersistRemoveVolatileTx(ui64 txId, TTransactionContext& txc) {
using Schema = TDataShard::Schema;
@@ -543,7 +583,8 @@ namespace NKikimr::NDataShard {
Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies");
Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents");
- Y_VERIFY_S(info->Empty(), "Unexpected remove of volatile tx " << txId << " which is in commit order linked list");
+ Y_VERIFY_S(!info->IsInList<TVolatileTxInfoCommitOrderListTag>(),
+ "Unexpected remove of volatile tx " << txId << " which is in commit order linked list");
UnblockWaitingRemovalOperations(info);
@@ -838,34 +879,39 @@ namespace NKikimr::NDataShard {
}
void TVolatileTxManager::AddPendingCommit(ui64 txId) {
- PendingCommits.push_back(txId);
- RunPendingCommitTx();
+ if (auto* info = FindByTxId(txId)) {
+ PendingCommits.PushBack(info);
+ RunPendingCommitTx();
+ }
}
void TVolatileTxManager::AddPendingAbort(ui64 txId) {
- PendingAborts.push_back(txId);
- RunPendingAbortTx();
+ if (auto* info = FindByTxId(txId)) {
+ PendingAborts.PushBack(info);
+ RunPendingAbortTx();
+ }
}
void TVolatileTxManager::RunPendingCommitTx() {
- if (!PendingCommitTxScheduled && !PendingCommits.empty()) {
+ if (!PendingCommitTxScheduled && !PendingCommits.Empty()) {
PendingCommitTxScheduled = true;
Self->Execute(new TDataShard::TTxVolatileTxCommit(Self));
}
}
void TVolatileTxManager::RunPendingAbortTx() {
- if (!PendingAbortTxScheduled && !PendingAborts.empty()) {
+ if (!PendingAbortTxScheduled && !PendingAborts.Empty()) {
PendingAbortTxScheduled = true;
Self->EnqueueExecute(new TDataShard::TTxVolatileTxAbort(Self));
}
}
void TVolatileTxManager::RemoveFromCommitOrder(TVolatileTxInfo* info) {
- Y_VERIFY(!info->Empty(), "Volatile transaction is not in a commit order linked list");
+ Y_VERIFY(info->IsInList<TVolatileTxInfoCommitOrderListTag>(),
+ "Volatile transaction is not in a commit order linked list");
Y_VERIFY(!VolatileTxByCommitOrder.Empty(), "Commit order linked list is unexpectedly empty");
const bool wasFirst = VolatileTxByCommitOrder.Front() == info;
- info->Unlink();
+ info->UnlinkFromList<TVolatileTxInfoCommitOrderListTag>();
if (wasFirst && !VolatileTxByCommitOrder.Empty()) {
auto* next = VolatileTxByCommitOrder.Front();
if (next->CommitOrdered && ReadyToDbCommit(next)) {
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 627c0c24236..b9991a07ee9 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -41,8 +41,14 @@ namespace NKikimr::NDataShard {
virtual void OnAbort(ui64 txId) = 0;
};
+ struct TVolatileTxInfoCommitOrderListTag {};
+ struct TVolatileTxInfoPendingCommitListTag {};
+ struct TVolatileTxInfoPendingAbortListTag {};
+
struct TVolatileTxInfo
- : public TIntrusiveListItem<TVolatileTxInfo>
+ : public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoCommitOrderListTag>
+ , public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoPendingCommitListTag>
+ , public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoPendingAbortListTag>
{
ui64 CommitOrder;
ui64 TxId;
@@ -58,6 +64,18 @@ namespace NKikimr::NDataShard {
absl::flat_hash_set<ui64> BlockedOperations;
absl::flat_hash_set<ui64> WaitingRemovalOperations;
TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks;
+
+ template<class TTag>
+ bool IsInList() const {
+ using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>;
+ return !static_cast<const TItem*>(this)->Empty();
+ }
+
+ template<class TTag>
+ void UnlinkFromList() {
+ using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>;
+ static_cast<TItem*>(this)->Unlink();
+ }
};
class TVolatileTxManager {
@@ -179,7 +197,7 @@ namespace NKikimr::NDataShard {
void PersistAddVolatileTx(
ui64 txId, const TRowVersion& version,
TConstArrayRef<ui64> commitTxIds,
- TConstArrayRef<ui64> dependencies,
+ const absl::flat_hash_set<ui64>& dependencies,
TConstArrayRef<ui64> participants,
std::optional<ui64> changeGroup,
bool commitOrdered,
@@ -212,6 +230,7 @@ namespace NKikimr::NDataShard {
}
private:
+ void RollbackAddVolatileTx(ui64 txId);
void PersistRemoveVolatileTx(ui64 txId, TTransactionContext& txc);
void RemoveVolatileTx(ui64 txId);
@@ -236,11 +255,11 @@ namespace NKikimr::NDataShard {
absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info
absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info
TVolatileTxByVersion VolatileTxByVersion;
- TIntrusiveList<TVolatileTxInfo> VolatileTxByCommitOrder;
+ TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoCommitOrderListTag> VolatileTxByCommitOrder;
std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents;
TIntrusivePtr<TTxMap> TxMap;
- std::deque<ui64> PendingCommits;
- std::deque<ui64> PendingAborts;
+ TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoPendingCommitListTag> PendingCommits;
+ TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoPendingAbortListTag> PendingAborts;
ui64 NextCommitOrder = 1;
bool PendingCommitTxScheduled = false;
bool PendingAbortTxScheduled = false;