summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <[email protected]>2023-02-06 11:02:43 +0300
committerabcdef <[email protected]>2023-02-06 11:02:43 +0300
commitc22ac5f189df2b00d1f889210781b0b010d3e4b5 (patch)
tree971e49060750e21c86f2d45b5c52ac531475e55e
parentf60b1f4471381f9dbf366d94de6e4c713c1a4772 (diff)
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h2
2 files changed, 5 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
index 023cb000f87..a5a3c2075a9 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
@@ -374,6 +374,7 @@ protected:
TWaiter PopWaiterImpl() { // Assumes that we're under lock.
TWaiter waiter(Waiter.ExtractPromise(), this);
+ WaiterWillBeSignaled = true;
return std::move(waiter);
}
@@ -384,8 +385,9 @@ protected:
}
void RenewWaiterImpl() {
- if (Events.empty() && Waiter.GetFuture().HasValue()) {
+ if (Events.empty() && WaiterWillBeSignaled) {
Waiter = TWaiter(NThreading::NewPromise<void>(), this);
+ WaiterWillBeSignaled = false;
}
}
@@ -405,6 +407,7 @@ public:
protected:
const TSettings& Settings;
TWaiter Waiter;
+ bool WaiterWillBeSignaled = false;
std::queue<TEventInfo> Events;
TCondVar CondVar;
TMutex Mutex;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index dd5e8437121..fb9138654b5 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -717,7 +717,7 @@ struct TReadSessionEvent {
//! Server request for creating partition stream.
struct TCreatePartitionStreamEvent {
- explicit TCreatePartitionStreamEvent(TPartitionStream::TPtr, ui64 committedOffset, ui64 endOffset);
+ TCreatePartitionStreamEvent(TPartitionStream::TPtr, ui64 committedOffset, ui64 endOffset);
const TPartitionStream::TPtr& GetPartitionStream() const {
return PartitionStream;