aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-03-14 15:22:23 +0300
committerAlexey Borzenkov <snaury@yandex-team.ru>2022-03-14 15:22:23 +0300
commit6fa2b9df455f828d333446424ecf0bf29960e90a (patch)
treebc2a7f56a0b6e8dcdd2b0dc8be67d2c34a17eb4f
parent03eec4e7e455d44d111cdffcaa3ee791074e2f86 (diff)
downloadydb-6fa2b9df455f828d333446424ecf0bf29960e90a.tar.gz
Support prioritized and unprotected snapshot reads in datashard, KIKIMR-13910
ref:b7023db4fab5b619fb7edc70205f036de6bad013
-rw-r--r--ydb/core/protos/config.proto11
-rw-r--r--ydb/core/tx/datashard/datashard.cpp319
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp10
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp15
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.h1
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.h2
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.cpp15
-rw-r--r--ydb/core/tx/datashard/datashard_direct_transaction.h8
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_direct_upload.h2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h77
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp63
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h4
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.cpp97
-rw-r--r--ydb/core/tx/datashard/datashard_snapshots.h18
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h3
-rw-r--r--ydb/core/tx/datashard/datashard_unsafe_upload.cpp18
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.h18
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp317
-rw-r--r--ydb/core/tx/datashard/execute_data_tx_unit.cpp7
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/finish_propose_unit.cpp57
-rw-r--r--ydb/core/tx/time_cast/time_cast.cpp5
-rw-r--r--ydb/core/tx/time_cast/time_cast.h4
27 files changed, 1029 insertions, 72 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 770b3cb6bf2..e307cc5bb7e 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1177,6 +1177,17 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];
+
+ optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = {
+ Description: "Enables prioritized mvcc snapshot reads over immediate writes",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
+ optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = {
+ Description: "Enables unprotected (fully readonly) mvcc snapshot reads",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
}
message TTxLimitControls {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a3fcc458681..f3a811a2a6f 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -140,6 +140,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, ReadColumnsScanInUserPool(0, 0, 1)
, BackupReadAheadLo(0, 0, 64*1024*1024)
, BackupReadAheadHi(0, 0, 128*1024*1024)
+ , EnablePrioritizedMvccSnapshotReads(0, 0, 1)
+ , EnableUnprotectedMvccSnapshotReads(0, 0, 1)
, DataShardSysTables(InitDataShardSysTables(this))
, ChangeSenderActivator(info->TabletID)
, ChangeExchangeSplitter(this)
@@ -265,6 +267,7 @@ void TDataShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorCo
void TDataShard::Cleanup(const TActorContext& ctx) {
//PipeClientCache->Detach(ctx);
if (RegistrationSended) {
+ ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep());
ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnregisterTablet(TabletID()));
}
@@ -301,6 +304,9 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");
+ AppData(ctx)->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
+ AppData(ctx)->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
+
// OnActivateExecutor might be called multiple times for a follower
// but the counters should be initialized only once
if (TabletCountersPtr) {
@@ -324,6 +330,16 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
}
void TDataShard::SwitchToWork(const TActorContext &ctx) {
+ if (IsMvccEnabled() && (
+ SnapshotManager.GetPerformedUnprotectedReads() ||
+ SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step))
+ {
+ // We will need to wait until mediator state is fully restored before
+ // processing new immediate transactions.
+ MediatorStateWaiting = true;
+ CheckMediatorStateRestored();
+ }
+
SyncConfig();
PlanQueue.Progress(ctx);
OutReadSets.ResendAll(ctx);
@@ -360,10 +376,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
<< DatashardStateName(State) << " tabletId " << TabletID()
<< " mediators count is " << ProcessingParams->MediatorsSize()
+ << " coordinators count is " << ProcessingParams->CoordinatorsSize()
<< " buckets per mediator " << ProcessingParams->GetTimeCastBucketsPerMediator());
RegistrationSended = true;
ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams));
+
+ // Subscribe to all known coordinators
+ for (ui64 coordinatorId : ProcessingParams->GetCoordinators()) {
+ size_t index = CoordinatorSubscriptions.size();
+ auto res = CoordinatorSubscriptionById.emplace(coordinatorId, index);
+ if (res.second) {
+ auto& subscription = CoordinatorSubscriptions.emplace_back();
+ subscription.CoordinatorId = coordinatorId;
+ ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvSubscribeReadStep(coordinatorId));
+ ++CoordinatorSubscriptionsPending;
+ }
+ }
}
void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
@@ -1267,7 +1296,8 @@ TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
- return TRowVersion((++edge).Step, ::Max<ui64>());
+ TRowVersion candidate = TRowVersion((++edge).Step, ::Max<ui64>());
+ return Max(candidate, SnapshotManager.GetImmediateWriteEdge());
}
TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const {
@@ -1291,6 +1321,9 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
// With read-only transactions we don't need reads to include
// changes made at the incomplete edge, as that is a point where
// distributed transactions performed some reads, not writes.
+ // Since incomplete transactions are still inflight, the actual
+ // version will stick to the first incomplete transaction is queue,
+ // effectively reading non-repeatable state before that transaction.
edge = readEdge;
break;
case EMvccTxMode::ReadWrite:
@@ -1308,12 +1341,41 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const
if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
- // This is currently active step for immediate writes, not that when
- // writeEdge is equal to some (PlanStep, Max<ui64>()) that means everything
- // up to this point is "fixed" and cannot be changed. In that case we
- // choose at least PlanStep + 1 for new writes.
- ui64 writeStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, (++writeEdge).Step);
- return TRowVersion(writeStep, ::Max<ui64>());
+ // Normally we stick transactions to the end of the last known mediator step
+ // Note this calculations only happen when we don't have distributed
+ // transactions left in queue, and we won't have any more transactions
+ // up to the current mediator time. The mediator time itself may be stale,
+ // in which case we may have evidence of its higher value via complete and
+ // incomplete edges above.
+ const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
+ TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
+
+ switch (mode) {
+ case EMvccTxMode::ReadOnly: {
+ // We want to include everything that was potentially confirmed to
+ // users, but we don't want to include anything that is not replied
+ // at the start of this read.
+ // Note it's only possible to have ImmediateWriteEdge > mediatorEdge
+ // when ImmediateWriteEdge == mediatorEdge + 1
+ return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied(), SnapshotManager.GetUnprotectedReadEdge());
+ }
+
+ case EMvccTxMode::ReadWrite: {
+ // We must use at least a previously used immediate write edge
+ // But we must also avoid trumpling over any unprotected mvcc
+ // snapshot reads that have occurred.
+ // Note it's only possible to go past the last known mediator step
+ // is when we had an unprotected read, which itself happens at the
+ // last mediator step. So we may only ever have a +1 step, never
+ // anything more.
+ TRowVersion postReadEdge = SnapshotManager.GetPerformedUnprotectedReads()
+ ? SnapshotManager.GetUnprotectedReadEdge().Next()
+ : TRowVersion::Min();
+ return Max(mediatorEdge, writeEdge.Next(), postReadEdge, SnapshotManager.GetImmediateWriteEdge());
+ }
+ }
+
+ Y_FAIL("unreachable");
}
TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
@@ -1333,6 +1395,195 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
return mvccVersion;
}
+TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges(
+ const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc)
+{
+ TPromotePostExecuteEdges res;
+
+ res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc);
+
+ switch (mode) {
+ case EPromotePostExecuteEdges::ReadOnly:
+ // We want read-only immediate transactions to be readonly, thus
+ // don't promote the complete edge unnecessarily. On restarts we
+ // will assume anything written is potentially replied anyway,
+ // even if it has never been read.
+ break;
+
+ case EPromotePostExecuteEdges::RepeatableRead: {
+ bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads();
+ if (unprotectedReads) {
+ // We want to use unprotected reads, but we need to make sure it's properly marked first
+ if (!SnapshotManager.GetPerformedUnprotectedReads()) {
+ SnapshotManager.SetPerformedUnprotectedReads(true, txc);
+ res.HadWrites = true;
+ }
+ if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) {
+ // We need to wait for completion until the flag is committed
+ res.WaitCompletion = true;
+ }
+ SnapshotManager.PromoteUnprotectedReadEdge(version);
+ } else if (SnapshotManager.GetPerformedUnprotectedReads()) {
+ // We want to drop the flag as soon as possible
+ SnapshotManager.SetPerformedUnprotectedReads(false, txc);
+ res.HadWrites = true;
+ }
+
+ // We want to promote the complete edge when protected reads are
+ // used or when we're already writing something anyway.
+ if (res.HadWrites || !unprotectedReads) {
+ res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc);
+ if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) {
+ // We need to wait for completion because some other transaction
+ // has moved complete edge, but it's not committed yet.
+ res.WaitCompletion = true;
+ }
+ }
+
+ break;
+ }
+
+ case EPromotePostExecuteEdges::ReadWrite: {
+ if (version.Step <= GetMaxObservedStep()) {
+ res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc);
+ }
+ res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc);
+ break;
+ }
+ }
+
+ return res;
+}
+
+ui64 TDataShard::GetMaxObservedStep() const {
+ return Max(
+ Pipeline.GetLastPlannedTx().Step,
+ SnapshotManager.GetCompleteEdge().Step,
+ SnapshotManager.GetIncompleteEdge().Step,
+ SnapshotManager.GetUnprotectedReadEdge().Step,
+ MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0);
+}
+
+void TDataShard::SendImmediateWriteResult(
+ const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie)
+{
+ const ui64 step = version.Step;
+ const ui64 observedStep = GetMaxObservedStep();
+ if (step <= observedStep) {
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(version);
+ Send(target, event, 0, cookie);
+ return;
+ }
+
+ MediatorDelayedReplies.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(version),
+ std::forward_as_tuple(target, THolder<IEventBase>(event), cookie));
+
+ // Try to subscribe to the next step, when needed
+ if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
+ MediatorTimeCastWaitingSteps.insert(step);
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+}
+
+void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) {
+ for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
+ const ui64 step = it->first.Step;
+
+ if (step <= mediatorStep) {
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
+ Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
+ it = MediatorDelayedReplies.erase(it);
+ continue;
+ }
+
+ // Try to subscribe to the next step, when needed
+ if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
+ MediatorTimeCastWaitingSteps.insert(step);
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+ break;
+ }
+}
+
+void TDataShard::CheckMediatorStateRestored() {
+ if (!MediatorStateWaiting ||
+ !RegistrationSended ||
+ !MediatorTimeCastEntry ||
+ CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>())
+ {
+ // We are not waiting or not ready to make a decision
+ if (MediatorStateWaiting &&
+ MediatorTimeCastEntry &&
+ CoordinatorPrevReadStepMax == Max<ui64>() &&
+ !MediatorStateBackupInitiated)
+ {
+ // It is possible we don't have coordinators with new protocol support
+ // Use a backup plan of acquiring a read snapshot for restoring the read step
+ Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup);
+ MediatorStateBackupInitiated = true;
+ }
+ return;
+ }
+
+ // CoordinatorPrevReadStepMax shows us what is the next minimum step that
+ // may be acquired as a snapshot. This tells as that no previous read
+ // could have happened after this step, even if it has been acquired.
+ // CoordinatorPrevReadStepMin shows us the maximum step that could have
+ // been acquired before we subscribed. Even if the next step is very
+ // large it may be used to infer an erlier step, as previous generation
+ // could not have read any step that was not acquired.
+ // When some coordinators are still pending we use CoordinatorPrevReadStepMax
+ // as a worst case read step in the future, hoping to make a tighter
+ // prediction while we wait for that.
+ const ui64 step = CoordinatorSubscriptionsPending == 0
+ ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin)
+ : CoordinatorPrevReadStepMax;
+
+ // WARNING: we must perform this check BEFORE we update unprotected read edge
+ // We may enter this code path multiple times, and we expect that the above
+ // read step may be refined while we wait based on pessimistic backup step.
+ if (GetMaxObservedStep() < step) {
+ // We need to wait until we observe mediator step that is at least
+ // as large as the step we found.
+ if (MediatorTimeCastWaitingSteps.insert(step).second) {
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+ return;
+ }
+
+ // Using the inferred last read step we restore the pessimistic unprotected
+ // read edge. Note we only need to do so if there have actually been any
+ // unprotected reads in this datashard history.
+ const TRowVersion lastReadEdge(step, Max<ui64>());
+ if (SnapshotManager.GetPerformedUnprotectedReads()) {
+ SnapshotManager.PromoteUnprotectedReadEdge(lastReadEdge);
+ }
+
+ // Promote the replied immediate write edge up to the currently observed step
+ // This is needed to make sure we read any potentially replied immediate
+ // writes before the restart, and conversely don't accidentally read any
+ // data that is definitely not replied yet.
+ if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
+ const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
+ SnapshotManager.PromoteImmediateWriteEdgeReplied(
+ Min(edge, SnapshotManager.GetImmediateWriteEdge()));
+ }
+
+ MediatorStateWaiting = false;
+
+ // Resend all waiting messages
+ TVector<THolder<IEventHandle>> msgs;
+ msgs.swap(MediatorStateWaitingMsgs);
+ for (auto& ev : msgs) {
+ TActivationContext::Send(ev.Release());
+ }
+}
+
NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code) {
using EResult = NMiniKQL::IEngineFlat::EResult;
@@ -1483,7 +1734,7 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr,
rejectReasons.push_back("decided to reject due to given RejectProbability");
}
- size_t totalInFly = (TxInFly() + ImmediateInFly() + ProposeQueue.Size() + TxWaiting());
+ size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting());
if (totalInFly > GetMaxTxInFly()) {
reject = true;
rejectReasons.push_back("MaxTxInFly was exceeded");
@@ -1554,10 +1805,21 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction*
}
void TDataShard::UpdateProposeQueueSize() const {
- SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
+ SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
}
void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
+ // Check if we need to delay an immediate transaction
+ if (MediatorStateWaiting &&
+ (ev->Get()->GetFlags() & TTxFlags::Immediate) &&
+ !(ev->Get()->GetFlags() & TTxFlags::ForceOnline))
+ {
+ // We cannot calculate correct version until we restore mediator state
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
+
if (Pipeline.HasProposeDelayers()) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored");
@@ -1970,7 +2232,30 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev,
MediatorTimeCastEntry = ev->Get()->Entry;
Y_VERIFY(MediatorTimeCastEntry);
+ SendAfterMediatorStepActivate(MediatorTimeCastEntry->Get(TabletID()));
+
Pipeline.ActivateWaitingTxOps(ctx);
+
+ CheckMediatorStateRestored();
+}
+
+void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx) {
+ auto* msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID()
+ << " coordinator " << msg->CoordinatorId
+ << " last step " << msg->LastReadStep
+ << " next step " << msg->ReadStep->Get());
+ auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId);
+ Y_VERIFY_S(it != CoordinatorSubscriptionById.end(),
+ "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId);
+ size_t index = it->second;
+ auto& subscription = CoordinatorSubscriptions.at(index);
+ subscription.ReadStep = msg->ReadStep;
+ CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep);
+ CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep);
+ --CoordinatorSubscriptionsPending;
+ CheckMediatorStateRestored();
}
void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx) {
@@ -1984,13 +2269,27 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const
for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end() && *it <= step;)
it = MediatorTimeCastWaitingSteps.erase(it);
+ SendAfterMediatorStepActivate(step);
+
Pipeline.ActivateWaitingTxOps(ctx);
+
+ CheckMediatorStateRestored();
+}
+
+void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) {
+ if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) {
+ // We are still waiting for new protol coordinator state
+ // TODO: send an old snapshot request to coordinators
+ }
}
bool TDataShard::WaitPlanStep(ui64 step) {
if (step <= Pipeline.GetLastPlannedTx().Step)
return false;
+ if (step <= SnapshotManager.GetCompleteEdge().Step)
+ return false;
+
if (MediatorTimeCastEntry && step <= MediatorTimeCastEntry->Get(TabletID()))
return false;
@@ -2016,7 +2315,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
auto &rec = ev->Get()->Record;
if (rec.HasMvccSnapshot()) {
TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
- TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge();
+ TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
if (rowVersion >= unreadableEdge) {
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge);
return true;
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 3b3c9d9d6f9..c55cf209f5e 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -132,6 +132,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c
}
void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx) {
+ if (MediatorStateWaiting) {
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) {
Executor()->Execute(new TTxUploadRows(this, ev), ctx);
} else {
@@ -140,6 +145,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
}
void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx) {
+ if (MediatorStateWaiting) {
+ MediatorStateWaitingMsgs.emplace_back(ev.Release());
+ UpdateProposeQueueSize();
+ return;
+ }
if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) {
Executor()->Execute(new TTxEraseRows(this, ev), ctx);
} else {
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 088da2b5213..4daee2067ad 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -209,6 +209,21 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
}
template <typename TEvRequest, typename TEvResponse>
+void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie) {
+ Y_VERIFY(Result);
+
+ if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) {
+ self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS);
+ } else {
+ self->IncCounter(COUNTER_BULK_UPSERT_ERROR);
+ }
+
+ target = Ev->Sender;
+ event = std::move(Result);
+ cookie = 0;
+}
+
+template <typename TEvRequest, typename TEvResponse>
void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) {
Y_VERIFY(Result);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h
index 262027b14fe..928558de2d9 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.h
+++ b/ydb/core/tx/datashard/datashard_common_upload.h
@@ -23,6 +23,7 @@ public:
protected:
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion);
+ void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie);
void SendResult(TDataShard* self, const TActorContext& ctx);
TVector<IChangeCollector::TChange> GetCollectedChanges() const;
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 826819596fa..574c5ff6473 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -204,7 +204,7 @@ bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc,
return true;
}
-void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) {
+TDirectTxResult TDirectTxErase::GetResult(TDataShard* self) {
Y_VERIFY(Result);
if (Result->Record.GetStatus() == NKikimrTxDataShard::TEvEraseRowsResponse::OK) {
@@ -213,7 +213,11 @@ void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) {
self->IncCounter(COUNTER_ERASE_ROWS_ERROR);
}
- ctx.Send(Ev->Sender, std::move(Result));
+ TDirectTxResult res;
+ res.Target = Ev->Sender;
+ res.Event = std::move(Result);
+ res.Cookie = 0;
+ return res;
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxErase::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h
index 689b868a36f..7663b47d38f 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.h
+++ b/ydb/core/tx/datashard/datashard_direct_erase.h
@@ -66,7 +66,7 @@ public:
NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error);
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
- void SendResult(TDataShard* self, const TActorContext& ctx) override;
+ TDirectTxResult GetResult(TDataShard* self) override;
TVector<IChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
index 145f0936cd4..858247f36af 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp
@@ -31,16 +31,25 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded)
}
bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) {
- auto [readVersion, writeVersion] = self->GetReadWriteVersions();
+ auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
if (!Impl->Execute(self, txc, readVersion, writeVersion))
return false;
- self->PromoteCompleteEdge(writeVersion.Step, txc);
+ if (self->IsMvccEnabled()) {
+ // Note: we always wait for completion, so we can ignore the result
+ self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ }
+
return true;
}
void TDirectTransaction::SendResult(TDataShard* self, const TActorContext& ctx) {
- Impl->SendResult(self, ctx);
+ auto result = Impl->GetResult(self);
+ if (MvccReadWriteVersion) {
+ self->SendImmediateWriteResult(*MvccReadWriteVersion, result.Target, result.Event.Release(), result.Cookie);
+ } else {
+ ctx.Send(result.Target, result.Event.Release(), 0, result.Cookie);
+ }
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTransaction::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h
index 5b42e1f1d71..e4d83188d5a 100644
--- a/ydb/core/tx/datashard/datashard_direct_transaction.h
+++ b/ydb/core/tx/datashard/datashard_direct_transaction.h
@@ -11,11 +11,17 @@
namespace NKikimr {
namespace NDataShard {
+struct TDirectTxResult {
+ TActorId Target;
+ THolder<IEventBase> Event;
+ ui64 Cookie;
+};
+
class IDirectTx {
public:
virtual ~IDirectTx() = default;
virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0;
- virtual void SendResult(TDataShard* self, const TActorContext& ctx) = 0;
+ virtual TDirectTxResult GetResult(TDataShard* self) = 0;
virtual TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const = 0;
};
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp
index 6173c0e218f..abb3eda7633 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp
@@ -12,8 +12,10 @@ bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const
return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion);
}
-void TDirectTxUpload::SendResult(TDataShard* self, const TActorContext& ctx) {
- TCommonUploadOps::SendResult(self, ctx);
+TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) {
+ TDirectTxResult res;
+ TCommonUploadOps::GetResult(self, res.Target, res.Event, res.Cookie);
+ return res;
}
TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxUpload::GetCollectedChanges() const {
diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h
index 7ca84f19a7b..a26226bd79a 100644
--- a/ydb/core/tx/datashard/datashard_direct_upload.h
+++ b/ydb/core/tx/datashard/datashard_direct_upload.h
@@ -14,7 +14,7 @@ public:
explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev);
bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override;
- void SendResult(TDataShard* self, const TActorContext& ctx) override;
+ TDirectTxResult GetResult(TDataShard* self) override;
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const override;
};
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 17eebf7693f..c144311a4e7 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -297,6 +297,7 @@ class TDataShard
EvRequestChangeRecords,
EvRemoveChangeRecords,
EvReplicationSourceOffsets,
+ EvMediatorRestoreBackup,
EvEnd
};
@@ -436,6 +437,8 @@ class TDataShard
// Note that keys are NOT sorted in any way
THashMap<TString, TVector<TSplitKey>> SourceOffsets;
};
+
+ struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {};
};
struct Schema : NIceDb::Schema {
@@ -819,6 +822,10 @@ class TDataShard
Sys_NextChangeRecordOrder, // 36 Next order of change record
Sys_LastChangeRecordGroup, // 37 Last group number of change records
+ SysMvcc_UnprotectedReads, // 38 Shard may have performed unprotected mvcc reads when non-zero
+ SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled
+ SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled
+
// reserved
SysPipeline_Flags = 1000,
SysPipeline_LimitActiveTx,
@@ -828,6 +835,9 @@ class TDataShard
static_assert(ESysTableKeys::Sys_SubDomainOwnerId == 33, "Sys_SubDomainOwnerId changed its value");
static_assert(ESysTableKeys::Sys_SubDomainLocalPathId == 34, "Sys_SubDomainLocalPathId changed its value");
static_assert(ESysTableKeys::Sys_SubDomainOutOfSpace == 35, "Sys_SubDomainOutOfSpace changed its value");
+ static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value");
+ static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value");
+ static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value");
static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000
@@ -918,7 +928,9 @@ class TDataShard
void Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx);
void Handle(TEvDataShard::TEvReturnBorrowedPart::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvReturnBorrowedPartAck::TPtr& ev, const TActorContext& ctx);
@@ -1242,6 +1254,16 @@ public:
return BackupReadAheadHi;
}
+ bool GetEnablePrioritizedMvccSnapshotReads() const {
+ ui64 value = EnablePrioritizedMvccSnapshotReads;
+ return value != 0;
+ }
+
+ bool GetEnableUnprotectedMvccSnapshotReads() const {
+ ui64 value = EnableUnprotectedMvccSnapshotReads;
+ return value != 0;
+ }
+
template <typename T>
void ReleaseCache(T& tx) {
ReleaseTxCache(tx.GetTxCacheUsage());
@@ -1436,7 +1458,26 @@ public:
// Returns a suitable row version for performing a transaction
TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const;
+ enum class EPromotePostExecuteEdges {
+ ReadOnly,
+ RepeatableRead,
+ ReadWrite,
+ };
+
+ struct TPromotePostExecuteEdges {
+ bool HadWrites = false;
+ bool WaitCompletion = false;
+ };
+
TReadWriteVersions GetReadWriteVersions(TOperation* op = nullptr) const;
+ TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(
+ const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc);
+ ui64 GetMaxObservedStep() const;
+ void SendImmediateWriteResult(
+ const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0);
+ void SendAfterMediatorStepActivate(ui64 mediatorStep);
+
+ void CheckMediatorStateRestored();
void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const;
@@ -1986,8 +2027,36 @@ private:
TS3UploadsManager S3Uploads;
TS3DownloadsManager S3Downloads;
+ struct TMediatorDelayedReply {
+ TActorId Target;
+ THolder<IEventBase> Event;
+ ui64 Cookie;
+
+ TMediatorDelayedReply(const TActorId& target, THolder<IEventBase> event, ui64 cookie)
+ : Target(target)
+ , Event(std::move(event))
+ , Cookie(cookie)
+ { }
+ };
+
TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
TSet<ui64> MediatorTimeCastWaitingSteps;
+ TMultiMap<TRowVersion, TMediatorDelayedReply> MediatorDelayedReplies;
+
+ struct TCoordinatorSubscription {
+ ui64 CoordinatorId;
+ TMediatorTimecastReadStep::TCPtr ReadStep;
+ };
+
+ TVector<TCoordinatorSubscription> CoordinatorSubscriptions;
+ THashMap<ui64, size_t> CoordinatorSubscriptionById;
+ size_t CoordinatorSubscriptionsPending = 0;
+ ui64 CoordinatorPrevReadStepMin = 0;
+ ui64 CoordinatorPrevReadStepMax = Max<ui64>();
+
+ TVector<THolder<IEventHandle>> MediatorStateWaitingMsgs;
+ bool MediatorStateWaiting = false;
+ bool MediatorStateBackupInitiated = false;
TControlWrapper DisableByKeyFilter;
TControlWrapper MaxTxInFly;
@@ -2009,6 +2078,9 @@ private:
TControlWrapper BackupReadAheadLo;
TControlWrapper BackupReadAheadHi;
+ TControlWrapper EnablePrioritizedMvccSnapshotReads;
+ TControlWrapper EnableUnprotectedMvccSnapshotReads;
+
// Set of InRS keys to remove from local DB.
THashSet<TReadSetKey> InRSToRemove;
TIntrusivePtr<TThrRefBase> DataShardSysTables;
@@ -2153,6 +2225,9 @@ protected:
TRACE_EVENT(NKikimrServices::TX_DATASHARD);
switch (ev->GetTypeRewrite()) {
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
+ HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
@@ -2202,7 +2277,9 @@ protected:
HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle);
HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle);
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
+ HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle);
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
+ HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
HFuncTraced(TEvDataShard::TEvCancelTransactionProposal, Handle);
HFuncTraced(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
HFunc(TEvDataShard::TEvReturnBorrowedPart, Handle);
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 3498162c17e..faf90e403b6 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1581,19 +1581,21 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co
if (Self->MvccSwitchState == TSwitchState::SWITCHING) {
WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished
} else {
+ bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads();
Y_VERIFY_DEBUG(ev->Get()->Record.HasMvccSnapshot());
TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId());
WaitingDataTxOps.emplace(snapshot, std::move(ev));
+ const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1;
TRowVersion unreadableEdge;
- if (!Self->WaitPlanStep(snapshot.Step + 1) && snapshot < (unreadableEdge = GetUnreadableEdge())) {
- ActivateWaitingTxOps(unreadableEdge, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op
+ if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) {
+ ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op
}
}
return true;
}
-void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) {
+void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) {
if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING)
return;
@@ -1611,8 +1613,12 @@ void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx)
activated = true;
}
- if (minWait == TRowVersion::Max() || Self->WaitPlanStep(minWait.Step + 1) || minWait >= (edge = GetUnreadableEdge()))
+ if (minWait == TRowVersion::Max() ||
+ Self->WaitPlanStep(prioritizedReads ? minWait.Step : minWait.Step + 1) ||
+ minWait >= (edge = GetUnreadableEdge(prioritizedReads)))
+ {
break;
+ }
// Async MediatorTimeCastEntry update, need to rerun activation
}
@@ -1626,7 +1632,8 @@ void TPipeline::ActivateWaitingTxOps(const TActorContext& ctx) {
if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING)
return;
- ActivateWaitingTxOps(GetUnreadableEdge(), ctx);
+ bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads();
+ ActivateWaitingTxOps(GetUnreadableEdge(prioritizedReads), prioritizedReads, ctx);
}
TRowVersion TPipeline::GetReadEdge() const {
@@ -1645,29 +1652,51 @@ TRowVersion TPipeline::GetReadEdge() const {
return TRowVersion(step, Max<ui64>());
}
-TRowVersion TPipeline::GetUnreadableEdge() const {
- auto last = TRowVersion(
+TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const {
+ const auto last = TRowVersion(
GetLastActivePlannedOpStep(),
GetLastActivePlannedOpId());
auto it = Self->TransQueue.PlannedTxs.upper_bound(TStepOrder(last.Step, last.TxId));
while (it != Self->TransQueue.PlannedTxs.end()) {
- last = TRowVersion(it->Step, it->TxId);
- if (!Self->TransQueue.FindTxInFly(last.TxId)->IsReadOnly()) {
+ const auto next = TRowVersion(it->Step, it->TxId);
+ if (!Self->TransQueue.FindTxInFly(next.TxId)->IsReadOnly()) {
// If there's any non-read-only planned tx we don't have in the
// dependency tracker yet, we absolutely cannot read from that
// version.
- return last;
+ return next;
}
++it;
}
- // It looks like we have an empty plan queue (or it's read-only), so we
- // use a rough estimate of a point we would use for immediate writes in
- // the far future. That point in time is not complete yet and cannot be
- // used for snapshot reads.
- last = TRowVersion(LastPlannedTx.Step, LastPlannedTx.TxId);
- ui64 step = Max(Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, (++last).Step);
- return TRowVersion(step, Max<ui64>());
+ // It looks like we have an empty plan queue (or it's read-only), so we use
+ // a rough estimate of a point in time we would use for immediate writes
+ // in the distant future. That point in time possibly has some unfinished
+ // transactions, but they would be resolved using dependency tracker. Here
+ // we use an estimate of the observed mediator step (including possible past
+ // generations). Note that we also update CompleteEdge when the distributed
+ // queue is empty, but we have been performing immediate writes and thus
+ // observing an updated mediator timecast step.
+ const ui64 mediatorStep = Max(
+ Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0,
+ Self->SnapshotManager.GetIncompleteEdge().Step,
+ Self->SnapshotManager.GetCompleteEdge().Step,
+ LastPlannedTx.Step);
+
+ // Using an observed mediator step we conclude that we have observed all
+ // distributed transactions up to the end of that step.
+ const TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
+
+ if (prioritizeReads) {
+ // We are prioritizing reads, and we are ok with blocking immediate writes
+ // in the current step. So the first unreadable version is actually in
+ // the next step.
+ return mediatorEdge.Next();
+ } else {
+ // We cannot block immediate writes up to this edge, thus we actually
+ // need to wait until the edge progresses above this version. This
+ // would happen when mediator timecast moves to the next step.
+ return mediatorEdge;
+ }
}
void TPipeline::AddCompletingOp(const TOperation::TPtr& op) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index 67c35260b28..2d0d76b1000 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -329,11 +329,11 @@ public:
ui64 WaitingTxs() const { return WaitingDataTxOps.size(); }
bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
- void ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx);
+ void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx);
void ActivateWaitingTxOps(const TActorContext& ctx);
TRowVersion GetReadEdge() const;
- TRowVersion GetUnreadableEdge() const;
+ TRowVersion GetUnreadableEdge(bool prioritizedReads) const;
void AddCompletingOp(const TOperation::TPtr& op);
void RemoveCompletingOp(const TOperation::TPtr& op);
diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp
index da1219bb9e6..39426396f39 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.cpp
+++ b/ydb/core/tx/datashard/datashard_snapshots.cpp
@@ -30,9 +30,11 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
TRowVersion completeEdge = TRowVersion::Min();
TRowVersion incompleteEdge = TRowVersion::Min();
TRowVersion lowWatermark = TRowVersion::Min();
+ TRowVersion immediateWriteEdge = TRowVersion::Min();
ui32 mvccState = 0;
ui64 keepSnapshotTimeout = 0;
+ ui64 unprotectedReads = 0;
TSnapshotMap snapshots;
@@ -43,12 +45,15 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
// We don't currently support mvcc on the follower
ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step);
ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step);
+ ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId);
}
{
@@ -86,9 +91,19 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) {
MinWriteVersion = minWriteVersion;
MvccState = static_cast<EMvccState>(mvccState);
KeepSnapshotTimeout = keepSnapshotTimeout;
+ PerformedUnprotectedReads = (unprotectedReads != 0);
CompleteEdge = completeEdge;
IncompleteEdge = incompleteEdge;
LowWatermark = lowWatermark;
+ ImmediateWriteEdge = immediateWriteEdge;
+ if (ImmediateWriteEdge.Step <= Max(CompleteEdge.Step, IncompleteEdge.Step)) {
+ ImmediateWriteEdgeReplied = immediateWriteEdge;
+ } else {
+ // We cannot be sure which writes we have replied to
+ // Datashard will restore mediator state and decide
+ ImmediateWriteEdgeReplied.Step = Max(CompleteEdge.Step, IncompleteEdge.Step);
+ ImmediateWriteEdgeReplied.TxId = Max<ui64>();
+ }
CommittedCompleteEdge = completeEdge;
Snapshots = std::move(snapshots);
}
@@ -208,6 +223,82 @@ bool TSnapshotManager::PromoteIncompleteEdge(TOperation* op, TTransactionContext
return false;
}
+TRowVersion TSnapshotManager::GetImmediateWriteEdge() const {
+ return ImmediateWriteEdge;
+}
+
+TRowVersion TSnapshotManager::GetImmediateWriteEdgeReplied() const {
+ return ImmediateWriteEdgeReplied;
+}
+
+void TSnapshotManager::SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) {
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeStep, version.Step);
+ Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, version.TxId);
+ ImmediateWriteEdge = version;
+}
+
+bool TSnapshotManager::PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) {
+ if (!IsMvccEnabled())
+ return false;
+
+ if (version > ImmediateWriteEdge) {
+ SetImmediateWriteEdge(version, txc);
+
+ return true;
+ }
+
+ return false;
+}
+
+bool TSnapshotManager::PromoteImmediateWriteEdgeReplied(const TRowVersion& version) {
+ if (!IsMvccEnabled())
+ return false;
+
+ if (version > ImmediateWriteEdgeReplied) {
+ ImmediateWriteEdgeReplied = version;
+ return true;
+ }
+
+ return false;
+}
+
+TRowVersion TSnapshotManager::GetUnprotectedReadEdge() const {
+ return UnprotectedReadEdge;
+}
+
+bool TSnapshotManager::PromoteUnprotectedReadEdge(const TRowVersion& version) {
+ if (IsMvccEnabled() && UnprotectedReadEdge < version) {
+ UnprotectedReadEdge = version;
+ return true;
+ }
+
+ return false;
+}
+
+bool TSnapshotManager::GetPerformedUnprotectedReads() const {
+ return PerformedUnprotectedReads;
+}
+
+bool TSnapshotManager::IsPerformedUnprotectedReadsCommitted() const {
+ return PerformedUnprotectedReadsUncommitted != 0;
+}
+
+void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc) {
+ using Schema = TDataShard::Schema;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistSys(db, Schema::SysMvcc_UnprotectedReads, ui64(performedUnprotectedReads ? 1 : 0));
+ PerformedUnprotectedReads = performedUnprotectedReads;
+ PerformedUnprotectedReadsUncommitted++;
+
+ txc.OnCommitted([this] {
+ this->PerformedUnprotectedReadsUncommitted--;
+ });
+}
+
void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) {
using Schema = TDataShard::Schema;
@@ -275,7 +366,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
const TRowVersion opVersion(step, txId);
// We need to choose a version that is at least as large as all previous edges
- TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge);
+ TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge, ImmediateWriteEdge);
// This must be a version that we may have previously written to, and which
// must not be a snapshot. We don't know if there have been any immediate
@@ -300,7 +391,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
SetCompleteEdge(nicedb, nextVersion);
SetIncompleteEdge(nicedb, nextVersion);
+ SetImmediateWriteEdge(nextVersion, txc);
SetLowWatermark(nicedb, nextVersion);
+ ImmediateWriteEdgeReplied = ImmediateWriteEdge;
break;
}
@@ -316,7 +409,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext
const auto minVersion = TRowVersion::Min();
SetCompleteEdge(nicedb, minVersion);
SetIncompleteEdge(nicedb, minVersion);
+ SetImmediateWriteEdge(minVersion, txc);
SetLowWatermark(nicedb, minVersion);
+ ImmediateWriteEdgeReplied = ImmediateWriteEdge;
break;
}
diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h
index 87468341120..362fa13b923 100644
--- a/ydb/core/tx/datashard/datashard_snapshots.h
+++ b/ydb/core/tx/datashard/datashard_snapshots.h
@@ -131,6 +131,19 @@ public:
bool PromoteIncompleteEdge(TOperation* op, TTransactionContext& txc);
+ TRowVersion GetImmediateWriteEdge() const;
+ TRowVersion GetImmediateWriteEdgeReplied() const;
+ void SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc);
+ bool PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc);
+ bool PromoteImmediateWriteEdgeReplied(const TRowVersion& version);
+
+ TRowVersion GetUnprotectedReadEdge() const;
+ bool PromoteUnprotectedReadEdge(const TRowVersion& version);
+
+ bool GetPerformedUnprotectedReads() const;
+ bool IsPerformedUnprotectedReadsCommitted() const;
+ void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc);
+
EMvccState GetMvccState() const {
return MvccState;
}
@@ -203,9 +216,14 @@ private:
EMvccState MvccState = EMvccState::MvccUnspecified;
ui64 KeepSnapshotTimeout = 0;
+ bool PerformedUnprotectedReads = false;
+ ui64 PerformedUnprotectedReadsUncommitted = 0;
TRowVersion IncompleteEdge = TRowVersion::Min();
TRowVersion CompleteEdge = TRowVersion::Min();
TRowVersion LowWatermark = TRowVersion::Min();
+ TRowVersion ImmediateWriteEdge = TRowVersion::Min();
+ TRowVersion ImmediateWriteEdgeReplied = TRowVersion::Min();
+ TRowVersion UnprotectedReadEdge = TRowVersion::Min();
TRowVersion CommittedCompleteEdge = TRowVersion::Min();
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index c221c4dfeb0..a3168bd3ef2 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -282,6 +282,9 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_UNSAFE_UPLOAD_ROWS; }
+
+private:
+ TRowVersion MvccVersion = TRowVersion::Min();
};
class TDataShard::TTxExecuteMvccStateChange: public NTabletFlatExecutor::TTransactionBase<TDataShard> {
diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
index 273952a06d1..cb085ad4805 100644
--- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
@@ -14,12 +14,26 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA
if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion))
return false;
- Self->PromoteCompleteEdge(writeVersion.Step, txc);
+ if (Self->IsMvccEnabled()) {
+ // Note: we always wait for completion, so we can ignore the result
+ Self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ MvccVersion = writeVersion;
+ }
+
return true;
}
void TDataShard::TTxUnsafeUploadRows::Complete(const TActorContext& ctx) {
- TCommonUploadOps::SendResult(Self, ctx);
+ TActorId target;
+ THolder<IEventBase> event;
+ ui64 cookie;
+ TCommonUploadOps::GetResult(Self, target, event, cookie);
+
+ if (MvccVersion) {
+ Self->SendImmediateWriteResult(MvccVersion, target, event.Release(), cookie);
+ } else {
+ ctx.Send(target, event.Release(), 0, cookie);
+ }
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index ba8d92e6e2b..3ed441d3963 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1724,6 +1724,10 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId) {
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration) {
auto &runtime = *server->GetRuntime();
+ SimulateSleep(runtime, duration);
+}
+
+void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) {
auto sender = runtime.AllocateEdgeActor();
runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration);
runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender);
diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h
index 0bf3c889420..79794005400 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.h
+++ b/ydb/core/tx/datashard/datashard_ut_common.h
@@ -606,6 +606,7 @@ void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId)
void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId);
void SimulateSleep(Tests::TServer::TPtr server, TDuration duration);
+void SimulateSleep(TTestActorRuntime& runtime, TDuration duration);
void SendSQL(Tests::TServer::TPtr server,
TActorId sender,
@@ -631,4 +632,21 @@ struct IsTxResultComplete {
void WaitTabletBecomesOffline(Tests::TServer::TPtr server, ui64 tabletId);
+///
+class TDisableDataShardLogBatching : public TNonCopyable {
+public:
+ TDisableDataShardLogBatching()
+ : PrevValue(NDataShard::gAllowLogBatchingDefaultValue)
+ {
+ NDataShard::gAllowLogBatchingDefaultValue = false;
+ }
+
+ ~TDisableDataShardLogBatching() {
+ NDataShard::gAllowLogBatchingDefaultValue = PrevValue;
+ }
+
+private:
+ const bool PrevValue;
+};
+
}
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index b8a78736526..e5c976dd352 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -124,6 +124,14 @@ namespace NKqpHelpers {
return ev->Get()->Record.GetResponse().GetSessionId();
}
+ inline void CloseSession(TTestActorRuntime& runtime, TActorId sender, const TString& sessionId) {
+ auto request = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ request->Record.MutableRequest()->SetSessionId(sessionId);
+ runtime.Send(
+ new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()),
+ 0, /* via actor system */ true);
+ }
+
inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest(
const TActorId sender,
const TString& sql,
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index 32472dce59e..e7ca6bd002f 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -4623,6 +4623,323 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterStuckRW, UseNewEngine) {
}
}
+Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(UnprotectedReads ? 1 : 0);
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetControls(controls)
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ CreateShardedTable(server, sender, "/Root", "table-2", 1);
+
+ auto table1shards = GetTableShards(server, sender, "/Root/table-1");
+ auto table2shards = GetTableShards(server, sender, "/Root/table-2");
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ // Perform an immediate write
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)"));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto execSnapshotRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ TString sessionId, txId;
+ TString result = beginSnapshotRequest(sessionId, txId, query);
+ CloseSession(runtime, reqSender, sessionId);
+ return result;
+ };
+
+ // Perform an immediate read, we should observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Send an immediate write transaction, but don't wait for result
+ auto senderImmediateWrite = runtime.AllocateEdgeActor();
+ SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5)
+ )")));
+
+ // We sleep for very little so datashard commits changes, but doesn't advance
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Perform an immediate read again, it should NOT observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+
+ // Wait for the write to finish
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // Perform an immediate read again, it should observe the write above
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Same when using a fresh snapshot read
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Start a new write and sleep again
+ SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7)
+ )")));
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Verify this write is not observed yet
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Reboot the tablet
+ RebootTablet(runtime, table1shards.at(0), sender);
+
+ // Verify the write above cannot be observed after restart as well
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Send one more write and sleep again
+ auto senderImmediateWrite2 = runtime.AllocateEdgeActor();
+ SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9)
+ )")));
+ SimulateSleep(runtime, TDuration::MicroSeconds(1));
+
+ // Verify it is also hidden at the moment
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "} Struct { Bool: false }");
+
+ // Wait for result of the second write
+ {
+ auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2);
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+
+ // We should finally observe both writes
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ TString snapshotSessionId, snapshotTxId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ // Spam schedules in the runtime to prevent mediator time jumping prematurely
+ {
+ Cerr << "!!! Setting up wakeup spam" << Endl;
+ auto senderWakeupSpam = runtime.AllocateEdgeActor();
+ for (int i = 1; i <= 10; ++i) {
+ runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250));
+ }
+ }
+
+ // Reboot the tablet
+ RebootTablet(runtime, table1shards.at(0), sender);
+
+ // Upsert new data after reboot
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (11, 11)"));
+
+ // Make sure datashard state is restored correctly and snapshot is not corrupted
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "} Struct { Bool: false }");
+
+ // Make sure new snapshot will actually observe new data
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSnapshotRequest(Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } "
+ "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } "
+ "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } "
+ "List { Struct { Optional { Uint32: 11 } } Struct { Optional { Uint32: 11 } } } "
+ "} Struct { Bool: false }");
+}
+
} // Y_UNIT_TEST_SUITE(DataShardOutOfOrder)
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
index a34599d2eac..9f6c59189b1 100644
--- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp
@@ -23,7 +23,6 @@ public:
private:
void ExecuteDataTx(TOperation::TPtr op,
- TTransactionContext& txc,
const TActorContext& ctx);
void AddLocksToResult(TOperation::TPtr op);
};
@@ -124,7 +123,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
try {
try {
- ExecuteDataTx(op, txc, ctx);
+ ExecuteDataTx(op, ctx);
} catch (const TNotReadyTabletException&) {
// We want to try pinning (actually precharging) all required pages
// before restarting the transaction, to minimize future restarts.
@@ -169,7 +168,6 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}
void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
- TTransactionContext& txc,
const TActorContext& ctx) {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
IEngineFlat* engine = tx->GetDataTx()->GetEngine();
@@ -237,9 +235,6 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op,
} else {
result->SetTxResult(engine->GetShardReply(DataShard.TabletID()));
- if (op->IsImmediate() && !op->IsReadOnly())
- DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
-
op->ChangeRecords() = std::move(tx->GetDataTx()->GetCollectedChanges());
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 16e109e6b9e..78bfb34c32b 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -152,10 +152,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
AddLocksToResult(op, ctx);
- if (op->IsImmediate() && !op->IsReadOnly()) {
- DataShard.PromoteCompleteEdge(writeVersion.Step, txc);
- }
-
op->ChangeRecords() = std::move(dataTx->GetCollectedChanges());
KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters());
diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp
index de0e9e4ad2e..daac9225b23 100644
--- a/ydb/core/tx/datashard/finish_propose_unit.cpp
+++ b/ydb/core/tx/datashard/finish_propose_unit.cpp
@@ -20,6 +20,7 @@ public:
const TActorContext &ctx) override;
private:
+ TDataShard::TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(TOperation* op, TTransactionContext& txc);
void CompleteRequest(TOperation::TPtr op,
const TActorContext &ctx);
void AddDiagnosticsResult(TOutputOpData::TResultPtr &res);
@@ -43,6 +44,27 @@ bool TFinishProposeUnit::IsReadyToExecute(TOperation::TPtr) const
return true;
}
+TDataShard::TPromotePostExecuteEdges TFinishProposeUnit::PromoteImmediatePostExecuteEdges(
+ TOperation* op,
+ TTransactionContext& txc)
+{
+ if (op->IsMvccSnapshotRead()) {
+ if (op->IsMvccSnapshotRepeatable()) {
+ return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::RepeatableRead, txc);
+ } else {
+ return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
+ }
+ } else if (op->MvccReadWriteVersion) {
+ if (op->IsReadOnly()) {
+ return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc);
+ } else {
+ return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc);
+ }
+ } else {
+ return { };
+ }
+}
+
EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
TTransactionContext &txc,
const TActorContext &ctx)
@@ -53,24 +75,18 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op,
bool hadWrites = false;
// When mvcc is enabled we perform marking after transaction is executed
- if (DataShard.IsMvccEnabled() && op->IsImmediate()) {
- if (op->IsMvccSnapshotRead()) {
- hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(op->GetMvccSnapshot(), txc);
- if (op->IsMvccSnapshotRepeatable()) {
- hadWrites |= DataShard.PromoteCompleteEdge(op.Get(), txc);
- if (!hadWrites && DataShard.GetSnapshotManager().GetCommittedCompleteEdge() < op->GetMvccSnapshot()) {
- // We need to wait for completion because some other transaction
- // has moved complete edge, but it's not committed yet.
- op->SetWaitCompletionFlag(true);
- }
- }
- } else if (op->MvccReadWriteVersion) {
- hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(*op->MvccReadWriteVersion, txc);
+ if (op->IsAborted()) {
+ // Make sure we confirm aborts with a commit
+ op->SetWaitCompletionFlag(true);
+ } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) {
+ auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc);
+
+ if (res.HadWrites) {
+ hadWrites = true;
+ res.WaitCompletion = true;
}
- if (hadWrites) {
- // FIXME: even if transaction itself didn't promote, we may still need to
- // wait for completion, when current in-memory state is not actually committed
+ if (res.WaitCompletion) {
op->SetWaitCompletionFlag(true);
}
}
@@ -159,8 +175,13 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op,
DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size());
- if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId()))
- ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie());
+ if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) {
+ if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) {
+ DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie());
+ } else {
+ ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie());
+ }
+ }
}
void TFinishProposeUnit::AddDiagnosticsResult(TOutputOpData::TResultPtr &res)
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp
index c2e538caa07..7dc3a464204 100644
--- a/ydb/core/tx/time_cast/time_cast.cpp
+++ b/ydb/core/tx/time_cast/time_cast.cpp
@@ -324,13 +324,13 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
default: {
const ui64 step = record.GetTimeBarrier();
bucket.Entry->Update(step, nullptr, 0);
- THashSet<ui64> processed; // a set of processed tablets
+ THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
while (!bucket.Waiters.empty()) {
const auto& top = bucket.Waiters.top();
if (step < top.PlanStep) {
break;
}
- if (processed.insert(top.TabletId).second) {
+ if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) {
ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step));
}
bucket.Waiters.pop();
@@ -495,6 +495,7 @@ void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr
new TEvMediatorTimecast::TEvSubscribeReadStepResult(
coordinatorId,
lastReadStep,
+ nextReadStep,
coordinator.ReadStep),
0, cookie);
}
diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h
index 9771a462c6e..0765eae9c4c 100644
--- a/ydb/core/tx/time_cast/time_cast.h
+++ b/ydb/core/tx/time_cast/time_cast.h
@@ -199,14 +199,17 @@ struct TEvMediatorTimecast {
struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> {
const ui64 CoordinatorId;
const ui64 LastReadStep;
+ const ui64 NextReadStep;
const TMediatorTimecastReadStep::TCPtr ReadStep;
TEvSubscribeReadStepResult(
ui64 coordinatorId,
ui64 lastReadStep,
+ ui64 nextReadStep,
TMediatorTimecastReadStep::TCPtr readStep)
: CoordinatorId(coordinatorId)
, LastReadStep(lastReadStep)
+ , NextReadStep(nextReadStep)
, ReadStep(std::move(readStep))
{
Y_VERIFY(ReadStep);
@@ -217,6 +220,7 @@ struct TEvMediatorTimecast {
<< ToStringHeader() << "{"
<< " CoordinatorId# " << CoordinatorId
<< " LastReadStep# " << LastReadStep
+ << " NextReadStep# " << NextReadStep
<< " ReadStep# " << ReadStep->Get()
<< " }";
}