aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-10-12 13:37:34 +0300
committersnaury <snaury@ydb.tech>2023-10-12 14:21:00 +0300
commit4d2a150fadea8cf58a2d98bf37f1549f2e076772 (patch)
tree03246d60390bdbc388aa7c815f691ede45e401ed
parentcb22bc788fb52ee6aee5ac0e397e9e2dc6bd670a (diff)
downloadydb-4d2a150fadea8cf58a2d98bf37f1549f2e076772.tar.gz
Predict volatile plan steps based on neighbor readsets and either plan or cancel them when they are lost KIKIMR-16343
-rw-r--r--ydb/core/protos/counters_datashard.proto2
-rw-r--r--ydb/core/tx/datashard/datashard.cpp27
-rw-r--r--ydb/core/tx/datashard/datashard__cleanup_tx.cpp51
-rw-r--r--ydb/core/tx/datashard/datashard__plan_step.cpp43
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h14
-rw-r--r--ydb/core/tx/datashard/datashard_locks.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp175
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h9
-rw-r--r--ydb/core/tx/datashard/datashard_trans_queue.cpp27
-rw-r--r--ydb/core/tx/datashard/datashard_trans_queue.h1
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp10
-rw-r--r--ydb/core/tx/datashard/operation.h3
13 files changed, 323 insertions, 49 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index ea662bf4247..00b451ca6e4 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -462,4 +462,6 @@ enum ETxTypes {
TXTYPE_FIND_WRITE_CONFLICTS = 77 [(TxTypeOpts) = {Name: "TTxFindWriteConflicts"}];
TXTYPE_UPDATE_FOLLOWER_READ_EDGE = 78 [(TxTypeOpts) = {Name: "TxUpdateFollowerReadEdge"}];
TXTYPE_CDC_STREAM_EMIT_HEARTBEATS = 79 [(TxTypeOpts) = {Name: "TTxCdcStreamEmitHeartbeats"}];
+ TXTYPE_CLEANUP_VOLATILE = 80 [(TxTypeOpts) = {Name: "TxCleanupVolatile"}];
+ TXTYPE_PLAN_PREDICTED_TXS = 81 [(TxTypeOpts) = {Name: "TxPlanPredictedTxs"}];
}
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 6ef262ffeef..5197f9db13c 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2267,6 +2267,15 @@ void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorCo
break;
}
+ if (Pipeline.HasPredictedPlan()) {
+ ui64 nextStep = Pipeline.NextPredictedPlanStep();
+ if (nextStep <= mediatorStep) {
+ SchedulePlanPredictedTxs();
+ } else {
+ WaitPredictedPlanStep(nextStep);
+ }
+ }
+
if (IsMvccEnabled()) {
PromoteFollowerReadEdge();
}
@@ -3173,6 +3182,24 @@ bool TDataShard::WaitPlanStep(ui64 step) {
return false;
}
+void TDataShard::WaitPredictedPlanStep(ui64 step) {
+ if (!MediatorTimeCastEntry) {
+ return;
+ }
+
+ if (step <= MediatorTimeCastEntry->Get(TabletID())) {
+ // This step is ready, schedule a transaction plan
+ SchedulePlanPredictedTxs();
+ return;
+ }
+
+ if (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin()) {
+ MediatorTimeCastWaitingSteps.insert(step);
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
+ }
+}
+
bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
auto* msg = ev->Get();
diff --git a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
index 520e3d0868c..fb0fbbe793d 100644
--- a/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
+++ b/ydb/core/tx/datashard/datashard__cleanup_tx.cpp
@@ -98,5 +98,56 @@ void TDataShard::Handle(TEvPrivate::TEvCleanupTransaction::TPtr&, const TActorCo
ExecuteCleanupTx(ctx);
}
+class TDataShard::TTxCleanupVolatileTransaction : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxCleanupVolatileTransaction(TDataShard* self, ui64 txId)
+ : TTransactionBase(self)
+ , TxId(txId)
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_CLEANUP_VOLATILE; }
+
+ bool Execute(TTransactionContext&, const TActorContext& ctx) override {
+ if (!Self->IsStateActive()) {
+ LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Cleanup volatile tx at non-ready tablet " << Self->TabletID() << " state " << Self->State);
+ return true;
+ }
+
+ if (Self->Pipeline.CleanupVolatile(TxId, ctx, Replies)) {
+ LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Cleaned up volatile tx " << TxId << " at " << Self->TabletID()
+ << " TxInFly " << Self->TxInFly());
+ Self->IncCounter(COUNTER_TX_PROGRESS_CLEANUP);
+
+ if (!Replies.empty()) {
+ // We want to send confirmed replies when cleaning up volatile transactions
+ ReplyTs = Self->ConfirmReadOnlyLease();
+ }
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ if (ReplyTs) {
+ Self->SendConfirmedReplies(ReplyTs, std::move(Replies));
+ } else {
+ Self->SendCommittedReplies(std::move(Replies));
+ }
+ Self->CheckSplitCanStart(ctx);
+ Self->CheckMvccStateChangeCanStart(ctx);
+ }
+
+private:
+ const ui64 TxId;
+ std::vector<std::unique_ptr<IEventHandle>> Replies;
+ TMonotonic ReplyTs;
+};
+
+void TDataShard::ExecuteCleanupVolatileTx(ui64 txId, const TActorContext& ctx) {
+ Execute(new TTxCleanupVolatileTransaction(this, txId), ctx);
+}
+
} // namespace NDataShard
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/datashard__plan_step.cpp b/ydb/core/tx/datashard/datashard__plan_step.cpp
index 88c39ad16b0..40c8887e05b 100644
--- a/ydb/core/tx/datashard/datashard__plan_step.cpp
+++ b/ydb/core/tx/datashard/datashard__plan_step.cpp
@@ -92,4 +92,47 @@ void TDataShard::TTxPlanStep::Complete(const TActorContext &ctx) {
}
}
+class TDataShard::TTxPlanPredictedTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
+public:
+ TTxPlanPredictedTxs(TDataShard* self)
+ : TTransactionBase(self)
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_PLAN_PREDICTED_TXS; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ Self->ScheduledPlanPredictedTxs = false;
+
+ ui64 step = Self->MediatorTimeCastEntry->Get(Self->TabletID());
+ bool planned = Self->Pipeline.PlanPredictedTxs(step, txc, ctx);
+
+ if (Self->Pipeline.HasPredictedPlan()) {
+ ui64 nextStep = Self->Pipeline.NextPredictedPlanStep();
+ Y_ABORT_UNLESS(step < nextStep);
+ Self->WaitPredictedPlanStep(nextStep);
+ }
+
+ if (planned) {
+ Self->PlanQueue.Progress(ctx);
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+};
+
+void TDataShard::Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr&, const TActorContext& ctx) {
+ Y_ABORT_UNLESS(ScheduledPlanPredictedTxs);
+ Execute(new TTxPlanPredictedTxs(this), ctx);
+}
+
+void TDataShard::SchedulePlanPredictedTxs() {
+ if (!ScheduledPlanPredictedTxs) {
+ ScheduledPlanPredictedTxs = true;
+ Send(SelfId(), new TEvPrivate::TEvPlanPredictedTxs());
+ }
+}
+
}}
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 94b137bcccc..5895e33597d 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -166,9 +166,11 @@ class TDataShard
class TTxInitSchema;
class TTxInitSchemaDefaults;
class TTxPlanStep;
+ class TTxPlanPredictedTxs;
class TTxProgressResendRS;
class TTxProgressTransaction;
class TTxCleanupTransaction;
+ class TTxCleanupVolatileTransaction;
class TTxProposeDataTransaction;
class TTxProposeSchemeTransaction;
class TTxCancelTransactionProposal;
@@ -351,6 +353,7 @@ class TDataShard
EvChangeExchangeExecuteHandshakes,
EvConfirmReadonlyLease,
EvReadonlyLeaseConfirmation,
+ EvPlanPredictedTxs,
EvEnd
};
@@ -544,6 +547,8 @@ class TDataShard
};
struct TEvReadonlyLeaseConfirmation: public TEventLocal<TEvReadonlyLeaseConfirmation, EvReadonlyLeaseConfirmation> {};
+
+ struct TEvPlanPredictedTxs : public TEventLocal<TEvPlanPredictedTxs, EvPlanPredictedTxs> {};
};
struct Schema : NIceDb::Schema {
@@ -1320,6 +1325,8 @@ class TDataShard
void Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvPlanPredictedTxs::TPtr& ev, const TActorContext& ctx);
+
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
void DoPeriodicTasks(const TActorContext &ctx);
@@ -1937,6 +1944,7 @@ public:
// Executes TTxCleanupTransaction
void ExecuteCleanupTx(const TActorContext& ctx);
+ void ExecuteCleanupVolatileTx(ui64 txId, const TActorContext& ctx);
void StopFindSubDomainPathId();
void StartFindSubDomainPathId(bool delayFirstRequest = true);
@@ -1947,6 +1955,9 @@ public:
bool WaitPlanStep(ui64 step);
bool CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const;
+ void WaitPredictedPlanStep(ui64 step);
+ void SchedulePlanPredictedTxs();
+
bool CheckChangesQueueOverflow() const;
void CheckChangesQueueNoOverflow();
@@ -2769,6 +2780,8 @@ private:
bool UpdateFollowerReadEdgePending = false;
+ bool ScheduledPlanPredictedTxs = false;
+
public:
auto& GetLockChangeRecords() {
return LockChangeRecords;
@@ -2940,6 +2953,7 @@ protected:
HFunc(TEvDataShard::TEvGetOpenTxs, Handle);
HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
HFunc(TEvPrivate::TEvConfirmReadonlyLease, Handle);
+ HFunc(TEvPrivate::TEvPlanPredictedTxs, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
ALOG_WARN(NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/datashard_locks.cpp b/ydb/core/tx/datashard/datashard_locks.cpp
index dec59d87d99..a7e5aecbaa5 100644
--- a/ydb/core/tx/datashard/datashard_locks.cpp
+++ b/ydb/core/tx/datashard/datashard_locks.cpp
@@ -17,6 +17,10 @@ namespace NDataShard {
struct TLockLoggerContext {
TLockLoggerContext() = default;
+ TInstant Timestamp() const {
+ return TInstant::Now();
+ }
+
NLog::TSettings* LoggerSettings() const {
return TlsActivationContext ? TlsActivationContext->LoggerSettings() : nullptr;
}
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 94ba6006fb9..9f819269afa 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -664,15 +664,20 @@ bool TPipeline::SaveInReadSet(const TEvTxProcessing::TEvReadSet &rs,
active = false;
}
if (op) {
+ if (!op->GetStep() && !op->GetPredictedStep() && !active) {
+ op->SetPredictedStep(step);
+ AddPredictedPlan(step, txId, ctx);
+ }
// If input read sets are not loaded yet then
// it will be added at load.
if (op->HasLoadedInRSFlag()) {
op->AddInReadSet(rs.Record);
+ } else {
+ op->AddDelayedInReadSet(rs.Record);
}
if (ack) {
op->AddDelayedAck(THolder(ack.Release()));
}
- op->AddDelayedInReadSet(rs.Record);
if (active) {
AddCandidateOp(op);
@@ -783,9 +788,11 @@ bool TPipeline::LoadInReadSets(TOperation::TPtr op,
// Add read sets not stored to DB.
for (auto &rs : op->DelayedInReadSets()) {
- if (! loadedReadSets.contains(TReadSetKey(rs)))
- op->AddInReadSet(rs);
+ if (!loadedReadSets.contains(TReadSetKey(rs))) {
+ op->AddInReadSet(std::move(rs));
+ }
}
+ op->DelayedInReadSets().clear();
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Remain " << op->GetRemainReadSets() << " read sets for " << *op
@@ -856,47 +863,34 @@ bool TPipeline::PlanTxs(ui64 step,
if (step <= LastPlannedTx.Step)
return false;
- ui64 lastTxId = txIds.empty()? 0: txIds.back();
+ auto it = PredictedPlan.begin();
+ while (it != PredictedPlan.end() && it->Step < step) {
+ PlanTxImpl(it->Step, it->TxId, txc, ctx);
+ PredictedPlan.erase(it++);
+ }
- NIceDb::TNiceDb db(txc.DB);
- SaveLastPlannedTx(db, TStepOrder(step, lastTxId));
+ ui64 lastTxId = 0;
for (ui64 txId : txIds) {
- if (SchemaTx && SchemaTx->TxId == txId && SchemaTx->MinStep > step) {
- TString explain = TStringBuilder() << "Scheme transaction has come too early"
- << ", only after particular step this shema tx is allowed"
- << ", txId: " << txId
- << ", expected min step: " << SchemaTx->MinStep
- << ", actual step: " << step;
- Y_VERIFY_DEBUG_S(SchemaTx->MinStep <= step, explain);
- LOG_ALERT_S(ctx, NKikimrServices::TX_DATASHARD, explain);
- }
-
- auto op = Self->TransQueue.FindTxInFly(txId);
- if (!op) {
- LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
- "Ignoring PlanStep " << step << " for unknown txId "
- << txId << " at tablet " << Self->TabletID());
- continue;
+ while (it != PredictedPlan.end() && it->Step == step && it->TxId < txId) {
+ PlanTxImpl(step, it->TxId, txc, ctx);
+ PredictedPlan.erase(it++);
}
-
- if (op->GetStep() && op->GetStep() != step) {
- LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
- "Ignoring PlanStep " << step << " for txId " << txId
- << " which already has PlanStep " << op->GetStep()
- << " at tablet " << Self->TabletID());
- continue;
+ if (it != PredictedPlan.end() && it->Step == step && it->TxId == txId) {
+ PredictedPlan.erase(it++);
}
+ PlanTxImpl(step, txId, txc, ctx);
+ lastTxId = txId;
+ }
- Self->TransQueue.PlanTx(op, step, db);
- // Execute WaitForPlan unit here to correctly compute operation
- // profile. Otherwise profile might count time spent in plan
- // queue as 'wait for plan' time.
- if (op->GetCurrentUnit() == EExecutionUnitKind::WaitForPlan) {
- auto status = RunExecutionUnit(op, txc, ctx);
- Y_ABORT_UNLESS(status == EExecutionStatus::Executed);
- }
+ while (it != PredictedPlan.end() && it->Step == step) {
+ PlanTxImpl(step, it->TxId, txc, ctx);
+ lastTxId = it->TxId;
+ PredictedPlan.erase(it++);
}
+ NIceDb::TNiceDb db(txc.DB);
+ SaveLastPlannedTx(db, TStepOrder(step, lastTxId));
+
AddCandidateUnit(EExecutionUnitKind::PlanQueue);
MaybeActivateWaitingSchemeOps(ctx);
ActivateWaitingTxOps(ctx);
@@ -904,6 +898,90 @@ bool TPipeline::PlanTxs(ui64 step,
return true;
}
+bool TPipeline::PlanPredictedTxs(ui64 step, TTransactionContext &txc, const TActorContext &ctx) {
+ if (step <= LastPlannedTx.Step) {
+ return false;
+ }
+
+ ui64 lastStep = 0;
+ ui64 lastTxId = 0;
+ size_t planned = 0;
+
+ auto it = PredictedPlan.begin();
+ while (it != PredictedPlan.end() && it->Step <= step) {
+ PlanTxImpl(it->Step, it->TxId, txc, ctx);
+ lastStep = it->Step;
+ lastTxId = it->TxId;
+ PredictedPlan.erase(it++);
+ ++planned;
+ }
+
+ if (planned == 0) {
+ return false;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ SaveLastPlannedTx(db, TStepOrder(lastStep, lastTxId));
+
+ AddCandidateUnit(EExecutionUnitKind::PlanQueue);
+ MaybeActivateWaitingSchemeOps(ctx);
+ ActivateWaitingTxOps(ctx);
+ return true;
+}
+
+void TPipeline::PlanTxImpl(ui64 step, ui64 txId, TTransactionContext &txc, const TActorContext &ctx)
+{
+ if (SchemaTx && SchemaTx->TxId == txId && SchemaTx->MinStep > step) {
+ TString explain = TStringBuilder() << "Scheme transaction has come too early"
+ << ", only after particular step this shema tx is allowed"
+ << ", txId: " << txId
+ << ", expected min step: " << SchemaTx->MinStep
+ << ", actual step: " << step;
+ Y_VERIFY_DEBUG_S(SchemaTx->MinStep <= step, explain);
+ LOG_ALERT_S(ctx, NKikimrServices::TX_DATASHARD, explain);
+ }
+
+ auto op = Self->TransQueue.FindTxInFly(txId);
+ if (!op) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Ignoring PlanStep " << step << " for unknown txId "
+ << txId << " at tablet " << Self->TabletID());
+ return;
+ }
+
+ if (op->GetStep() && op->GetStep() != step) {
+ LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
+ "Ignoring PlanStep " << step << " for txId " << txId
+ << " which already has PlanStep " << op->GetStep()
+ << " at tablet " << Self->TabletID());
+ return;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->TransQueue.PlanTx(op, step, db);
+
+ // Execute WaitForPlan unit here to correctly compute operation
+ // profile. Otherwise profile might count time spent in plan
+ // queue as 'wait for plan' time.
+ if (op->GetCurrentUnit() == EExecutionUnitKind::WaitForPlan) {
+ auto status = RunExecutionUnit(op, txc, ctx);
+ Y_ABORT_UNLESS(status == EExecutionStatus::Executed);
+ }
+}
+
+void TPipeline::AddPredictedPlan(ui64 step, ui64 txId, const TActorContext &ctx)
+{
+ if (step <= LastPlannedTx.Step) {
+ // Trying to add a predicted step to transaction that is in the past
+ // We cannot plan for the past, so we need to clean it up as soon as possible
+ Self->ExecuteCleanupVolatileTx(txId, ctx);
+ return;
+ }
+
+ PredictedPlan.emplace(step, txId);
+ Self->WaitPredictedPlanStep(step);
+}
+
void TPipeline::MarkOpAsUsingSnapshot(TOperation::TPtr op)
{
UnblockNormalDependencies(op);
@@ -1128,6 +1206,29 @@ ECleanupStatus TPipeline::CleanupOutdated(NIceDb::TNiceDb& db, const TActorConte
return status;
}
+bool TPipeline::CleanupVolatile(ui64 txId, const TActorContext& ctx,
+ std::vector<std::unique_ptr<IEventHandle>>& replies)
+{
+ if (Self->TransQueue.CleanupVolatile(txId, replies)) {
+ ForgetTx(txId);
+
+ Self->CheckDelayedProposeQueue(ctx);
+
+ // Outdated op removal might enable scheme op proposal.
+ MaybeActivateWaitingSchemeOps(ctx);
+
+ // Outdated op removal might enable another op execution.
+ // N.B.: actually this happens only for DROP which waits
+ // for TxInFly having only the DROP.
+ // N.B.: compatibility with older datashard versions.
+ AddCandidateUnit(EExecutionUnitKind::PlanQueue);
+ Self->PlanQueue.Progress(ctx);
+ return true;
+ }
+
+ return false;
+}
+
ui64 TPipeline::PlannedTxInFly() const
{
return Self->TransQueue.TxInFly();
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index ac0e2e0fb5e..85cf95c6b84 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -127,7 +127,13 @@ public:
void CompleteSchemaTx(NIceDb::TNiceDb& db, ui64 txId);
void MarkOpAsUsingSnapshot(TOperation::TPtr op);
+ bool HasPredictedPlan() const { return !PredictedPlan.empty(); }
+ ui64 NextPredictedPlanStep() const { return PredictedPlan.begin()->Step; }
+
bool PlanTxs(ui64 step, TVector<ui64> &txIds, TTransactionContext &txc, const TActorContext &ctx);
+ bool PlanPredictedTxs(ui64 step, TTransactionContext &txc, const TActorContext &ctx);
+ void PlanTxImpl(ui64 step, ui64 txId, TTransactionContext &txc, const TActorContext &ctx);
+ void AddPredictedPlan(ui64 step, ui64 txId, const TActorContext &ctx);
void PreserveSchema(NIceDb::TNiceDb& db, ui64 step);
TDuration CleanupTimeout() const;
ECleanupStatus Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx,
@@ -196,6 +202,8 @@ public:
std::vector<std::unique_ptr<IEventHandle>>& replies);
ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, const TActorContext& ctx, ui64 outdatedStep,
std::vector<std::unique_ptr<IEventHandle>>& replies);
+ bool CleanupVolatile(ui64 txId, const TActorContext& ctx,
+ std::vector<std::unique_ptr<IEventHandle>>& replies);
ui64 PlannedTxInFly() const;
const TSet<TStepOrder> &GetPlan() const;
bool HasProposeDelayers() const;
@@ -492,6 +500,7 @@ private:
// Slow operation profiles.
TList<TStoredExecutionProfile> SlowOpProfiles;
TMap<ui64, ui32> ActiveStreamingTxs;
+ TSet<TStepOrder> PredictedPlan;
typedef TList<TOperation::TPtr> TWaitingSchemeOpsOrder;
typedef THashMap<TOperation::TPtr, TWaitingSchemeOpsOrder::iterator> TWaitingSchemeOps;
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.cpp b/ydb/core/tx/datashard/datashard_trans_queue.cpp
index 6eb12e379ec..5a41f0c1bec 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.cpp
+++ b/ydb/core/tx/datashard/datashard_trans_queue.cpp
@@ -9,14 +9,14 @@ const TSet<TStepOrder> TTransQueue::EMPTY_PLAN;
void TTransQueue::AddTxInFly(TOperation::TPtr op) {
const ui64 txId = op->GetTxId();
- const ui64 maxStep = op->GetMaxStep();
Y_VERIFY_S(!TxsInFly.contains(txId), "Adding duplicate txId " << txId);
TxsInFly[txId] = op;
if (Y_LIKELY(!op->GetStep())) {
++PlanWaitingTxCount;
- }
- if (maxStep != Max<ui64>()) {
- DeadlineQueue.emplace(std::make_pair(maxStep, txId));
+ const ui64 maxStep = op->GetMaxStep();
+ if (maxStep != Max<ui64>()) {
+ DeadlineQueue.emplace(std::make_pair(maxStep, txId));
+ }
}
Self->SetCounter(COUNTER_TX_IN_FLY, TxsInFly.size());
}
@@ -28,6 +28,10 @@ void TTransQueue::RemoveTxInFly(ui64 txId, std::vector<std::unique_ptr<IEventHan
Self->GetCleanupReplies(it->second, *cleanupReplies);
}
if (!it->second->GetStep()) {
+ const ui64 maxStep = it->second->GetMaxStep();
+ if (maxStep != Max<ui64>()) {
+ DeadlineQueue.erase(std::make_pair(maxStep, txId));
+ }
--PlanWaitingTxCount;
}
TxsInFly.erase(it);
@@ -494,6 +498,21 @@ ECleanupStatus TTransQueue::CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedSt
return ECleanupStatus::Success;
}
+bool TTransQueue::CleanupVolatile(ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies) {
+ auto it = TxsInFly.find(txId);
+ if (it != TxsInFly.end() && it->second->HasVolatilePrepareFlag() && !it->second->GetStep()) {
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "Cleaning up volatile tx " << txId << " ahead of time");
+
+ RemoveTxInFly(txId, &replies);
+
+ Self->IncCounter(COUNTER_TX_PROGRESS_OUTDATED, 1);
+ return true;
+ }
+
+ return false;
+}
+
void TTransQueue::PlanTx(TOperation::TPtr op,
ui64 step,
NIceDb::TNiceDb &db)
diff --git a/ydb/core/tx/datashard/datashard_trans_queue.h b/ydb/core/tx/datashard/datashard_trans_queue.h
index a76014b030d..3295285ba48 100644
--- a/ydb/core/tx/datashard/datashard_trans_queue.h
+++ b/ydb/core/tx/datashard/datashard_trans_queue.h
@@ -88,6 +88,7 @@ private: // for pipeline only
bool CancelPropose(NIceDb::TNiceDb& db, ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies);
ECleanupStatus CleanupOutdated(NIceDb::TNiceDb& db, ui64 outdatedStep, ui32 batchSize,
TVector<ui64>& outdatedTxs, std::vector<std::unique_ptr<IEventHandle>>& replies);
+ bool CleanupVolatile(ui64 txId, std::vector<std::unique_ptr<IEventHandle>>& replies);
// Plan
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index ffc712ebae2..1f35f29a5a2 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -4090,7 +4090,8 @@ Y_UNIT_TEST(TestSnapshotReadAfterBrokenLock) {
auto result = KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
SELECT * FROM `/Root/table-1` WHERE key = 1
UNION ALL
- SELECT * FROM `/Root/table-2` WHERE key = 2)"));
+ SELECT * FROM `/Root/table-2` WHERE key = 2
+ ORDER BY key)"));
UNIT_ASSERT_VALUES_EQUAL(
result,
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
@@ -4169,7 +4170,8 @@ Y_UNIT_TEST(TestSnapshotReadAfterBrokenLockOutOfOrder) {
auto result = KqpSimpleBegin(runtime, sessionIdBlocker, txIdBlocker, Q_(R"(
SELECT * FROM `/Root/table-1`
UNION ALL
- SELECT * FROM `/Root/table-2`)"));
+ SELECT * FROM `/Root/table-2`
+ ORDER BY key)"));
UNIT_ASSERT_VALUES_EQUAL(
result,
"{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 4bd7ee2a070..cb176859757 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1853,7 +1853,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
// It would be as if shard missed them for some reason
for (auto& pr : acks) {
auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end());
- runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true);
+ runtime.Send(new IEventHandle(pr.first, recipient, ack), 0, true);
}
auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step);
runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true);
@@ -1896,8 +1896,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
WaitTxNotification(server, sender, txId);
auto dropLatency = runtime.GetCurrentTime() - dropStartTs;
Cerr << "... drop finished in " << dropLatency << Endl;
- // TODO: we need to use neighbor readset hints to cancel earlier
- // UNIT_ASSERT(dropLatency < TDuration::Seconds(5));
+ UNIT_ASSERT(dropLatency < TDuration::Seconds(5));
}
Y_UNIT_TEST(DistributedWriteLostPlanThenSplit) {
@@ -1947,7 +1946,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
// It would be as if shard missed them for some reason
for (auto& pr : acks) {
auto* ack = new TEvTxProcessing::TEvPlanStepAck(tabletId, step, pr.second.begin(), pr.second.end());
- runtime.Send(new IEventHandle(ev->Sender, recipient, ack), 0, true);
+ runtime.Send(new IEventHandle(pr.first, recipient, ack), 0, true);
}
auto* accept = new TEvTxProcessing::TEvPlanStepAccepted(tabletId, step);
runtime.Send(new IEventHandle(ev->Sender, recipient, accept), 0, true);
@@ -1990,8 +1989,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
WaitTxNotification(server, sender, txId);
auto splitLatency = runtime.GetCurrentTime() - splitStartTs;
Cerr << "... split finished in " << splitLatency << Endl;
- // TODO: we need to use neighbor readset hints to cancel earlier
- // UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
+ UNIT_ASSERT(splitLatency < TDuration::Seconds(5));
}
} // Y_UNIT_TEST_SUITE(DataShardVolatile)
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 4b11d83907f..40870d8c0b9 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -371,6 +371,8 @@ public:
ui64 GetStep() const { return Step; }
void SetStep(ui64 step) { Step = step; }
+ ui64 GetPredictedStep() const { return PredictedStep; }
+ void SetPredictedStep(ui64 step) { PredictedStep = step; }
TStepOrder GetStepOrder() const { return TStepOrder(GetStep(), GetTxId()); }
@@ -415,6 +417,7 @@ protected:
ui64 Flags;
ui64 GlobalTxId;
ui64 Step;
+ ui64 PredictedStep = 0;
TInstant ReceivedAt;
ui64 MinStep;