diff options
author | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-03-14 15:22:23 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@yandex-team.ru> | 2022-03-14 15:22:23 +0300 |
commit | 6fa2b9df455f828d333446424ecf0bf29960e90a (patch) | |
tree | bc2a7f56a0b6e8dcdd2b0dc8be67d2c34a17eb4f | |
parent | 03eec4e7e455d44d111cdffcaa3ee791074e2f86 (diff) | |
download | ydb-6fa2b9df455f828d333446424ecf0bf29960e90a.tar.gz |
Support prioritized and unprotected snapshot reads in datashard, KIKIMR-13910
ref:b7023db4fab5b619fb7edc70205f036de6bad013
27 files changed, 1029 insertions, 72 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 770b3cb6bf2..e307cc5bb7e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1177,6 +1177,17 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 134217728, DefaultValue: 0 }]; + + optional uint64 PrioritizedMvccSnapshotReads = 8 [(ControlOptions) = { + Description: "Enables prioritized mvcc snapshot reads over immediate writes", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; + optional uint64 UnprotectedMvccSnapshotReads = 9 [(ControlOptions) = { + Description: "Enables unprotected (fully readonly) mvcc snapshot reads", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; } message TTxLimitControls { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a3fcc458681..f3a811a2a6f 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -140,6 +140,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , ReadColumnsScanInUserPool(0, 0, 1) , BackupReadAheadLo(0, 0, 64*1024*1024) , BackupReadAheadHi(0, 0, 128*1024*1024) + , EnablePrioritizedMvccSnapshotReads(0, 0, 1) + , EnableUnprotectedMvccSnapshotReads(0, 0, 1) , DataShardSysTables(InitDataShardSysTables(this)) , ChangeSenderActivator(info->TabletID) , ChangeExchangeSplitter(this) @@ -265,6 +267,7 @@ void TDataShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorCo void TDataShard::Cleanup(const TActorContext& ctx) { //PipeClientCache->Detach(ctx); if (RegistrationSended) { + ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep()); ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnregisterTablet(TabletID())); } @@ -301,6 +304,9 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) { AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo"); AppData(ctx)->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi"); + AppData(ctx)->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads"); + AppData(ctx)->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads"); + // OnActivateExecutor might be called multiple times for a follower // but the counters should be initialized only once if (TabletCountersPtr) { @@ -324,6 +330,16 @@ void TDataShard::OnActivateExecutor(const TActorContext& ctx) { } void TDataShard::SwitchToWork(const TActorContext &ctx) { + if (IsMvccEnabled() && ( + SnapshotManager.GetPerformedUnprotectedReads() || + SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step)) + { + // We will need to wait until mediator state is fully restored before + // processing new immediate transactions. + MediatorStateWaiting = true; + CheckMediatorStateRestored(); + } + SyncConfig(); PlanQueue.Progress(ctx); OutReadSets.ResendAll(ctx); @@ -360,10 +376,23 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) { LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast " << DatashardStateName(State) << " tabletId " << TabletID() << " mediators count is " << ProcessingParams->MediatorsSize() + << " coordinators count is " << ProcessingParams->CoordinatorsSize() << " buckets per mediator " << ProcessingParams->GetTimeCastBucketsPerMediator()); RegistrationSended = true; ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams)); + + // Subscribe to all known coordinators + for (ui64 coordinatorId : ProcessingParams->GetCoordinators()) { + size_t index = CoordinatorSubscriptions.size(); + auto res = CoordinatorSubscriptionById.emplace(coordinatorId, index); + if (res.second) { + auto& subscription = CoordinatorSubscriptions.emplace_back(); + subscription.CoordinatorId = coordinatorId; + ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvSubscribeReadStep(coordinatorId)); + ++CoordinatorSubscriptionsPending; + } + } } void TDataShard::PrepareAndSaveOutReadSets(ui64 step, @@ -1267,7 +1296,8 @@ TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const { if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId)) return TRowVersion(nextOp->GetStep(), nextOp->GetTxId()); - return TRowVersion((++edge).Step, ::Max<ui64>()); + TRowVersion candidate = TRowVersion((++edge).Step, ::Max<ui64>()); + return Max(candidate, SnapshotManager.GetImmediateWriteEdge()); } TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const { @@ -1291,6 +1321,9 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const // With read-only transactions we don't need reads to include // changes made at the incomplete edge, as that is a point where // distributed transactions performed some reads, not writes. + // Since incomplete transactions are still inflight, the actual + // version will stick to the first incomplete transaction is queue, + // effectively reading non-repeatable state before that transaction. edge = readEdge; break; case EMvccTxMode::ReadWrite: @@ -1308,12 +1341,41 @@ TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId)) return TRowVersion(nextOp->GetStep(), nextOp->GetTxId()); - // This is currently active step for immediate writes, not that when - // writeEdge is equal to some (PlanStep, Max<ui64>()) that means everything - // up to this point is "fixed" and cannot be changed. In that case we - // choose at least PlanStep + 1 for new writes. - ui64 writeStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, (++writeEdge).Step); - return TRowVersion(writeStep, ::Max<ui64>()); + // Normally we stick transactions to the end of the last known mediator step + // Note this calculations only happen when we don't have distributed + // transactions left in queue, and we won't have any more transactions + // up to the current mediator time. The mediator time itself may be stale, + // in which case we may have evidence of its higher value via complete and + // incomplete edges above. + const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step); + TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>()); + + switch (mode) { + case EMvccTxMode::ReadOnly: { + // We want to include everything that was potentially confirmed to + // users, but we don't want to include anything that is not replied + // at the start of this read. + // Note it's only possible to have ImmediateWriteEdge > mediatorEdge + // when ImmediateWriteEdge == mediatorEdge + 1 + return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied(), SnapshotManager.GetUnprotectedReadEdge()); + } + + case EMvccTxMode::ReadWrite: { + // We must use at least a previously used immediate write edge + // But we must also avoid trumpling over any unprotected mvcc + // snapshot reads that have occurred. + // Note it's only possible to go past the last known mediator step + // is when we had an unprotected read, which itself happens at the + // last mediator step. So we may only ever have a +1 step, never + // anything more. + TRowVersion postReadEdge = SnapshotManager.GetPerformedUnprotectedReads() + ? SnapshotManager.GetUnprotectedReadEdge().Next() + : TRowVersion::Min(); + return Max(mediatorEdge, writeEdge.Next(), postReadEdge, SnapshotManager.GetImmediateWriteEdge()); + } + } + + Y_FAIL("unreachable"); } TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { @@ -1333,6 +1395,195 @@ TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const { return mvccVersion; } +TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges( + const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc) +{ + TPromotePostExecuteEdges res; + + res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc); + + switch (mode) { + case EPromotePostExecuteEdges::ReadOnly: + // We want read-only immediate transactions to be readonly, thus + // don't promote the complete edge unnecessarily. On restarts we + // will assume anything written is potentially replied anyway, + // even if it has never been read. + break; + + case EPromotePostExecuteEdges::RepeatableRead: { + bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads(); + if (unprotectedReads) { + // We want to use unprotected reads, but we need to make sure it's properly marked first + if (!SnapshotManager.GetPerformedUnprotectedReads()) { + SnapshotManager.SetPerformedUnprotectedReads(true, txc); + res.HadWrites = true; + } + if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) { + // We need to wait for completion until the flag is committed + res.WaitCompletion = true; + } + SnapshotManager.PromoteUnprotectedReadEdge(version); + } else if (SnapshotManager.GetPerformedUnprotectedReads()) { + // We want to drop the flag as soon as possible + SnapshotManager.SetPerformedUnprotectedReads(false, txc); + res.HadWrites = true; + } + + // We want to promote the complete edge when protected reads are + // used or when we're already writing something anyway. + if (res.HadWrites || !unprotectedReads) { + res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc); + if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) { + // We need to wait for completion because some other transaction + // has moved complete edge, but it's not committed yet. + res.WaitCompletion = true; + } + } + + break; + } + + case EPromotePostExecuteEdges::ReadWrite: { + if (version.Step <= GetMaxObservedStep()) { + res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc); + } + res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc); + break; + } + } + + return res; +} + +ui64 TDataShard::GetMaxObservedStep() const { + return Max( + Pipeline.GetLastPlannedTx().Step, + SnapshotManager.GetCompleteEdge().Step, + SnapshotManager.GetIncompleteEdge().Step, + SnapshotManager.GetUnprotectedReadEdge().Step, + MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0); +} + +void TDataShard::SendImmediateWriteResult( + const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie) +{ + const ui64 step = version.Step; + const ui64 observedStep = GetMaxObservedStep(); + if (step <= observedStep) { + SnapshotManager.PromoteImmediateWriteEdgeReplied(version); + Send(target, event, 0, cookie); + return; + } + + MediatorDelayedReplies.emplace( + std::piecewise_construct, + std::forward_as_tuple(version), + std::forward_as_tuple(target, THolder<IEventBase>(event), cookie)); + + // Try to subscribe to the next step, when needed + if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) { + MediatorTimeCastWaitingSteps.insert(step); + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } +} + +void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep) { + for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) { + const ui64 step = it->first.Step; + + if (step <= mediatorStep) { + SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first); + Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie); + it = MediatorDelayedReplies.erase(it); + continue; + } + + // Try to subscribe to the next step, when needed + if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) { + MediatorTimeCastWaitingSteps.insert(step); + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } + break; + } +} + +void TDataShard::CheckMediatorStateRestored() { + if (!MediatorStateWaiting || + !RegistrationSended || + !MediatorTimeCastEntry || + CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>()) + { + // We are not waiting or not ready to make a decision + if (MediatorStateWaiting && + MediatorTimeCastEntry && + CoordinatorPrevReadStepMax == Max<ui64>() && + !MediatorStateBackupInitiated) + { + // It is possible we don't have coordinators with new protocol support + // Use a backup plan of acquiring a read snapshot for restoring the read step + Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup); + MediatorStateBackupInitiated = true; + } + return; + } + + // CoordinatorPrevReadStepMax shows us what is the next minimum step that + // may be acquired as a snapshot. This tells as that no previous read + // could have happened after this step, even if it has been acquired. + // CoordinatorPrevReadStepMin shows us the maximum step that could have + // been acquired before we subscribed. Even if the next step is very + // large it may be used to infer an erlier step, as previous generation + // could not have read any step that was not acquired. + // When some coordinators are still pending we use CoordinatorPrevReadStepMax + // as a worst case read step in the future, hoping to make a tighter + // prediction while we wait for that. + const ui64 step = CoordinatorSubscriptionsPending == 0 + ? Min(CoordinatorPrevReadStepMax, CoordinatorPrevReadStepMin) + : CoordinatorPrevReadStepMax; + + // WARNING: we must perform this check BEFORE we update unprotected read edge + // We may enter this code path multiple times, and we expect that the above + // read step may be refined while we wait based on pessimistic backup step. + if (GetMaxObservedStep() < step) { + // We need to wait until we observe mediator step that is at least + // as large as the step we found. + if (MediatorTimeCastWaitingSteps.insert(step).second) { + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } + return; + } + + // Using the inferred last read step we restore the pessimistic unprotected + // read edge. Note we only need to do so if there have actually been any + // unprotected reads in this datashard history. + const TRowVersion lastReadEdge(step, Max<ui64>()); + if (SnapshotManager.GetPerformedUnprotectedReads()) { + SnapshotManager.PromoteUnprotectedReadEdge(lastReadEdge); + } + + // Promote the replied immediate write edge up to the currently observed step + // This is needed to make sure we read any potentially replied immediate + // writes before the restart, and conversely don't accidentally read any + // data that is definitely not replied yet. + if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) { + const TRowVersion edge(GetMaxObservedStep(), Max<ui64>()); + SnapshotManager.PromoteImmediateWriteEdgeReplied( + Min(edge, SnapshotManager.GetImmediateWriteEdge())); + } + + MediatorStateWaiting = false; + + // Resend all waiting messages + TVector<THolder<IEventHandle>> msgs; + msgs.swap(MediatorStateWaitingMsgs); + for (auto& ev : msgs) { + TActivationContext::Send(ev.Release()); + } +} + NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code) { using EResult = NMiniKQL::IEngineFlat::EResult; @@ -1483,7 +1734,7 @@ bool TDataShard::CheckDataTxReject(const TString& opDescr, rejectReasons.push_back("decided to reject due to given RejectProbability"); } - size_t totalInFly = (TxInFly() + ImmediateInFly() + ProposeQueue.Size() + TxWaiting()); + size_t totalInFly = (TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + TxWaiting()); if (totalInFly > GetMaxTxInFly()) { reject = true; rejectReasons.push_back("MaxTxInFly was exceeded"); @@ -1554,10 +1805,21 @@ bool TDataShard::CheckDataTxRejectAndReply(TEvDataShard::TEvProposeTransaction* } void TDataShard::UpdateProposeQueueSize() const { - SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs()); + SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs()); } void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { + // Check if we need to delay an immediate transaction + if (MediatorStateWaiting && + (ev->Get()->GetFlags() & TTxFlags::Immediate) && + !(ev->Get()->GetFlags() & TTxFlags::ForceOnline)) + { + // We cannot calculate correct version until we restore mediator state + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } + if (Pipeline.HasProposeDelayers()) { LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored"); @@ -1970,7 +2232,30 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, MediatorTimeCastEntry = ev->Get()->Entry; Y_VERIFY(MediatorTimeCastEntry); + SendAfterMediatorStepActivate(MediatorTimeCastEntry->Get(TabletID())); + Pipeline.ActivateWaitingTxOps(ctx); + + CheckMediatorStateRestored(); +} + +void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx) { + auto* msg = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID() + << " coordinator " << msg->CoordinatorId + << " last step " << msg->LastReadStep + << " next step " << msg->ReadStep->Get()); + auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId); + Y_VERIFY_S(it != CoordinatorSubscriptionById.end(), + "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId); + size_t index = it->second; + auto& subscription = CoordinatorSubscriptions.at(index); + subscription.ReadStep = msg->ReadStep; + CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep); + CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep); + --CoordinatorSubscriptionsPending; + CheckMediatorStateRestored(); } void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx) { @@ -1984,13 +2269,27 @@ void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end() && *it <= step;) it = MediatorTimeCastWaitingSteps.erase(it); + SendAfterMediatorStepActivate(step); + Pipeline.ActivateWaitingTxOps(ctx); + + CheckMediatorStateRestored(); +} + +void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) { + if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) { + // We are still waiting for new protol coordinator state + // TODO: send an old snapshot request to coordinators + } } bool TDataShard::WaitPlanStep(ui64 step) { if (step <= Pipeline.GetLastPlannedTx().Step) return false; + if (step <= SnapshotManager.GetCompleteEdge().Step) + return false; + if (MediatorTimeCastEntry && step <= MediatorTimeCastEntry->Get(TabletID())) return false; @@ -2016,7 +2315,7 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr auto &rec = ev->Get()->Record; if (rec.HasMvccSnapshot()) { TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId()); - TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(); + TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads()); if (rowVersion >= unreadableEdge) { LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge); return true; diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 3b3c9d9d6f9..c55cf209f5e 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -132,6 +132,11 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c } void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx) { + if (MediatorStateWaiting) { + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } if (!MaybeReject<TEvDataShard::TEvUploadRowsResponse>(this, ev, ctx, "bulk upsert", true)) { Executor()->Execute(new TTxUploadRows(this, ev), ctx); } else { @@ -140,6 +145,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct } void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx) { + if (MediatorStateWaiting) { + MediatorStateWaitingMsgs.emplace_back(ev.Release()); + UpdateProposeQueueSize(); + return; + } if (!MaybeReject<TEvDataShard::TEvEraseRowsResponse>(this, ev, ctx, "erase", false)) { Executor()->Execute(new TTxEraseRows(this, ev), ctx); } else { diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 088da2b5213..4daee2067ad 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -209,6 +209,21 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans } template <typename TEvRequest, typename TEvResponse> +void TCommonUploadOps<TEvRequest, TEvResponse>::GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie) { + Y_VERIFY(Result); + + if (Result->Record.GetStatus() == NKikimrTxDataShard::TError::OK) { + self->IncCounter(COUNTER_BULK_UPSERT_SUCCESS); + } else { + self->IncCounter(COUNTER_BULK_UPSERT_ERROR); + } + + target = Ev->Sender; + event = std::move(Result); + cookie = 0; +} + +template <typename TEvRequest, typename TEvResponse> void TCommonUploadOps<TEvRequest, TEvResponse>::SendResult(TDataShard* self, const TActorContext& ctx) { Y_VERIFY(Result); diff --git a/ydb/core/tx/datashard/datashard_common_upload.h b/ydb/core/tx/datashard/datashard_common_upload.h index 262027b14fe..928558de2d9 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.h +++ b/ydb/core/tx/datashard/datashard_common_upload.h @@ -23,6 +23,7 @@ public: protected: bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion); + void GetResult(TDataShard* self, TActorId& target, THolder<IEventBase>& event, ui64& cookie); void SendResult(TDataShard* self, const TActorContext& ctx); TVector<IChangeCollector::TChange> GetCollectedChanges() const; diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 826819596fa..574c5ff6473 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -204,7 +204,7 @@ bool TDirectTxErase::Execute(TDataShard* self, TTransactionContext& txc, return true; } -void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) { +TDirectTxResult TDirectTxErase::GetResult(TDataShard* self) { Y_VERIFY(Result); if (Result->Record.GetStatus() == NKikimrTxDataShard::TEvEraseRowsResponse::OK) { @@ -213,7 +213,11 @@ void TDirectTxErase::SendResult(TDataShard* self, const TActorContext& ctx) { self->IncCounter(COUNTER_ERASE_ROWS_ERROR); } - ctx.Send(Ev->Sender, std::move(Result)); + TDirectTxResult res; + res.Target = Ev->Sender; + res.Event = std::move(Result); + res.Cookie = 0; + return res; } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxErase::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_erase.h b/ydb/core/tx/datashard/datashard_direct_erase.h index 689b868a36f..7663b47d38f 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.h +++ b/ydb/core/tx/datashard/datashard_direct_erase.h @@ -66,7 +66,7 @@ public: NKikimrTxDataShard::TEvEraseRowsResponse::EStatus& status, TString& error); bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; - void SendResult(TDataShard* self, const TActorContext& ctx) override; + TDirectTxResult GetResult(TDataShard* self) override; TVector<IChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.cpp b/ydb/core/tx/datashard/datashard_direct_transaction.cpp index 145f0936cd4..858247f36af 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_direct_transaction.cpp @@ -31,16 +31,25 @@ void TDirectTransaction::BuildExecutionPlan(bool loaded) } bool TDirectTransaction::Execute(TDataShard* self, TTransactionContext& txc) { - auto [readVersion, writeVersion] = self->GetReadWriteVersions(); + auto [readVersion, writeVersion] = self->GetReadWriteVersions(this); if (!Impl->Execute(self, txc, readVersion, writeVersion)) return false; - self->PromoteCompleteEdge(writeVersion.Step, txc); + if (self->IsMvccEnabled()) { + // Note: we always wait for completion, so we can ignore the result + self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + } + return true; } void TDirectTransaction::SendResult(TDataShard* self, const TActorContext& ctx) { - Impl->SendResult(self, ctx); + auto result = Impl->GetResult(self); + if (MvccReadWriteVersion) { + self->SendImmediateWriteResult(*MvccReadWriteVersion, result.Target, result.Event.Release(), result.Cookie); + } else { + ctx.Send(result.Target, result.Event.Release(), 0, result.Cookie); + } } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTransaction::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_transaction.h b/ydb/core/tx/datashard/datashard_direct_transaction.h index 5b42e1f1d71..e4d83188d5a 100644 --- a/ydb/core/tx/datashard/datashard_direct_transaction.h +++ b/ydb/core/tx/datashard/datashard_direct_transaction.h @@ -11,11 +11,17 @@ namespace NKikimr { namespace NDataShard { +struct TDirectTxResult { + TActorId Target; + THolder<IEventBase> Event; + ui64 Cookie; +}; + class IDirectTx { public: virtual ~IDirectTx() = default; virtual bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) = 0; - virtual void SendResult(TDataShard* self, const TActorContext& ctx) = 0; + virtual TDirectTxResult GetResult(TDataShard* self) = 0; virtual TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const = 0; }; diff --git a/ydb/core/tx/datashard/datashard_direct_upload.cpp b/ydb/core/tx/datashard/datashard_direct_upload.cpp index 6173c0e218f..abb3eda7633 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.cpp +++ b/ydb/core/tx/datashard/datashard_direct_upload.cpp @@ -12,8 +12,10 @@ bool TDirectTxUpload::Execute(TDataShard* self, TTransactionContext& txc, const return TCommonUploadOps::Execute(self, txc, readVersion, writeVersion); } -void TDirectTxUpload::SendResult(TDataShard* self, const TActorContext& ctx) { - TCommonUploadOps::SendResult(self, ctx); +TDirectTxResult TDirectTxUpload::GetResult(TDataShard* self) { + TDirectTxResult res; + TCommonUploadOps::GetResult(self, res.Target, res.Event, res.Cookie); + return res; } TVector<NMiniKQL::IChangeCollector::TChange> TDirectTxUpload::GetCollectedChanges() const { diff --git a/ydb/core/tx/datashard/datashard_direct_upload.h b/ydb/core/tx/datashard/datashard_direct_upload.h index 7ca84f19a7b..a26226bd79a 100644 --- a/ydb/core/tx/datashard/datashard_direct_upload.h +++ b/ydb/core/tx/datashard/datashard_direct_upload.h @@ -14,7 +14,7 @@ public: explicit TDirectTxUpload(TEvDataShard::TEvUploadRowsRequest::TPtr& ev); bool Execute(TDataShard* self, TTransactionContext& txc, const TRowVersion& readVersion, const TRowVersion& writeVersion) override; - void SendResult(TDataShard* self, const TActorContext& ctx) override; + TDirectTxResult GetResult(TDataShard* self) override; TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const override; }; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 17eebf7693f..c144311a4e7 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -297,6 +297,7 @@ class TDataShard EvRequestChangeRecords, EvRemoveChangeRecords, EvReplicationSourceOffsets, + EvMediatorRestoreBackup, EvEnd }; @@ -436,6 +437,8 @@ class TDataShard // Note that keys are NOT sorted in any way THashMap<TString, TVector<TSplitKey>> SourceOffsets; }; + + struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {}; }; struct Schema : NIceDb::Schema { @@ -819,6 +822,10 @@ class TDataShard Sys_NextChangeRecordOrder, // 36 Next order of change record Sys_LastChangeRecordGroup, // 37 Last group number of change records + SysMvcc_UnprotectedReads, // 38 Shard may have performed unprotected mvcc reads when non-zero + SysMvcc_ImmediateWriteEdgeStep, // 39 Maximum step of immediate writes with mvcc enabled + SysMvcc_ImmediateWriteEdgeTxId, // 40 Maximum txId of immediate writes with mvcc enabled + // reserved SysPipeline_Flags = 1000, SysPipeline_LimitActiveTx, @@ -828,6 +835,9 @@ class TDataShard static_assert(ESysTableKeys::Sys_SubDomainOwnerId == 33, "Sys_SubDomainOwnerId changed its value"); static_assert(ESysTableKeys::Sys_SubDomainLocalPathId == 34, "Sys_SubDomainLocalPathId changed its value"); static_assert(ESysTableKeys::Sys_SubDomainOutOfSpace == 35, "Sys_SubDomainOutOfSpace changed its value"); + static_assert(ESysTableKeys::SysMvcc_UnprotectedReads == 38, "SysMvcc_UnprotectedReads changed its value"); + static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeStep == 39, "SysMvcc_ImmediateWriteEdgeStep changed its value"); + static_assert(ESysTableKeys::SysMvcc_ImmediateWriteEdgeTxId == 40, "SysMvcc_ImmediateWriteEdgeTxId changed its value"); static constexpr ui64 MinLocalTid = TSysTables::SysTableMAX + 1; // 1000 @@ -918,7 +928,9 @@ class TDataShard void Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx); void Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx); + void Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCancelTransactionProposal::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvReturnBorrowedPart::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvReturnBorrowedPartAck::TPtr& ev, const TActorContext& ctx); @@ -1242,6 +1254,16 @@ public: return BackupReadAheadHi; } + bool GetEnablePrioritizedMvccSnapshotReads() const { + ui64 value = EnablePrioritizedMvccSnapshotReads; + return value != 0; + } + + bool GetEnableUnprotectedMvccSnapshotReads() const { + ui64 value = EnableUnprotectedMvccSnapshotReads; + return value != 0; + } + template <typename T> void ReleaseCache(T& tx) { ReleaseTxCache(tx.GetTxCacheUsage()); @@ -1436,7 +1458,26 @@ public: // Returns a suitable row version for performing a transaction TRowVersion GetMvccTxVersion(EMvccTxMode mode, TOperation* op = nullptr) const; + enum class EPromotePostExecuteEdges { + ReadOnly, + RepeatableRead, + ReadWrite, + }; + + struct TPromotePostExecuteEdges { + bool HadWrites = false; + bool WaitCompletion = false; + }; + TReadWriteVersions GetReadWriteVersions(TOperation* op = nullptr) const; + TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges( + const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc); + ui64 GetMaxObservedStep() const; + void SendImmediateWriteResult( + const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie = 0); + void SendAfterMediatorStepActivate(ui64 mediatorStep); + + void CheckMediatorStateRestored(); void FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const; @@ -1986,8 +2027,36 @@ private: TS3UploadsManager S3Uploads; TS3DownloadsManager S3Downloads; + struct TMediatorDelayedReply { + TActorId Target; + THolder<IEventBase> Event; + ui64 Cookie; + + TMediatorDelayedReply(const TActorId& target, THolder<IEventBase> event, ui64 cookie) + : Target(target) + , Event(std::move(event)) + , Cookie(cookie) + { } + }; + TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; TSet<ui64> MediatorTimeCastWaitingSteps; + TMultiMap<TRowVersion, TMediatorDelayedReply> MediatorDelayedReplies; + + struct TCoordinatorSubscription { + ui64 CoordinatorId; + TMediatorTimecastReadStep::TCPtr ReadStep; + }; + + TVector<TCoordinatorSubscription> CoordinatorSubscriptions; + THashMap<ui64, size_t> CoordinatorSubscriptionById; + size_t CoordinatorSubscriptionsPending = 0; + ui64 CoordinatorPrevReadStepMin = 0; + ui64 CoordinatorPrevReadStepMax = Max<ui64>(); + + TVector<THolder<IEventHandle>> MediatorStateWaitingMsgs; + bool MediatorStateWaiting = false; + bool MediatorStateBackupInitiated = false; TControlWrapper DisableByKeyFilter; TControlWrapper MaxTxInFly; @@ -2009,6 +2078,9 @@ private: TControlWrapper BackupReadAheadLo; TControlWrapper BackupReadAheadHi; + TControlWrapper EnablePrioritizedMvccSnapshotReads; + TControlWrapper EnableUnprotectedMvccSnapshotReads; + // Set of InRS keys to remove from local DB. THashSet<TReadSetKey> InRSToRemove; TIntrusivePtr<TThrRefBase> DataShardSysTables; @@ -2153,6 +2225,9 @@ protected: TRACE_EVENT(NKikimrServices::TX_DATASHARD); switch (ev->GetTypeRewrite()) { HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); + HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvents::TEvPoisonPill, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { @@ -2202,7 +2277,9 @@ protected: HFuncTraced(TEvTabletPipe::TEvServerConnected, Handle); HFuncTraced(TEvTabletPipe::TEvServerDisconnected, Handle); HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle); + HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle); HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); + HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); HFuncTraced(TEvDataShard::TEvCancelTransactionProposal, Handle); HFuncTraced(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); HFunc(TEvDataShard::TEvReturnBorrowedPart, Handle); diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 3498162c17e..faf90e403b6 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1581,19 +1581,21 @@ bool TPipeline::AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, co if (Self->MvccSwitchState == TSwitchState::SWITCHING) { WaitingDataTxOps.emplace(TRowVersion::Min(), std::move(ev)); // postpone tx processing till mvcc state switch is finished } else { + bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); Y_VERIFY_DEBUG(ev->Get()->Record.HasMvccSnapshot()); TRowVersion snapshot(ev->Get()->Record.GetMvccSnapshot().GetStep(), ev->Get()->Record.GetMvccSnapshot().GetTxId()); WaitingDataTxOps.emplace(snapshot, std::move(ev)); + const ui64 waitStep = prioritizedReads ? snapshot.Step : snapshot.Step + 1; TRowVersion unreadableEdge; - if (!Self->WaitPlanStep(snapshot.Step + 1) && snapshot < (unreadableEdge = GetUnreadableEdge())) { - ActivateWaitingTxOps(unreadableEdge, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op + if (!Self->WaitPlanStep(waitStep) && snapshot < (unreadableEdge = GetUnreadableEdge(prioritizedReads))) { + ActivateWaitingTxOps(unreadableEdge, prioritizedReads, ctx); // Async MediatorTimeCastEntry update, need to reschedule the op } } return true; } -void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) { +void TPipeline::ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx) { if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING) return; @@ -1611,8 +1613,12 @@ void TPipeline::ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx) activated = true; } - if (minWait == TRowVersion::Max() || Self->WaitPlanStep(minWait.Step + 1) || minWait >= (edge = GetUnreadableEdge())) + if (minWait == TRowVersion::Max() || + Self->WaitPlanStep(prioritizedReads ? minWait.Step : minWait.Step + 1) || + minWait >= (edge = GetUnreadableEdge(prioritizedReads))) + { break; + } // Async MediatorTimeCastEntry update, need to rerun activation } @@ -1626,7 +1632,8 @@ void TPipeline::ActivateWaitingTxOps(const TActorContext& ctx) { if (WaitingDataTxOps.empty() || Self->MvccSwitchState == TSwitchState::SWITCHING) return; - ActivateWaitingTxOps(GetUnreadableEdge(), ctx); + bool prioritizedReads = Self->GetEnablePrioritizedMvccSnapshotReads(); + ActivateWaitingTxOps(GetUnreadableEdge(prioritizedReads), prioritizedReads, ctx); } TRowVersion TPipeline::GetReadEdge() const { @@ -1645,29 +1652,51 @@ TRowVersion TPipeline::GetReadEdge() const { return TRowVersion(step, Max<ui64>()); } -TRowVersion TPipeline::GetUnreadableEdge() const { - auto last = TRowVersion( +TRowVersion TPipeline::GetUnreadableEdge(bool prioritizeReads) const { + const auto last = TRowVersion( GetLastActivePlannedOpStep(), GetLastActivePlannedOpId()); auto it = Self->TransQueue.PlannedTxs.upper_bound(TStepOrder(last.Step, last.TxId)); while (it != Self->TransQueue.PlannedTxs.end()) { - last = TRowVersion(it->Step, it->TxId); - if (!Self->TransQueue.FindTxInFly(last.TxId)->IsReadOnly()) { + const auto next = TRowVersion(it->Step, it->TxId); + if (!Self->TransQueue.FindTxInFly(next.TxId)->IsReadOnly()) { // If there's any non-read-only planned tx we don't have in the // dependency tracker yet, we absolutely cannot read from that // version. - return last; + return next; } ++it; } - // It looks like we have an empty plan queue (or it's read-only), so we - // use a rough estimate of a point we would use for immediate writes in - // the far future. That point in time is not complete yet and cannot be - // used for snapshot reads. - last = TRowVersion(LastPlannedTx.Step, LastPlannedTx.TxId); - ui64 step = Max(Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, (++last).Step); - return TRowVersion(step, Max<ui64>()); + // It looks like we have an empty plan queue (or it's read-only), so we use + // a rough estimate of a point in time we would use for immediate writes + // in the distant future. That point in time possibly has some unfinished + // transactions, but they would be resolved using dependency tracker. Here + // we use an estimate of the observed mediator step (including possible past + // generations). Note that we also update CompleteEdge when the distributed + // queue is empty, but we have been performing immediate writes and thus + // observing an updated mediator timecast step. + const ui64 mediatorStep = Max( + Self->MediatorTimeCastEntry ? Self->MediatorTimeCastEntry->Get(Self->TabletID()) : 0, + Self->SnapshotManager.GetIncompleteEdge().Step, + Self->SnapshotManager.GetCompleteEdge().Step, + LastPlannedTx.Step); + + // Using an observed mediator step we conclude that we have observed all + // distributed transactions up to the end of that step. + const TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>()); + + if (prioritizeReads) { + // We are prioritizing reads, and we are ok with blocking immediate writes + // in the current step. So the first unreadable version is actually in + // the next step. + return mediatorEdge.Next(); + } else { + // We cannot block immediate writes up to this edge, thus we actually + // need to wait until the edge progresses above this version. This + // would happen when mediator timecast moves to the next step. + return mediatorEdge; + } } void TPipeline::AddCompletingOp(const TOperation::TPtr& op) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index 67c35260b28..2d0d76b1000 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -329,11 +329,11 @@ public: ui64 WaitingTxs() const { return WaitingDataTxOps.size(); } bool AddWaitingTxOp(TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx); - void ActivateWaitingTxOps(TRowVersion edge, const TActorContext& ctx); + void ActivateWaitingTxOps(TRowVersion edge, bool prioritizedReads, const TActorContext& ctx); void ActivateWaitingTxOps(const TActorContext& ctx); TRowVersion GetReadEdge() const; - TRowVersion GetUnreadableEdge() const; + TRowVersion GetUnreadableEdge(bool prioritizedReads) const; void AddCompletingOp(const TOperation::TPtr& op); void RemoveCompletingOp(const TOperation::TPtr& op); diff --git a/ydb/core/tx/datashard/datashard_snapshots.cpp b/ydb/core/tx/datashard/datashard_snapshots.cpp index da1219bb9e6..39426396f39 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.cpp +++ b/ydb/core/tx/datashard/datashard_snapshots.cpp @@ -30,9 +30,11 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { TRowVersion completeEdge = TRowVersion::Min(); TRowVersion incompleteEdge = TRowVersion::Min(); TRowVersion lowWatermark = TRowVersion::Min(); + TRowVersion immediateWriteEdge = TRowVersion::Min(); ui32 mvccState = 0; ui64 keepSnapshotTimeout = 0; + ui64 unprotectedReads = 0; TSnapshotMap snapshots; @@ -43,12 +45,15 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { // We don't currently support mvcc on the follower ready &= Self->SysGetUi64(db, Schema::SysMvcc_State, mvccState); ready &= Self->SysGetUi64(db, Schema::SysMvcc_KeepSnapshotTimeout, keepSnapshotTimeout); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_UnprotectedReads, unprotectedReads); ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeStep, completeEdge.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_CompleteEdgeTxId, completeEdge.TxId); ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeStep, incompleteEdge.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_IncompleteEdgeTxId, incompleteEdge.TxId); ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkStep, lowWatermark.Step); ready &= Self->SysGetUi64(db, Schema::SysMvcc_LowWatermarkTxId, lowWatermark.TxId); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeStep, immediateWriteEdge.Step); + ready &= Self->SysGetUi64(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, immediateWriteEdge.TxId); } { @@ -86,9 +91,19 @@ bool TSnapshotManager::Reload(NIceDb::TNiceDb& db) { MinWriteVersion = minWriteVersion; MvccState = static_cast<EMvccState>(mvccState); KeepSnapshotTimeout = keepSnapshotTimeout; + PerformedUnprotectedReads = (unprotectedReads != 0); CompleteEdge = completeEdge; IncompleteEdge = incompleteEdge; LowWatermark = lowWatermark; + ImmediateWriteEdge = immediateWriteEdge; + if (ImmediateWriteEdge.Step <= Max(CompleteEdge.Step, IncompleteEdge.Step)) { + ImmediateWriteEdgeReplied = immediateWriteEdge; + } else { + // We cannot be sure which writes we have replied to + // Datashard will restore mediator state and decide + ImmediateWriteEdgeReplied.Step = Max(CompleteEdge.Step, IncompleteEdge.Step); + ImmediateWriteEdgeReplied.TxId = Max<ui64>(); + } CommittedCompleteEdge = completeEdge; Snapshots = std::move(snapshots); } @@ -208,6 +223,82 @@ bool TSnapshotManager::PromoteIncompleteEdge(TOperation* op, TTransactionContext return false; } +TRowVersion TSnapshotManager::GetImmediateWriteEdge() const { + return ImmediateWriteEdge; +} + +TRowVersion TSnapshotManager::GetImmediateWriteEdgeReplied() const { + return ImmediateWriteEdgeReplied; +} + +void TSnapshotManager::SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) { + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeStep, version.Step); + Self->PersistSys(db, Schema::SysMvcc_ImmediateWriteEdgeTxId, version.TxId); + ImmediateWriteEdge = version; +} + +bool TSnapshotManager::PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc) { + if (!IsMvccEnabled()) + return false; + + if (version > ImmediateWriteEdge) { + SetImmediateWriteEdge(version, txc); + + return true; + } + + return false; +} + +bool TSnapshotManager::PromoteImmediateWriteEdgeReplied(const TRowVersion& version) { + if (!IsMvccEnabled()) + return false; + + if (version > ImmediateWriteEdgeReplied) { + ImmediateWriteEdgeReplied = version; + return true; + } + + return false; +} + +TRowVersion TSnapshotManager::GetUnprotectedReadEdge() const { + return UnprotectedReadEdge; +} + +bool TSnapshotManager::PromoteUnprotectedReadEdge(const TRowVersion& version) { + if (IsMvccEnabled() && UnprotectedReadEdge < version) { + UnprotectedReadEdge = version; + return true; + } + + return false; +} + +bool TSnapshotManager::GetPerformedUnprotectedReads() const { + return PerformedUnprotectedReads; +} + +bool TSnapshotManager::IsPerformedUnprotectedReadsCommitted() const { + return PerformedUnprotectedReadsUncommitted != 0; +} + +void TSnapshotManager::SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc) { + using Schema = TDataShard::Schema; + + NIceDb::TNiceDb db(txc.DB); + Self->PersistSys(db, Schema::SysMvcc_UnprotectedReads, ui64(performedUnprotectedReads ? 1 : 0)); + PerformedUnprotectedReads = performedUnprotectedReads; + PerformedUnprotectedReadsUncommitted++; + + txc.OnCommitted([this] { + this->PerformedUnprotectedReadsUncommitted--; + }); +} + void TSnapshotManager::SetKeepSnapshotTimeout(NIceDb::TNiceDb& db, ui64 keepSnapshotTimeout) { using Schema = TDataShard::Schema; @@ -275,7 +366,7 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext const TRowVersion opVersion(step, txId); // We need to choose a version that is at least as large as all previous edges - TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge); + TRowVersion nextVersion = Max(opVersion, MinWriteVersion, CompleteEdge, IncompleteEdge, ImmediateWriteEdge); // This must be a version that we may have previously written to, and which // must not be a snapshot. We don't know if there have been any immediate @@ -300,7 +391,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext SetCompleteEdge(nicedb, nextVersion); SetIncompleteEdge(nicedb, nextVersion); + SetImmediateWriteEdge(nextVersion, txc); SetLowWatermark(nicedb, nextVersion); + ImmediateWriteEdgeReplied = ImmediateWriteEdge; break; } @@ -316,7 +409,9 @@ bool TSnapshotManager::ChangeMvccState(ui64 step, ui64 txId, TTransactionContext const auto minVersion = TRowVersion::Min(); SetCompleteEdge(nicedb, minVersion); SetIncompleteEdge(nicedb, minVersion); + SetImmediateWriteEdge(minVersion, txc); SetLowWatermark(nicedb, minVersion); + ImmediateWriteEdgeReplied = ImmediateWriteEdge; break; } diff --git a/ydb/core/tx/datashard/datashard_snapshots.h b/ydb/core/tx/datashard/datashard_snapshots.h index 87468341120..362fa13b923 100644 --- a/ydb/core/tx/datashard/datashard_snapshots.h +++ b/ydb/core/tx/datashard/datashard_snapshots.h @@ -131,6 +131,19 @@ public: bool PromoteIncompleteEdge(TOperation* op, TTransactionContext& txc); + TRowVersion GetImmediateWriteEdge() const; + TRowVersion GetImmediateWriteEdgeReplied() const; + void SetImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc); + bool PromoteImmediateWriteEdge(const TRowVersion& version, TTransactionContext& txc); + bool PromoteImmediateWriteEdgeReplied(const TRowVersion& version); + + TRowVersion GetUnprotectedReadEdge() const; + bool PromoteUnprotectedReadEdge(const TRowVersion& version); + + bool GetPerformedUnprotectedReads() const; + bool IsPerformedUnprotectedReadsCommitted() const; + void SetPerformedUnprotectedReads(bool performedUnprotectedReads, TTransactionContext& txc); + EMvccState GetMvccState() const { return MvccState; } @@ -203,9 +216,14 @@ private: EMvccState MvccState = EMvccState::MvccUnspecified; ui64 KeepSnapshotTimeout = 0; + bool PerformedUnprotectedReads = false; + ui64 PerformedUnprotectedReadsUncommitted = 0; TRowVersion IncompleteEdge = TRowVersion::Min(); TRowVersion CompleteEdge = TRowVersion::Min(); TRowVersion LowWatermark = TRowVersion::Min(); + TRowVersion ImmediateWriteEdge = TRowVersion::Min(); + TRowVersion ImmediateWriteEdgeReplied = TRowVersion::Min(); + TRowVersion UnprotectedReadEdge = TRowVersion::Min(); TRowVersion CommittedCompleteEdge = TRowVersion::Min(); diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index c221c4dfeb0..a3168bd3ef2 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -282,6 +282,9 @@ public: bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; void Complete(const TActorContext& ctx) override; TTxType GetTxType() const override { return TXTYPE_UNSAFE_UPLOAD_ROWS; } + +private: + TRowVersion MvccVersion = TRowVersion::Min(); }; class TDataShard::TTxExecuteMvccStateChange: public NTabletFlatExecutor::TTransactionBase<TDataShard> { diff --git a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp index 273952a06d1..cb085ad4805 100644 --- a/ydb/core/tx/datashard/datashard_unsafe_upload.cpp +++ b/ydb/core/tx/datashard/datashard_unsafe_upload.cpp @@ -14,12 +14,26 @@ bool TDataShard::TTxUnsafeUploadRows::Execute(TTransactionContext& txc, const TA if (!TCommonUploadOps::Execute(Self, txc, readVersion, writeVersion)) return false; - Self->PromoteCompleteEdge(writeVersion.Step, txc); + if (Self->IsMvccEnabled()) { + // Note: we always wait for completion, so we can ignore the result + Self->PromoteImmediatePostExecuteEdges(writeVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + MvccVersion = writeVersion; + } + return true; } void TDataShard::TTxUnsafeUploadRows::Complete(const TActorContext& ctx) { - TCommonUploadOps::SendResult(Self, ctx); + TActorId target; + THolder<IEventBase> event; + ui64 cookie; + TCommonUploadOps::GetResult(Self, target, event, cookie); + + if (MvccVersion) { + Self->SendImmediateWriteResult(MvccVersion, target, event.Release(), cookie); + } else { + ctx.Send(target, event.Release(), 0, cookie); + } } } // NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index ba8d92e6e2b..3ed441d3963 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1724,6 +1724,10 @@ void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId) { void SimulateSleep(Tests::TServer::TPtr server, TDuration duration) { auto &runtime = *server->GetRuntime(); + SimulateSleep(runtime, duration); +} + +void SimulateSleep(TTestActorRuntime& runtime, TDuration duration) { auto sender = runtime.AllocateEdgeActor(); runtime.Schedule(new IEventHandle(sender, sender, new TEvents::TEvWakeup()), duration); runtime.GrabEdgeEventRethrow<TEvents::TEvWakeup>(sender); diff --git a/ydb/core/tx/datashard/datashard_ut_common.h b/ydb/core/tx/datashard/datashard_ut_common.h index 0bf3c889420..79794005400 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.h +++ b/ydb/core/tx/datashard/datashard_ut_common.h @@ -606,6 +606,7 @@ void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId) void WaitTxNotification(Tests::TServer::TPtr server, ui64 txId); void SimulateSleep(Tests::TServer::TPtr server, TDuration duration); +void SimulateSleep(TTestActorRuntime& runtime, TDuration duration); void SendSQL(Tests::TServer::TPtr server, TActorId sender, @@ -631,4 +632,21 @@ struct IsTxResultComplete { void WaitTabletBecomesOffline(Tests::TServer::TPtr server, ui64 tabletId); +/// +class TDisableDataShardLogBatching : public TNonCopyable { +public: + TDisableDataShardLogBatching() + : PrevValue(NDataShard::gAllowLogBatchingDefaultValue) + { + NDataShard::gAllowLogBatchingDefaultValue = false; + } + + ~TDisableDataShardLogBatching() { + NDataShard::gAllowLogBatchingDefaultValue = PrevValue; + } + +private: + const bool PrevValue; +}; + } diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index b8a78736526..e5c976dd352 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -124,6 +124,14 @@ namespace NKqpHelpers { return ev->Get()->Record.GetResponse().GetSessionId(); } + inline void CloseSession(TTestActorRuntime& runtime, TActorId sender, const TString& sessionId) { + auto request = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + request->Record.MutableRequest()->SetSessionId(sessionId); + runtime.Send( + new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()), + 0, /* via actor system */ true); + } + inline THolder<NKqp::TEvKqp::TEvQueryRequest> MakeStreamRequest( const TActorId sender, const TString& sql, diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 32472dce59e..e7ca6bd002f 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -4623,6 +4623,323 @@ Y_UNIT_TEST_TWIN(TestSnapshotReadAfterStuckRW, UseNewEngine) { } } +Y_UNIT_TEST_QUAD(TestSnapshotReadPriority, UnprotectedReads, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(UnprotectedReads ? 1 : 0); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetControls(controls) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_MEDIATOR_TIMECAST, NLog::PRI_TRACE); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + auto table2shards = GetTableShards(server, sender, "/Root/table-2"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 2)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + // Perform an immediate write + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 3)")); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto execSnapshotRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + TString sessionId, txId; + TString result = beginSnapshotRequest(sessionId, txId, query); + CloseSession(runtime, reqSender, sessionId); + return result; + }; + + // Perform an immediate read, we should observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Send an immediate write transaction, but don't wait for result + auto senderImmediateWrite = runtime.AllocateEdgeActor(); + SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (5, 5) + )"))); + + // We sleep for very little so datashard commits changes, but doesn't advance + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Perform an immediate read again, it should NOT observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + + // Wait for the write to finish + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // Perform an immediate read again, it should observe the write above + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Same when using a fresh snapshot read + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Start a new write and sleep again + SendRequest(runtime, senderImmediateWrite, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (7, 7) + )"))); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Verify this write is not observed yet + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Reboot the tablet + RebootTablet(runtime, table1shards.at(0), sender); + + // Verify the write above cannot be observed after restart as well + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Send one more write and sleep again + auto senderImmediateWrite2 = runtime.AllocateEdgeActor(); + SendRequest(runtime, senderImmediateWrite2, MakeSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (9, 9) + )"))); + SimulateSleep(runtime, TDuration::MicroSeconds(1)); + + // Verify it is also hidden at the moment + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "} Struct { Bool: false }"); + + // Wait for result of the second write + { + auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(senderImmediateWrite2); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + + // We should finally observe both writes + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + TString snapshotSessionId, snapshotTxId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + // Spam schedules in the runtime to prevent mediator time jumping prematurely + { + Cerr << "!!! Setting up wakeup spam" << Endl; + auto senderWakeupSpam = runtime.AllocateEdgeActor(); + for (int i = 1; i <= 10; ++i) { + runtime.Schedule(new IEventHandle(senderWakeupSpam, senderWakeupSpam, new TEvents::TEvWakeup()), TDuration::MicroSeconds(i * 250)); + } + } + + // Reboot the tablet + RebootTablet(runtime, table1shards.at(0), sender); + + // Upsert new data after reboot + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (11, 11)")); + + // Make sure datashard state is restored correctly and snapshot is not corrupted + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(snapshotSessionId, snapshotTxId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "} Struct { Bool: false }"); + + // Make sure new snapshot will actually observe new data + UNIT_ASSERT_VALUES_EQUAL( + execSnapshotRequest(Q_(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "List { Struct { Optional { Uint32: 5 } } Struct { Optional { Uint32: 5 } } } " + "List { Struct { Optional { Uint32: 7 } } Struct { Optional { Uint32: 7 } } } " + "List { Struct { Optional { Uint32: 9 } } Struct { Optional { Uint32: 9 } } } " + "List { Struct { Optional { Uint32: 11 } } Struct { Optional { Uint32: 11 } } } " + "} Struct { Bool: false }"); +} + } // Y_UNIT_TEST_SUITE(DataShardOutOfOrder) } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index a34599d2eac..9f6c59189b1 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -23,7 +23,6 @@ public: private: void ExecuteDataTx(TOperation::TPtr op, - TTransactionContext& txc, const TActorContext& ctx); void AddLocksToResult(TOperation::TPtr op); }; @@ -124,7 +123,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, try { try { - ExecuteDataTx(op, txc, ctx); + ExecuteDataTx(op, ctx); } catch (const TNotReadyTabletException&) { // We want to try pinning (actually precharging) all required pages // before restarting the transaction, to minimize future restarts. @@ -169,7 +168,6 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, - TTransactionContext& txc, const TActorContext& ctx) { TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); IEngineFlat* engine = tx->GetDataTx()->GetEngine(); @@ -237,9 +235,6 @@ void TExecuteDataTxUnit::ExecuteDataTx(TOperation::TPtr op, } else { result->SetTxResult(engine->GetShardReply(DataShard.TabletID())); - if (op->IsImmediate() && !op->IsReadOnly()) - DataShard.PromoteCompleteEdge(writeVersion.Step, txc); - op->ChangeRecords() = std::move(tx->GetDataTx()->GetCollectedChanges()); } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 16e109e6b9e..78bfb34c32b 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -152,10 +152,6 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio AddLocksToResult(op, ctx); - if (op->IsImmediate() && !op->IsReadOnly()) { - DataShard.PromoteCompleteEdge(writeVersion.Step, txc); - } - op->ChangeRecords() = std::move(dataTx->GetCollectedChanges()); KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); diff --git a/ydb/core/tx/datashard/finish_propose_unit.cpp b/ydb/core/tx/datashard/finish_propose_unit.cpp index de0e9e4ad2e..daac9225b23 100644 --- a/ydb/core/tx/datashard/finish_propose_unit.cpp +++ b/ydb/core/tx/datashard/finish_propose_unit.cpp @@ -20,6 +20,7 @@ public: const TActorContext &ctx) override; private: + TDataShard::TPromotePostExecuteEdges PromoteImmediatePostExecuteEdges(TOperation* op, TTransactionContext& txc); void CompleteRequest(TOperation::TPtr op, const TActorContext &ctx); void AddDiagnosticsResult(TOutputOpData::TResultPtr &res); @@ -43,6 +44,27 @@ bool TFinishProposeUnit::IsReadyToExecute(TOperation::TPtr) const return true; } +TDataShard::TPromotePostExecuteEdges TFinishProposeUnit::PromoteImmediatePostExecuteEdges( + TOperation* op, + TTransactionContext& txc) +{ + if (op->IsMvccSnapshotRead()) { + if (op->IsMvccSnapshotRepeatable()) { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::RepeatableRead, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(op->GetMvccSnapshot(), TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } + } else if (op->MvccReadWriteVersion) { + if (op->IsReadOnly()) { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadOnly, txc); + } else { + return DataShard.PromoteImmediatePostExecuteEdges(*op->MvccReadWriteVersion, TDataShard::EPromotePostExecuteEdges::ReadWrite, txc); + } + } else { + return { }; + } +} + EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, TTransactionContext &txc, const TActorContext &ctx) @@ -53,24 +75,18 @@ EExecutionStatus TFinishProposeUnit::Execute(TOperation::TPtr op, bool hadWrites = false; // When mvcc is enabled we perform marking after transaction is executed - if (DataShard.IsMvccEnabled() && op->IsImmediate()) { - if (op->IsMvccSnapshotRead()) { - hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(op->GetMvccSnapshot(), txc); - if (op->IsMvccSnapshotRepeatable()) { - hadWrites |= DataShard.PromoteCompleteEdge(op.Get(), txc); - if (!hadWrites && DataShard.GetSnapshotManager().GetCommittedCompleteEdge() < op->GetMvccSnapshot()) { - // We need to wait for completion because some other transaction - // has moved complete edge, but it's not committed yet. - op->SetWaitCompletionFlag(true); - } - } - } else if (op->MvccReadWriteVersion) { - hadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(*op->MvccReadWriteVersion, txc); + if (op->IsAborted()) { + // Make sure we confirm aborts with a commit + op->SetWaitCompletionFlag(true); + } else if (DataShard.IsMvccEnabled() && op->IsImmediate()) { + auto res = PromoteImmediatePostExecuteEdges(op.Get(), txc); + + if (res.HadWrites) { + hadWrites = true; + res.WaitCompletion = true; } - if (hadWrites) { - // FIXME: even if transaction itself didn't promote, we may still need to - // wait for completion, when current in-memory state is not actually committed + if (res.WaitCompletion) { op->SetWaitCompletionFlag(true); } } @@ -159,8 +175,13 @@ void TFinishProposeUnit::CompleteRequest(TOperation::TPtr op, DataShard.IncCounter(COUNTER_TX_RESULT_SIZE, res->Record.GetTxResult().size()); - if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) - ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie()); + if (!gSkipRepliesFailPoint.Check(DataShard.TabletID(), op->GetTxId())) { + if (op->IsImmediate() && !op->IsReadOnly() && !op->IsAborted() && op->MvccReadWriteVersion) { + DataShard.SendImmediateWriteResult(*op->MvccReadWriteVersion, op->GetTarget(), res.Release(), op->GetCookie()); + } else { + ctx.Send(op->GetTarget(), res.Release(), 0, op->GetCookie()); + } + } } void TFinishProposeUnit::AddDiagnosticsResult(TOutputOpData::TResultPtr &res) diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp index c2e538caa07..7dc3a464204 100644 --- a/ydb/core/tx/time_cast/time_cast.cpp +++ b/ydb/core/tx/time_cast/time_cast.cpp @@ -324,13 +324,13 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co default: { const ui64 step = record.GetTimeBarrier(); bucket.Entry->Update(step, nullptr, 0); - THashSet<ui64> processed; // a set of processed tablets + THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets while (!bucket.Waiters.empty()) { const auto& top = bucket.Waiters.top(); if (step < top.PlanStep) { break; } - if (processed.insert(top.TabletId).second) { + if (processed.insert(std::make_pair(top.Sender, top.TabletId)).second) { ctx.Send(top.Sender, new TEvMediatorTimecast::TEvNotifyPlanStep(top.TabletId, step)); } bucket.Waiters.pop(); @@ -495,6 +495,7 @@ void TMediatorTimecastProxy::Handle(TEvTxProxy::TEvSubscribeReadStepResult::TPtr new TEvMediatorTimecast::TEvSubscribeReadStepResult( coordinatorId, lastReadStep, + nextReadStep, coordinator.ReadStep), 0, cookie); } diff --git a/ydb/core/tx/time_cast/time_cast.h b/ydb/core/tx/time_cast/time_cast.h index 9771a462c6e..0765eae9c4c 100644 --- a/ydb/core/tx/time_cast/time_cast.h +++ b/ydb/core/tx/time_cast/time_cast.h @@ -199,14 +199,17 @@ struct TEvMediatorTimecast { struct TEvSubscribeReadStepResult : public TEventLocal<TEvSubscribeReadStepResult, EvSubscribeReadStepResult> { const ui64 CoordinatorId; const ui64 LastReadStep; + const ui64 NextReadStep; const TMediatorTimecastReadStep::TCPtr ReadStep; TEvSubscribeReadStepResult( ui64 coordinatorId, ui64 lastReadStep, + ui64 nextReadStep, TMediatorTimecastReadStep::TCPtr readStep) : CoordinatorId(coordinatorId) , LastReadStep(lastReadStep) + , NextReadStep(nextReadStep) , ReadStep(std::move(readStep)) { Y_VERIFY(ReadStep); @@ -217,6 +220,7 @@ struct TEvMediatorTimecast { << ToStringHeader() << "{" << " CoordinatorId# " << CoordinatorId << " LastReadStep# " << LastReadStep + << " NextReadStep# " << NextReadStep << " ReadStep# " << ReadStep->Get() << " }"; } |