diff options
author | snaury <snaury@ydb.tech> | 2023-03-20 15:17:56 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-20 15:17:56 +0300 |
commit | 3949f900d72edad109de1ea0e6213f2092f86790 (patch) | |
tree | 3c4cd16841ab452104bf58afd2b935e1e5fc54f5 | |
parent | e4db80492edac29772697c7d2663a2f10ee94ecf (diff) | |
download | ydb-3949f900d72edad109de1ea0e6213f2092f86790.tar.gz |
Handle volatile conflicts in bulk operations
27 files changed, 560 insertions, 111 deletions
diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h index 2b9b034a827..b8171933ea6 100644 --- a/ydb/core/tx/datashard/change_collector.h +++ b/ydb/core/tx/datashard/change_collector.h @@ -16,7 +16,7 @@ protected: ~IDataShardChangeGroupProvider() = default; public: - virtual bool HasChangeGroup() const = 0; + virtual std::optional<ui64> GetCurrentChangeGroup() const = 0; virtual ui64 GetChangeGroup() = 0; }; @@ -31,8 +31,8 @@ public: , Group(group) { } - bool HasChangeGroup() const override { - return bool(Group); + std::optional<ui64> GetCurrentChangeGroup() const override { + return Group; } ui64 GetChangeGroup() override; diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 206585a7ce1..1797b482798 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3879,6 +3879,8 @@ void SendViaSession(const TActorId& sessionId, } class TBreakWriteConflictsTxObserver : public NTable::ITransactionObserver { + friend class TBreakWriteConflictsTxObserverVolatileDependenciesGuard; + public: TBreakWriteConflictsTxObserver(TDataShard* self) : Self(self) @@ -3886,7 +3888,14 @@ public: } void OnSkipUncommitted(ui64 txId) override { - Self->SysLocksTable().BreakLock(txId); + if (auto* info = Self->GetVolatileTxManager().FindByCommitTxId(txId)) { + if (info->State != EVolatileTxState::Aborting) { + Y_VERIFY(VolatileDependencies); + VolatileDependencies->insert(txId); + } + } else { + Self->SysLocksTable().BreakLock(txId); + } } void OnSkipCommitted(const TRowVersion&) override { @@ -3907,9 +3916,31 @@ public: private: TDataShard* Self; + absl::flat_hash_set<ui64>* VolatileDependencies = nullptr; }; -bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells) { +class TBreakWriteConflictsTxObserverVolatileDependenciesGuard { +public: + TBreakWriteConflictsTxObserverVolatileDependenciesGuard( + TBreakWriteConflictsTxObserver* observer, + absl::flat_hash_set<ui64>& volatileDependencies) + : Observer(observer) + { + Y_VERIFY(!Observer->VolatileDependencies); + Observer->VolatileDependencies = &volatileDependencies; + } + + ~TBreakWriteConflictsTxObserverVolatileDependenciesGuard() { + Observer->VolatileDependencies = nullptr; + } + +private: + TBreakWriteConflictsTxObserver* const Observer; +}; + +bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, + TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies) +{ const auto localTid = GetLocalTableId(tableId); Y_VERIFY(localTid); const NTable::TScheme& scheme = db.GetScheme(); @@ -3921,6 +3952,10 @@ bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tabl BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this); } + TBreakWriteConflictsTxObserverVolatileDependenciesGuard guard( + static_cast<TBreakWriteConflictsTxObserver*>(BreakWriteConflictsTxObserver.Get()), + volatileDependencies); + // We are not actually interested in the row version, we only need to // detect uncommitted transaction skips on the path to that version. auto res = db.SelectRowVersion( @@ -3966,6 +4001,15 @@ void TDataShard::Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorConte Execute(new TTxGetOpenTxs(this, std::move(ev)), ctx); } +void TDataShard::Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev, const TActorContext& ctx) { + auto op = Pipeline.FindOp(ev->Cookie); + if (op && op->HasWaitingForGlobalTxIdFlag()) { + Pipeline.ProvideGlobalTxId(op, ev->Get()->TxId); + Pipeline.AddCandidateOp(op); + PlanQueue.Progress(ctx); + } +} + } // NDataShard diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index b45c0a0affa..5b3a9fb36a7 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -122,6 +122,8 @@ namespace NDataShard { WaitingForAsyncJob = 1ULL << 43, // Operation must complete before results sending WaitCompletion = 1ULL << 44, + // Waiting for global tx id allocation + WaitingForGlobalTxId = 1ULL << 45, LastFlag = WaitCompletion, diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 819affd1509..485e7562493 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -275,8 +275,8 @@ public: IsRepeatableSnapshot = true; } - bool HasChangeGroup() const override { - return bool(ChangeGroup); + std::optional<ui64> GetCurrentChangeGroup() const override { + return ChangeGroup; } ui64 GetChangeGroup() override { @@ -321,7 +321,7 @@ public: return; } - if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min())) { + if (auto lock = Self->SysLocksTable().GetRawLock(lockId, TRowVersion::Min()); lock && !VolatileCommitOrdered) { lock->ForAllVolatileDependencies([this](ui64 txId) { if (VolatileDependencies.insert(txId).second && !VolatileTxId) { VolatileTxId = EngineBay.GetTxId(); @@ -390,17 +390,8 @@ public: return commitTxIds; } - TVector<ui64> GetVolatileDependencies() const { - TVector<ui64> dependencies; - - if (!VolatileDependencies.empty() && !VolatileCommitOrdered) { - dependencies.reserve(VolatileDependencies.size()); - for (ui64 dependency : VolatileDependencies) { - dependencies.push_back(dependency); - } - } - - return dependencies; + const absl::flat_hash_set<ui64>& GetVolatileDependencies() const { + return VolatileDependencies; } std::optional<ui64> GetVolatileChangeGroup() const { @@ -842,6 +833,7 @@ public: VolatileTxId = EngineBay.GetTxId(); } VolatileCommitOrdered = true; + VolatileDependencies.clear(); } return; } @@ -956,7 +948,7 @@ public: // even though they may not be persistent yet, since this tx will // also perform writes, and either it fails, or future generation // could not have possibly committed it already. - if (info->State != EVolatileTxState::Aborting) { + if (info->State != EVolatileTxState::Aborting && !VolatileCommitOrdered) { if (!VolatileTxId) { // All further writes will use this VolatileTxId and will // add it to VolatileCommitTxIds, forcing it to be committed @@ -1220,7 +1212,7 @@ TVector<ui64> TEngineBay::GetVolatileCommitTxIds() const { return host->GetVolatileCommitTxIds(); } -TVector<ui64> TEngineBay::GetVolatileDependencies() const { +const absl::flat_hash_set<ui64>& TEngineBay::GetVolatileDependencies() const { Y_VERIFY(EngineHost); auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index f6d6446fd29..0c83a561869 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -113,7 +113,7 @@ public: void ResetCollectedChanges(); TVector<ui64> GetVolatileCommitTxIds() const; - TVector<ui64> GetVolatileDependencies() const; + const absl::flat_hash_set<ui64>& GetVolatileDependencies() const; std::optional<ui64> GetVolatileChangeGroup() const; bool GetVolatileCommitOrdered() const; diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index c55cf209f5e..7f19015743a 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -12,6 +12,7 @@ class TTxDirectBase : public TTransactionBase<TDataShard> { TOperation::TPtr Op; TVector<EExecutionUnitKind> CompleteList; + bool WaitComplete = false; public: TTxDirectBase(TDataShard* ds, TEvRequest ev) @@ -30,29 +31,68 @@ public: if (Ev) { const ui64 tieBreaker = Self->NextTieBreakerIndex++; - Op = new TDirectTransaction(tieBreaker, ctx.Now(), tieBreaker, Ev); + Op = new TDirectTransaction(ctx.Now(), tieBreaker, Ev); Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); Ev = nullptr; + Op->IncrementInProgress(); } + Y_VERIFY(Op && Op->IsInProgress() && !Op->GetExecutionPlan().empty()); + auto status = Self->Pipeline.RunExecutionPlan(Op, CompleteList, txc, ctx); - if (!CompleteList.empty()) { - return true; - } else if (status == EExecutionStatus::Restart) { - return false; + + switch (status) { + case EExecutionStatus::Restart: + return false; + + case EExecutionStatus::Reschedule: + Y_FAIL("Unexpected Reschedule status while handling a direct operation"); + + case EExecutionStatus::Executed: + case EExecutionStatus::Continue: + Op->DecrementInProgress(); + break; + + case EExecutionStatus::WaitComplete: + WaitComplete = true; + break; + + case EExecutionStatus::ExecutedNoMoreRestarts: + case EExecutionStatus::DelayComplete: + case EExecutionStatus::DelayCompleteNoMoreRestarts: + Y_FAIL_S("unexpected execution status " << status << " for operation " + << *Op << " " << Op->GetKind() << " at " << Self->TabletID()); + } + + if (WaitComplete || !CompleteList.empty()) { + // Keep current operation } else { Op = nullptr; - return true; } + + return true; } void Complete(const TActorContext& ctx) override { LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TTxDirectBase(" << GetTxType() << ") Complete" << ": at tablet# " << Self->TabletID()); - Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); + if (Op) { + if (!CompleteList.empty()) { + Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); + } + + if (WaitComplete) { + Op->DecrementInProgress(); + + if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) { + Self->Pipeline.AddCandidateOp(Op); + } + } + } + if (Self->Pipeline.CanRunAnotherOp()) { Self->PlanQueue.Progress(ctx); } diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index be2ba9a37a5..d641c4086cf 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -897,8 +897,8 @@ class TDataShard::TReadOperation : public TOperation, public IReadOperation { static constexpr ui32 Flags = NTxDataShard::TTxFlags::ReadOnly | NTxDataShard::TTxFlags::Immediate; public: - TReadOperation(TDataShard* ds, ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev) - : TOperation(TBasicOpInfo(txId, EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex)) + TReadOperation(TDataShard* ds, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvRead::TPtr ev) + : TOperation(TBasicOpInfo(EOperationKind::ReadTx, Flags, 0, receivedAt, tieBreakerIndex)) , Self(ds) , Sender(ev->Sender) , Request(ev->Release().Release()) @@ -1723,7 +1723,7 @@ public: if (Ev) { const ui64 tieBreaker = Self->NextTieBreakerIndex++; - Op = new TReadOperation(Self, tieBreaker, ctx.Now(), tieBreaker, Ev); + Op = new TReadOperation(Self, ctx.Now(), tieBreaker, Ev); Op->BuildExecutionPlan(false); Self->Pipeline.GetExecutionUnit(Op->GetCurrentUnit()).AddOperation(Op); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 1ffdab103bd..7ddf8478bb1 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -189,7 +189,7 @@ public: void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); } TVector<ui64> GetVolatileCommitTxIds() const { return EngineBay.GetVolatileCommitTxIds(); } - TVector<ui64> GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); } + const absl::flat_hash_set<ui64>& GetVolatileDependencies() const { return EngineBay.GetVolatileDependencies(); } std::optional<ui64> GetVolatileChangeGroup() const { return EngineBay.GetVolatileChangeGroup(); } bool GetVolatileCommitOrdered() const { return EngineBay.GetVolatileCommitOrdered(); } diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 6cd4da5e67c..61d132b9e94 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -15,7 +15,7 @@ TCommonUploadOps<TEvRequest, TEvResponse>::TCommonUploadOps(typename TEvRequest: template <typename TEvRequest, typename TEvResponse> bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTransactionContext& txc, - const TRowVersion& readVersion, const TRowVersion& writeVersion) + const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId) { const auto& record = Ev->Get()->Record; Result = MakeHolder<TEvResponse>(self->TabletID()); @@ -58,7 +58,9 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans const bool readForTableShadow = writeToTableShadow && !shadowTableId; const ui32 writeTableId = writeToTableShadow && shadowTableId ? shadowTableId : localTableId; - const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId); + const bool breakWriteConflicts = BreakLocks && ( + self->SysLocksTable().HasWriteLocks(fullTableId) || + self->GetVolatileTxManager().GetTxMap()); TDataShardUserDb userDb(*self, txc.DB, readVersion); TDataShardChangeGroupProvider groupProvider(*self, txc.DB); @@ -91,6 +93,8 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans bool pageFault = false; NTable::TRowState rowState; + absl::flat_hash_set<ui64> volatileDependencies; + ui64 bytes = 0; for (const auto& r : record.GetRows()) { // TODO: use safe parsing! @@ -188,7 +192,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans if (BreakLocks) { if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) { + if (!self->BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { pageFault = true; } @@ -201,7 +205,14 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } } - txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); + if (!volatileDependencies.empty()) { + if (!globalTxId) { + throw TNeedGlobalTxId(); + } + txc.DB.UpdateTx(writeTableId, NTable::ERowOp::Upsert, key, value, globalTxId); + } else { + txc.DB.Update(writeTableId, NTable::ERowOp::Upsert, key, value, writeVersion); + } } if (pageFault) { @@ -212,6 +223,19 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans return false; } + if (!volatileDependencies.empty()) { + self->GetVolatileTxManager().PersistAddVolatileTx( + globalTxId, + writeVersion, + /* commitTxIds */ { globalTxId }, + volatileDependencies, + /* participants */ { }, + groupProvider.GetCurrentChangeGroup(), + /* ordered */ false, + txc); + // Note: transaction is already committed, no additional waiting needed + } + self->IncCounter(COUNTER_UPLOAD_ROWS, record.GetRows().size()); self->IncCounter(COUNTER_UPLOAD_ROWS_BYTES, bytes); diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 7c7f79c99ee..50284f42979 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -21,7 +21,7 @@ public: explicit TCommonUploadOps(typename TEvRequest::TPtr& ev, bool breakLocks, bool collectChanges); protected: - bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion); + bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion, ui64 globalTxId); void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie); const TEvRequest* GetRequest() const; TEvResponse* GetResult(); diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index e27bec92afd..2d7c033bb4a 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -74,7 +74,11 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, *groupProvider, params.Txc->DB, tableInfo)); } - const bool breakWriteConflicts = self->SysLocksTable().HasWriteLocks(fullTableId); + const bool breakWriteConflicts = ( + self->SysLocksTable().HasWriteLocks(fullTableId) || + self->GetVolatileTxManager().GetTxMap()); + + absl::flat_hash_set<ui64> volatileDependencies; bool pageFault = false; for (const auto& serializedKey : request.GetKeyColumns()) { @@ -144,7 +148,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } if (breakWriteConflicts) { - if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells())) { + if (!self->BreakWriteConflicts(params.Txc->DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { pageFault = true; } } @@ -154,7 +158,15 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } self->SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); - params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion); + + if (!volatileDependencies.empty()) { + if (!params.GlobalTxId) { + throw TNeedGlobalTxId(); + } + params.Txc->DB.UpdateTx(localTableId, NTable::ERowOp::Erase, key, {}, params.GlobalTxId); + } else { + params.Txc->DB.Update(localTableId, NTable::ERowOp::Erase, key, {}, params.WriteVersion); + } } if (pageFault) { @@ -165,6 +177,19 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( return EStatus::PageFault; } + if (!volatileDependencies.empty()) { + self->GetVolatileTxManager().PersistAddVolatileTx( + params.GlobalTxId, + params.WriteVersion, + /* commitTxIds */ { params.GlobalTxId }, + volatileDependencies, + /* participants */ { }, + groupProvider ? groupProvider->GetCurrentChangeGroup() : std::nullopt, + /* ordered */ false, + *params.Txc); + // Note: transaction is already committed, no additional waiting needed + } + status = NKikimrTxDataShard::TEvEraseRowsResponse::OK; return EStatus::Success; } @@ -184,14 +209,15 @@ bool TDirectTxErase::CheckRequest(TDataShard* self, const NKikimrTxDataShard::TE } bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, - const TRowVersion& readVersion, const TRowVersion& writeVersion) + const TRowVersion& readVersion, const TRowVersion& writeVersion, + ui64 globalTxId) { const auto& record = Ev->Get()->Record; Result = MakeHolder<TEvDataShard::TEvEraseRowsResponse>(); Result->Record.SetTabletID(self->TabletID()); - const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion); + const auto params = TExecuteParams::ForExecute(this, &txc, readVersion, writeVersion, globalTxId); NKikimrTxDataShard::TEvEraseRowsResponse::EStatus status; TString error; diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index c79ef35bd17..9904fd404e9 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -22,20 +22,23 @@ class TDirectTxErase : public IDirectTx { TTransactionContext* const Txc; const TRowVersion ReadVersion; const TRowVersion WriteVersion; + const ui64 GlobalTxId; private: explicit TExecuteParams(TDirectTxErase* tx, TTransactionContext* txc, - const TRowVersion& readVersion, const TRowVersion& writeVersion) + const TRowVersion& readVersion, const TRowVersion& writeVersion, + const ui64 globalTxId) : Tx(tx) , Txc(txc) , ReadVersion(readVersion) , WriteVersion(writeVersion) + , GlobalTxId(globalTxId) { } public: static TExecuteParams ForCheck() { - return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion()); + return TExecuteParams(nullptr, nullptr, TRowVersion(), TRowVersion(), 0); } template <typename... Args> @@ -68,7 +71,9 @@ public: static bool CheckRequest(TDataShard* self, const NKikimrTxDataShard::TEvEraseRowsRequest& request, NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error); - bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; + bool Execute(TDataShard* self, TTransactionContext& txc, + const TRowVersion& readVersion, const TRowVersion& writeVersion, + ui64 globalTxId) override; TDirectTxResult GetResult(TDataShard* self) override; TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp index f83c8ef5504..3b228035e64 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp @@ -5,14 +5,14 @@ namespace NKikimr { namespace NDataShard { -TDirectTransaction::TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev) - : TOperation(TBasicOpInfo(txId, EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex)) +TDirectTransaction::TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev) + : TOperation(TBasicOpInfo(EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex)) , Impl(new TDirectTxUpload(ev)) { } -TDirectTransaction::TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev) - : TOperation(TBasicOpInfo(txId, EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex)) +TDirectTransaction::TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev) + : TOperation(TBasicOpInfo(EOperationKind::DirectTx, Flags, 0, receivedAt, tieBreakerIndex)) , Impl(new TDirectTxErase(ev)) { } @@ -32,7 +32,9 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded) bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) { auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); - if (!Impl->Execute(self, txc, readVersion, writeVersion)) + + // NOTE: may throw TNeedGlobalTxId exception, which is handled in direct tx unit + if (!Impl->Execute(self, txc, readVersion, writeVersion, GetGlobalTxId())) return false; if (self->IsMvccEnabled()) { diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h index 99933431cad..f2bfbc94f28 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.h +++ b/ydb/core/tx/datashard/datashard_direct_transaction.h @@ -20,15 +20,17 @@ struct TDirectTxResult { class IDirectTx { public: virtual ~IDirectTx() = default; - virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0; + virtual bool Execute(TDataShard* self, TTransactionContext& txc, + const TRowVersion& readVersion, const TRowVersion& writeVersion, + ui64 globalTxId) = 0; virtual TDirectTxResult GetResult(TDataShard* self) = 0; virtual TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const = 0; }; class TDirectTransaction : public TOperation { public: - TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev); - TDirectTransaction(ui64 txId, TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev); + TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvUploadRowsRequest::TPtr& ev); + TDirectTransaction(TInstant receivedAt, ui64 tieBreakerIndex, TEvDataShard::TEvEraseRowsRequest::TPtr& ev); void BuildExecutionPlan(bool) override; diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp index 61b586f0970..21dd40d7b6d 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.cpp +++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp @@ -8,8 +8,11 @@ TDirectTxUpload::TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev) { } -bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) { - return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion); +bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, + const TRowVersion& readVersion, const TRowVersion& writeVersion, + ui64 globalTxId) +{ + return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion, globalTxId); } TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) { diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index 17a94b2b06b..4a59bdeb806 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -14,7 +14,9 @@ class TDirectTxUpload : public IDirectTx public: explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev); - bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; + bool Execute(TDataShard* self, TTransactionContext& txc, + const TRowVersion& readVersion, const TRowVersion& writeVersion, + ui64 globalTxId) override; TDirectTxResult GetResult(TDataShard* self) override; TVector<IDataShardChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index c977cbb7209..1cf51f6fd2e 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -23,6 +23,7 @@ #include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> +#include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/base/appdata.h> @@ -149,6 +150,8 @@ enum class TSwitchState { class TDataShardEngineHost; struct TSetupSysLocks; +class TNeedGlobalTxId : public yexception {}; + /// class TDataShard : public TActor<TDataShard> @@ -1204,6 +1207,7 @@ class TDataShard void Handle(TEvTxProcessing::TEvInterruptTransaction::TPtr &ev, const TActorContext &ctx) { ForwardEventToOperation(ev, ctx); } + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr &ev, const TActorContext &ctx); void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr ev, const TActorContext &ctx); void Handle(TEvents::TEvUndelivered::TPtr& ev, const TActorContext& ctx); @@ -1849,12 +1853,13 @@ public: * * Prerequisites: TSetupSysLocks is active and caller does not have any * uncommitted write locks. - * Note: the specified table should have some write locks, otherwise - * this call is a very expensive no-op. + * Note: the specified table should have potential conflicting changes, + * otherwise this call is a very expensive no-op. * * Returns true on success and false on page fault. */ - bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, TArrayRef<const TCell> keyCells); + bool BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId, + TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies); private: /// @@ -2697,6 +2702,7 @@ protected: HFuncTraced(TEvTxProcessing::TEvStreamClearanceResponse, Handle); HFuncTraced(TEvTxProcessing::TEvStreamClearancePending, Handle); HFuncTraced(TEvTxProcessing::TEvInterruptTransaction, Handle); + HFuncTraced(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); HFuncTraced(TEvPrivate::TEvProgressTransaction, Handle); HFuncTraced(TEvPrivate::TEvCleanupTransaction, Handle); HFuncTraced(TEvPrivate::TEvDelayedProposeTransaction, Handle); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 655e898effc..f8e8e543219 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -2000,4 +2000,33 @@ bool TPipeline::AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& gu return addedDependencies; } +void TPipeline::ProvideGlobalTxId(const TOperation::TPtr& op, ui64 globalTxId) { + Y_VERIFY(op->HasWaitingForGlobalTxIdFlag()); + ui64 localTxId = op->GetTxId(); + + auto itImmediate = ImmediateOps.find(localTxId); + Y_VERIFY(itImmediate != ImmediateOps.end()); + ImmediateOps.erase(itImmediate); + auto itActive = ActiveOps.find(op->GetStepOrder()); + Y_VERIFY(itActive != ActiveOps.end()); + ActiveOps.erase(itActive); + bool removedCandidate = false; + auto itCandidate = CandidateOps.find(op->GetStepOrder()); + if (itCandidate != CandidateOps.end()) { + CandidateOps.erase(itCandidate); + removedCandidate = true; + } + + op->SetGlobalTxId(globalTxId); + op->SetWaitingForGlobalTxIdFlag(false); + auto resImmediate = ImmediateOps.emplace(op->GetTxId(), op); + Y_VERIFY(resImmediate.second); + auto resActive = ActiveOps.emplace(op->GetStepOrder(), op); + Y_VERIFY(resActive.second); + if (removedCandidate) { + auto resCandidate = CandidateOps.emplace(op->GetStepOrder(), op); + Y_VERIFY(resCandidate.second); + } +} + }} diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index e8c06dea8a1..fcfc94059ce 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -374,6 +374,13 @@ public: */ bool AddLockDependencies(const TOperation::TPtr& op, TLocksUpdate& guardLocks); + /** + * Provides a global txId for the waiting operation + * + * The operation must have a WaitingForGlobalTxId flag. + */ + void ProvideGlobalTxId(const TOperation::TPtr& op, ui64 globalTxId); + private: struct TStoredExecutionProfile { TBasicOpInfo OpInfo; diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp index 1aec8f60214..16d4766971b 100644 --- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp +++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp @@ -11,7 +11,9 @@ TDataShard::TTxUnsafeUploadRows::TTxUnsafeUploadRows(TDataShard* ds, TEvDataShar bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TActorContext&) { auto [readVersion, writeVersion] = Self->GetReadWriteVersions(); - if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) { + + // NOTE: will not throw TNeedGlobalTxId since we set breakLocks to false + if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion, /* globalTxId */ 0)) { return false; } diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index b8d9145f426..be43f27767c 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -2,6 +2,9 @@ #include "datashard_ut_common_kqp.h" #include "datashard_active_transaction.h" +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/tx_proxy/upload_rows.h> + #include <ydb/core/kqp/executer_actor/kqp_executer.h> namespace NKikimr { @@ -1412,6 +1415,126 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 20 } items { uint32_value: 30 } }"); } + Y_UNIT_TEST(DistributedWriteThenBulkUpsert) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + {"value2", "Uint32", false, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + CreateShardedTable(server, sender, "/Root", "table-2", opts); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + ui64 maxReadSetStep = 0; + bool captureReadSets = true; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + maxReadSetStep = Max(maxReadSetStep, msg->Record.GetStep()); + if (captureReadSets) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (2, 2, 42); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedReadSets.size() >= 4; }, "captured readsets"); + UNIT_ASSERT_VALUES_EQUAL(capturedReadSets.size(), 4u); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Write to key 2 using bulk upsert + { + using TRows = TVector<std::pair<TSerializedCellVec, TString>>; + using TRowTypes = TVector<std::pair<TString, Ydb::Type>>; + + auto types = std::make_shared<TRowTypes>(); + + Ydb::Type type; + type.set_type_id(Ydb::Type::UINT32); + types->emplace_back("key", type); + types->emplace_back("value", type); + + auto rows = std::make_shared<TRows>(); + + TVector<TCell> key{ TCell::Make(ui32(2)) }; + TVector<TCell> values{ TCell::Make(ui32(22)) }; + TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key)); + TString serializedValues(TSerializedCellVec::Serialize(values)); + rows->emplace_back(serializedKey, serializedValues); + + auto upsertSender = runtime.AllocateEdgeActor(); + auto actor = NTxProxy::CreateUploadRowsInternal(upsertSender, "/Root/table-1", types, rows); + runtime.Register(actor); + + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvUploadRowsResponse>(upsertSender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, Ydb::StatusIds::SUCCESS); + } + + // This compaction verifies there's no commit race with the waiting + // distributed transaction. If commits happen in incorrect order we + // would observe unexpected results. + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + CompactTable(runtime, shard1, tableId1, false); + + runtime.SetObserverFunc(prevObserverFunc); + for (auto& ev : capturedReadSets) { + runtime.Send(ev.Release(), 0, true); + } + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "<empty>"); + + // Verify the result + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value, value2 FROM `/Root/table-1` + UNION ALL + SELECT key, value, value2 FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 2 } items { uint32_value: 22 } items { uint32_value: 42 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } items { null_flag_value: NULL_VALUE } }, " + "{ items { uint32_value: 20 } items { uint32_value: 20 } items { null_flag_value: NULL_VALUE } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/direct_tx_unit.cpp b/ydb/core/tx/datashard/direct_tx_unit.cpp index 1d0ea8ad17d..d469f83e849 100644 --- a/ydb/core/tx/datashard/direct_tx_unit.cpp +++ b/ydb/core/tx/datashard/direct_tx_unit.cpp @@ -19,12 +19,16 @@ public: } bool IsReadyToExecute(TOperation::TPtr op) const override { - return !op->HasRuntimeConflicts(); + return !op->HasRuntimeConflicts() && !op->HasWaitingForGlobalTxIdFlag(); } EExecutionStatus Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { Y_UNUSED(ctx); + if (op->HasWaitingForGlobalTxIdFlag()) { + return EExecutionStatus::Continue; + } + if (op->IsImmediate()) { // Every time we execute immediate transaction we may choose a new mvcc version op->MvccReadWriteVersion.reset(); @@ -36,8 +40,26 @@ public: TDirectTransaction* tx = dynamic_cast<TDirectTransaction*>(op.Get()); Y_VERIFY(tx != nullptr); - if (!tx->Execute(&DataShard, txc)) { - return EExecutionStatus::Restart; + try { + if (!tx->Execute(&DataShard, txc)) { + return EExecutionStatus::Restart; + } + } catch (const TNeedGlobalTxId&) { + Y_VERIFY_S(op->GetGlobalTxId() == 0, + "Unexpected TNeedGlobalTxId exception for direct operation with TxId# " << op->GetGlobalTxId()); + Y_VERIFY_S(op->IsImmediate(), + "Unexpected TNeedGlobalTxId exception for a non-immediate operation with TxId# " << op->GetTxId()); + + ctx.Send(MakeTxProxyID(), + new TEvTxUserProxy::TEvAllocateTxId(), + 0, op->GetTxId()); + op->SetWaitingForGlobalTxIdFlag(); + + if (txc.DB.HasChanges()) { + txc.Reschedule(); + return EExecutionStatus::Restart; + } + return EExecutionStatus::Continue; } if (Pipeline.AddLockDependencies(op, guardLocks)) { diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index b57b346ecf2..5c73693fce7 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -50,7 +50,9 @@ public: THolder<IDataShardChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, groupProvider, txc.DB, request.GetTableId())}; auto presentRows = TDynBitMap().Set(0, request.KeyColumnsSize()); - if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, changeCollector.Get())) { + if (!Execute(txc, request, presentRows, eraseTx->GetConfirmedRows(), writeVersion, op->GetGlobalTxId(), + &groupProvider, changeCollector.Get())) + { return EExecutionStatus::Restart; } @@ -75,7 +77,8 @@ public: Y_VERIFY(body.ParseFromArray(rs.Body.data(), rs.Body.size())); Y_VERIFY(presentRows.contains(rs.Origin)); - const bool ok = Execute(txc, request, presentRows.at(rs.Origin), DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion); + const bool ok = Execute(txc, request, presentRows.at(rs.Origin), + DeserializeBitMap<TDynBitMap>(body.GetConfirmedRows()), writeVersion, op->GetGlobalTxId()); Y_VERIFY(ok); } } @@ -98,6 +101,8 @@ public: bool Execute(TTransactionContext& txc, const NKikimrTxDataShard::TEvEraseRowsRequest& request, const TDynBitMap& presentRows, const TDynBitMap& confirmedRows, const TRowVersion& writeVersion, + ui64 globalTxId, + TDataShardChangeGroupProvider* groupProvider = nullptr, IDataShardChangeCollector* changeCollector = nullptr) { const ui64 tableId = request.GetTableId(); @@ -107,6 +112,10 @@ public: const TUserTable& tableInfo = *DataShard.GetUserTables().at(tableId); const bool breakWriteConflicts = DataShard.SysLocksTable().HasWriteLocks(fullTableId); + bool checkVolatileDependencies = bool(DataShard.GetVolatileTxManager().GetTxMap()); + + absl::flat_hash_set<ui64> volatileDependencies; + bool volatileOrdered = false; size_t row = 0; bool pageFault = false; @@ -128,15 +137,27 @@ public: key.emplace_back(TRawTypeValue(cell.AsRef(), kt)); } - if (changeCollector) { - if (!changeCollector->OnUpdate(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion)) { - pageFault = true; + if (breakWriteConflicts || checkVolatileDependencies) { + if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells(), volatileDependencies)) { + if (breakWriteConflicts) { + pageFault = true; + } else if (checkVolatileDependencies) { + checkVolatileDependencies = false; + volatileDependencies.clear(); + volatileOrdered = true; + } } } - if (breakWriteConflicts) { - if (!DataShard.BreakWriteConflicts(txc.DB, fullTableId, keyCells.GetCells())) { - pageFault = true; + if (changeCollector) { + if (!volatileDependencies.empty() || volatileOrdered) { + if (!changeCollector->OnUpdateTx(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId)) { + pageFault = true; + } + } else { + if (!changeCollector->OnUpdate(fullTableId, tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion)) { + pageFault = true; + } } } @@ -145,13 +166,31 @@ public: } DataShard.SysLocksTable().BreakLocks(fullTableId, keyCells.GetCells()); - txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion); + + if (!volatileDependencies.empty() || volatileOrdered) { + txc.DB.UpdateTx(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, globalTxId); + } else { + txc.DB.Update(tableInfo.LocalTid, NTable::ERowOp::Erase, key, {}, writeVersion); + } } if (pageFault && changeCollector) { changeCollector->OnRestart(); } + if (!volatileDependencies.empty() || volatileOrdered) { + DataShard.GetVolatileTxManager().PersistAddVolatileTx( + globalTxId, + writeVersion, + /* commitTxIds */ { globalTxId }, + volatileDependencies, + /* participants */ { }, + groupProvider ? groupProvider->GetCurrentChangeGroup() : std::nullopt, + volatileOrdered, + txc); + // Note: transaction is already committed, no additional waiting needed + } + return !pageFault; } diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp index 1b09efee406..3cd1c432576 100644 --- a/ydb/core/tx/datashard/operation.cpp +++ b/ydb/core/tx/datashard/operation.cpp @@ -33,7 +33,7 @@ void PrintDeps(const TOperation *op, void TBasicOpInfo::Serialize(NKikimrTxDataShard::TBasicOpInfo &info) const { - info.SetTxId(TxId); + info.SetTxId(GetTxId()); info.SetStep(Step); info.SetKind(ToString(Kind)); info.SetReceivedAt(ReceivedAt.GetValue()); diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 004d10a406d..1569832efbb 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -115,7 +115,7 @@ public: TBasicOpInfo() : Kind(EOperationKind::Unknown) , Flags(0) - , TxId(0) + , GlobalTxId(0) , Step(0) , MinStep(0) , MaxStep(0) @@ -123,6 +123,15 @@ public: { } + TBasicOpInfo(EOperationKind kind, + ui64 flags, + ui64 maxStep, + TInstant receivedAt, + ui64 tieBreakerIndex) + : TBasicOpInfo(0, kind, flags, maxStep, receivedAt, tieBreakerIndex) + { + } + TBasicOpInfo(ui64 txId, EOperationKind kind, ui64 flags, @@ -131,7 +140,7 @@ public: ui64 tieBreakerIndex) : Kind(kind) , Flags(flags) - , TxId(txId) + , GlobalTxId(txId) , Step(0) , ReceivedAt(receivedAt) , MinStep(0) @@ -342,10 +351,15 @@ public: bool HasWaitCompletionFlag() const { return HasFlag(TTxFlags::WaitCompletion); } void SetWaitCompletionFlag(bool val = true) { SetFlag(TTxFlags::WaitCompletion, val); } + bool HasWaitingForGlobalTxIdFlag() const { return HasFlag(TTxFlags::WaitingForGlobalTxId); } + void SetWaitingForGlobalTxIdFlag(bool val = true) { SetFlag(TTxFlags::WaitingForGlobalTxId, val); } + /////////////////////////////////// // OPERATION ID AND PLAN // /////////////////////////////////// - ui64 GetTxId() const { return TxId; } + ui64 GetTxId() const { return GlobalTxId ? GlobalTxId : TieBreakerIndex; } + ui64 GetGlobalTxId() const { return GlobalTxId; } + void SetGlobalTxId(ui64 txId) { GlobalTxId = txId; } ui64 GetStep() const { return Step; } void SetStep(ui64 step) { Step = step; } @@ -391,7 +405,7 @@ protected: EOperationKind Kind; // See TTxFlags. ui64 Flags; - ui64 TxId; + ui64 GlobalTxId; ui64 Step; TInstant ReceivedAt; diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index aebb0516cd1..5587c77b1e2 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -19,16 +19,19 @@ namespace NKikimr::NDataShard { Y_VERIFY(Self->VolatileTxManager.PendingCommitTxScheduled); Self->VolatileTxManager.PendingCommitTxScheduled = false; - Y_VERIFY(!Self->VolatileTxManager.PendingCommits.empty()); - TxId = Self->VolatileTxManager.PendingCommits.front(); - Self->VolatileTxManager.PendingCommits.pop_front(); + // We may have changed our mind + if (Self->VolatileTxManager.PendingCommits.Empty()) { + TxId = 0; + return true; + } + + auto* info = Self->VolatileTxManager.PendingCommits.PopFront(); + Y_VERIFY(info && info->State == EVolatileTxState::Committed); + TxId = info->TxId; // Schedule another transaction if needed Self->VolatileTxManager.RunPendingCommitTx(); - auto* info = Self->VolatileTxManager.FindByTxId(TxId); - Y_VERIFY(info && info->State == EVolatileTxState::Committed); - for (auto& pr : Self->GetUserTables()) { auto tid = pr.second->LocalTid; for (ui64 commitTxId : info->CommitTxIds) { @@ -77,9 +80,14 @@ namespace NKikimr::NDataShard { } void Complete(const TActorContext& ctx) override { + if (TxId == 0) { + return; + } + if (Delayed) { OnCommitted(ctx); } + if (Collected) { Self->EnqueueChangeRecords(std::move(Collected)); } @@ -119,16 +127,19 @@ namespace NKikimr::NDataShard { Y_VERIFY(Self->VolatileTxManager.PendingAbortTxScheduled); Self->VolatileTxManager.PendingAbortTxScheduled = false; - Y_VERIFY(!Self->VolatileTxManager.PendingAborts.empty()); - TxId = Self->VolatileTxManager.PendingAborts.front(); - Self->VolatileTxManager.PendingAborts.pop_front(); + // We may have changed our mind + if (Self->VolatileTxManager.PendingAborts.Empty()) { + TxId = 0; + return true; + } + + auto* info = Self->VolatileTxManager.PendingAborts.PopFront(); + Y_VERIFY(info && info->State == EVolatileTxState::Aborting); + TxId = info->TxId; // Schedule another transaction if needed Self->VolatileTxManager.RunPendingAbortTx(); - auto* info = Self->VolatileTxManager.FindByTxId(TxId); - Y_VERIFY(info && info->State == EVolatileTxState::Aborting); - for (auto& pr : Self->GetUserTables()) { auto tid = pr.second->LocalTid; for (ui64 commitTxId : info->CommitTxIds) { @@ -143,6 +154,10 @@ namespace NKikimr::NDataShard { } void Complete(const TActorContext& ctx) override { + if (TxId == 0) { + return; + } + auto* info = Self->VolatileTxManager.FindByTxId(TxId); Y_VERIFY(info && info->State == EVolatileTxState::Aborting); Y_VERIFY(info->AddCommitted); @@ -237,11 +252,11 @@ namespace NKikimr::NDataShard { break; case EVolatileTxState::Committed: if (ReadyToDbCommit(pr.second.get())) { - PendingCommits.push_back(pr.first); + PendingCommits.PushBack(pr.second.get()); } break; case EVolatileTxState::Aborting: - PendingAborts.push_back(pr.first); + PendingAborts.PushBack(pr.second.get()); Y_FAIL("FIXME: unexpected persistent aborting state"); break; } @@ -417,7 +432,7 @@ namespace NKikimr::NDataShard { void TVolatileTxManager::PersistAddVolatileTx( ui64 txId, const TRowVersion& version, TConstArrayRef<ui64> commitTxIds, - TConstArrayRef<ui64> dependencies, + const absl::flat_hash_set<ui64>& dependencies, TConstArrayRef<ui64> participants, std::optional<ui64> changeGroup, bool commitOrdered, @@ -437,7 +452,7 @@ namespace NKikimr::NDataShard { info->TxId = txId; info->Version = version; info->CommitTxIds.insert(commitTxIds.begin(), commitTxIds.end()); - info->Dependencies.insert(dependencies.begin(), dependencies.end()); + info->Dependencies = dependencies; info->Participants.insert(participants.begin(), participants.end()); info->ChangeGroup = changeGroup; info->CommitOrder = NextCommitOrder++; @@ -504,6 +519,9 @@ namespace NKikimr::NDataShard { db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Update(); } + txc.DB.OnRollback([this, txId]() { + RollbackAddVolatileTx(txId); + }); txc.OnCommitted([this, txId]() { auto* info = FindByTxId(txId); Y_VERIFY_S(info, "Unexpected failure to find volatile txId# " << txId); @@ -513,15 +531,37 @@ namespace NKikimr::NDataShard { RunCommitCallbacks(info); } }); - txc.OnRollback([txId]() { - Y_VERIFY_S(false, "Unexpected rollback of volatile txId# " << txId); - }); if (ReadyToDbCommit(info)) { AddPendingCommit(info->TxId); } } + void TVolatileTxManager::RollbackAddVolatileTx(ui64 txId) { + auto* info = FindByTxId(txId); + Y_VERIFY_S(info, "Rollback cannot find volatile txId# " << txId); + + // Unlink dependencies + for (ui64 dependencyTxId : info->Dependencies) { + if (auto* dependency = FindByTxId(dependencyTxId)) { + dependency->Dependents.erase(txId); + } + } + + // Unlink commits + for (ui64 commitTxId : info->CommitTxIds) { + TxMap->Remove(commitTxId); + VolatileTxByCommitTxId.erase(commitTxId); + } + + VolatileTxByVersion.erase(info); + + // FIXME: do we need to handle WaitingSnapshotEvents somehow? + + // This will also unlink from linked lists + VolatileTxs.erase(txId); + } + void TVolatileTxManager::PersistRemoveVolatileTx(ui64 txId, TTransactionContext& txc) { using Schema = TDataShard::Schema; @@ -543,7 +583,8 @@ namespace NKikimr::NDataShard { Y_VERIFY_S(info->Dependencies.empty(), "Unexpected remove of volatile tx " << txId << " with dependencies"); Y_VERIFY_S(info->Dependents.empty(), "Unexpected remove of volatile tx " << txId << " with dependents"); - Y_VERIFY_S(info->Empty(), "Unexpected remove of volatile tx " << txId << " which is in commit order linked list"); + Y_VERIFY_S(!info->IsInList<TVolatileTxInfoCommitOrderListTag>(), + "Unexpected remove of volatile tx " << txId << " which is in commit order linked list"); UnblockWaitingRemovalOperations(info); @@ -838,34 +879,39 @@ namespace NKikimr::NDataShard { } void TVolatileTxManager::AddPendingCommit(ui64 txId) { - PendingCommits.push_back(txId); - RunPendingCommitTx(); + if (auto* info = FindByTxId(txId)) { + PendingCommits.PushBack(info); + RunPendingCommitTx(); + } } void TVolatileTxManager::AddPendingAbort(ui64 txId) { - PendingAborts.push_back(txId); - RunPendingAbortTx(); + if (auto* info = FindByTxId(txId)) { + PendingAborts.PushBack(info); + RunPendingAbortTx(); + } } void TVolatileTxManager::RunPendingCommitTx() { - if (!PendingCommitTxScheduled && !PendingCommits.empty()) { + if (!PendingCommitTxScheduled && !PendingCommits.Empty()) { PendingCommitTxScheduled = true; Self->Execute(new TDataShard::TTxVolatileTxCommit(Self)); } } void TVolatileTxManager::RunPendingAbortTx() { - if (!PendingAbortTxScheduled && !PendingAborts.empty()) { + if (!PendingAbortTxScheduled && !PendingAborts.Empty()) { PendingAbortTxScheduled = true; Self->EnqueueExecute(new TDataShard::TTxVolatileTxAbort(Self)); } } void TVolatileTxManager::RemoveFromCommitOrder(TVolatileTxInfo* info) { - Y_VERIFY(!info->Empty(), "Volatile transaction is not in a commit order linked list"); + Y_VERIFY(info->IsInList<TVolatileTxInfoCommitOrderListTag>(), + "Volatile transaction is not in a commit order linked list"); Y_VERIFY(!VolatileTxByCommitOrder.Empty(), "Commit order linked list is unexpectedly empty"); const bool wasFirst = VolatileTxByCommitOrder.Front() == info; - info->Unlink(); + info->UnlinkFromList<TVolatileTxInfoCommitOrderListTag>(); if (wasFirst && !VolatileTxByCommitOrder.Empty()) { auto* next = VolatileTxByCommitOrder.Front(); if (next->CommitOrdered && ReadyToDbCommit(next)) { diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 627c0c24236..b9991a07ee9 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -41,8 +41,14 @@ namespace NKikimr::NDataShard { virtual void OnAbort(ui64 txId) = 0; }; + struct TVolatileTxInfoCommitOrderListTag {}; + struct TVolatileTxInfoPendingCommitListTag {}; + struct TVolatileTxInfoPendingAbortListTag {}; + struct TVolatileTxInfo - : public TIntrusiveListItem<TVolatileTxInfo> + : public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoCommitOrderListTag> + , public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoPendingCommitListTag> + , public TIntrusiveListItem<TVolatileTxInfo, TVolatileTxInfoPendingAbortListTag> { ui64 CommitOrder; ui64 TxId; @@ -58,6 +64,18 @@ namespace NKikimr::NDataShard { absl::flat_hash_set<ui64> BlockedOperations; absl::flat_hash_set<ui64> WaitingRemovalOperations; TStackVec<IVolatileTxCallback::TPtr, 2> Callbacks; + + template<class TTag> + bool IsInList() const { + using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>; + return !static_cast<const TItem*>(this)->Empty(); + } + + template<class TTag> + void UnlinkFromList() { + using TItem = TIntrusiveListItem<TVolatileTxInfo, TTag>; + static_cast<TItem*>(this)->Unlink(); + } }; class TVolatileTxManager { @@ -179,7 +197,7 @@ namespace NKikimr::NDataShard { void PersistAddVolatileTx( ui64 txId, const TRowVersion& version, TConstArrayRef<ui64> commitTxIds, - TConstArrayRef<ui64> dependencies, + const absl::flat_hash_set<ui64>& dependencies, TConstArrayRef<ui64> participants, std::optional<ui64> changeGroup, bool commitOrdered, @@ -212,6 +230,7 @@ namespace NKikimr::NDataShard { } private: + void RollbackAddVolatileTx(ui64 txId); void PersistRemoveVolatileTx(ui64 txId, TTransactionContext& txc); void RemoveVolatileTx(ui64 txId); @@ -236,11 +255,11 @@ namespace NKikimr::NDataShard { absl::flat_hash_map<ui64, std::unique_ptr<TVolatileTxInfo>> VolatileTxs; // TxId -> Info absl::flat_hash_map<ui64, TVolatileTxInfo*> VolatileTxByCommitTxId; // CommitTxId -> Info TVolatileTxByVersion VolatileTxByVersion; - TIntrusiveList<TVolatileTxInfo> VolatileTxByCommitOrder; + TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoCommitOrderListTag> VolatileTxByCommitOrder; std::vector<TWaitingSnapshotEvent> WaitingSnapshotEvents; TIntrusivePtr<TTxMap> TxMap; - std::deque<ui64> PendingCommits; - std::deque<ui64> PendingAborts; + TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoPendingCommitListTag> PendingCommits; + TIntrusiveList<TVolatileTxInfo, TVolatileTxInfoPendingAbortListTag> PendingAborts; ui64 NextCommitOrder = 1; bool PendingCommitTxScheduled = false; bool PendingAbortTxScheduled = false; |