diff options
author | snaury <snaury@ydb.tech> | 2023-08-15 23:54:21 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-08-16 00:23:54 +0300 |
commit | d1a4fffc6c0a74a57deba7c00d8a67abe182d9dd (patch) | |
tree | ac1869398afc2fe15fbde1763a5f4a3e95eb89ab | |
parent | 9d6d0a0973985de0970d53639143a6d1f9048912 (diff) | |
download | ydb-d1a4fffc6c0a74a57deba7c00d8a67abe182d9dd.tar.gz |
Fix stuck volatile transactions on lost volatile plans KIKIMR-18580
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 96 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp | 41 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__cleanup_tx.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 34 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.h | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_txs.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 187 |
10 files changed, 359 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 7f105b0bf8..0f4dca03a2 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -494,6 +494,57 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven delayedAcks.clear(); } +void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) { + if (!op->HasOutputData()) { + // There are no replies + return; + } + + auto& delayedAcks = op->DelayedAcks(); + for (auto& x : delayedAcks) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Cleanup TxId# " << op->GetTxId() << " at " << TabletID() << " Ack RS " << x->ToString()); + cleanupReplies.emplace_back(x.Release()); + IncCounter(COUNTER_ACK_SENT_DELAYED); + } + delayedAcks.clear(); + + auto& expectedReadSets = op->ExpectedReadSets(); + for (auto& x : expectedReadSets) { + for (const auto& recipient : x.second) { + cleanupReplies.push_back(GenerateReadSetNoData(recipient, op->GetStep(), op->GetTxId(), x.first.first, x.first.second)); + } + } + expectedReadSets.clear(); +} + +void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) { + if (replies.empty()) { + return; + } + + struct TState : public TThrRefBase { + std::vector<std::unique_ptr<IEventHandle>> Replies; + + TState(std::vector<std::unique_ptr<IEventHandle>>&& replies) + : Replies(std::move(replies)) + {} + }; + + Executor()->ConfirmReadOnlyLease(ts, + [state = MakeIntrusive<TState>(std::move(replies))] { + for (auto& ev : state->Replies) { + TActivationContext::Send(std::move(ev)); + } + }); +} + +void TDataShard::SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies) { + for (auto& ev : replies) { + TActivationContext::Send(std::move(ev)); + } +} + class TDataShard::TWaitVolatileDependencies final : public IVolatileTxCallback { public: TWaitVolatileDependencies( @@ -3110,18 +3161,6 @@ bool TDataShard::CheckChangesQueueOverflow() const { return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit; } -void TDataShard::Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx) { - ui64 txId = ev->Get()->Record.GetTxId(); - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvDataShard::TEvCancelTransactionProposal " << TabletID() - << " txId " << txId); - - // Mark any queued proposals as cancelled - ProposeQueue.Cancel(txId); - - // Cancel transactions that have already been proposed - Execute(new TTxCancelTransactionProposal(this, txId), ctx); -} - void TDataShard::DoPeriodicTasks(const TActorContext &ctx) { UpdateLagCounters(ctx); UpdateTableStats(ctx); @@ -3251,33 +3290,36 @@ void TDataShard::SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui6 PipeClientCache->Send(ctx, target, ev.Release()); } -void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target) +std::unique_ptr<IEventHandle> TDataShard::GenerateReadSetNoData(const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target) { - Y_UNUSED(ctx); - auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID()); - ev->Record.SetFlags( + auto msg = std::make_unique<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID()); + msg->Record.SetFlags( NKikimrTx::TEvReadSet::FLAG_NO_DATA | NKikimrTx::TEvReadSet::FLAG_NO_ACK); if (source != TabletID()) { - FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList()); + FillSplitTrajectory(source, *msg->Record.MutableBalanceTrackList()); } + return std::make_unique<IEventHandle>(recipient, SelfId(), msg.release()); +} + +void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target) +{ + Y_UNUSED(ctx); + auto ev = GenerateReadSetNoData(recipient, step, txId, source, target); + struct TSendState : public TThrRefBase { - TDataShard* Self; - TActorId Recipient; - THolder<TEvTxProcessing::TEvReadSet> Event; - - TSendState(TDataShard* self, const TActorId& recipient, THolder<TEvTxProcessing::TEvReadSet>&& event) - : Self(self) - , Recipient(recipient) - , Event(std::move(event)) + std::unique_ptr<IEventHandle> Event; + + TSendState(std::unique_ptr<IEventHandle>&& event) + : Event(std::move(event)) { } }; // FIXME: we can probably avoid lease confirmation here Executor()->ConfirmReadOnlyLease( - [state = MakeIntrusive<TSendState>(this, recipient, std::move(ev))] { - state->Self->Send(state->Recipient, state->Event.Release()); + [state = MakeIntrusive<TSendState>(std::move(ev))] { + TActivationContext::Send(std::move(state->Event)); }); } diff --git a/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp b/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp index ff950f6a59..12b7525b91 100644 --- a/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp +++ b/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp @@ -3,6 +3,18 @@ namespace NKikimr { namespace NDataShard { +class TDataShard::TTxCancelTransactionProposal : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxCancelTransactionProposal(TDataShard *self, ui64 txId); + bool Execute(TTransactionContext &txc, const TActorContext &ctx) override; + void Complete(const TActorContext &ctx) override; + TTxType GetTxType() const override { return TXTYPE_CANCEL_TX_PROPOSAL; } +private: + const ui64 TxId; + std::vector<std::unique_ptr<IEventHandle>> Replies; + TMonotonic ReplyTs; +}; + TDataShard::TTxCancelTransactionProposal::TTxCancelTransactionProposal(TDataShard *self, ui64 txId) : TBase(self) @@ -32,14 +44,41 @@ bool TDataShard::TTxCancelTransactionProposal::Execute(TTransactionContext &txc, << " txId " << TxId); NIceDb::TNiceDb db(txc.DB); - return Self->Pipeline.CancelPropose(db, ctx, TxId); + if (!Self->Pipeline.CancelPropose(db, ctx, TxId, Replies)) { + // Page fault, try again + return false; + } + + if (!Replies.empty() && !txc.DB.HasChanges()) { + // We want to send confirmed replies when cleaning up volatile transactions + ReplyTs = Self->ConfirmReadOnlyLease(); + } + + return true; } void TDataShard::TTxCancelTransactionProposal::Complete(const TActorContext &ctx) { + if (ReplyTs) { + Self->SendConfirmedReplies(ReplyTs, std::move(Replies)); + } else { + Self->SendCommittedReplies(std::move(Replies)); + } Self->CheckSplitCanStart(ctx); Self->CheckMvccStateChangeCanStart(ctx); } +void TDataShard::Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx) { + ui64 txId = ev->Get()->Record.GetTxId(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvDataShard::TEvCancelTransactionProposal " << TabletID() + << " txId " << txId); + + // Mark any queued proposals as cancelled + ProposeQueue.Cancel(txId); + + // Cancel transactions that have already been proposed + Execute(new TTxCancelTransactionProposal(this, txId), ctx); +} + } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp index 7bb3681418..520e3d0868 100644 --- a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp +++ b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp @@ -21,7 +21,7 @@ public: NIceDb::TNiceDb db(txc.DB); - auto cleanupStatus = Self->Pipeline.Cleanup(db, ctx); + auto cleanupStatus = Self->Pipeline.Cleanup(db, ctx, Replies); switch (cleanupStatus) { case ECleanupStatus::None: break; @@ -29,6 +29,10 @@ public: Self->IncCounter(COUNTER_TX_WAIT_DATA); return false; case ECleanupStatus::Success: + if (!Replies.empty() && !txc.DB.HasChanges()) { + // We want to send confirmed replies when cleaning up volatile transactions + ReplyTs = Self->ConfirmReadOnlyLease(); + } LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Cleaned up old txs at " << Self->TabletID() << " TxInFly " << Self->TxInFly()); @@ -71,9 +75,18 @@ public: } void Complete(const TActorContext& ctx) override { + if (ReplyTs) { + Self->SendConfirmedReplies(ReplyTs, std::move(Replies)); + } else { + Self->SendCommittedReplies(std::move(Replies)); + } Self->CheckSplitCanStart(ctx); Self->CheckMvccStateChangeCanStart(ctx); } + +private: + std::vector<std::unique_ptr<IEventHandle>> Replies; + TMonotonic ReplyTs; }; void TDataShard::ExecuteCleanupTx(const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index e20a188bdd..533d59bc55 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1394,12 +1394,16 @@ public: bool AddExpectation(ui64 target, ui64 step, ui64 txId); bool RemoveExpectation(ui64 target, ui64 txId); void SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target); + std::unique_ptr<IEventHandle> GenerateReadSetNoData(const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target); void SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target); bool ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev); void SendReadSets(const TActorContext& ctx, TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets); void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno); void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const; + void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies); + void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies); + void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies); void WaitVolatileDependenciesThenSend( const absl::flat_hash_set<ui64>& dependencies, @@ -1495,6 +1499,7 @@ public: bool CanDrop() const { Y_VERIFY(State != TShardState::Offline, "Unexpexted repeated drop"); + // FIXME: why are we waiting for OutReadSets.Empty()? return (TxInFly() == 1) && OutReadSets.Empty() && (State != TShardState::PreOffline); } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 8a6e822d01..60f89cedd2 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -133,7 +133,9 @@ TDuration TPipeline::CleanupTimeout() const { return TDuration::Zero(); } -ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) { +ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx, + std::vector<std::unique_ptr<IEventHandle>>& replies) +{ bool foundExpired = false; TOperation::TPtr op; ui64 step = 0; @@ -174,7 +176,7 @@ ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) // cleaunup outdated ui64 outdatedStep = Self->GetOutdatedCleanupStep(); - auto status = CleanupOutdated(db, ctx, outdatedStep); + auto status = CleanupOutdated(db, ctx, outdatedStep, replies); switch (status) { case ECleanupStatus::None: if (!op || !CanRunOp(*op)) { @@ -1076,20 +1078,32 @@ void TPipeline::ProposeSchemeTx(const TSchemaOperation &op, Self->TransQueue.ProposeSchemaTx(db, op); } -bool TPipeline::CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId) { - ForgetTx(txId); - bool cancelled = Self->TransQueue.CancelPropose(db, txId); - if (cancelled) { - Self->CheckDelayedProposeQueue(ctx); +bool TPipeline::CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId, + std::vector<std::unique_ptr<IEventHandle>>& replies) +{ + auto op = Self->TransQueue.FindTxInFly(txId); + if (!op || op->GetStep()) { + // Operation either doesn't exist, or already planned and cannot be cancelled + return true; + } + + if (!Self->TransQueue.CancelPropose(db, txId, replies)) { + // Page fault, try again + return false; } + + ForgetTx(txId); + Self->CheckDelayedProposeQueue(ctx); MaybeActivateWaitingSchemeOps(ctx); - return cancelled; + return true; } -ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep) { +ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep, + std::vector<std::unique_ptr<IEventHandle>>& replies) +{ const ui32 OUTDATED_BATCH_SIZE = 100; TVector<ui64> outdatedTxs; - auto status = Self->TransQueue.CleanupOutdated(db, outdatedStep, OUTDATED_BATCH_SIZE, outdatedTxs); + auto status = Self->TransQueue.CleanupOutdated(db, outdatedStep, OUTDATED_BATCH_SIZE, outdatedTxs, replies); switch (status) { case ECleanupStatus::None: case ECleanupStatus::Restart: diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 46211553bc..23be12a5f3 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -130,7 +130,8 @@ public: bool PlanTxs(ui64 step, TVector<ui64> &txIds, TTransactionContext &txc, const TActorContext &ctx); void PreserveSchema(NIceDb::TNiceDb& db, ui64 step); TDuration CleanupTimeout() const; - ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx); + ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx, + std::vector<std::unique_ptr<IEventHandle>>& replies); // times @@ -191,8 +192,10 @@ public: void PersistTxFlags(TOperation::TPtr op, TTransactionContext &txc); void UpdateSchemeTxBody(ui64 txId, const TStringBuf &txBody, TTransactionContext &txc); void ProposeSchemeTx(const TSchemaOperation &op, TTransactionContext &txc); - bool CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId); - ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep); + bool CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId, + std::vector<std::unique_ptr<IEventHandle>>& replies); + ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep, + std::vector<std::unique_ptr<IEventHandle>>& replies); ui64 PlannedTxInFly() const; const TSet<TStepOrder> &GetPlan() const; bool HasProposeDelayers() const; diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp index c19243f55a..314a026e55 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.cpp +++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp @@ -21,9 +21,12 @@ void TTransQueue::AddTxInFly(TOperation::TPtr op) { Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size()); } -void TTransQueue::RemoveTxInFly(ui64 txId) { +void TTransQueue::RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHandle>> *cleanupReplies) { auto it = TxsInFly.find(txId); if (it != TxsInFly.end()) { + if (cleanupReplies) { + Self->GetCleanupReplies(it->second, *cleanupReplies); + } if (!it->second->GetStep()) { --PlanWaitingTxCount; } @@ -414,7 +417,7 @@ bool TTransQueue::ClearTxDetails(NIceDb::TNiceDb& db, ui64 txId) { return true; } -bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) { +bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies) { using Schema = TDataShard::Schema; auto it = TxsInFly.find(txId); @@ -433,7 +436,7 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) { } DeadlineQueue.erase(std::make_pair(maxStep, txId)); - RemoveTxInFly(txId); + RemoveTxInFly(txId, &replies); Self->IncCounter(COUNTER_PREPARE_CANCELLED); return true; } @@ -442,7 +445,9 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) { // The argument outdatedStep specifies the maximum step for which we received // all planned transactions. // NOTE: DeadlineQueue no longer contains planned transactions. -ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, TVector<ui64>& outdatedTxs) { +ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, + TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies) +{ using Schema = TDataShard::Schema; outdatedTxs.reserve(batchSize); @@ -482,7 +487,7 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt DeadlineQueue.erase(pr); } for (ui64 txId : outdatedTxs) { - RemoveTxInFly(txId); + RemoveTxInFly(txId, &replies); } Self->IncCounter(COUNTER_TX_PROGRESS_OUTDATED, outdatedTxs.size()); diff --git a/ydb/core/tx/datashard/datashard_trans_queue.h b/ydb/core/tx/datashard/datashard_trans_queue.h index 1d392676da..a76014b030 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.h +++ b/ydb/core/tx/datashard/datashard_trans_queue.h @@ -45,7 +45,7 @@ public: const THashMap<ui64, TOperation::TPtr> &GetTxsInFly() const { return TxsInFly; } ui64 TxInFly() const { return TxsInFly.size(); } void AddTxInFly(TOperation::TPtr op); - void RemoveTxInFly(ui64 txId); + void RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHandle>> *cleanupReplies = nullptr); TOperation::TPtr FindTxInFly(ui64 txId) const { auto it = TxsInFly.find(txId); @@ -85,8 +85,9 @@ private: // for pipeline only void UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags); void UpdateTxBody(NIceDb::TNiceDb& db, ui64 txId, const TStringBuf& txBody); void ProposeSchemaTx(NIceDb::TNiceDb& db, const TSchemaOperation& op); - bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId); - ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, TVector<ui64>& outdatedTxs); + bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies); + ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, + TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies); // Plan diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 7b31059f89..a38b4bd0cd 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -140,16 +140,6 @@ private: const ui64 Seqno; }; -class TDataShard::TTxCancelTransactionProposal : public NTabletFlatExecutor::TTransactionBase<TDataShard> { -public: - TTxCancelTransactionProposal(TDataShard *self, ui64 txId); - bool Execute(TTransactionContext &txc, const TActorContext &ctx) override; - void Complete(const TActorContext &ctx) override; - TTxType GetTxType() const override { return TXTYPE_CANCEL_TX_PROPOSAL; } -private: - const ui64 TxId; -}; - inline bool MaybeRequestMoreTxMemory(ui64 usage, NTabletFlatExecutor::TTransactionContext &txc) { if (usage > txc.GetMemoryLimit()) { ui64 request = Max(usage - txc.GetMemoryLimit(), txc.GetMemoryLimit() * MEMORY_REQUEST_FACTOR); diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 36d60abbea..4bd7ee2a07 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1807,6 +1807,193 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { "{ items { uint32_value: 20 } items { uint32_value: 20 } }"); } + Y_UNIT_TEST(DistributedWriteLostPlanThenDrop) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); + + bool removeTransactions = true; + size_t removedTransactions = 0; + size_t receivedReadSets = 0; + auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>(); + auto step = msg->Record.GetStep(); + auto tabletId = msg->Record.GetTabletID(); + auto recipient = ev->GetRecipientRewrite(); + Cerr << "... observed step " << step << " at tablet " << tabletId << Endl; + if (removeTransactions && tabletId == shards1.at(0)) { + THashMap<TActorId, TVector<ui64>> acks; + for (auto& tx : msg->Record.GetTransactions()) { + // Acknowledge transaction to coordinator + auto ackTo = ActorIdFromProto(tx.GetAckTo()); + acks[ackTo].push_back(tx.GetTxId()); + ++removedTransactions; + } + // Acknowledge transactions to coordinator and remove them + // It would be as if shard missed them for some reason + for (auto& pr : acks) { + auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end()); + runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true); + } + auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step); + runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true); + msg->Record.ClearTransactions(); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + auto tabletId = msg->Record.GetTabletDest(); + Cerr << "... observed readset at " << tabletId << Endl; + if (tabletId == shards1.at(0)) { + ++receivedReadSets; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(observer); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", /* commitTx */ true), "/Root"); + + WaitFor(runtime, [&]{ return removedTransactions > 0 && receivedReadSets >= 2; }, "readset exchange start"); + UNIT_ASSERT_VALUES_EQUAL(removedTransactions, 1u); + UNIT_ASSERT_VALUES_EQUAL(receivedReadSets, 2u); + + removeTransactions = false; + + auto dropStartTs = runtime.GetCurrentTime(); + Cerr << "... dropping table" << Endl; + ui64 txId = AsyncDropTable(server, sender, "/Root", "table-2"); + Cerr << "... drop table txId# " << txId << " started" << Endl; + WaitTxNotification(server, sender, txId); + auto dropLatency = runtime.GetCurrentTime() - dropStartTs; + Cerr << "... drop finished in " << dropLatency << Endl; + // TODO: we need to use neighbor readset hints to cancel earlier + // UNIT_ASSERT(dropLatency < TDuration::Seconds(5)); + } + + Y_UNIT_TEST(DistributedWriteLostPlanThenSplit) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetDomainPlanResolution(1000) + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + SetSplitMergePartCountLimit(server->GetRuntime(), -1); + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + const auto shards1 = GetTableShards(server, sender, "/Root/table-1"); + UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u); + + bool removeTransactions = true; + size_t removedTransactions = 0; + size_t receivedReadSets = 0; + auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + case TEvTxProcessing::TEvPlanStep::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>(); + auto step = msg->Record.GetStep(); + auto tabletId = msg->Record.GetTabletID(); + auto recipient = ev->GetRecipientRewrite(); + Cerr << "... observed step " << step << " at tablet " << tabletId << Endl; + if (removeTransactions && tabletId == shards1.at(0)) { + THashMap<TActorId, TVector<ui64>> acks; + for (auto& tx : msg->Record.GetTransactions()) { + // Acknowledge transaction to coordinator + auto ackTo = ActorIdFromProto(tx.GetAckTo()); + acks[ackTo].push_back(tx.GetTxId()); + ++removedTransactions; + } + // Acknowledge transactions to coordinator and remove them + // It would be as if shard missed them for some reason + for (auto& pr : acks) { + auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end()); + runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true); + } + auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step); + runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true); + msg->Record.ClearTransactions(); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } + case TEvTxProcessing::TEvReadSet::EventType: { + auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>(); + auto tabletId = msg->Record.GetTabletDest(); + Cerr << "... observed readset at " << tabletId << Endl; + if (tabletId == shards1.at(0)) { + ++receivedReadSets; + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(observer); + + TString sessionId = CreateSessionRPC(runtime, "/Root"); + + auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2); + UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20); + )", sessionId, "", /* commitTx */ true), "/Root"); + + WaitFor(runtime, [&]{ return removedTransactions > 0 && receivedReadSets >= 2; }, "readset exchange start"); + UNIT_ASSERT_VALUES_EQUAL(removedTransactions, 1u); + UNIT_ASSERT_VALUES_EQUAL(receivedReadSets, 2u); + + removeTransactions = false; + + auto splitStartTs = runtime.GetCurrentTime(); + Cerr << "... splitting table" << Endl; + ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1.at(0), 2); + Cerr << "... split txId# " << txId << " started" << Endl; + WaitTxNotification(server, sender, txId); + auto splitLatency = runtime.GetCurrentTime() - splitStartTs; + Cerr << "... split finished in " << splitLatency << Endl; + // TODO: we need to use neighbor readset hints to cancel earlier + // UNIT_ASSERT(splitLatency < TDuration::Seconds(5)); + } + } // Y_UNIT_TEST_SUITE(DataShardVolatile) } // namespace NKikimr |