diff options
author | abcdef <[email protected]> | 2023-02-06 11:02:43 +0300 |
---|---|---|
committer | abcdef <[email protected]> | 2023-02-06 11:02:43 +0300 |
commit | c22ac5f189df2b00d1f889210781b0b010d3e4b5 (patch) | |
tree | 971e49060750e21c86f2d45b5c52ac531475e55e | |
parent | f60b1f4471381f9dbf366d94de6e4c713c1a4772 (diff) |
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h | 5 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h | 2 |
2 files changed, 5 insertions, 2 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 023cb000f87..a5a3c2075a9 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -374,6 +374,7 @@ protected: TWaiter PopWaiterImpl() { // Assumes that we're under lock. TWaiter waiter(Waiter.ExtractPromise(), this); + WaiterWillBeSignaled = true; return std::move(waiter); } @@ -384,8 +385,9 @@ protected: } void RenewWaiterImpl() { - if (Events.empty() && Waiter.GetFuture().HasValue()) { + if (Events.empty() && WaiterWillBeSignaled) { Waiter = TWaiter(NThreading::NewPromise<void>(), this); + WaiterWillBeSignaled = false; } } @@ -405,6 +407,7 @@ public: protected: const TSettings& Settings; TWaiter Waiter; + bool WaiterWillBeSignaled = false; std::queue<TEventInfo> Events; TCondVar CondVar; TMutex Mutex; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index dd5e8437121..fb9138654b5 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -717,7 +717,7 @@ struct TReadSessionEvent { //! Server request for creating partition stream. struct TCreatePartitionStreamEvent { - explicit TCreatePartitionStreamEvent(TPartitionStream::TPtr, ui64 committedOffset, ui64 endOffset); + TCreatePartitionStreamEvent(TPartitionStream::TPtr, ui64 committedOffset, ui64 endOffset); const TPartitionStream::TPtr& GetPartitionStream() const { return PartitionStream; |