diff options
author | snaury <snaury@ydb.tech> | 2023-03-21 12:10:54 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-03-21 12:10:54 +0300 |
commit | 7b4ada9fb080cc29c83adf84f4730e5876b20a0f (patch) | |
tree | 8af3146cd6b4b8a6c95a32ee9db2745377a48b20 | |
parent | 721d4056db719f24bd568a7bb7b3573cb495d610 (diff) | |
download | ydb-7b4ada9fb080cc29c83adf84f4730e5876b20a0f.tar.gz |
Don't keep operations inprogress longer than necessary
-rw-r--r-- | ydb/core/tx/datashard/datashard__op_rows.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__progress_tx.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__propose_tx_base.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_txs.h | 2 |
5 files changed, 57 insertions, 28 deletions
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp index 7f19015743a..e1475e38561 100644 --- a/ydb/core/tx/datashard/datashard__op_rows.cpp +++ b/ydb/core/tx/datashard/datashard__op_rows.cpp @@ -89,13 +89,13 @@ public: if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) { Self->Pipeline.AddCandidateOp(Op); + + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } } } } - - if (Self->Pipeline.CanRunAnotherOp()) { - Self->PlanQueue.Progress(ctx); - } } }; // TTxDirectBase diff --git a/ydb/core/tx/datashard/datashard__progress_tx.cpp b/ydb/core/tx/datashard/datashard__progress_tx.cpp index 3f1fab6b46e..79a28f59f29 100644 --- a/ydb/core/tx/datashard/datashard__progress_tx.cpp +++ b/ydb/core/tx/datashard/datashard__progress_tx.cpp @@ -77,12 +77,16 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const Self->ExecuteProgressTx(ActiveOp, ctx); Rescheduled = true; } + ActiveOp->DecrementInProgress(); break; case EExecutionStatus::Executed: case EExecutionStatus::Continue: + ActiveOp->DecrementInProgress(); + break; + case EExecutionStatus::WaitComplete: - // No special handling + WaitComplete = true; break; default: @@ -90,12 +94,11 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const << *ActiveOp << " " << ActiveOp->GetKind() << " at " << Self->TabletID()); } - if (!CompleteList.empty()) { + if (WaitComplete || !CompleteList.empty()) { // Keep operation active until we run the complete list CommitStart = AppData()->TimeProvider->Now(); } else { // Release operation as it's no longer needed - ActiveOp->DecrementInProgress(); ActiveOp = nullptr; } @@ -122,13 +125,18 @@ void TDataShard::TTxProgressTransaction::Complete(const TActorContext &ctx) { Self->Pipeline.RunCompleteList(ActiveOp, CompleteList, ctx); } - ActiveOp->DecrementInProgress(); - if (!ActiveOp->IsInProgress() && !ActiveOp->IsExecutionPlanFinished()) - Self->Pipeline.AddCandidateOp(ActiveOp); + if (WaitComplete) { + ActiveOp->DecrementInProgress(); - if (Self->Pipeline.CanRunAnotherOp()) - Self->PlanQueue.Progress(ctx); + if (!ActiveOp->IsInProgress() && !ActiveOp->IsExecutionPlanFinished()) { + Self->Pipeline.AddCandidateOp(ActiveOp); + + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } + } + } } Self->CheckSplitCanStart(ctx); diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp index fea2907cfd4..650556bdd21 100644 --- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp +++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp @@ -124,12 +124,16 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa Self->ExecuteProgressTx(Op, ctx); Rescheduled = true; } + Op->DecrementInProgress(); break; case EExecutionStatus::Executed: case EExecutionStatus::Continue: + Op->DecrementInProgress(); + break; + case EExecutionStatus::WaitComplete: - // No special handling + WaitComplete = true; break; default: @@ -137,12 +141,11 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa << *Op << " " << Op->GetKind() << " at " << Self->TabletID()); } - if (!CompleteList.empty()) { + if (WaitComplete || !CompleteList.empty()) { // Keep operation active until we run the complete list CommitStart = AppData()->TimeProvider->Now(); } else { // Release operation as it's no longer needed - Op->DecrementInProgress(); Op = nullptr; } @@ -184,13 +187,18 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) { Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); } - Op->DecrementInProgress(); - if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) - Self->Pipeline.AddCandidateOp(Op); + if (WaitComplete) { + Op->DecrementInProgress(); - if (Self->Pipeline.CanRunAnotherOp()) - Self->PlanQueue.Progress(ctx); + if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) { + Self->Pipeline.AddCandidateOp(Op); + + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } + } + } } Self->CheckSplitCanStart(ctx); diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index d641c4086cf..ddfde1e0bcf 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1675,6 +1675,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB TOperation::TPtr Op; TVector<EExecutionUnitKind> CompleteList; + bool WaitComplete = false; public: TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev) @@ -1748,12 +1749,16 @@ public: Op->IncrementInProgress(); Self->ExecuteProgressTx(Op, ctx); } + Op->DecrementInProgress(); break; case EExecutionStatus::Executed: case EExecutionStatus::Continue: + Op->DecrementInProgress(); + break; + case EExecutionStatus::WaitComplete: - // No special handling + WaitComplete = true; break; default: @@ -1761,8 +1766,10 @@ public: << *Op << " " << Op->GetKind() << " at " << Self->TabletID()); } - if (CompleteList.empty()) { - Op->DecrementInProgress(); + if (WaitComplete || !CompleteList.empty()) { + // Keep operation active until we run the complete list + } else { + // Release operation as it's no longer needed Op = nullptr; } @@ -1790,13 +1797,17 @@ public: Self->Pipeline.RunCompleteList(Op, CompleteList, ctx); } - Op->DecrementInProgress(); + if (WaitComplete) { + Op->DecrementInProgress(); - if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) - Self->Pipeline.AddCandidateOp(Op); + if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) { + Self->Pipeline.AddCandidateOp(Op); - if (Self->Pipeline.CanRunAnotherOp()) - Self->PlanQueue.Progress(ctx); + if (Self->Pipeline.CanRunAnotherOp()) { + Self->PlanQueue.Progress(ctx); + } + } + } } }; diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h index 8b56677de97..ae6219ca2ad 100644 --- a/ydb/core/tx/datashard/datashard_txs.h +++ b/ydb/core/tx/datashard/datashard_txs.h @@ -78,6 +78,7 @@ private: TVector<EExecutionUnitKind> CompleteList; TInstant CommitStart; bool Rescheduled = false; + bool WaitComplete = false; }; class TDataShard::TTxProposeTransactionBase : public NTabletFlatExecutor::TTransactionBase<TDataShard> { @@ -108,6 +109,7 @@ protected: TInstant CommitStart; bool Acked; bool Rescheduled = false; + bool WaitComplete = false; NWilson::TSpan ProposeTransactionSpan; }; |