diff options
authorsnaury <snaury@ydb.tech>2023-08-15 23:54:21 +0300
committersnaury <snaury@ydb.tech>2023-08-16 00:23:54 +0300
commitd1a4fffc6c0a74a57deba7c00d8a67abe182d9dd (patch)
parent9d6d0a0973985de0970d53639143a6d1f9048912 (diff)
Fix stuck volatile transactions on lost volatile plans KIKIMR-18580
10 files changed, 359 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 7f105b0bf8..0f4dca03a2 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -494,6 +494,57 @@ void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEven
+void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
+ if (!op->HasOutputData()) {
+ // There are no replies
+ return;
+ }
+ auto& delayedAcks = op->DelayedAcks();
+ for (auto& x : delayedAcks) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "Cleanup TxId# " << op->GetTxId() << " at " << TabletID() << " Ack RS " << x->ToString());
+ cleanupReplies.emplace_back(x.Release());
+ }
+ delayedAcks.clear();
+ auto& expectedReadSets = op->ExpectedReadSets();
+ for (auto& x : expectedReadSets) {
+ for (const auto& recipient : x.second) {
+ cleanupReplies.push_back(GenerateReadSetNoData(recipient, op->GetStep(), op->GetTxId(), x.first.first, x.first.second));
+ }
+ }
+ expectedReadSets.clear();
+void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) {
+ if (replies.empty()) {
+ return;
+ }
+ struct TState : public TThrRefBase {
+ std::vector<std::unique_ptr<IEventHandle>> Replies;
+ TState(std::vector<std::unique_ptr<IEventHandle>>&& replies)
+ : Replies(std::move(replies))
+ {}
+ };
+ Executor()->ConfirmReadOnlyLease(ts,
+ [state = MakeIntrusive<TState>(std::move(replies))] {
+ for (auto& ev : state->Replies) {
+ TActivationContext::Send(std::move(ev));
+ }
+ });
+void TDataShard::SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies) {
+ for (auto& ev : replies) {
+ TActivationContext::Send(std::move(ev));
+ }
class TDataShard::TWaitVolatileDependencies final : public IVolatileTxCallback {
@@ -3110,18 +3161,6 @@ bool TDataShard::CheckChangesQueueOverflow() const {
return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;
-void TDataShard::Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx) {
- ui64 txId = ev->Get()->Record.GetTxId();
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvDataShard::TEvCancelTransactionProposal " << TabletID()
- << " txId " << txId);
- // Mark any queued proposals as cancelled
- ProposeQueue.Cancel(txId);
- // Cancel transactions that have already been proposed
- Execute(new TTxCancelTransactionProposal(this, txId), ctx);
void TDataShard::DoPeriodicTasks(const TActorContext &ctx) {
@@ -3251,33 +3290,36 @@ void TDataShard::SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui6
PipeClientCache->Send(ctx, target, ev.Release());
-void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
+std::unique_ptr<IEventHandle> TDataShard::GenerateReadSetNoData(const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
- Y_UNUSED(ctx);
- auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
- ev->Record.SetFlags(
+ auto msg = std::make_unique<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
+ msg->Record.SetFlags(
NKikimrTx::TEvReadSet::FLAG_NO_DATA |
if (source != TabletID()) {
- FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList());
+ FillSplitTrajectory(source, *msg->Record.MutableBalanceTrackList());
+ return std::make_unique<IEventHandle>(recipient, SelfId(), msg.release());
+void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
+ Y_UNUSED(ctx);
+ auto ev = GenerateReadSetNoData(recipient, step, txId, source, target);
struct TSendState : public TThrRefBase {
- TDataShard* Self;
- TActorId Recipient;
- THolder<TEvTxProcessing::TEvReadSet> Event;
- TSendState(TDataShard* self, const TActorId& recipient, THolder<TEvTxProcessing::TEvReadSet>&& event)
- : Self(self)
- , Recipient(recipient)
- , Event(std::move(event))
+ std::unique_ptr<IEventHandle> Event;
+ TSendState(std::unique_ptr<IEventHandle>&& event)
+ : Event(std::move(event))
{ }
// FIXME: we can probably avoid lease confirmation here
- [state = MakeIntrusive<TSendState>(this, recipient, std::move(ev))] {
- state->Self->Send(state->Recipient, state->Event.Release());
+ [state = MakeIntrusive<TSendState>(std::move(ev))] {
+ TActivationContext::Send(std::move(state->Event));
diff --git a/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp b/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp
index ff950f6a59..12b7525b91 100644
--- a/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp
+++ b/ydb/core/tx/datashard/datashard__cancel_tx_proposal.cpp
@@ -3,6 +3,18 @@
namespace NKikimr {
namespace NDataShard {
+class TDataShard::TTxCancelTransactionProposal : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+ TTxCancelTransactionProposal(TDataShard *self, ui64 txId);
+ bool Execute(TTransactionContext &txc, const TActorContext &ctx) override;
+ void Complete(const TActorContext &ctx) override;
+ TTxType GetTxType() const override { return TXTYPE_CANCEL_TX_PROPOSAL; }
+ const ui64 TxId;
+ std::vector<std::unique_ptr<IEventHandle>> Replies;
+ TMonotonic ReplyTs;
TDataShard::TTxCancelTransactionProposal::TTxCancelTransactionProposal(TDataShard *self,
ui64 txId)
: TBase(self)
@@ -32,14 +44,41 @@ bool TDataShard::TTxCancelTransactionProposal::Execute(TTransactionContext &txc,
<< " txId " << TxId);
NIceDb::TNiceDb db(txc.DB);
- return Self->Pipeline.CancelPropose(db, ctx, TxId);
+ if (!Self->Pipeline.CancelPropose(db, ctx, TxId, Replies)) {
+ // Page fault, try again
+ return false;
+ }
+ if (!Replies.empty() && !txc.DB.HasChanges()) {
+ // We want to send confirmed replies when cleaning up volatile transactions
+ ReplyTs = Self->ConfirmReadOnlyLease();
+ }
+ return true;
void TDataShard::TTxCancelTransactionProposal::Complete(const TActorContext &ctx)
+ if (ReplyTs) {
+ Self->SendConfirmedReplies(ReplyTs, std::move(Replies));
+ } else {
+ Self->SendCommittedReplies(std::move(Replies));
+ }
+void TDataShard::Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx) {
+ ui64 txId = ev->Get()->Record.GetTxId();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Got TEvDataShard::TEvCancelTransactionProposal " << TabletID()
+ << " txId " << txId);
+ // Mark any queued proposals as cancelled
+ ProposeQueue.Cancel(txId);
+ // Cancel transactions that have already been proposed
+ Execute(new TTxCancelTransactionProposal(this, txId), ctx);
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
index 7bb3681418..520e3d0868 100644
--- a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
+++ b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
@@ -21,7 +21,7 @@ public:
NIceDb::TNiceDb db(txc.DB);
- auto cleanupStatus = Self->Pipeline.Cleanup(db, ctx);
+ auto cleanupStatus = Self->Pipeline.Cleanup(db, ctx, Replies);
switch (cleanupStatus) {
case ECleanupStatus::None:
@@ -29,6 +29,10 @@ public:
return false;
case ECleanupStatus::Success:
+ if (!Replies.empty() && !txc.DB.HasChanges()) {
+ // We want to send confirmed replies when cleaning up volatile transactions
+ ReplyTs = Self->ConfirmReadOnlyLease();
+ }
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
"Cleaned up old txs at " << Self->TabletID()
<< " TxInFly " << Self->TxInFly());
@@ -71,9 +75,18 @@ public:
void Complete(const TActorContext& ctx) override {
+ if (ReplyTs) {
+ Self->SendConfirmedReplies(ReplyTs, std::move(Replies));
+ } else {
+ Self->SendCommittedReplies(std::move(Replies));
+ }
+ std::vector<std::unique_ptr<IEventHandle>> Replies;
+ TMonotonic ReplyTs;
void TDataShard::ExecuteCleanupTx(const TActorContext& ctx) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index e20a188bdd..533d59bc55 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1394,12 +1394,16 @@ public:
bool AddExpectation(ui64 target, ui64 step, ui64 txId);
bool RemoveExpectation(ui64 target, ui64 txId);
void SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target);
+ std::unique_ptr<IEventHandle> GenerateReadSetNoData(const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target);
void SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target);
bool ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev);
void SendReadSets(const TActorContext& ctx,
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
void SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const;
+ void GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies);
+ void SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies);
+ void SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies);
void WaitVolatileDependenciesThenSend(
const absl::flat_hash_set<ui64>& dependencies,
@@ -1495,6 +1499,7 @@ public:
bool CanDrop() const {
Y_VERIFY(State != TShardState::Offline, "Unexpexted repeated drop");
+ // FIXME: why are we waiting for OutReadSets.Empty()?
return (TxInFly() == 1) && OutReadSets.Empty() && (State != TShardState::PreOffline);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 8a6e822d01..60f89cedd2 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -133,7 +133,9 @@ TDuration TPipeline::CleanupTimeout() const {
return TDuration::Zero();
-ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
+ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx,
+ std::vector<std::unique_ptr<IEventHandle>>& replies)
bool foundExpired = false;
TOperation::TPtr op;
ui64 step = 0;
@@ -174,7 +176,7 @@ ECleanupStatus TPipeline::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx)
// cleaunup outdated
ui64 outdatedStep = Self->GetOutdatedCleanupStep();
- auto status = CleanupOutdated(db, ctx, outdatedStep);
+ auto status = CleanupOutdated(db, ctx, outdatedStep, replies);
switch (status) {
case ECleanupStatus::None:
if (!op || !CanRunOp(*op)) {
@@ -1076,20 +1078,32 @@ void TPipeline::ProposeSchemeTx(const TSchemaOperation &op,
Self->TransQueue.ProposeSchemaTx(db, op);
-bool TPipeline::CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId) {
- ForgetTx(txId);
- bool cancelled = Self->TransQueue.CancelPropose(db, txId);
- if (cancelled) {
- Self->CheckDelayedProposeQueue(ctx);
+bool TPipeline::CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId,
+ std::vector<std::unique_ptr<IEventHandle>>& replies)
+ auto op = Self->TransQueue.FindTxInFly(txId);
+ if (!op || op->GetStep()) {
+ // Operation either doesn't exist, or already planned and cannot be cancelled
+ return true;
+ }
+ if (!Self->TransQueue.CancelPropose(db, txId, replies)) {
+ // Page fault, try again
+ return false;
+ ForgetTx(txId);
+ Self->CheckDelayedProposeQueue(ctx);
- return cancelled;
+ return true;
-ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep) {
+ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep,
+ std::vector<std::unique_ptr<IEventHandle>>& replies)
const ui32 OUTDATED_BATCH_SIZE = 100;
TVector<ui64> outdatedTxs;
- auto status = Self->TransQueue.CleanupOutdated(db, outdatedStep, OUTDATED_BATCH_SIZE, outdatedTxs);
+ auto status = Self->TransQueue.CleanupOutdated(db, outdatedStep, OUTDATED_BATCH_SIZE, outdatedTxs, replies);
switch (status) {
case ECleanupStatus::None:
case ECleanupStatus::Restart:
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 46211553bc..23be12a5f3 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -130,7 +130,8 @@ public:
bool PlanTxs(ui64 step, TVector<ui64> &txIds, TTransactionContext &txc, const TActorContext &ctx);
void PreserveSchema(NIceDb::TNiceDb& db, ui64 step);
TDuration CleanupTimeout() const;
- ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx);
+ ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx,
+ std::vector<std::unique_ptr<IEventHandle>>& replies);
// times
@@ -191,8 +192,10 @@ public:
void PersistTxFlags(TOperation::TPtr op, TTransactionContext &txc);
void UpdateSchemeTxBody(ui64 txId, const TStringBuf &txBody, TTransactionContext &txc);
void ProposeSchemeTx(const TSchemaOperation &op, TTransactionContext &txc);
- bool CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId);
- ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep);
+ bool CancelPropose(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 txId,
+ std::vector<std::unique_ptr<IEventHandle>>& replies);
+ ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep,
+ std::vector<std::unique_ptr<IEventHandle>>& replies);
ui64 PlannedTxInFly() const;
const TSet<TStepOrder> &GetPlan() const;
bool HasProposeDelayers() const;
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp
index c19243f55a..314a026e55 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.cpp
+++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp
@@ -21,9 +21,12 @@ void TTransQueue::AddTxInFly(TOperation::TPtr op) {
Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size());
-void TTransQueue::RemoveTxInFly(ui64 txId) {
+void TTransQueue::RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHandle>> *cleanupReplies) {
auto it = TxsInFly.find(txId);
if (it != TxsInFly.end()) {
+ if (cleanupReplies) {
+ Self->GetCleanupReplies(it->second, *cleanupReplies);
+ }
if (!it->second->GetStep()) {
@@ -414,7 +417,7 @@ bool TTransQueue::ClearTxDetails(NIceDb::TNiceDb& db, ui64 txId) {
return true;
-bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) {
+bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies) {
using Schema = TDataShard::Schema;
auto it = TxsInFly.find(txId);
@@ -433,7 +436,7 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) {
DeadlineQueue.erase(std::make_pair(maxStep, txId));
- RemoveTxInFly(txId);
+ RemoveTxInFly(txId, &replies);
return true;
@@ -442,7 +445,9 @@ bool TTransQueue::CancelPropose(NIceDb::TNiceDb& db, ui64 txId) {
// The argument outdatedStep specifies the maximum step for which we received
// all planned transactions.
// NOTE: DeadlineQueue no longer contains planned transactions.
-ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, TVector<ui64>& outdatedTxs) {
+ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize,
+ TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies)
using Schema = TDataShard::Schema;
@@ -482,7 +487,7 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt
for (ui64 txId : outdatedTxs) {
- RemoveTxInFly(txId);
+ RemoveTxInFly(txId, &replies);
Self->IncCounter(COUNTER_TX_PROGRESS_OUTDATED, outdatedTxs.size());
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.h b/ydb/core/tx/datashard/datashard_trans_queue.h
index 1d392676da..a76014b030 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.h
+++ b/ydb/core/tx/datashard/datashard_trans_queue.h
@@ -45,7 +45,7 @@ public:
const THashMap<ui64, TOperation::TPtr> &GetTxsInFly() const { return TxsInFly; }
ui64 TxInFly() const { return TxsInFly.size(); }
void AddTxInFly(TOperation::TPtr op);
- void RemoveTxInFly(ui64 txId);
+ void RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHandle>> *cleanupReplies = nullptr);
TOperation::TPtr FindTxInFly(ui64 txId) const
auto it = TxsInFly.find(txId);
@@ -85,8 +85,9 @@ private: // for pipeline only
void UpdateTxFlags(NIceDb::TNiceDb& db, ui64 txId, ui64 flags);
void UpdateTxBody(NIceDb::TNiceDb& db, ui64 txId, const TStringBuf& txBody);
void ProposeSchemaTx(NIceDb::TNiceDb& db, const TSchemaOperation& op);
- bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId);
- ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, TVector<ui64>& outdatedTxs);
+ bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies);
+ ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize,
+ TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies);
// Plan
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index 7b31059f89..a38b4bd0cd 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -140,16 +140,6 @@ private:
const ui64 Seqno;
-class TDataShard::TTxCancelTransactionProposal : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
- TTxCancelTransactionProposal(TDataShard *self, ui64 txId);
- bool Execute(TTransactionContext &txc, const TActorContext &ctx) override;
- void Complete(const TActorContext &ctx) override;
- TTxType GetTxType() const override { return TXTYPE_CANCEL_TX_PROPOSAL; }
- const ui64 TxId;
inline bool MaybeRequestMoreTxMemory(ui64 usage, NTabletFlatExecutor::TTransactionContext &txc) {
if (usage > txc.GetMemoryLimit()) {
ui64 request = Max(usage - txc.GetMemoryLimit(), txc.GetMemoryLimit() * MEMORY_REQUEST_FACTOR);
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 36d60abbea..4bd7ee2a07 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1807,6 +1807,193 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
+ Y_UNIT_TEST(DistributedWriteLostPlanThenDrop) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableDataShardVolatileTransactions(true);
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ InitRoot(server, sender);
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
+ bool removeTransactions = true;
+ size_t removedTransactions = 0;
+ size_t receivedReadSets = 0;
+ auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
+ auto step = msg->Record.GetStep();
+ auto tabletId = msg->Record.GetTabletID();
+ auto recipient = ev->GetRecipientRewrite();
+ Cerr << "... observed step " << step << " at tablet " << tabletId << Endl;
+ if (removeTransactions && tabletId == shards1.at(0)) {
+ THashMap<TActorId, TVector<ui64>> acks;
+ for (auto& tx : msg->Record.GetTransactions()) {
+ // Acknowledge transaction to coordinator
+ auto ackTo = ActorIdFromProto(tx.GetAckTo());
+ acks[ackTo].push_back(tx.GetTxId());
+ ++removedTransactions;
+ }
+ // Acknowledge transactions to coordinator and remove them
+ // It would be as if shard missed them for some reason
+ for (auto& pr : acks) {
+ auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end());
+ runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true);
+ }
+ auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step);
+ runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true);
+ msg->Record.ClearTransactions();
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ auto tabletId = msg->Record.GetTabletDest();
+ Cerr << "... observed readset at " << tabletId << Endl;
+ if (tabletId == shards1.at(0)) {
+ ++receivedReadSets;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(observer);
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", /* commitTx */ true), "/Root");
+ WaitFor(runtime, [&]{ return removedTransactions > 0 && receivedReadSets >= 2; }, "readset exchange start");
+ UNIT_ASSERT_VALUES_EQUAL(removedTransactions, 1u);
+ UNIT_ASSERT_VALUES_EQUAL(receivedReadSets, 2u);
+ removeTransactions = false;
+ auto dropStartTs = runtime.GetCurrentTime();
+ Cerr << "... dropping table" << Endl;
+ ui64 txId = AsyncDropTable(server, sender, "/Root", "table-2");
+ Cerr << "... drop table txId# " << txId << " started" << Endl;
+ WaitTxNotification(server, sender, txId);
+ auto dropLatency = runtime.GetCurrentTime() - dropStartTs;
+ Cerr << "... drop finished in " << dropLatency << Endl;
+ // TODO: we need to use neighbor readset hints to cancel earlier
+ // UNIT_ASSERT(dropLatency < TDuration::Seconds(5));
+ }
+ Y_UNIT_TEST(DistributedWriteLostPlanThenSplit) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000)
+ .SetEnableDataShardVolatileTransactions(true);
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+ InitRoot(server, sender);
+ SetSplitMergePartCountLimit(server->GetRuntime(), -1);
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+ const auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ UNIT_ASSERT_VALUES_EQUAL(shards1.size(), 1u);
+ bool removeTransactions = true;
+ size_t removedTransactions = 0;
+ size_t receivedReadSets = 0;
+ auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
+ auto step = msg->Record.GetStep();
+ auto tabletId = msg->Record.GetTabletID();
+ auto recipient = ev->GetRecipientRewrite();
+ Cerr << "... observed step " << step << " at tablet " << tabletId << Endl;
+ if (removeTransactions && tabletId == shards1.at(0)) {
+ THashMap<TActorId, TVector<ui64>> acks;
+ for (auto& tx : msg->Record.GetTransactions()) {
+ // Acknowledge transaction to coordinator
+ auto ackTo = ActorIdFromProto(tx.GetAckTo());
+ acks[ackTo].push_back(tx.GetTxId());
+ ++removedTransactions;
+ }
+ // Acknowledge transactions to coordinator and remove them
+ // It would be as if shard missed them for some reason
+ for (auto& pr : acks) {
+ auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end());
+ runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true);
+ }
+ auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step);
+ runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true);
+ msg->Record.ClearTransactions();
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ auto tabletId = msg->Record.GetTabletDest();
+ Cerr << "... observed readset at " << tabletId << Endl;
+ if (tabletId == shards1.at(0)) {
+ ++receivedReadSets;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(observer);
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", /* commitTx */ true), "/Root");
+ WaitFor(runtime, [&]{ return removedTransactions > 0 && receivedReadSets >= 2; }, "readset exchange start");
+ UNIT_ASSERT_VALUES_EQUAL(removedTransactions, 1u);
+ UNIT_ASSERT_VALUES_EQUAL(receivedReadSets, 2u);
+ removeTransactions = false;
+ auto splitStartTs = runtime.GetCurrentTime();
+ Cerr << "... splitting table" << Endl;
+ ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1.at(0), 2);
+ Cerr << "... split txId# " << txId << " started" << Endl;
+ WaitTxNotification(server, sender, txId);
+ auto splitLatency = runtime.GetCurrentTime() - splitStartTs;
+ Cerr << "... split finished in " << splitLatency << Endl;
+ // TODO: we need to use neighbor readset hints to cancel earlier
+ // UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
+ }
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr