diff options
author | snaury <snaury@ydb.tech> | 2023-10-12 13:37:34 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-10-12 14:21:00 +0300 |
commit | 4d2a150fadea8cf58a2d98bf37f1549f2e076772 (patch) | |
tree | 03246d60390bdbc388aa7c815f691ede45e401ed | |
parent | cb22bc788fb52ee6aee5ac0e397e9e2dc6bd670a (diff) | |
download | ydb-4d2a150fadea8cf58a2d98bf37f1549f2e076772.tar.gz |
Predict volatile plan steps based on neighbor readsets and either plan or cancel them when they are lost KIKIMR-16343
-rw-r--r-- | ydb/core/protos/counters_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__cleanup_tx.cpp | 51 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__plan_step.cpp | 43 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 14 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_locks.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 175 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_trans_queue.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_order.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_volatile.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 3 |
13 files changed, 323 insertions, 49 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index ea662bf4247..00b451ca6e4 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -462,4 +462,6 @@ enum ETxTypes { TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}]; TXTYPE_UPDATE_FOLLOWER_READ_EDGE = 78 [(TxTypeOpts) = {Name: "TxUpdateFollowerReadEdge"}]; TXTYPE_CDC_STREAM_EMIT_HEARTBEATS = 79 [(TxTypeOpts) = {Name: "TTxCdcStreamEmitHeartbeats"}]; + TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}]; + TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}]; } diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 6ef262ffeef..5197f9db13c 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -2267,6 +2267,15 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo break; } + if (Pipeline.HasPredictedPlan()) { + ui64 nextStep = Pipeline.NextPredictedPlanStep(); + if (nextStep <= mediatorStep) { + SchedulePlanPredictedTxs(); + } else { + WaitPredictedPlanStep(nextStep); + } + } + if (IsMvccEnabled()) { PromoteFollowerReadEdge(); } @@ -3173,6 +3182,24 @@ bool TDataShard::WaitPlanStep(ui64 step) { return false; } +void TDataShard::WaitPredictedPlanStep(ui64 step) { + if (!MediatorTimeCastEntry) { + return; + } + + if (step <= MediatorTimeCastEntry->Get(TabletID())) { + // This step is ready, schedule a transaction plan + SchedulePlanPredictedTxs(); + return; + } + + if (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin()) { + MediatorTimeCastWaitingSteps.insert(step); + Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step)); + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast"); + } +} + bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const { auto* msg = ev->Get(); diff --git a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp index 520e3d0868c..fb0fbbe793d 100644 --- a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp +++ b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp @@ -98,5 +98,56 @@ void TDataShard::Handle(TEvPrivate::TEvCleanupTransaction::TPtr&, const TActorCo ExecuteCleanupTx(ctx); } +class TDataShard::TTxCleanupVolatileTransaction : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxCleanupVolatileTransaction(TDataShard* self, ui64 txId) + : TTransactionBase(self) + , TxId(txId) + {} + + TTxType GetTxType() const override { return TXTYPE_CLEANUP_VOLATILE; } + + bool Execute(TTransactionContext&, const TActorContext& ctx) override { + if (!Self->IsStateActive()) { + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, + "Cleanup volatile tx at non-ready tablet " << Self->TabletID() << " state " << Self->State); + return true; + } + + if (Self->Pipeline.CleanupVolatile(TxId, ctx, Replies)) { + LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, + "Cleaned up volatile tx " << TxId << " at " << Self->TabletID() + << " TxInFly " << Self->TxInFly()); + Self->IncCounter(COUNTER_TX_PROGRESS_CLEANUP); + + if (!Replies.empty()) { + // We want to send confirmed replies when cleaning up volatile transactions + ReplyTs = Self->ConfirmReadOnlyLease(); + } + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + if (ReplyTs) { + Self->SendConfirmedReplies(ReplyTs, std::move(Replies)); + } else { + Self->SendCommittedReplies(std::move(Replies)); + } + Self->CheckSplitCanStart(ctx); + Self->CheckMvccStateChangeCanStart(ctx); + } + +private: + const ui64 TxId; + std::vector<std::unique_ptr<IEventHandle>> Replies; + TMonotonic ReplyTs; +}; + +void TDataShard::ExecuteCleanupVolatileTx(ui64 txId, const TActorContext& ctx) { + Execute(new TTxCleanupVolatileTransaction(this, txId), ctx); +} + } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/datashard__plan_step.cpp b/ydb/core/tx/datashard/datashard__plan_step.cpp index 88c39ad16b0..40c8887e05b 100644 --- a/ydb/core/tx/datashard/datashard__plan_step.cpp +++ b/ydb/core/tx/datashard/datashard__plan_step.cpp @@ -92,4 +92,47 @@ void TDataShard::TTxPlanStep::Complete(const TActorContext &ctx) { } } +class TDataShard::TTxPlanPredictedTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> { +public: + TTxPlanPredictedTxs(TDataShard* self) + : TTransactionBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_PLAN_PREDICTED_TXS; } + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + Self->ScheduledPlanPredictedTxs = false; + + ui64 step = Self->MediatorTimeCastEntry->Get(Self->TabletID()); + bool planned = Self->Pipeline.PlanPredictedTxs(step, txc, ctx); + + if (Self->Pipeline.HasPredictedPlan()) { + ui64 nextStep = Self->Pipeline.NextPredictedPlanStep(); + Y_ABORT_UNLESS(step < nextStep); + Self->WaitPredictedPlanStep(nextStep); + } + + if (planned) { + Self->PlanQueue.Progress(ctx); + } + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } +}; + +void TDataShard::Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr&, const TActorContext& ctx) { + Y_ABORT_UNLESS(ScheduledPlanPredictedTxs); + Execute(new TTxPlanPredictedTxs(this), ctx); +} + +void TDataShard::SchedulePlanPredictedTxs() { + if (!ScheduledPlanPredictedTxs) { + ScheduledPlanPredictedTxs = true; + Send(SelfId(), new TEvPrivate::TEvPlanPredictedTxs()); + } +} + }} diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 94b137bcccc..5895e33597d 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -166,9 +166,11 @@ class TDataShard class TTxInitSchema; class TTxInitSchemaDefaults; class TTxPlanStep; + class TTxPlanPredictedTxs; class TTxProgressResendRS; class TTxProgressTransaction; class TTxCleanupTransaction; + class TTxCleanupVolatileTransaction; class TTxProposeDataTransaction; class TTxProposeSchemeTransaction; class TTxCancelTransactionProposal; @@ -351,6 +353,7 @@ class TDataShard EvChangeExchangeExecuteHandshakes, EvConfirmReadonlyLease, EvReadonlyLeaseConfirmation, + EvPlanPredictedTxs, EvEnd }; @@ -544,6 +547,8 @@ class TDataShard }; struct TEvReadonlyLeaseConfirmation: public TEventLocal<TEvReadonlyLeaseConfirmation, EvReadonlyLeaseConfirmation> {}; + + struct TEvPlanPredictedTxs : public TEventLocal<TEvPlanPredictedTxs, EvPlanPredictedTxs> {}; }; struct Schema : NIceDb::Schema { @@ -1320,6 +1325,8 @@ class TDataShard void Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1937,6 +1944,7 @@ public: // Executes TTxCleanupTransaction void ExecuteCleanupTx(const TActorContext& ctx); + void ExecuteCleanupVolatileTx(ui64 txId, const TActorContext& ctx); void StopFindSubDomainPathId(); void StartFindSubDomainPathId(bool delayFirstRequest = true); @@ -1947,6 +1955,9 @@ public: bool WaitPlanStep(ui64 step); bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const; + void WaitPredictedPlanStep(ui64 step); + void SchedulePlanPredictedTxs(); + bool CheckChangesQueueOverflow() const; void CheckChangesQueueNoOverflow(); @@ -2769,6 +2780,8 @@ private: bool UpdateFollowerReadEdgePending = false; + bool ScheduledPlanPredictedTxs = false; + public: auto& GetLockChangeRecords() { return LockChangeRecords; @@ -2940,6 +2953,7 @@ protected: HFunc(TEvDataShard::TEvGetOpenTxs, Handle); HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle); + HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { ALOG_WARN(NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp index dec59d87d99..a7e5aecbaa5 100644 --- a/ydb/core/tx/datashard/datashard_locks.cpp +++ b/ydb/core/tx/datashard/datashard_locks.cpp @@ -17,6 +17,10 @@ namespace NDataShard { struct TLockLoggerContext { TLockLoggerContext() = default; + TInstant Timestamp() const { + return TInstant::Now(); + } + NLog::TSettings* LoggerSettings() const { return TlsActivationContext ? TlsActivationContext->LoggerSettings() : nullptr; } diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 94ba6006fb9..9f819269afa 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -664,15 +664,20 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs, active = false; } if (op) { + if (!op->GetStep() && !op->GetPredictedStep() && !active) { + op->SetPredictedStep(step); + AddPredictedPlan(step, txId, ctx); + } // If input read sets are not loaded yet then // it will be added at load. if (op->HasLoadedInRSFlag()) { op->AddInReadSet(rs.Record); + } else { + op->AddDelayedInReadSet(rs.Record); } if (ack) { op->AddDelayedAck(THolder(ack.Release())); } - op->AddDelayedInReadSet(rs.Record); if (active) { AddCandidateOp(op); @@ -783,9 +788,11 @@ bool TPipeline::LoadInReadSets(TOperation::TPtr op, // Add read sets not stored to DB. for (auto &rs : op->DelayedInReadSets()) { - if (! loadedReadSets.contains(TReadSetKey(rs))) - op->AddInReadSet(rs); + if (!loadedReadSets.contains(TReadSetKey(rs))) { + op->AddInReadSet(std::move(rs)); + } } + op->DelayedInReadSets().clear(); LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Remain " << op->GetRemainReadSets() << " read sets for " << *op @@ -856,47 +863,34 @@ bool TPipeline::PlanTxs(ui64 step, if (step <= LastPlannedTx.Step) return false; - ui64 lastTxId = txIds.empty()? 0: txIds.back(); + auto it = PredictedPlan.begin(); + while (it != PredictedPlan.end() && it->Step < step) { + PlanTxImpl(it->Step, it->TxId, txc, ctx); + PredictedPlan.erase(it++); + } - NIceDb::TNiceDb db(txc.DB); - SaveLastPlannedTx(db, TStepOrder(step, lastTxId)); + ui64 lastTxId = 0; for (ui64 txId : txIds) { - if (SchemaTx && SchemaTx->TxId == txId && SchemaTx->MinStep > step) { - TString explain = TStringBuilder() << "Scheme transaction has come too early" - << ", only after particular step this shema tx is allowed" - << ", txId: " << txId - << ", expected min step: " << SchemaTx->MinStep - << ", actual step: " << step; - Y_VERIFY_DEBUG_S(SchemaTx->MinStep <= step, explain); - LOG_ALERT_S(ctx, NKikimrServices::TX_DATASHARD, explain); - } - - auto op = Self->TransQueue.FindTxInFly(txId); - if (!op) { - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, - "Ignoring PlanStep " << step << " for unknown txId " - << txId << " at tablet " << Self->TabletID()); - continue; + while (it != PredictedPlan.end() && it->Step == step && it->TxId < txId) { + PlanTxImpl(step, it->TxId, txc, ctx); + PredictedPlan.erase(it++); } - - if (op->GetStep() && op->GetStep() != step) { - LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, - "Ignoring PlanStep " << step << " for txId " << txId - << " which already has PlanStep " << op->GetStep() - << " at tablet " << Self->TabletID()); - continue; + if (it != PredictedPlan.end() && it->Step == step && it->TxId == txId) { + PredictedPlan.erase(it++); } + PlanTxImpl(step, txId, txc, ctx); + lastTxId = txId; + } - Self->TransQueue.PlanTx(op, step, db); - // Execute WaitForPlan unit here to correctly compute operation - // profile. Otherwise profile might count time spent in plan - // queue as 'wait for plan' time. - if (op->GetCurrentUnit() == EExecutionUnitKind::WaitForPlan) { - auto status = RunExecutionUnit(op, txc, ctx); - Y_ABORT_UNLESS(status == EExecutionStatus::Executed); - } + while (it != PredictedPlan.end() && it->Step == step) { + PlanTxImpl(step, it->TxId, txc, ctx); + lastTxId = it->TxId; + PredictedPlan.erase(it++); } + NIceDb::TNiceDb db(txc.DB); + SaveLastPlannedTx(db, TStepOrder(step, lastTxId)); + AddCandidateUnit(EExecutionUnitKind::PlanQueue); MaybeActivateWaitingSchemeOps(ctx); ActivateWaitingTxOps(ctx); @@ -904,6 +898,90 @@ bool TPipeline::PlanTxs(ui64 step, return true; } +bool TPipeline::PlanPredictedTxs(ui64 step, TTransactionContext &txc, const TActorContext &ctx) { + if (step <= LastPlannedTx.Step) { + return false; + } + + ui64 lastStep = 0; + ui64 lastTxId = 0; + size_t planned = 0; + + auto it = PredictedPlan.begin(); + while (it != PredictedPlan.end() && it->Step <= step) { + PlanTxImpl(it->Step, it->TxId, txc, ctx); + lastStep = it->Step; + lastTxId = it->TxId; + PredictedPlan.erase(it++); + ++planned; + } + + if (planned == 0) { + return false; + } + + NIceDb::TNiceDb db(txc.DB); + SaveLastPlannedTx(db, TStepOrder(lastStep, lastTxId)); + + AddCandidateUnit(EExecutionUnitKind::PlanQueue); + MaybeActivateWaitingSchemeOps(ctx); + ActivateWaitingTxOps(ctx); + return true; +} + +void TPipeline::PlanTxImpl(ui64 step, ui64 txId, TTransactionContext &txc, const TActorContext &ctx) +{ + if (SchemaTx && SchemaTx->TxId == txId && SchemaTx->MinStep > step) { + TString explain = TStringBuilder() << "Scheme transaction has come too early" + << ", only after particular step this shema tx is allowed" + << ", txId: " << txId + << ", expected min step: " << SchemaTx->MinStep + << ", actual step: " << step; + Y_VERIFY_DEBUG_S(SchemaTx->MinStep <= step, explain); + LOG_ALERT_S(ctx, NKikimrServices::TX_DATASHARD, explain); + } + + auto op = Self->TransQueue.FindTxInFly(txId); + if (!op) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, + "Ignoring PlanStep " << step << " for unknown txId " + << txId << " at tablet " << Self->TabletID()); + return; + } + + if (op->GetStep() && op->GetStep() != step) { + LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, + "Ignoring PlanStep " << step << " for txId " << txId + << " which already has PlanStep " << op->GetStep() + << " at tablet " << Self->TabletID()); + return; + } + + NIceDb::TNiceDb db(txc.DB); + Self->TransQueue.PlanTx(op, step, db); + + // Execute WaitForPlan unit here to correctly compute operation + // profile. Otherwise profile might count time spent in plan + // queue as 'wait for plan' time. + if (op->GetCurrentUnit() == EExecutionUnitKind::WaitForPlan) { + auto status = RunExecutionUnit(op, txc, ctx); + Y_ABORT_UNLESS(status == EExecutionStatus::Executed); + } +} + +void TPipeline::AddPredictedPlan(ui64 step, ui64 txId, const TActorContext &ctx) +{ + if (step <= LastPlannedTx.Step) { + // Trying to add a predicted step to transaction that is in the past + // We cannot plan for the past, so we need to clean it up as soon as possible + Self->ExecuteCleanupVolatileTx(txId, ctx); + return; + } + + PredictedPlan.emplace(step, txId); + Self->WaitPredictedPlanStep(step); +} + void TPipeline::MarkOpAsUsingSnapshot(TOperation::TPtr op) { UnblockNormalDependencies(op); @@ -1128,6 +1206,29 @@ ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorConte return status; } +bool TPipeline::CleanupVolatile(ui64 txId, const TActorContext& ctx, + std::vector<std::unique_ptr<IEventHandle>>& replies) +{ + if (Self->TransQueue.CleanupVolatile(txId, replies)) { + ForgetTx(txId); + + Self->CheckDelayedProposeQueue(ctx); + + // Outdated op removal might enable scheme op proposal. + MaybeActivateWaitingSchemeOps(ctx); + + // Outdated op removal might enable another op execution. + // N.B.: actually this happens only for DROP which waits + // for TxInFly having only the DROP. + // N.B.: compatibility with older datashard versions. + AddCandidateUnit(EExecutionUnitKind::PlanQueue); + Self->PlanQueue.Progress(ctx); + return true; + } + + return false; +} + ui64 TPipeline::PlannedTxInFly() const { return Self->TransQueue.TxInFly(); diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index ac0e2e0fb5e..85cf95c6b84 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -127,7 +127,13 @@ public: void CompleteSchemaTx(NIceDb::TNiceDb& db, ui64 txId); void MarkOpAsUsingSnapshot(TOperation::TPtr op); + bool HasPredictedPlan() const { return !PredictedPlan.empty(); } + ui64 NextPredictedPlanStep() const { return PredictedPlan.begin()->Step; } + bool PlanTxs(ui64 step, TVector<ui64> &txIds, TTransactionContext &txc, const TActorContext &ctx); + bool PlanPredictedTxs(ui64 step, TTransactionContext &txc, const TActorContext &ctx); + void PlanTxImpl(ui64 step, ui64 txId, TTransactionContext &txc, const TActorContext &ctx); + void AddPredictedPlan(ui64 step, ui64 txId, const TActorContext &ctx); void PreserveSchema(NIceDb::TNiceDb& db, ui64 step); TDuration CleanupTimeout() const; ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx, @@ -196,6 +202,8 @@ public: std::vector<std::unique_ptr<IEventHandle>>& replies); ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep, std::vector<std::unique_ptr<IEventHandle>>& replies); + bool CleanupVolatile(ui64 txId, const TActorContext& ctx, + std::vector<std::unique_ptr<IEventHandle>>& replies); ui64 PlannedTxInFly() const; const TSet<TStepOrder> &GetPlan() const; bool HasProposeDelayers() const; @@ -492,6 +500,7 @@ private: // Slow operation profiles. TList<TStoredExecutionProfile> SlowOpProfiles; TMap<ui64, ui32> ActiveStreamingTxs; + TSet<TStepOrder> PredictedPlan; typedef TList<TOperation::TPtr> TWaitingSchemeOpsOrder; typedef THashMap<TOperation::TPtr, TWaitingSchemeOpsOrder::iterator> TWaitingSchemeOps; diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp index 6eb12e379ec..5a41f0c1bec 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.cpp +++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp @@ -9,14 +9,14 @@ const TSet<TStepOrder> TTransQueue::EMPTY_PLAN; void TTransQueue::AddTxInFly(TOperation::TPtr op) { const ui64 txId = op->GetTxId(); - const ui64 maxStep = op->GetMaxStep(); Y_VERIFY_S(!TxsInFly.contains(txId), "Adding duplicate txId " << txId); TxsInFly[txId] = op; if (Y_LIKELY(!op->GetStep())) { ++PlanWaitingTxCount; - } - if (maxStep != Max<ui64>()) { - DeadlineQueue.emplace(std::make_pair(maxStep, txId)); + const ui64 maxStep = op->GetMaxStep(); + if (maxStep != Max<ui64>()) { + DeadlineQueue.emplace(std::make_pair(maxStep, txId)); + } } Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size()); } @@ -28,6 +28,10 @@ void TTransQueue::RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHan Self->GetCleanupReplies(it->second, *cleanupReplies); } if (!it->second->GetStep()) { + const ui64 maxStep = it->second->GetMaxStep(); + if (maxStep != Max<ui64>()) { + DeadlineQueue.erase(std::make_pair(maxStep, txId)); + } --PlanWaitingTxCount; } TxsInFly.erase(it); @@ -494,6 +498,21 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt return ECleanupStatus::Success; } +bool TTransQueue::CleanupVolatile(ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies) { + auto it = TxsInFly.find(txId); + if (it != TxsInFly.end() && it->second->HasVolatilePrepareFlag() && !it->second->GetStep()) { + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Cleaning up volatile tx " << txId << " ahead of time"); + + RemoveTxInFly(txId, &replies); + + Self->IncCounter(COUNTER_TX_PROGRESS_OUTDATED, 1); + return true; + } + + return false; +} + void TTransQueue::PlanTx(TOperation::TPtr op, ui64 step, NIceDb::TNiceDb &db) diff --git a/ydb/core/tx/datashard/datashard_trans_queue.h b/ydb/core/tx/datashard/datashard_trans_queue.h index a76014b030d..3295285ba48 100644 --- a/ydb/core/tx/datashard/datashard_trans_queue.h +++ b/ydb/core/tx/datashard/datashard_trans_queue.h @@ -88,6 +88,7 @@ private: // for pipeline only bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies); ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize, TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies); + bool CleanupVolatile(ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies); // Plan diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index ffc712ebae2..1f35f29a5a2 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -4090,7 +4090,8 @@ Y_UNIT_TEST(TestSnapshotReadAfterBrokenLock) { auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( SELECT * FROM `/Root/table-1` WHERE key = 1 UNION ALL - SELECT * FROM `/Root/table-2` WHERE key = 2)")); + SELECT * FROM `/Root/table-2` WHERE key = 2 + ORDER BY key)")); UNIT_ASSERT_VALUES_EQUAL( result, "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " @@ -4169,7 +4170,8 @@ Y_UNIT_TEST(TestSnapshotReadAfterBrokenLockOutOfOrder) { auto result = KqpSimpleBegin(runtime, sessionIdBlocker, txIdBlocker, Q_(R"( SELECT * FROM `/Root/table-1` UNION ALL - SELECT * FROM `/Root/table-2`)")); + SELECT * FROM `/Root/table-2` + ORDER BY key)")); UNIT_ASSERT_VALUES_EQUAL( result, "{ items { uint32_value: 1 } items { uint32_value: 1 } }, " diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 4bd7ee2a070..cb176859757 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1853,7 +1853,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { // It would be as if shard missed them for some reason for (auto& pr : acks) { auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end()); - runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true); + runtime.Send(new IEventHandle(pr.first, recipient, ack), 0, true); } auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step); runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true); @@ -1896,8 +1896,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { WaitTxNotification(server, sender, txId); auto dropLatency = runtime.GetCurrentTime() - dropStartTs; Cerr << "... drop finished in " << dropLatency << Endl; - // TODO: we need to use neighbor readset hints to cancel earlier - // UNIT_ASSERT(dropLatency < TDuration::Seconds(5)); + UNIT_ASSERT(dropLatency < TDuration::Seconds(5)); } Y_UNIT_TEST(DistributedWriteLostPlanThenSplit) { @@ -1947,7 +1946,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { // It would be as if shard missed them for some reason for (auto& pr : acks) { auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end()); - runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true); + runtime.Send(new IEventHandle(pr.first, recipient, ack), 0, true); } auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step); runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true); @@ -1990,8 +1989,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { WaitTxNotification(server, sender, txId); auto splitLatency = runtime.GetCurrentTime() - splitStartTs; Cerr << "... split finished in " << splitLatency << Endl; - // TODO: we need to use neighbor readset hints to cancel earlier - // UNIT_ASSERT(splitLatency < TDuration::Seconds(5)); + UNIT_ASSERT(splitLatency < TDuration::Seconds(5)); } } // Y_UNIT_TEST_SUITE(DataShardVolatile) diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 4b11d83907f..40870d8c0b9 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -371,6 +371,8 @@ public: ui64 GetStep() const { return Step; } void SetStep(ui64 step) { Step = step; } + ui64 GetPredictedStep() const { return PredictedStep; } + void SetPredictedStep(ui64 step) { PredictedStep = step; } TStepOrder GetStepOrder() const { return TStepOrder(GetStep(), GetTxId()); } @@ -415,6 +417,7 @@ protected: ui64 Flags; ui64 GlobalTxId; ui64 Step; + ui64 PredictedStep = 0; TInstant ReceivedAt; ui64 MinStep; |