aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-21 12:10:54 +0300
committersnaury <snaury@ydb.tech>2023-03-21 12:10:54 +0300
commit7b4ada9fb080cc29c83adf84f4730e5876b20a0f (patch)
tree8af3146cd6b4b8a6c95a32ee9db2745377a48b20
parent721d4056db719f24bd568a7bb7b3573cb495d610 (diff)
downloadydb-7b4ada9fb080cc29c83adf84f4730e5876b20a0f.tar.gz
Don't keep operations inprogress longer than necessary
-rw-r--r--ydb/core/tx/datashard/datashard__op_rows.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard__progress_tx.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard__propose_tx_base.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp27
-rw-r--r--ydb/core/tx/datashard/datashard_txs.h2
5 files changed, 57 insertions, 28 deletions
diff --git a/ydb/core/tx/datashard/datashard__op_rows.cpp b/ydb/core/tx/datashard/datashard__op_rows.cpp
index 7f19015743a..e1475e38561 100644
--- a/ydb/core/tx/datashard/datashard__op_rows.cpp
+++ b/ydb/core/tx/datashard/datashard__op_rows.cpp
@@ -89,13 +89,13 @@ public:
if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) {
Self->Pipeline.AddCandidateOp(Op);
+
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
}
}
}
-
- if (Self->Pipeline.CanRunAnotherOp()) {
- Self->PlanQueue.Progress(ctx);
- }
}
}; // TTxDirectBase
diff --git a/ydb/core/tx/datashard/datashard__progress_tx.cpp b/ydb/core/tx/datashard/datashard__progress_tx.cpp
index 3f1fab6b46e..79a28f59f29 100644
--- a/ydb/core/tx/datashard/datashard__progress_tx.cpp
+++ b/ydb/core/tx/datashard/datashard__progress_tx.cpp
@@ -77,12 +77,16 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
Self->ExecuteProgressTx(ActiveOp, ctx);
Rescheduled = true;
}
+ ActiveOp->DecrementInProgress();
break;
case EExecutionStatus::Executed:
case EExecutionStatus::Continue:
+ ActiveOp->DecrementInProgress();
+ break;
+
case EExecutionStatus::WaitComplete:
- // No special handling
+ WaitComplete = true;
break;
default:
@@ -90,12 +94,11 @@ bool TDataShard::TTxProgressTransaction::Execute(TTransactionContext &txc, const
<< *ActiveOp << " " << ActiveOp->GetKind() << " at " << Self->TabletID());
}
- if (!CompleteList.empty()) {
+ if (WaitComplete || !CompleteList.empty()) {
// Keep operation active until we run the complete list
CommitStart = AppData()->TimeProvider->Now();
} else {
// Release operation as it's no longer needed
- ActiveOp->DecrementInProgress();
ActiveOp = nullptr;
}
@@ -122,13 +125,18 @@ void TDataShard::TTxProgressTransaction::Complete(const TActorContext &ctx) {
Self->Pipeline.RunCompleteList(ActiveOp, CompleteList, ctx);
}
- ActiveOp->DecrementInProgress();
- if (!ActiveOp->IsInProgress() && !ActiveOp->IsExecutionPlanFinished())
- Self->Pipeline.AddCandidateOp(ActiveOp);
+ if (WaitComplete) {
+ ActiveOp->DecrementInProgress();
- if (Self->Pipeline.CanRunAnotherOp())
- Self->PlanQueue.Progress(ctx);
+ if (!ActiveOp->IsInProgress() && !ActiveOp->IsExecutionPlanFinished()) {
+ Self->Pipeline.AddCandidateOp(ActiveOp);
+
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
+ }
+ }
}
Self->CheckSplitCanStart(ctx);
diff --git a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
index fea2907cfd4..650556bdd21 100644
--- a/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
+++ b/ydb/core/tx/datashard/datashard__propose_tx_base.cpp
@@ -124,12 +124,16 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
Self->ExecuteProgressTx(Op, ctx);
Rescheduled = true;
}
+ Op->DecrementInProgress();
break;
case EExecutionStatus::Executed:
case EExecutionStatus::Continue:
+ Op->DecrementInProgress();
+ break;
+
case EExecutionStatus::WaitComplete:
- // No special handling
+ WaitComplete = true;
break;
default:
@@ -137,12 +141,11 @@ bool TDataShard::TTxProposeTransactionBase::Execute(NTabletFlatExecutor::TTransa
<< *Op << " " << Op->GetKind() << " at " << Self->TabletID());
}
- if (!CompleteList.empty()) {
+ if (WaitComplete || !CompleteList.empty()) {
// Keep operation active until we run the complete list
CommitStart = AppData()->TimeProvider->Now();
} else {
// Release operation as it's no longer needed
- Op->DecrementInProgress();
Op = nullptr;
}
@@ -184,13 +187,18 @@ void TDataShard::TTxProposeTransactionBase::Complete(const TActorContext &ctx) {
Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
}
- Op->DecrementInProgress();
- if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished())
- Self->Pipeline.AddCandidateOp(Op);
+ if (WaitComplete) {
+ Op->DecrementInProgress();
- if (Self->Pipeline.CanRunAnotherOp())
- Self->PlanQueue.Progress(ctx);
+ if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) {
+ Self->Pipeline.AddCandidateOp(Op);
+
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
+ }
+ }
}
Self->CheckSplitCanStart(ctx);
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index d641c4086cf..ddfde1e0bcf 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1675,6 +1675,7 @@ class TDataShard::TTxReadViaPipeline : public NTabletFlatExecutor::TTransactionB
TOperation::TPtr Op;
TVector<EExecutionUnitKind> CompleteList;
+ bool WaitComplete = false;
public:
TTxReadViaPipeline(TDataShard* ds, TEvDataShard::TEvRead::TPtr ev)
@@ -1748,12 +1749,16 @@ public:
Op->IncrementInProgress();
Self->ExecuteProgressTx(Op, ctx);
}
+ Op->DecrementInProgress();
break;
case EExecutionStatus::Executed:
case EExecutionStatus::Continue:
+ Op->DecrementInProgress();
+ break;
+
case EExecutionStatus::WaitComplete:
- // No special handling
+ WaitComplete = true;
break;
default:
@@ -1761,8 +1766,10 @@ public:
<< *Op << " " << Op->GetKind() << " at " << Self->TabletID());
}
- if (CompleteList.empty()) {
- Op->DecrementInProgress();
+ if (WaitComplete || !CompleteList.empty()) {
+ // Keep operation active until we run the complete list
+ } else {
+ // Release operation as it's no longer needed
Op = nullptr;
}
@@ -1790,13 +1797,17 @@ public:
Self->Pipeline.RunCompleteList(Op, CompleteList, ctx);
}
- Op->DecrementInProgress();
+ if (WaitComplete) {
+ Op->DecrementInProgress();
- if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished())
- Self->Pipeline.AddCandidateOp(Op);
+ if (!Op->IsInProgress() && !Op->IsExecutionPlanFinished()) {
+ Self->Pipeline.AddCandidateOp(Op);
- if (Self->Pipeline.CanRunAnotherOp())
- Self->PlanQueue.Progress(ctx);
+ if (Self->Pipeline.CanRunAnotherOp()) {
+ Self->PlanQueue.Progress(ctx);
+ }
+ }
+ }
}
};
diff --git a/ydb/core/tx/datashard/datashard_txs.h b/ydb/core/tx/datashard/datashard_txs.h
index 8b56677de97..ae6219ca2ad 100644
--- a/ydb/core/tx/datashard/datashard_txs.h
+++ b/ydb/core/tx/datashard/datashard_txs.h
@@ -78,6 +78,7 @@ private:
TVector<EExecutionUnitKind> CompleteList;
TInstant CommitStart;
bool Rescheduled = false;
+ bool WaitComplete = false;
};
class TDataShard::TTxProposeTransactionBase : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
@@ -108,6 +109,7 @@ protected:
TInstant CommitStart;
bool Acked;
bool Rescheduled = false;
+ bool WaitComplete = false;
NWilson::TSpan ProposeTransactionSpan;
};