summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2023-01-13 15:01:20 +0300
committersnaury <[email protected]>2023-01-13 15:01:20 +0300
commit0e7b1b31fe5c1f508c8df26cb7e4e9fb1c13baab (patch)
tree99918677e53617095661ed01b66147f7ba890a03
parentbe6f1b53be888b4e18c1840d289c914b5bbd0f66 (diff)
Implement readset expectations for volatile transactions
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp12
-rw-r--r--ydb/core/tablet_flat/flat_executor.h2
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executed.cpp5
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executed.h1
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h1
-rw-r--r--ydb/core/tx/datashard/complete_data_tx_unit.cpp22
-rw-r--r--ydb/core/tx/datashard/datashard.cpp177
-rw-r--r--ydb/core/tx/datashard/datashard__readset.cpp71
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h8
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_outreadset.cpp43
-rw-r--r--ydb/core/tx/datashard/datashard_outreadset.h10
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h4
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h16
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp231
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp12
-rw-r--r--ydb/core/tx/datashard/operation.h5
-rw-r--r--ydb/core/tx/datashard/volatile_tx.cpp60
-rw-r--r--ydb/core/tx/datashard/volatile_tx.h8
-rw-r--r--ydb/core/tx/tx_processing.h9
23 files changed, 641 insertions, 75 deletions
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index d73ebbb707e..d88c2c9b9cc 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1515,7 +1515,7 @@ bool TExecutor::CanExecuteTransaction() const {
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
}
-void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
+void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) {
Y_VERIFY(ActivationQueue, "attempt to execute transaction before activation");
TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self);
@@ -1560,7 +1560,7 @@ void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
return;
}
- if (ActiveTransaction || ActivateTransactionWaiting) {
+ if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) {
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
ActivationQueue->Push(seat.Release());
ActivateTransactionWaiting++;
@@ -1571,6 +1571,14 @@ void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
ExecuteTransaction(seat, ctx);
}
+void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
+ DoExecute(self, true, ctx);
+}
+
+void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
+ DoExecute(self, false, ctx);
+}
+
void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) {
Y_VERIFY_DEBUG(!ActiveTransaction);
diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h
index ab6116c05b0..a2426e6c4bc 100644
--- a/ydb/core/tablet_flat/flat_executor.h
+++ b/ydb/core/tablet_flat/flat_executor.h
@@ -615,7 +615,9 @@ public:
void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override;
void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override;
void DetachTablet(const TActorContext &ctx) override;
+ void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
+ void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false);
TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);
diff --git a/ydb/core/tablet_flat/tablet_flat_executed.cpp b/ydb/core/tablet_flat/tablet_flat_executed.cpp
index 3d46f6ffda3..2e09ec6a882 100644
--- a/ydb/core/tablet_flat/tablet_flat_executed.cpp
+++ b/ydb/core/tablet_flat/tablet_flat_executed.cpp
@@ -39,6 +39,11 @@ void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction) {
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext));
}
+void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) {
+ if (transaction)
+ static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext));
+}
+
const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept {
return static_cast<TExecutor*>(Executor())->Scheme();
}
diff --git a/ydb/core/tablet_flat/tablet_flat_executed.h b/ydb/core/tablet_flat/tablet_flat_executed.h
index 117d7532bb3..c88281b25b6 100644
--- a/ydb/core/tablet_flat/tablet_flat_executed.h
+++ b/ydb/core/tablet_flat/tablet_flat_executed.h
@@ -25,6 +25,7 @@ protected:
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx);
void Execute(TAutoPtr<ITransaction> transaction);
+ void EnqueueExecute(TAutoPtr<ITransaction> transaction);
const NTable::TScheme& Scheme() const noexcept;
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index 35d584e38f4..eb452378f97 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -523,6 +523,7 @@ namespace NFlatExecutorSetup {
virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0;
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
+ virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;
diff --git a/ydb/core/tx/datashard/complete_data_tx_unit.cpp b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
index c086fa70119..60bb9764597 100644
--- a/ydb/core/tx/datashard/complete_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/complete_data_tx_unit.cpp
@@ -117,6 +117,28 @@ void TCompleteOperationUnit::Complete(TOperation::TPtr op,
DataShard.NotifySchemeshard(ctx, op->GetTxId());
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
+
+ if (op->HasOutputData()) {
+ const auto& outReadSets = op->OutReadSets();
+ const auto& expectedReadSets = op->ExpectedReadSets();
+ auto itOut = outReadSets.begin();
+ auto itExpected = expectedReadSets.begin();
+ while (itExpected != expectedReadSets.end()) {
+ while (itOut != outReadSets.end() && itOut->first < itExpected->first) {
+ ++itOut;
+ }
+ if (itOut != outReadSets.end() && itOut->first == itExpected->first) {
+ ++itOut;
+ ++itExpected;
+ continue;
+ }
+ // We have an expected readset without a corresponding out readset
+ for (const auto& recipient : itExpected->second) {
+ DataShard.SendReadSetNoData(ctx, recipient, op->GetStep(), op->GetTxId(), itExpected->first.first, itExpected->first.second);
+ }
+ ++itExpected;
+ }
+ }
}
THolder<TExecutionUnit> CreateCompleteOperationUnit(TDataShard &dataShard,
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index e9ed4443848..19b32a42908 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -399,7 +399,7 @@ void TDataShard::SwitchToWork(const TActorContext &ctx) {
Execute(new TTxCleanupRemovedSnapshots(this), ctx);
if (State != TShardState::Offline) {
- VolatileTxManager.Start();
+ VolatileTxManager.Start(ctx);
}
SignalTabletActive(ctx);
@@ -514,7 +514,10 @@ public:
}
void OnAbort() override {
- Y_FAIL("TODO");
+ Result->Record.ClearTxResult();
+ Result->Record.SetStatus(NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED);
+ Result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Distributed transaction aborted due to commit failure");
+ OnCommit();
}
private:
@@ -2797,6 +2800,11 @@ void TDataShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActo
void TDataShard::RestartPipeRS(ui64 tabletId, const TActorContext& ctx) {
for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) {
+ if (seqno == Max<ui64>()) {
+ OutReadSets.ResendExpectations(tabletId, ctx);
+ continue;
+ }
+
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to tablet %" PRIu64 " caused resend of readset %" PRIu64
" at tablet %" PRIu64, tabletId, seqno, TabletID());
@@ -2805,7 +2813,14 @@ void TDataShard::RestartPipeRS(ui64 tabletId, const TActorContext& ctx) {
}
void TDataShard::AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx) {
+ bool detachExpectations = false;
for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) {
+ if (seqno == Max<ui64>()) {
+ AbortExpectationsFromDeletedTablet(tabletId, OutReadSets.RemoveExpectations(tabletId));
+ detachExpectations = true;
+ continue;
+ }
+
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to dead tablet %" PRIu64 " caused ack of readset %" PRIu64
" at tablet %" PRIu64, tabletId, seqno, TabletID());
@@ -2818,9 +2833,23 @@ void TDataShard::AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx) {
PlanQueue.Progress(ctx);
}
}
+
+ if (detachExpectations) {
+ ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), tabletId, 0, ctx);
+ }
+
CheckStateChange(ctx);
}
+void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64, ui64>&& expectations) {
+ for (auto& pr : expectations) {
+ auto* info = VolatileTxManager.FindByTxId(pr.first);
+ if (info && info->State == EVolatileTxState::Waiting && info->Participants.contains(tabletId)) {
+ VolatileTxManager.AbortWaitingTransaction(info);
+ }
+ }
+}
+
void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) {
Y_UNUSED(ev); Y_UNUSED(ctx);
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Server connected at tablet %s %" PRIu64 ,
@@ -3007,41 +3036,133 @@ TDataShard::PrepareReadSet(ui64 step, ui64 txId, ui64 source, ui64 target,
return ev;
}
-void TDataShard::SendReadSet(const TActorContext& ctx, ui64 step,
- ui64 txId, ui64 source, ui64 target,
- const TString& body, ui64 seqno)
+THolder<TEvTxProcessing::TEvReadSet>
+TDataShard::PrepareReadSetExpectation(ui64 step, ui64 txId, ui64 source, ui64 target)
{
+ // We want to notify the target that we expect a readset, there's no data and no ack needed so no seqno
+ auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
+ ev->Record.SetFlags(
+ NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET |
+ NKikimrTx::TEvReadSet::FLAG_NO_DATA |
+ NKikimrTx::TEvReadSet::FLAG_NO_ACK);
+ if (source != TabletID())
+ FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList());
+ return ev;
+}
+
+void TDataShard::SendReadSet(
+ const TActorContext& ctx,
+ THolder<TEvTxProcessing::TEvReadSet>&& rs)
+{
+ ui64 txId = rs->Record.GetTxId();
+ ui64 source = rs->Record.GetTabletSource();
+ ui64 target = rs->Record.GetTabletDest();
+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Send RS at " << TabletID() << " from " << source << " to " << target << " txId " << txId);
+ IncCounter(COUNTER_READSET_SENT_COUNT);
+ IncCounter(COUNTER_READSET_SENT_SIZE, rs->Record.GetReadSet().size());
+
+ PipeClientCache->Send(ctx, target, rs.Release());
+}
+
+void TDataShard::SendReadSet(const TActorContext& ctx, ui64 step,
+ ui64 txId, ui64 source, ui64 target,
+ const TString& body, ui64 seqno)
+{
auto ev = PrepareReadSet(step, txId, source, target, body, seqno);
+ SendReadSet(ctx, std::move(ev));
+}
- IncCounter(COUNTER_READSET_SENT_COUNT);
- IncCounter(COUNTER_READSET_SENT_SIZE, body.size());
+bool TDataShard::AddExpectation(ui64 target, ui64 step, ui64 txId) {
+ bool hadExpectations = OutReadSets.HasExpectations(target);
+ bool added = OutReadSets.AddExpectation(target, step, txId);
+ if (!hadExpectations) {
+ ResendReadSetPipeTracker.AttachTablet(Max<ui64>(), target);
+ }
+ return added;
+}
+bool TDataShard::RemoveExpectation(ui64 target, ui64 txId) {
+ bool removed = OutReadSets.RemoveExpectation(target, txId);
+ if (removed && !OutReadSets.HasExpectations(target)) {
+ auto ctx = TActivationContext::ActorContextFor(SelfId());
+ ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), target, 0, ctx);
+ }
+ return removed;
+}
+
+void TDataShard::SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId,
+ ui64 source, ui64 target)
+{
+ auto ev = PrepareReadSetExpectation(step, txId, source, target);
PipeClientCache->Send(ctx, target, ev.Release());
}
+void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
+{
+ Y_UNUSED(ctx);
+ auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
+ ev->Record.SetFlags(
+ NKikimrTx::TEvReadSet::FLAG_NO_DATA |
+ NKikimrTx::TEvReadSet::FLAG_NO_ACK);
+ if (source != TabletID()) {
+ FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList());
+ }
+
+ struct TSendState : public TThrRefBase {
+ TDataShard* Self;
+ TActorId Recipient;
+ THolder<TEvTxProcessing::TEvReadSet> Event;
+
+ TSendState(TDataShard* self, const TActorId& recipient, THolder<TEvTxProcessing::TEvReadSet>&& event)
+ : Self(self)
+ , Recipient(recipient)
+ , Event(std::move(event))
+ { }
+ };
+
+ // FIXME: we can probably avoid lease confirmation here
+ Executor()->ConfirmReadOnlyLease(
+ [state = MakeIntrusive<TSendState>(this, recipient, std::move(ev))] {
+ state->Self->Send(state->Recipient, state->Event.Release());
+ });
+}
+
+bool TDataShard::ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+
+ // Check if we already have a pending readset from dest to source
+ TReadSetKey rsKey(record.GetTxId(), TabletID(), record.GetTabletDest(), record.GetTabletSource());
+ if (OutReadSets.Has(rsKey)) {
+ return true;
+ }
+
+ if (IsStateActive()) {
+ // When we have a pending op, remember that readset from dest to source is expected
+ if (auto op = Pipeline.FindOp(record.GetTxId())) {
+ auto key = std::make_pair(record.GetTabletDest(), record.GetTabletSource());
+ op->ExpectedReadSets()[key].push_back(ev->Sender);
+ return true;
+ }
+ }
+
+ // In all other cases we want to reply with no data
+ return false;
+}
+
void TDataShard::SendReadSets(const TActorContext& ctx,
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets)
{
TPendingPipeTrackerCommands pendingPipeTrackerCommands;
for (auto &rs : readsets) {
- ui64 txId = rs->Record.GetTxId();
- ui64 source = rs->Record.GetTabletSource();
ui64 target = rs->Record.GetTabletDest();
ui64 seqno = rs->Record.GetSeqno();
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
- "Send RS at " << TabletID() << " from " << source
- << " to " << target << " txId " << txId);
-
- IncCounter(COUNTER_READSET_SENT_COUNT);
- IncCounter(COUNTER_READSET_SENT_SIZE, rs->Record.GetReadSet().size());
-
pendingPipeTrackerCommands.AttachTablet(seqno, target);
- PipeClientCache->Send(ctx, target, rs.Release());
+ SendReadSet(ctx, std::move(rs));
}
pendingPipeTrackerCommands.Apply(ResendReadSetPipeTracker, ctx);
@@ -3387,16 +3508,18 @@ void TDataShard::Handle(TEvDataShard::TEvGetRSInfoRequest::TPtr &ev,
}
for (auto &pr : Pipeline.GetDelayedAcks()) {
- auto *ev = dynamic_cast<TEvTxProcessing::TEvReadSetAck*>(pr.second->GetBase());
- if (ev) {
- auto &rec = ev->Record;
- auto &ack = *response->Record.AddDelayedRSAcks();
- ack.SetTxId(rec.GetTxId());
- ack.SetStep(rec.GetStep());
- ack.SetOrigin(rec.GetTabletConsumer());
- ack.SetSource(rec.GetTabletSource());
- ack.SetDestination(rec.GetTabletDest());
- ack.SetSeqNo(rec.GetSeqno());
+ for (auto &ack : pr.second) {
+ auto *ev = dynamic_cast<TEvTxProcessing::TEvReadSetAck*>(ack->GetBase());
+ if (ev) {
+ auto &rec = ev->Record;
+ auto &ack = *response->Record.AddDelayedRSAcks();
+ ack.SetTxId(rec.GetTxId());
+ ack.SetStep(rec.GetStep());
+ ack.SetOrigin(rec.GetTabletConsumer());
+ ack.SetSource(rec.GetTabletSource());
+ ack.SetDestination(rec.GetTabletDest());
+ ack.SetSeqNo(rec.GetSeqno());
+ }
}
}
diff --git a/ydb/core/tx/datashard/datashard__readset.cpp b/ydb/core/tx/datashard/datashard__readset.cpp
index 24d992e9c7e..4d8a63ecb32 100644
--- a/ydb/core/tx/datashard/datashard__readset.cpp
+++ b/ydb/core/tx/datashard/datashard__readset.cpp
@@ -1,8 +1,6 @@
#include "datashard_txs.h"
-namespace NKikimr {
-
-namespace NDataShard {
+namespace NKikimr::NDataShard {
TDataShard::TTxReadSet::TTxReadSet(TDataShard *self, TEvTxProcessing::TEvReadSet::TPtr ev)
: TBase(self)
@@ -16,11 +14,14 @@ namespace NDataShard {
DoExecute(txc, ctx);
- if (Ack) {
+ if (Ack || NoDataReply) {
// Only leader is allowed to ack readsets, start confirming as early
// as possible, so we handle both read-only and read-write acks.
AckTs = AppData(ctx)->MonotonicTimeProvider->Now();
Self->Executor()->ConfirmReadOnlyLease(AckTs);
+ } else {
+ // We won't need the event in Complete
+ Ev.Reset();
}
return true;
@@ -33,25 +34,36 @@ namespace NDataShard {
&& state != TShardState::Readonly,
"State %" PRIu32 " event %s", state, Ev->Get()->ToString().data());
- if (!(Ev->Get()->Record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) {
+ const auto& msg = *Ev->Get();
+ const auto& record = msg.Record;
+
+ if (!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_ACK)) {
Ack = MakeAck(ctx);
}
- // TODO: handle FLAG_EXPECT_READSET for missing transactions and inactive states
+ if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET) {
+ if (!Self->ProcessReadSetExpectation(Ev)) {
+ NoDataReply = MakeNoDataReply(ctx);
+ }
+
+ // Note: expect + no data is pure notification, avoid further processing
+ if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
+ return;
+ }
+ }
if (!Self->IsStateActive()) {
/// @warning Ack and allow sender to forget readset.
/// It's possible till readsets can't passwthough splits-merges or other shard mutations.
LOG_WARN(ctx, NKikimrServices::TX_DATASHARD,
"Allow sender to lose readset, state %" PRIu32 " at %" PRIu64 " %s",
- state, Self->TabletID(), Ev->Get()->ToString().data());
+ state, Self->TabletID(), msg.ToString().data());
return;
}
- bool saved = Self->Pipeline.SaveInReadSet(*Ev->Get(), Ack, txc, ctx);
+ bool saved = Self->Pipeline.SaveInReadSet(msg, Ack, txc, ctx);
if (!saved) { // delayed. Do not ack
Y_VERIFY(!Ack);
- Ev.Reset();
}
}
@@ -60,33 +72,56 @@ namespace NDataShard {
new TEvTxProcessing::TEvReadSetAck(*Ev->Get(), Self->TabletID())));
}
+ THolder<IEventHandle> TDataShard::TTxReadSet::MakeNoDataReply(const TActorContext& ctx) {
+ const auto& record = Ev->Get()->Record;
+ auto event = MakeHolder<TEvTxProcessing::TEvReadSet>(
+ record.GetStep(),
+ record.GetTxId(),
+ record.GetTabletDest(),
+ record.GetTabletSource(),
+ Self->TabletID());
+ event->Record.SetFlags(NKikimrTx::TEvReadSet::FLAG_NO_DATA | NKikimrTx::TEvReadSet::FLAG_NO_ACK);
+ return THolder(new IEventHandle(Ev->Sender, ctx.SelfID, event.Release()));
+ }
+
void TDataShard::TTxReadSet::Complete(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"TTxReadSet::Complete at " << Self->TabletID());
// If it was read set for non-active tx we should send ACK back after successful save in DB
// Note that, active tx will send "delayed" ACK after tx complete
- if (Ack) {
+ if (Ack || NoDataReply) {
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD,
- "Send RS Ack at %" PRIu64 " %s",
+ "Send RS %s at %" PRIu64 " %s",
+ Ack && NoDataReply ? "Ack+Reply" : Ack ? "Ack" : "Reply",
Self->TabletID(), Ev->Get()->ToString().data());
struct TSendState : public TThrRefBase {
TDataShard* Self;
THolder<IEventHandle> Ack;
+ THolder<IEventHandle> NoDataReply;
- TSendState(TDataShard* self, THolder<IEventHandle>&& ack)
+ TSendState(TDataShard* self,
+ THolder<IEventHandle>&& ack,
+ THolder<IEventHandle>&& noDataReply)
: Self(self)
, Ack(std::move(ack))
+ , NoDataReply(std::move(noDataReply))
{ }
};
- Self->Executor()->ConfirmReadOnlyLease(AckTs, [state = MakeIntrusive<TSendState>(Self, std::move(Ack))] {
- TActivationContext::Send(std::move(state->Ack));
- state->Self->IncCounter(COUNTER_ACK_SENT);
- });
+ Self->Executor()->ConfirmReadOnlyLease(
+ AckTs,
+ [state = MakeIntrusive<TSendState>(Self, std::move(Ack), std::move(NoDataReply))] {
+ if (state->Ack) {
+ state->Self->IncCounter(COUNTER_ACK_SENT);
+ TActivationContext::Send(std::move(state->Ack));
+ }
+ if (state->NoDataReply) {
+ TActivationContext::Send(std::move(state->NoDataReply));
+ }
+ });
}
}
-}
-}
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index e91bf4ee289..d7e554f826c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1206,6 +1206,7 @@ class TDataShard
void RestartPipeRS(ui64 tabletId, const TActorContext& ctx);
void AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx);
+ void AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64, ui64>&& expectations);
void DefaultSignalTabletActive(const TActorContext &ctx) override {
// This overriden in order to pospone SignalTabletActive until TxInit completes
@@ -1255,7 +1256,14 @@ public:
const TActorContext& ctx);
THolder<TEvTxProcessing::TEvReadSet> PrepareReadSet(ui64 step, ui64 txId, ui64 source, ui64 target,
const TString& body, ui64 seqno);
+ THolder<TEvTxProcessing::TEvReadSet> PrepareReadSetExpectation(ui64 step, ui64 txId, ui64 source, ui64 target);
+ void SendReadSet(const TActorContext& ctx, THolder<TEvTxProcessing::TEvReadSet>&& rs);
void SendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
+ bool AddExpectation(ui64 target, ui64 step, ui64 txId);
+ bool RemoveExpectation(ui64 target, ui64 txId);
+ void SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target);
+ void SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target);
+ bool ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev);
void SendReadSets(const TActorContext& ctx,
TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets);
void ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target, const TString& body, ui64 seqno);
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index cb81c8f8710..1a36cd6d05a 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -819,6 +819,9 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
}
if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
+ Y_VERIFY(!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET),
+ "Unexpected FLAG_EXPECT_READSET + FLAG_NO_DATA in delayed readsets");
+
// No readset data: participant aborted the transaction
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Processed readset without data from"
<< srcTabletId << " to " << dstTabletId << " will abort txId# " << tx->GetTxId());
diff --git a/ydb/core/tx/datashard/datashard_outreadset.cpp b/ydb/core/tx/datashard/datashard_outreadset.cpp
index 36ded0e8972..71c2fbfdcc5 100644
--- a/ydb/core/tx/datashard/datashard_outreadset.cpp
+++ b/ydb/core/tx/datashard/datashard_outreadset.cpp
@@ -179,4 +179,47 @@ bool TOutReadSets::ResendRS(NTabletFlatExecutor::TTransactionContext &txc, const
return true;
}
+bool TOutReadSets::AddExpectation(ui64 target, ui64 step, ui64 txId) {
+ auto res = Expectations[target].emplace(txId, step);
+ return res.second;
+}
+
+bool TOutReadSets::RemoveExpectation(ui64 target, ui64 txId) {
+ auto it = Expectations.find(target);
+ if (it != Expectations.end()) {
+ auto itTxId = it->second.find(txId);
+ if (itTxId != it->second.end()) {
+ it->second.erase(itTxId);
+ if (it->second.empty()) {
+ Expectations.erase(it);
+ }
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TOutReadSets::HasExpectations(ui64 target) {
+ return Expectations.contains(target);
+}
+
+void TOutReadSets::ResendExpectations(ui64 target, const TActorContext& ctx) {
+ auto it = Expectations.find(target);
+ if (it != Expectations.end()) {
+ for (const auto& pr : it->second) {
+ Self->SendReadSetExpectation(ctx, pr.second, pr.first, Self->TabletID(), target);
+ }
+ }
+}
+
+THashMap<ui64, ui64> TOutReadSets::RemoveExpectations(ui64 target) {
+ THashMap<ui64, ui64> result;
+ auto it = Expectations.find(target);
+ if (it != Expectations.end()) {
+ result = std::move(it->second);
+ Expectations.erase(it);
+ }
+ return result;
+}
+
}}
diff --git a/ydb/core/tx/datashard/datashard_outreadset.h b/ydb/core/tx/datashard/datashard_outreadset.h
index 901fd4d827a..3cf4c8db01a 100644
--- a/ydb/core/tx/datashard/datashard_outreadset.h
+++ b/ydb/core/tx/datashard/datashard_outreadset.h
@@ -61,13 +61,19 @@ public:
void ResendAll(const TActorContext& ctx);
void Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx);
- bool Empty() const { return CurrentReadSets.empty(); }
+ bool Empty() const { return CurrentReadSets.empty() && Expectations.empty(); }
bool HasAcks() const { return ! ReadSetAcks.empty(); }
bool Has(const TReadSetKey& rsKey) const { return CurrentReadSetInfos.contains(rsKey); }
ui64 CountReadSets() const { return CurrentReadSets.size(); }
ui64 CountAcks() const { return ReadSetAcks.size(); }
+ bool AddExpectation(ui64 target, ui64 step, ui64 txId);
+ bool RemoveExpectation(ui64 target, ui64 txId);
+ bool HasExpectations(ui64 target);
+ void ResendExpectations(ui64 target, const TActorContext& ctx);
+ THashMap<ui64, ui64> RemoveExpectations(ui64 target);
+
private:
void UpdateMonCounter() const;
@@ -77,6 +83,8 @@ private:
THashMap<TReadSetKey, ui64> CurrentReadSetInfos; // Info -> SeqNo
THashSet<ui64> AckedSeqno;
TVector<TIntrusivePtr<TEvTxProcessing::TEvReadSetAck>> ReadSetAcks;
+ // Target -> TxId -> Step
+ THashMap<ui64, THashMap<ui64, ui64>> Expectations;
};
}}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 660bba43b11..753066d0e0c 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -638,7 +638,7 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs,
"Unexpected readset in state %" PRIu32 " for %" PRIu64 ":%" PRIu64 " at %" PRIu64,
Self->State, step, txId, Self->TabletID());
if (ack) {
- DelayedAcks[TStepOrder(step, txId)] = std::move(ack);
+ DelayedAcks[TStepOrder(step, txId)].push_back(std::move(ack));
}
return false;
}
@@ -934,7 +934,9 @@ void TPipeline::CompleteTx(const TOperation::TPtr op, TTransactionContext& txc,
"Will send outdated delayed readset ack for %" PRIu64 ":%" PRIu64 " at %" PRIu64,
pr.first.Step, pr.first.TxId, Self->TabletID());
- op->AddDelayedAck(std::move(pr.second));
+ for (auto& ack : pr.second) {
+ op->AddDelayedAck(std::move(ack));
+ }
DelayedAcks.erase(DelayedAcks.begin());
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index b2199949f7d..f4c235d8edc 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -298,7 +298,7 @@ public:
}
ui64 GetDataTxCacheSize() const { return DataTxCache.size(); }
- const TMap<TStepOrder, THolder<IEventHandle>> &GetDelayedAcks() const
+ const TMap<TStepOrder, TStackVec<THolder<IEventHandle>, 1>> &GetDelayedAcks() const
{
return DelayedAcks;
}
@@ -453,7 +453,7 @@ private:
TSortedOps::iterator ActivePlannedOpsLogicallyCompleteEnd;
TSortedOps::iterator ActivePlannedOpsLogicallyIncompleteEnd;
THashMap<ui64, TValidatedDataTx::TPtr> DataTxCache;
- TMap<TStepOrder, THolder<IEventHandle>> DelayedAcks;
+ TMap<TStepOrder, TStackVec<THolder<IEventHandle>, 1>> DelayedAcks;
TStepOrder LastPlannedTx;
TStepOrder LastCompleteTx;
TStepOrder UtmostCompleteTx;
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index 1f159a0c0c1..43815707b6b 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -121,10 +121,12 @@ public:
private:
TEvTxProcessing::TEvReadSet::TPtr Ev;
THolder<IEventHandle> Ack;
+ THolder<IEventHandle> NoDataReply;
TMonotonic AckTs;
void DoExecute(TTransactionContext &txc, const TActorContext &ctx);
THolder<IEventHandle> MakeAck(const TActorContext &ctx);
+ THolder<IEventHandle> MakeNoDataReply(const TActorContext &ctx);
};
class TDataShard::TTxProgressResendRS : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 2fffedefadd..0e8c4a14413 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -694,4 +694,20 @@ private:
const bool PrevValue;
};
+template<class TCondition>
+void WaitFor(TTestActorRuntime& runtime, TCondition&& condition, const TString& description = "condition", size_t maxAttempts = 1) {
+ for (size_t attempt = 0; attempt < maxAttempts; ++attempt) {
+ if (condition()) {
+ return;
+ }
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ }
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+}
+
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index 9761f4cbcc4..01827e50c52 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -128,6 +128,14 @@ namespace NKqpHelpers {
return JoinSeq(", ", result.result_sets(0).rows());
}
+ inline TString FormatResult(const Ydb::Table::ExecuteDataQueryResponse& response) {
+ if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.operation().status();
+ }
+ Ydb::Table::ExecuteQueryResult result;
+ response.operation().result().UnpackTo(&result);
+ return FormatResult(result);
+ }
inline TString KqpSimpleExec(TTestActorRuntime& runtime, const TString& query, bool staleRo = false, const TString& database = {}) {
TString sessionId = CreateSessionRPC(runtime, database);
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index efec2da9e31..54987228d5e 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -36,7 +36,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
- Cerr << "!!! distributed write start" << Endl;
+ Cerr << "!!! distributed write begin" << Endl;
ExecSQL(server, sender, R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
@@ -74,6 +74,235 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
"{ items { uint32_value: 20 } items { uint32_value: 20 } }");
}
+ Y_UNIT_TEST(DistributedWriteBrokenLock) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ // Start transaction that reads from table-1 and table-2
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }");
+
+ // Break lock using a blind write to table-1
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);");
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (30, 30);
+ )"),
+ "ERROR: ABORTED");
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Verify transaction was not committed
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }");
+ }
+
+ Y_UNIT_TEST(DistributedWriteShardRestartBeforePlan) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
+
+ TVector<THolder<IEventHandle>> capturedPlans;
+ auto capturePlans = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ Cerr << "... captured TEvPlanStep" << Endl;
+ capturedPlans.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(capturePlans);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ Cerr << "!!! distributed write begin" << Endl;
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedPlans.size() >= 2; }, "both captured plans");
+
+ runtime.SetObserverFunc(prevObserverFunc);
+
+ // Restart the first table shard
+ auto shards1 = GetTableShards(server, sender, "/Root/table-1");
+ RebootTablet(runtime, shards1.at(0), sender);
+
+ // Unblock captured plan messages
+ for (auto& ev : capturedPlans) {
+ runtime.Send(ev.Release(), 0, true);
+ }
+ capturedPlans.clear();
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "ERROR: UNDETERMINED");
+ Cerr << "!!! distributed write end" << Endl;
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Verify transaction was not committed
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }");
+ }
+
+ Y_UNIT_TEST(DistributedWriteShardRestartAfterExpectation) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(1000);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1);");
+ ExecSQL(server, sender, "UPSERT INTO `/Root/table-2` (key, value) VALUES (10, 10);");
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(true);
+ runtime.SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NLog::PRI_DEBUG);
+
+ auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
+
+ TVector<THolder<IEventHandle>> capturedPlans;
+ TVector<THolder<IEventHandle>> capturedReadSets;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvTxProcessing::TEvPlanStep::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvPlanStep>();
+ if (msg->Record.GetTabletID() == shard1) {
+ Cerr << "... captured TEvPlanStep for " << msg->Record.GetTabletID() << Endl;
+ capturedPlans.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ case TEvTxProcessing::TEvReadSet::EventType: {
+ const auto* msg = ev->Get<TEvTxProcessing::TEvReadSet>();
+ if (msg->Record.GetTabletDest() == shard1) {
+ Cerr << "... captured TEvReadSet for " << msg->Record.GetTabletDest() << Endl;
+ capturedReadSets.emplace_back(ev.Release());
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(captureEvents);
+
+ TString sessionId = CreateSessionRPC(runtime, "/Root");
+
+ Cerr << "!!! distributed write begin" << Endl;
+ auto future = SendRequest(runtime, MakeSimpleRequestRPC(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2);
+ UPSERT INTO `/Root/table-2` (key, value) VALUES (20, 20);
+ )", sessionId, "", true /* commitTx */), "/Root");
+
+ WaitFor(runtime, [&]{ return capturedPlans.size() >= 1 && capturedReadSets.size() >= 2; }, "captured plan and readsets");
+
+ runtime.SetObserverFunc(prevObserverFunc);
+
+ // Restart the first table shard
+ RebootTablet(runtime, shard1, sender);
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ FormatResult(AwaitResponse(runtime, std::move(future))),
+ "ERROR: UNDETERMINED");
+ Cerr << "!!! distributed write end" << Endl;
+
+ runtime.GetAppData(0).FeatureFlags.SetEnableDataShardVolatileTransactions(false);
+
+ // Verify transaction was not committed
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, R"(
+ SELECT key, value FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value FROM `/Root/table-2`
+ ORDER BY key
+ )"),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 10 } items { uint32_value: 10 } }");
+ }
+
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 54365b8bef2..34fbed08884 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -294,8 +294,16 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
txc);
}
- if (op->HasVolatilePrepareFlag() && !op->OutReadSets().empty()) {
- DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx);
+ if (op->HasVolatilePrepareFlag()) {
+ // Notify other shards about our expectations as soon as possible, even before we commit
+ for (ui64 target : op->AwaitingDecisions()) {
+ if (DataShard.AddExpectation(target, op->GetStep(), op->GetTxId())) {
+ DataShard.SendReadSetExpectation(ctx, op->GetStep(), op->GetTxId(), DataShard.TabletID(), target);
+ }
+ }
+ if (!op->OutReadSets().empty()) {
+ DataShard.PrepareAndSaveOutReadSets(op->GetStep(), op->GetTxId(), op->OutReadSets(), op->PreparedOutReadSets(), txc, ctx);
+ }
}
// Note: may erase persistent locks, must be after we persist volatile tx
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index ca43b25cabe..1a575f02e9b 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -445,11 +445,13 @@ struct TOutputOpData {
using TDelayedAcks = TVector<THolder<IEventHandle>>;
using TOutReadSets = TMap<std::pair<ui64, ui64>, TString>; // source:target -> body
using TChangeRecord = NMiniKQL::IChangeCollector::TChange;
+ using TExpectedReadSets = TMap<std::pair<ui64, ui64>, TStackVec<TActorId, 1>>;
TResultPtr Result;
// ACKs to send on successful operation completion.
TDelayedAcks DelayedAcks;
TOutReadSets OutReadSets;
+ TExpectedReadSets ExpectedReadSets;
TVector<THolder<TEvTxProcessing::TEvReadSet>> PreparedOutReadSets;
// Access log of checked locks
TLocksCache LocksAccessLog;
@@ -598,6 +600,7 @@ public:
////////////////////////////////////////
// OUTPUT DATA //
////////////////////////////////////////
+ bool HasOutputData() { return bool(OutputData); }
TOutputOpData::TResultPtr &Result() { return OutputDataRef().Result; }
TOutputOpData::TDelayedAcks &DelayedAcks() { return OutputDataRef().DelayedAcks; }
@@ -607,6 +610,7 @@ public:
}
TOutputOpData::TOutReadSets &OutReadSets() { return OutputDataRef().OutReadSets; }
+ TOutputOpData::TExpectedReadSets &ExpectedReadSets() { return OutputDataRef().ExpectedReadSets; }
TVector<THolder<TEvTxProcessing::TEvReadSet>> &PreparedOutReadSets()
{
return OutputDataRef().PreparedOutReadSets;
@@ -798,7 +802,6 @@ protected:
OutputData = MakeHolder<TOutputOpData>();
return *OutputData;
}
- void ClearOutputData() { OutputData = nullptr; }
TInputOpData &InputDataRef()
{
diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp
index 5edc151fe7c..d73639cf76f 100644
--- a/ydb/core/tx/datashard/volatile_tx.cpp
+++ b/ydb/core/tx/datashard/volatile_tx.cpp
@@ -158,13 +158,18 @@ namespace NKikimr::NDataShard {
return true;
}
- void TVolatileTxManager::Start() {
+ void TVolatileTxManager::Start(const TActorContext& ctx) {
for (auto& pr : VolatileTxs) {
if (!pr.second->Dependencies.empty()) {
continue;
}
switch (pr.second->State) {
case EVolatileTxState::Waiting:
+ for (ui64 target : pr.second->Participants) {
+ if (Self->AddExpectation(target, pr.second->Version.Step, pr.second->TxId)) {
+ Self->SendReadSetExpectation(ctx, pr.second->Version.Step, pr.second->TxId, Self->TabletID(), target);
+ }
+ }
break;
case EVolatileTxState::Committed:
PendingCommits.push_back(pr.first);
@@ -422,6 +427,7 @@ namespace NKikimr::NDataShard {
for (ui64 shardId : info->Participants) {
db.Table<Schema::TxVolatileParticipants>().Key(info->TxId, shardId).Delete();
+ Self->RemoveExpectation(shardId, info->TxId);
}
db.Table<Schema::TxVolatileDetails>().Key(info->TxId).Delete();
}
@@ -470,6 +476,29 @@ namespace NKikimr::NDataShard {
return true;
}
+ void TVolatileTxManager::AbortWaitingTransaction(TVolatileTxInfo* info) {
+ Y_VERIFY(info && info->State == EVolatileTxState::Waiting);
+
+ ui64 txId = info->TxId;
+
+ // Move tx to aborted, but don't persist yet, we need a separate transaction for that
+ info->State = EVolatileTxState::Aborted;
+
+ // We don't need aborted transactions in tx map
+ RemoveFromTxMap(info);
+
+ // Aborted transactions don't have dependencies
+ for (ui64 dependencyTxId : info->Dependencies) {
+ auto* dependency = FindByTxId(dependencyTxId);
+ Y_VERIFY(dependency);
+ dependency->Dependents.erase(txId);
+ }
+ info->Dependencies.clear();
+
+ // We will unblock operations when we persist the abort
+ AddPendingAbort(txId);
+ }
+
void TVolatileTxManager::ProcessReadSet(
const TEvTxProcessing::TEvReadSet& rs,
TTransactionContext& txc)
@@ -503,6 +532,8 @@ namespace NKikimr::NDataShard {
bool committed = [&]() {
if (record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_NO_DATA) {
+ Y_VERIFY(!(record.GetFlags() & NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET),
+ "Unexpected FLAG_EXPECT_READSET + FLAG_NO_DATA in ProcessReadSet");
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
"Processed readset without data from " << srcTabletId << " to " << dstTabletId
<< " at tablet " << Self->TabletID());
@@ -520,26 +551,19 @@ namespace NKikimr::NDataShard {
return false;
}
+ if (record.GetStep() != info->Version.Step) {
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "Processed readset from " << srcTabletId << " to " << dstTabletId
+ << " with step " << record.GetStep() << " expecting " << info->Version.Step
+ << ", treating like abort due to divergence at tablet " << Self->TabletID());
+ return false;
+ }
+
return true;
}();
if (!committed) {
- // Move tx to aborted, but don't persist yet, we need a separate transaction for that
- info->State = EVolatileTxState::Aborted;
-
- // We don't need aborted transactions in tx map
- RemoveFromTxMap(info);
-
- // Aborted transactions don't have dependencies
- for (ui64 dependencyTxId : info->Dependencies) {
- auto* dependency = FindByTxId(dependencyTxId);
- Y_VERIFY(dependency);
- dependency->Dependents.erase(txId);
- }
- info->Dependencies.clear();
-
- // We will unblock operations when we persist the abort
- AddPendingAbort(txId);
+ AbortWaitingTransaction(info);
return;
}
@@ -623,7 +647,7 @@ namespace NKikimr::NDataShard {
void TVolatileTxManager::RunPendingAbortTx() {
if (!PendingAbortTxScheduled && !PendingAborts.empty()) {
PendingAbortTxScheduled = true;
- Self->Execute(new TDataShard::TTxVolatileTxAbort(Self));
+ Self->EnqueueExecute(new TDataShard::TTxVolatileTxAbort(Self));
}
}
diff --git a/ydb/core/tx/datashard/volatile_tx.h b/ydb/core/tx/datashard/volatile_tx.h
index 9ca8aae9343..613843e51c7 100644
--- a/ydb/core/tx/datashard/volatile_tx.h
+++ b/ydb/core/tx/datashard/volatile_tx.h
@@ -121,7 +121,7 @@ namespace NKikimr::NDataShard {
void Clear();
bool Load(NIceDb::TNiceDb& db);
- void Start();
+ void Start(const TActorContext& ctx);
TVolatileTxInfo* FindByTxId(ui64 txId) const;
TVolatileTxInfo* FindByCommitTxId(ui64 txId) const;
@@ -136,10 +136,16 @@ namespace NKikimr::NDataShard {
bool AttachVolatileTxCallback(
ui64 txId, IVolatileTxCallback::TPtr callback);
+ void AbortWaitingTransaction(TVolatileTxInfo* info);
+
void ProcessReadSet(
const TEvTxProcessing::TEvReadSet& rs,
TTransactionContext& txc);
+ void ProcessReadSetMissing(
+ ui64 source, ui64 txId,
+ TTransactionContext& txc);
+
TTxMapAccess GetTxMap() const {
return TTxMapAccess(TxMap);
}
diff --git a/ydb/core/tx/tx_processing.h b/ydb/core/tx/tx_processing.h
index d1cc771fcbc..d992dd71474 100644
--- a/ydb/core/tx/tx_processing.h
+++ b/ydb/core/tx/tx_processing.h
@@ -109,6 +109,15 @@ struct TEvTxProcessing {
TEvReadSet()
{}
+ TEvReadSet(ui64 step, ui64 orderId, ui64 tabletSource, ui64 tabletDest, ui64 tabletProducer)
+ {
+ Record.SetStep(step);
+ Record.SetTxId(orderId);
+ Record.SetTabletSource(tabletSource);
+ Record.SetTabletDest(tabletDest);
+ Record.SetTabletProducer(tabletProducer);
+ }
+
TEvReadSet(ui64 step, ui64 orderId, ui64 tabletSource, ui64 tabletDest, ui64 tabletProducer, const TString &readSet, ui64 seqno = 0)
{
Record.SetStep(step);