aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-09-28 16:23:20 +0300
committereivanov89 <eivanov89@ydb.tech>2022-09-28 16:23:20 +0300
commit4524f6bdbb266ac2004ba894f90bc0ffd4785e7f (patch)
tree9287ae5ac823d0d44980b5ed1503e063f5556ea6
parent22fab02402f08e4285ea61bfca7b4c7a8965de6a (diff)
downloadydb-4524f6bdbb266ac2004ba894f90bc0ffd4785e7f.tar.gz
ds iter can reply right in Execute()
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp11
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.h1
-rw-r--r--ydb/core/tx/datashard/datashard_read_operation.h1
-rw-r--r--ydb/core/tx/datashard/read_op_unit.cpp2
5 files changed, 15 insertions, 6 deletions
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 95d8cf4c458..15c2ff8c93e 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -927,7 +927,14 @@ public:
hadWrites |= res.HadWrites;
}
- return hadWrites ? EExecutionStatus::DelayCompleteNoMoreRestarts : EExecutionStatus::DelayComplete;
+ if (hadWrites)
+ return EExecutionStatus::DelayCompleteNoMoreRestarts;
+
+ if (Self->Pipeline.HasCommittingOpsBelow(state.ReadVersion))
+ return EExecutionStatus::DelayComplete;
+
+ Complete(ctx);
+ return EExecutionStatus::Executed;
}
void CheckRequestAndInit(TTransactionContext& txc, const TActorContext& ctx) override {
@@ -1129,7 +1136,7 @@ public:
}
}
- void SendResult(const TActorContext& ctx) override {
+ void SendResult(const TActorContext& ctx) {
if (ResultSent)
return;
ResultSent = true;
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index eead6bab8af..7a8d128190a 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1820,7 +1820,11 @@ bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const {
if(!op->Result() || op->Result()->GetStatus() != NKikimrTxDataShard::TEvProposeTransactionResult::COMPLETE)
return true;
- return CommittingOps.HasOpsBelow(op->GetMvccSnapshot());
+ return HasCommittingOpsBelow(op->GetMvccSnapshot());
+}
+
+bool TPipeline::HasCommittingOpsBelow(TRowVersion upperBound) const {
+ return CommittingOps.HasOpsBelow(upperBound);
}
bool TPipeline::PromoteCompleteEdgeUpTo(const TRowVersion& version, TTransactionContext& txc) {
diff --git a/ydb/core/tx/datashard/datashard_pipeline.h b/ydb/core/tx/datashard/datashard_pipeline.h
index ffd06be63c5..96bcbdfe52e 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.h
+++ b/ydb/core/tx/datashard/datashard_pipeline.h
@@ -349,6 +349,7 @@ public:
void AddCommittingOp(const TOperation::TPtr& op);
void RemoveCommittingOp(const TOperation::TPtr& op);
bool WaitCompletion(const TOperation::TPtr& op) const;
+ bool HasCommittingOpsBelow(TRowVersion upperBound) const;
/**
* Promotes the mvcc complete edge to the last distributed transaction less than version
diff --git a/ydb/core/tx/datashard/datashard_read_operation.h b/ydb/core/tx/datashard/datashard_read_operation.h
index 92fcc8889a6..5703fb6cd9b 100644
--- a/ydb/core/tx/datashard/datashard_read_operation.h
+++ b/ydb/core/tx/datashard/datashard_read_operation.h
@@ -11,7 +11,6 @@ public:
// our interface for TReadUnit
virtual EExecutionStatus Execute(TTransactionContext& txc, const TActorContext& ctx) = 0;
- virtual void SendResult(const TActorContext& ctx) = 0;
virtual void Complete(const TActorContext& ctx) = 0;
// our interface for TCheckReadUnit
diff --git a/ydb/core/tx/datashard/read_op_unit.cpp b/ydb/core/tx/datashard/read_op_unit.cpp
index 5736675acc9..24f752eccf5 100644
--- a/ydb/core/tx/datashard/read_op_unit.cpp
+++ b/ydb/core/tx/datashard/read_op_unit.cpp
@@ -25,8 +25,6 @@ public:
if (status == EExecutionStatus::Restart || status == EExecutionStatus::Continue)
return status;
- // TODO: check if we can send result right here to decrease latency
-
// note that op has set locks itself, no ApplyLocks() required
DataShard.SubscribeNewLocks(ctx);