diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 16:23:20 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-09-28 16:23:20 +0300 |
commit | 4524f6bdbb266ac2004ba894f90bc0ffd4785e7f (patch) | |
tree | 9287ae5ac823d0d44980b5ed1503e063f5556ea6 | |
parent | 22fab02402f08e4285ea61bfca7b4c7a8965de6a (diff) | |
download | ydb-4524f6bdbb266ac2004ba894f90bc0ffd4785e7f.tar.gz |
ds iter can reply right in Execute()
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_read_operation.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_op_unit.cpp | 2 |
5 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 95d8cf4c458..15c2ff8c93e 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -927,7 +927,14 @@ public: hadWrites |= res.HadWrites; } - return hadWrites ? EExecutionStatus::DelayCompleteNoMoreRestarts : EExecutionStatus::DelayComplete; + if (hadWrites) + return EExecutionStatus::DelayCompleteNoMoreRestarts; + + if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion)) + return EExecutionStatus::DelayComplete; + + Complete(ctx); + return EExecutionStatus::Executed; } void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override { @@ -1129,7 +1136,7 @@ public: } } - void SendResult(const TActorContext& ctx) override { + void SendResult(const TActorContext& ctx) { if (ResultSent) return; ResultSent = true; diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index eead6bab8af..7a8d128190a 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1820,7 +1820,11 @@ bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const { if(!op->Result() || op->Result()->GetStatus() != NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE) return true; - return CommittingOps.HasOpsBelow(op->GetMvccSnapshot()); + return HasCommittingOpsBelow(op->GetMvccSnapshot()); +} + +bool TPipeline::HasCommittingOpsBelow(TRowVersion upperBound) const { + return CommittingOps.HasOpsBelow(upperBound); } bool TPipeline::PromoteCompleteEdgeUpTo(const TRowVersion& version, TTransactionContext& txc) { diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h index ffd06be63c5..96bcbdfe52e 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.h +++ b/ydb/core/tx/datashard/datashard_pipeline.h @@ -349,6 +349,7 @@ public: void AddCommittingOp(const TOperation::TPtr& op); void RemoveCommittingOp(const TOperation::TPtr& op); bool WaitCompletion(const TOperation::TPtr& op) const; + bool HasCommittingOpsBelow(TRowVersion upperBound) const; /** * Promotes the mvcc complete edge to the last distributed transaction less than version diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h index 92fcc8889a6..5703fb6cd9b 100644 --- a/ydb/core/tx/datashard/datashard_read_operation.h +++ b/ydb/core/tx/datashard/datashard_read_operation.h @@ -11,7 +11,6 @@ public: // our interface for TReadUnit virtual EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) = 0; - virtual void SendResult(const TActorContext& ctx) = 0; virtual void Complete(const TActorContext& ctx) = 0; // our interface for TCheckReadUnit diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp index 5736675acc9..24f752eccf5 100644 --- a/ydb/core/tx/datashard/read_op_unit.cpp +++ b/ydb/core/tx/datashard/read_op_unit.cpp @@ -25,8 +25,6 @@ public: if (status == EExecutionStatus::Restart || status == EExecutionStatus::Continue) return status; - // TODO: check if we can send result right here to decrease latency - // note that op has set locks itself, no ApplyLocks() required DataShard.SubscribeNewLocks(ctx); |