diff options
author | snaury <[email protected]> | 2023-01-13 15:01:20 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2023-01-13 15:01:20 +0300 |
commit | 0e7b1b31fe5c1f508c8df26cb7e4e9fb1c13baab (patch) | |
tree | 99918677e53617095661ed01b66147f7ba890a03 | |
parent | be6f1b53be888b4e18c1840d289c914b5bbd0f66 (diff) |
Implement readset expectations for volatile transactions
23 files changed, 641 insertions, 75 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index d73ebbb707e..d88c2c9b9cc 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1515,7 +1515,7 @@ bool TExecutor::CanExecuteTransaction() const { return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction; } -void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) { +void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) { Y_VERIFY(ActivationQueue, "attempt to execute transaction before activation"); TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self); @@ -1560,7 +1560,7 @@ void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) { return; } - if (ActiveTransaction || ActivateTransactionWaiting) { + if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) { LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID); ActivationQueue->Push(seat.Release()); ActivateTransactionWaiting++; @@ -1571,6 +1571,14 @@ void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) { ExecuteTransaction(seat, ctx); } +void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) { + DoExecute(self, true, ctx); +} + +void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) { + DoExecute(self, false, ctx); +} + void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) { Y_VERIFY_DEBUG(!ActiveTransaction); diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index ab6116c05b0..a2426e6c4bc 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -615,7 +615,9 @@ public: void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override; void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override; void DetachTablet(const TActorContext &ctx) override; + void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx); void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override; + void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override; TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false); TLeaseCommit* EnsureReadOnlyLease(TMonotonic at); diff --git a/ydb/core/tablet_flat/tablet_flat_executed.cpp b/ydb/core/tablet_flat/tablet_flat_executed.cpp index 3d46f6ffda3..2e09ec6a882 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.cpp +++ b/ydb/core/tablet_flat/tablet_flat_executed.cpp @@ -39,6 +39,11 @@ void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction) { static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext)); } +void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) { + if (transaction) + static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext)); +} + const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept { return static_cast<TExecutor*>(Executor())->Scheme(); } diff --git a/ydb/core/tablet_flat/tablet_flat_executed.h b/ydb/core/tablet_flat/tablet_flat_executed.h index 117d7532bb3..c88281b25b6 100644 --- a/ydb/core/tablet_flat/tablet_flat_executed.h +++ b/ydb/core/tablet_flat/tablet_flat_executed.h @@ -25,6 +25,7 @@ protected: void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx); void Execute(TAutoPtr<ITransaction> transaction); + void EnqueueExecute(TAutoPtr<ITransaction> transaction); const NTable::TScheme& Scheme() const noexcept; diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index 35d584e38f4..eb452378f97 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -523,6 +523,7 @@ namespace NFlatExecutorSetup { virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0; virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0; + virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at) = 0; virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0; diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp index c086fa70119..60bb9764597 100644 --- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp @@ -117,6 +117,28 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op, DataShard.NotifySchemeshard(ctx, op->GetTxId()); DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords())); + + if (op->HasOutputData()) { + const auto& outReadSets = op->OutReadSets(); + const auto& expectedReadSets = op->ExpectedReadSets(); + auto itOut = outReadSets.begin(); + auto itExpected = expectedReadSets.begin(); + while (itExpected != expectedReadSets.end()) { + while (itOut != outReadSets.end() && itOut->first < itExpected->first) { + ++itOut; + } + if (itOut != outReadSets.end() && itOut->first == itExpected->first) { + ++itOut; + ++itExpected; + continue; + } + // We have an expected readset without a corresponding out readset + for (const auto& recipient : itExpected->second) { + DataShard.SendReadSetNoData(ctx, recipient, op->GetStep(), op->GetTxId(), itExpected->first.first, itExpected->first.second); + } + ++itExpected; + } + } } THolder<TExecutionUnit> CreateCompleteOperationUnit(TDataShard &dataShard, diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index e9ed4443848..19b32a42908 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -399,7 +399,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) { Execute(new TTxCleanupRemovedSnapshots(this), ctx); if (State != TShardState::Offline) { - VolatileTxManager.Start(); + VolatileTxManager.Start(ctx); } SignalTabletActive(ctx); @@ -514,7 +514,10 @@ public: } void OnAbort() override { - Y_FAIL("TODO"); + Result->Record.ClearTxResult(); + Result->Record.SetStatus(NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED); + Result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Distributed transaction aborted due to commit failure"); + OnCommit(); } private: @@ -2797,6 +2800,11 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActo void TDataShard::RestartPipeRS(ui64 tabletId, const TActorContext& ctx) { for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) { + if (seqno == Max<ui64>()) { + OutReadSets.ResendExpectations(tabletId, ctx); + continue; + } + LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to tablet %" PRIu64 " caused resend of readset %" PRIu64 " at tablet %" PRIu64, tabletId, seqno, TabletID()); @@ -2805,7 +2813,14 @@ void TDataShard::RestartPipeRS(ui64 tabletId, const TActorContext& ctx) { } void TDataShard::AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx) { + bool detachExpectations = false; for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) { + if (seqno == Max<ui64>()) { + AbortExpectationsFromDeletedTablet(tabletId, OutReadSets.RemoveExpectations(tabletId)); + detachExpectations = true; + continue; + } + LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to dead tablet %" PRIu64 " caused ack of readset %" PRIu64 " at tablet %" PRIu64, tabletId, seqno, TabletID()); @@ -2818,9 +2833,23 @@ void TDataShard::AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx) { PlanQueue.Progress(ctx); } } + + if (detachExpectations) { + ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), tabletId, 0, ctx); + } + CheckStateChange(ctx); } +void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64, ui64>&& expectations) { + for (auto& pr : expectations) { + auto* info = VolatileTxManager.FindByTxId(pr.first); + if (info && info->State == EVolatileTxState::Waiting && info->Participants.contains(tabletId)) { + VolatileTxManager.AbortWaitingTransaction(info); + } + } +} + void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) { Y_UNUSED(ev); Y_UNUSED(ctx); LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Server connected at tablet %s %" PRIu64 , @@ -3007,41 +3036,133 @@ TDataShard::PrepareReadSet(ui64 step, ui64 txId, ui64 source, ui64 target, return ev; } -void TDataShard::SendReadSet(const TActorContext& ctx, ui64 step, - ui64 txId, ui64 source, ui64 target, - const TString& body, ui64 seqno) +THolder<TEvTxProcessing::TEvReadSet> +TDataShard::PrepareReadSetExpectation(ui64 step, ui64 txId, ui64 source, ui64 target) { + // We want to notify the target that we expect a readset, there's no data and no ack needed so no seqno + auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID()); + ev->Record.SetFlags( + NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET | + NKikimrTx::TEvReadSet::FLAG_NO_DATA | + NKikimrTx::TEvReadSet::FLAG_NO_ACK); + if (source != TabletID()) + FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList()); + return ev; +} + +void TDataShard::SendReadSet( + const TActorContext& ctx, + THolder<TEvTxProcessing::TEvReadSet>&& rs) +{ + ui64 txId = rs->Record.GetTxId(); + ui64 source = rs->Record.GetTabletSource(); + ui64 target = rs->Record.GetTabletDest(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Send RS at " << TabletID() << " from " << source << " to " << target << " txId " << txId); + IncCounter(COUNTER_READSET_SENT_COUNT); + IncCounter(COUNTER_READSET_SENT_SIZE, rs->Record.GetReadSet().size()); + + PipeClientCache->Send(ctx, target, rs.Release()); +} + +void TDataShard::SendReadSet(const TActorContext& ctx, ui64 step, + ui64 txId, ui64 source, ui64 target, + const TString& body, ui64 seqno) +{ auto ev = PrepareReadSet(step, txId, source, target, body, seqno); + SendReadSet(ctx, std::move(ev)); +} - IncCounter(COUNTER_READSET_SENT_COUNT); - IncCounter(COUNTER_READSET_SENT_SIZE, body.size()); +bool TDataShard::AddExpectation(ui64 target, ui64 step, ui64 txId) { + bool hadExpectations = OutReadSets.HasExpectations(target); + bool added = OutReadSets.AddExpectation(target, step, txId); + if (!hadExpectations) { + ResendReadSetPipeTracker.AttachTablet(Max<ui64>(), target); + } + return added; +} +bool TDataShard::RemoveExpectation(ui64 target, ui64 txId) { + bool removed = OutReadSets.RemoveExpectation(target, txId); + if (removed && !OutReadSets.HasExpectations(target)) { + auto ctx = TActivationContext::ActorContextFor(SelfId()); + ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), target, 0, ctx); + } + return removed; +} + +void TDataShard::SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId, + ui64 source, ui64 target) +{ + auto ev = PrepareReadSetExpectation(step, txId, source, target); PipeClientCache->Send(ctx, target, ev.Release()); } +void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target) +{ + Y_UNUSED(ctx); + auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID()); + ev->Record.SetFlags( + NKikimrTx::TEvReadSet::FLAG_NO_DATA | + NKikimrTx::TEvReadSet::FLAG_NO_ACK); + if (source != TabletID()) { + FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList()); + } + + struct TSendState : public TThrRefBase { + TDataShard* Self; + TActorId Recipient; + THolder<TEvTxProcessing::TEvReadSet> Event; + + TSendState(TDataShard* self, const TActorId& recipient, THolder<TEvTxProcessing::TEvReadSet>&& event) + : Self(self) + , Recipient(recipient) + , Event(std::move(event)) + { } + }; + + // FIXME: we can probably avoid lease confirmation here + Executor()->ConfirmReadOnlyLease( + [state = MakeIntrusive<TSendState>(this, recipient, std::move(ev))] { + state->Self->Send(state->Recipient, state->Event.Release()); + }); +} + +bool TDataShard::ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev) { + const auto& record = ev->Get()->Record; + + // Check if we already have a pending readset from dest to source + TReadSetKey rsKey(record.GetTxId(), TabletID(), record.GetTabletDest(), record.GetTabletSource()); + if (OutReadSets.Has(rsKey)) { + return true; + } + + if (IsStateActive()) { + // When we have a pending op, remember that readset from dest to source is expected + if (auto op = Pipeline.FindOp(record.GetTxId())) { + auto key = std::make_pair(record.GetTabletDest(), record.GetTabletSource()); + op->ExpectedReadSets()[key].push_back(ev->Sender); + return true; + } + } + + // In all other cases we want to reply with no data + return false; +} + void TDataShard::SendReadSets(const TActorContext& ctx, TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets) { TPendingPipeTrackerCommands pendingPipeTrackerCommands; for (auto &rs : readsets) { - ui64 txId = rs->Record.GetTxId(); - ui64 source = rs->Record.GetTabletSource(); ui64 target = rs->Record.GetTabletDest(); ui64 seqno = rs->Record.GetSeqno(); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Send RS at " << TabletID() << " from " << source - << " to " << target << " txId " << txId); - - IncCounter(COUNTER_READSET_SENT_COUNT); - IncCounter(COUNTER_READSET_SENT_SIZE, rs->Record.GetReadSet().size()); - pendingPipeTrackerCommands.AttachTablet(seqno, target); - PipeClientCache->Send(ctx, target, rs.Release()); + SendReadSet(ctx, std::move(rs)); } pendingPipeTrackerCommands.Apply(ResendReadSetPipeTracker, ctx); @@ -3387,16 +3508,18 @@ void TDataShard::Handle(TEvDataShard::TEvGetRSInfoRequest::TPtr &ev, } for (auto &pr : Pipeline.GetDelayedAcks()) { - auto *ev = dynamic_cast<TEvTxProcessing::TEvReadSetAck*>(pr.second->GetBase()); - if (ev) { - auto &rec = ev->Record; - auto &ack = *response->Record.AddDelayedRSAcks(); - ack.SetTxId(rec.GetTxId()); - ack.SetStep(rec.GetStep()); - ack.SetOrigin(rec.GetTabletConsumer()); - ack.SetSource(rec.GetTabletSource()); - ack.SetDestination(rec.GetTabletDest()); - ack.SetSeqNo(rec.GetSeqno()); + for (auto &ack : pr.second) { + auto *ev = dynamic_cast<TEvTxProcessing::TEvReadSetAck*>(ack->GetBase()); + if (ev) { + auto &rec = ev->Record; + auto &ack = *response->Record.AddDelayedRSAcks(); + ack.SetTxId(rec.GetTxId()); + ack.SetStep(rec.GetStep()); + ack.SetOrigin(rec.GetTabletConsumer()); + ack.SetSource(rec.GetTabletSource()); + ack.SetDestination(rec.GetTabletDest()); + ack.SetSeqNo(rec.GetSeqno()); + } } } diff --git a/ydb/core/tx/datashard/datashard__readset.cpp b/ydb/core/tx/datashard/datashard__readset.cpp index 24d992e9c7e..4d8a63ecb32 100644 --- a/ydb/core/tx/datashard/datashard__readset.cpp +++ b/ydb/core/tx/datashard/datashard__readset.cpp @@ -1,8 +1,6 @@ #include "datashard_txs.h" -namespace NKikimr { - -namespace NDataShard { +namespace NKikimr::NDataShard { TDataShard::TTxReadSet::TTxReadSet(TDataShard *self, TEvTxProcessing::TEvReadSet::TPtr ev) : TBase(self) @@ -16,11 +14,14 @@ namespace NDataShard { DoExecute(txc, ctx); - if (Ack) { + if (Ack || NoDataReply) { // Only leader is allowed to ack readsets, start confirming as early // as possible, so we handle both read-only and read-write acks. AckTs = AppData(ctx)->MonotonicTimeProvider->Now(); Self->Executor()->ConfirmReadOnlyLease(AckTs); + } else { + // We won't need the event in Complete + Ev.Reset(); } return true; @@ -33,25 +34,36 @@ namespace NDataShard { && state != TShardState::Readonly, "State %" PRIu32 " event %s", state, Ev->Get()->ToString().data()); - if (!(Ev->Get()->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) { + const auto& msg = *Ev->Get(); + const auto& record = msg.Record; + + if (!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) { Ack = MakeAck(ctx); } - // TODO: handle FLAG_EXPECT_READSET for missing transactions and inactive states + if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) { + if (!Self->ProcessReadSetExpectation(Ev)) { + NoDataReply = MakeNoDataReply(ctx); + } + + // Note: expect + no data is pure notification, avoid further processing + if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) { + return; + } + } if (!Self->IsStateActive()) { /// @warning Ack and allow sender to forget readset. /// It's possible till readsets can't passwthough splits-merges or other shard mutations. LOG_WARN(ctx, NKikimrServices::TX_DATASHARD, "Allow sender to lose readset, state %" PRIu32 " at %" PRIu64 " %s", - state, Self->TabletID(), Ev->Get()->ToString().data()); + state, Self->TabletID(), msg.ToString().data()); return; } - bool saved = Self->Pipeline.SaveInReadSet(*Ev->Get(), Ack, txc, ctx); + bool saved = Self->Pipeline.SaveInReadSet(msg, Ack, txc, ctx); if (!saved) { // delayed. Do not ack Y_VERIFY(!Ack); - Ev.Reset(); } } @@ -60,33 +72,56 @@ namespace NDataShard { new TEvTxProcessing::TEvReadSetAck(*Ev->Get(), Self->TabletID()))); } + THolder<IEventHandle> TDataShard::TTxReadSet::MakeNoDataReply(const TActorContext& ctx) { + const auto& record = Ev->Get()->Record; + auto event = MakeHolder<TEvTxProcessing::TEvReadSet>( + record.GetStep(), + record.GetTxId(), + record.GetTabletDest(), + record.GetTabletSource(), + Self->TabletID()); + event->Record.SetFlags(NKikimrTx::TEvReadSet::FLAG_NO_DATA | NKikimrTx::TEvReadSet::FLAG_NO_ACK); + return THolder(new IEventHandle(Ev->Sender, ctx.SelfID, event.Release())); + } + void TDataShard::TTxReadSet::Complete(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "TTxReadSet::Complete at " << Self->TabletID()); // If it was read set for non-active tx we should send ACK back after successful save in DB // Note that, active tx will send "delayed" ACK after tx complete - if (Ack) { + if (Ack || NoDataReply) { LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, - "Send RS Ack at %" PRIu64 " %s", + "Send RS %s at %" PRIu64 " %s", + Ack && NoDataReply ? "Ack+Reply" : Ack ? "Ack" : "Reply", Self->TabletID(), Ev->Get()->ToString().data()); struct TSendState : public TThrRefBase { TDataShard* Self; THolder<IEventHandle> Ack; + THolder<IEventHandle> NoDataReply; - TSendState(TDataShard* self, THolder<IEventHandle>&& ack) + TSendState(TDataShard* self, + THolder<IEventHandle>&& ack, + THolder<IEventHandle>&& noDataReply) : Self(self) , Ack(std::move(ack)) + , NoDataReply(std::move(noDataReply)) { } }; - Self->Executor()->ConfirmReadOnlyLease(AckTs, [state = MakeIntrusive<TSendState>(Self, std::move(Ack))] { - TActivationContext::Send(std::move(state->Ack)); - state->Self->IncCounter(COUNTER_ACK_SENT); - }); + Self->Executor()->ConfirmReadOnlyLease( + AckTs, + [state = MakeIntrusive<TSendState>(Self, std::move(Ack), std::move(NoDataReply))] { + if (state->Ack) { + state->Self->IncCounter(COUNTER_ACK_SENT); + TActivationContext::Send(std::move(state->Ack)); + } + if (state->NoDataReply) { + TActivationContext::Send(std::move(state->NoDataReply)); + } + }); } } -} -} +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index e91bf4ee289..d7e554f826c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1206,6 +1206,7 @@ class TDataShard void RestartPipeRS(ui64 tabletId, const TActorContext& ctx); void AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx); + void AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64, ui64>&& expectations); void DefaultSignalTabletActive(const TActorContext &ctx) override { // This overriden in order to pospone SignalTabletActive until TxInit completes @@ -1255,7 +1256,14 @@ public: const TActorContext& ctx); THolder<TEvTxProcessing::TEvReadSet> PrepareReadSet(ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); + THolder<TEvTxProcessing::TEvReadSet> PrepareReadSetExpectation(ui64 step, ui64 txId, ui64 source, ui64 target); + void SendReadSet(const TActorContext& ctx, THolder<TEvTxProcessing::TEvReadSet>&& rs); void SendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); + bool AddExpectation(ui64 target, ui64 step, ui64 txId); + bool RemoveExpectation(ui64 target, ui64 txId); + void SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target); + void SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target); + bool ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev); void SendReadSets(const TActorContext& ctx, TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets); void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index cb81c8f8710..1a36cd6d05a 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -819,6 +819,9 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo } if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) { + Y_VERIFY(!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET), + "Unexpected FLAG_EXPECT_READSET + FLAG_NO_DATA in delayed readsets"); + // No readset data: participant aborted the transaction LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed readset without data from" << srcTabletId << " to " << dstTabletId << " will abort txId# " << tx->GetTxId()); diff --git a/ydb/core/tx/datashard/datashard_outreadset.cpp b/ydb/core/tx/datashard/datashard_outreadset.cpp index 36ded0e8972..71c2fbfdcc5 100644 --- a/ydb/core/tx/datashard/datashard_outreadset.cpp +++ b/ydb/core/tx/datashard/datashard_outreadset.cpp @@ -179,4 +179,47 @@ bool TOutReadSets::ResendRS(NTabletFlatExecutor::TTransactionContext &txc, const return true; } +bool TOutReadSets::AddExpectation(ui64 target, ui64 step, ui64 txId) { + auto res = Expectations[target].emplace(txId, step); + return res.second; +} + +bool TOutReadSets::RemoveExpectation(ui64 target, ui64 txId) { + auto it = Expectations.find(target); + if (it != Expectations.end()) { + auto itTxId = it->second.find(txId); + if (itTxId != it->second.end()) { + it->second.erase(itTxId); + if (it->second.empty()) { + Expectations.erase(it); + } + return true; + } + } + return false; +} + +bool TOutReadSets::HasExpectations(ui64 target) { + return Expectations.contains(target); +} + +void TOutReadSets::ResendExpectations(ui64 target, const TActorContext& ctx) { + auto it = Expectations.find(target); + if (it != Expectations.end()) { + for (const auto& pr : it->second) { + Self->SendReadSetExpectation(ctx, pr.second, pr.first, Self->TabletID(), target); + } + } +} + +THashMap<ui64, ui64> TOutReadSets::RemoveExpectations(ui64 target) { + THashMap<ui64, ui64> result; + auto it = Expectations.find(target); + if (it != Expectations.end()) { + result = std::move(it->second); + Expectations.erase(it); + } + return result; +} + }} diff --git a/ydb/core/tx/datashard/datashard_outreadset.h b/ydb/core/tx/datashard/datashard_outreadset.h index 901fd4d827a..3cf4c8db01a 100644 --- a/ydb/core/tx/datashard/datashard_outreadset.h +++ b/ydb/core/tx/datashard/datashard_outreadset.h @@ -61,13 +61,19 @@ public: void ResendAll(const TActorContext& ctx); void Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx); - bool Empty() const { return CurrentReadSets.empty(); } + bool Empty() const { return CurrentReadSets.empty() && Expectations.empty(); } bool HasAcks() const { return ! ReadSetAcks.empty(); } bool Has(const TReadSetKey& rsKey) const { return CurrentReadSetInfos.contains(rsKey); } ui64 CountReadSets() const { return CurrentReadSets.size(); } ui64 CountAcks() const { return ReadSetAcks.size(); } + bool AddExpectation(ui64 target, ui64 step, ui64 txId); + bool RemoveExpectation(ui64 target, ui64 txId); + bool HasExpectations(ui64 target); + void ResendExpectations(ui64 target, const TActorContext& ctx); + THashMap<ui64, ui64> RemoveExpectations(ui64 target); + private: void UpdateMonCounter() const; @@ -77,6 +83,8 @@ private: THashMap<TReadSetKey, ui64> CurrentReadSetInfos; // Info -> SeqNo THashSet<ui64> AckedSeqno; TVector<TIntrusivePtr<TEvTxProcessing::TEvReadSetAck>> ReadSetAcks; + // Target -> TxId -> Step + THashMap<ui64, THashMap<ui64, ui64>> Expectations; }; }} diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 660bba43b11..753066d0e0c 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -638,7 +638,7 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs, "Unexpected readset in state %" PRIu32 " for %" PRIu64 ":%" PRIu64 " at %" PRIu64, Self->State, step, txId, Self->TabletID()); if (ack) { - DelayedAcks[TStepOrder(step, txId)] = std::move(ack); + DelayedAcks[TStepOrder(step, txId)].push_back(std::move(ack)); } return false; } @@ -934,7 +934,9 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc, "Will send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64, pr.first.Step, pr.first.TxId, Self->TabletID()); - op->AddDelayedAck(std::move(pr.second)); + for (auto& ack : pr.second) { + op->AddDelayedAck(std::move(ack)); + } DelayedAcks.erase(DelayedAcks.begin()); } diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index b2199949f7d..f4c235d8edc 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -298,7 +298,7 @@ public: } ui64 GetDataTxCacheSize() const { return DataTxCache.size(); } - const TMap<TStepOrder, THolder<IEventHandle>> &GetDelayedAcks() const + const TMap<TStepOrder, TStackVec<THolder<IEventHandle>, 1>> &GetDelayedAcks() const { return DelayedAcks; } @@ -453,7 +453,7 @@ private: TSortedOps::iterator ActivePlannedOpsLogicallyCompleteEnd; TSortedOps::iterator ActivePlannedOpsLogicallyIncompleteEnd; THashMap<ui64, TValidatedDataTx::TPtr> DataTxCache; - TMap<TStepOrder, THolder<IEventHandle>> DelayedAcks; + TMap<TStepOrder, TStackVec<THolder<IEventHandle>, 1>> DelayedAcks; TStepOrder LastPlannedTx; TStepOrder LastCompleteTx; TStepOrder UtmostCompleteTx; diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 1f159a0c0c1..43815707b6b 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -121,10 +121,12 @@ public: private: TEvTxProcessing::TEvReadSet::TPtr Ev; THolder<IEventHandle> Ack; + THolder<IEventHandle> NoDataReply; TMonotonic AckTs; void DoExecute(TTransactionContext &txc, const TActorContext &ctx); THolder<IEventHandle> MakeAck(const TActorContext &ctx); + THolder<IEventHandle> MakeNoDataReply(const TActorContext &ctx); }; class TDataShard::TTxProgressResendRS : public NTabletFlatExecutor::TTransactionBase<TDataShard> { diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 2fffedefadd..0e8c4a14413 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -694,4 +694,20 @@ private: const bool PrevValue; }; +template<class TCondition> +void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString& description = "condition", size_t maxAttempts = 1) { + for (size_t attempt = 0; attempt < maxAttempts; ++attempt) { + if (condition()) { + return; + } + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + } + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); +} + } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index 9761f4cbcc4..01827e50c52 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -128,6 +128,14 @@ namespace NKqpHelpers { return JoinSeq(", ", result.result_sets(0).rows()); } + inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) { + if (response.operation().status() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.operation().status(); + } + Ydb::Table::ExecuteQueryResult result; + response.operation().result().UnpackTo(&result); + return FormatResult(result); + } inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) { TString sessionId = CreateSessionRPC(runtime, database); diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index efec2da9e31..54987228d5e 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -36,7 +36,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); - Cerr << "!!! distributed write start" << Endl; + Cerr << "!!! distributed write begin" << Endl; ExecSQL(server, sender, R"( UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); @@ -74,6 +74,235 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 20 } items { uint32_value: 20 } }"); } + Y_UNIT_TEST(DistributedWriteBrokenLock) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + // Start transaction that reads from table-1 and table-2 + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); + + // Break lock using a blind write to table-1 + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);"); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3); + UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 30); + )"), + "ERROR: ABORTED"); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Verify transaction was not committed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 2 } items { uint32_value: 2 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); + } + + Y_UNIT_TEST(DistributedWriteShardRestartBeforePlan) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); + + TVector<THolder<IEventHandle>> capturedPlans; + auto capturePlans = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + Cerr << "... captured TEvPlanStep" << Endl; + capturedPlans.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(capturePlans); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + Cerr << "!!! distributed write begin" << Endl; + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedPlans.size() >= 2; }, "both captured plans"); + + runtime.SetObserverFunc(prevObserverFunc); + + // Restart the first table shard + auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + RebootTablet(runtime, shards1.at(0), sender); + + // Unblock captured plan messages + for (auto& ev : capturedPlans) { + runtime.Send(ev.Release(), 0, true); + } + capturedPlans.clear(); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "ERROR: UNDETERMINED"); + Cerr << "!!! distributed write end" << Endl; + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Verify transaction was not committed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); + } + + Y_UNIT_TEST(DistributedWriteShardRestartAfterExpectation) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);"); + ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);"); + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true); + runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG); + + auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + + TVector<THolder<IEventHandle>> capturedPlans; + TVector<THolder<IEventHandle>> capturedReadSets; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>(); + if (msg->Record.GetTabletID() == shard1) { + Cerr << "... captured TEvPlanStep for " << msg->Record.GetTabletID() << Endl; + capturedPlans.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + if (msg->Record.GetTabletDest() == shard1) { + Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl; + capturedReadSets.emplace_back(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(captureEvents); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + Cerr << "!!! distributed write begin" << Endl; + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", true /* commitTx */), "/Root"); + + WaitFor(runtime, [&]{ return capturedPlans.size() >= 1 && capturedReadSets.size() >= 2; }, "captured plan and readsets"); + + runtime.SetObserverFunc(prevObserverFunc); + + // Restart the first table shard + RebootTablet(runtime, shard1, sender); + + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(future))), + "ERROR: UNDETERMINED"); + Cerr << "!!! distributed write end" << Endl; + + runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false); + + // Verify transaction was not committed + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, R"( + SELECT key, value FROM `/Root/table-1` + UNION ALL + SELECT key, value FROM `/Root/table-2` + ORDER BY key + )"), + "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " + "{ items { uint32_value: 10 } items { uint32_value: 10 } }"); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 54365b8bef2..34fbed08884 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -294,8 +294,16 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio txc); } - if (op->HasVolatilePrepareFlag() && !op->OutReadSets().empty()) { - DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx); + if (op->HasVolatilePrepareFlag()) { + // Notify other shards about our expectations as soon as possible, even before we commit + for (ui64 target : op->AwaitingDecisions()) { + if (DataShard.AddExpectation(target, op->GetStep(), op->GetTxId())) { + DataShard.SendReadSetExpectation(ctx, op->GetStep(), op->GetTxId(), DataShard.TabletID(), target); + } + } + if (!op->OutReadSets().empty()) { + DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx); + } } // Note: may erase persistent locks, must be after we persist volatile tx diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index ca43b25cabe..1a575f02e9b 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -445,11 +445,13 @@ struct TOutputOpData { using TDelayedAcks = TVector<THolder<IEventHandle>>; using TOutReadSets = TMap<std::pair<ui64, ui64>, TString>; // source:target -> body using TChangeRecord = NMiniKQL::IChangeCollector::TChange; + using TExpectedReadSets = TMap<std::pair<ui64, ui64>, TStackVec<TActorId, 1>>; TResultPtr Result; // ACKs to send on successful operation completion. TDelayedAcks DelayedAcks; TOutReadSets OutReadSets; + TExpectedReadSets ExpectedReadSets; TVector<THolder<TEvTxProcessing::TEvReadSet>> PreparedOutReadSets; // Access log of checked locks TLocksCache LocksAccessLog; @@ -598,6 +600,7 @@ public: //////////////////////////////////////// // OUTPUT DATA // //////////////////////////////////////// + bool HasOutputData() { return bool(OutputData); } TOutputOpData::TResultPtr &Result() { return OutputDataRef().Result; } TOutputOpData::TDelayedAcks &DelayedAcks() { return OutputDataRef().DelayedAcks; } @@ -607,6 +610,7 @@ public: } TOutputOpData::TOutReadSets &OutReadSets() { return OutputDataRef().OutReadSets; } + TOutputOpData::TExpectedReadSets &ExpectedReadSets() { return OutputDataRef().ExpectedReadSets; } TVector<THolder<TEvTxProcessing::TEvReadSet>> &PreparedOutReadSets() { return OutputDataRef().PreparedOutReadSets; @@ -798,7 +802,6 @@ protected: OutputData = MakeHolder<TOutputOpData>(); return *OutputData; } - void ClearOutputData() { OutputData = nullptr; } TInputOpData &InputDataRef() { diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 5edc151fe7c..d73639cf76f 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -158,13 +158,18 @@ namespace NKikimr::NDataShard { return true; } - void TVolatileTxManager::Start() { + void TVolatileTxManager::Start(const TActorContext& ctx) { for (auto& pr : VolatileTxs) { if (!pr.second->Dependencies.empty()) { continue; } switch (pr.second->State) { case EVolatileTxState::Waiting: + for (ui64 target : pr.second->Participants) { + if (Self->AddExpectation(target, pr.second->Version.Step, pr.second->TxId)) { + Self->SendReadSetExpectation(ctx, pr.second->Version.Step, pr.second->TxId, Self->TabletID(), target); + } + } break; case EVolatileTxState::Committed: PendingCommits.push_back(pr.first); @@ -422,6 +427,7 @@ namespace NKikimr::NDataShard { for (ui64 shardId : info->Participants) { db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete(); + Self->RemoveExpectation(shardId, info->TxId); } db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Delete(); } @@ -470,6 +476,29 @@ namespace NKikimr::NDataShard { return true; } + void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) { + Y_VERIFY(info && info->State == EVolatileTxState::Waiting); + + ui64 txId = info->TxId; + + // Move tx to aborted, but don't persist yet, we need a separate transaction for that + info->State = EVolatileTxState::Aborted; + + // We don't need aborted transactions in tx map + RemoveFromTxMap(info); + + // Aborted transactions don't have dependencies + for (ui64 dependencyTxId : info->Dependencies) { + auto* dependency = FindByTxId(dependencyTxId); + Y_VERIFY(dependency); + dependency->Dependents.erase(txId); + } + info->Dependencies.clear(); + + // We will unblock operations when we persist the abort + AddPendingAbort(txId); + } + void TVolatileTxManager::ProcessReadSet( const TEvTxProcessing::TEvReadSet& rs, TTransactionContext& txc) @@ -503,6 +532,8 @@ namespace NKikimr::NDataShard { bool committed = [&]() { if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) { + Y_VERIFY(!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET), + "Unexpected FLAG_EXPECT_READSET + FLAG_NO_DATA in ProcessReadSet"); LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed readset without data from " << srcTabletId << " to " << dstTabletId << " at tablet " << Self->TabletID()); @@ -520,26 +551,19 @@ namespace NKikimr::NDataShard { return false; } + if (record.GetStep() != info->Version.Step) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Processed readset from " << srcTabletId << " to " << dstTabletId + << " with step " << record.GetStep() << " expecting " << info->Version.Step + << ", treating like abort due to divergence at tablet " << Self->TabletID()); + return false; + } + return true; }(); if (!committed) { - // Move tx to aborted, but don't persist yet, we need a separate transaction for that - info->State = EVolatileTxState::Aborted; - - // We don't need aborted transactions in tx map - RemoveFromTxMap(info); - - // Aborted transactions don't have dependencies - for (ui64 dependencyTxId : info->Dependencies) { - auto* dependency = FindByTxId(dependencyTxId); - Y_VERIFY(dependency); - dependency->Dependents.erase(txId); - } - info->Dependencies.clear(); - - // We will unblock operations when we persist the abort - AddPendingAbort(txId); + AbortWaitingTransaction(info); return; } @@ -623,7 +647,7 @@ namespace NKikimr::NDataShard { void TVolatileTxManager::RunPendingAbortTx() { if (!PendingAbortTxScheduled && !PendingAborts.empty()) { PendingAbortTxScheduled = true; - Self->Execute(new TDataShard::TTxVolatileTxAbort(Self)); + Self->EnqueueExecute(new TDataShard::TTxVolatileTxAbort(Self)); } } diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h index 9ca8aae9343..613843e51c7 100644 --- a/ydb/core/tx/datashard/volatile_tx.h +++ b/ydb/core/tx/datashard/volatile_tx.h @@ -121,7 +121,7 @@ namespace NKikimr::NDataShard { void Clear(); bool Load(NIceDb::TNiceDb& db); - void Start(); + void Start(const TActorContext& ctx); TVolatileTxInfo* FindByTxId(ui64 txId) const; TVolatileTxInfo* FindByCommitTxId(ui64 txId) const; @@ -136,10 +136,16 @@ namespace NKikimr::NDataShard { bool AttachVolatileTxCallback( ui64 txId, IVolatileTxCallback::TPtr callback); + void AbortWaitingTransaction(TVolatileTxInfo* info); + void ProcessReadSet( const TEvTxProcessing::TEvReadSet& rs, TTransactionContext& txc); + void ProcessReadSetMissing( + ui64 source, ui64 txId, + TTransactionContext& txc); + TTxMapAccess GetTxMap() const { return TTxMapAccess(TxMap); } diff --git a/ydb/core/tx/tx_processing.h b/ydb/core/tx/tx_processing.h index d1cc771fcbc..d992dd71474 100644 --- a/ydb/core/tx/tx_processing.h +++ b/ydb/core/tx/tx_processing.h @@ -109,6 +109,15 @@ struct TEvTxProcessing { TEvReadSet() {} + TEvReadSet(ui64 step, ui64 orderId, ui64 tabletSource, ui64 tabletDest, ui64 tabletProducer) + { + Record.SetStep(step); + Record.SetTxId(orderId); + Record.SetTabletSource(tabletSource); + Record.SetTabletDest(tabletDest); + Record.SetTabletProducer(tabletProducer); + } + TEvReadSet(ui64 step, ui64 orderId, ui64 tabletSource, ui64 tabletDest, ui64 tabletProducer, const TString &readSet, ui64 seqno = 0) { Record.SetStep(step); |