diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-08-14 10:29:52 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-08-14 10:41:01 +0300 |
commit | c396916a700a53920a2e76247779688320e270e3 (patch) | |
tree | 52a6357eda43f475e1f80088c49e86936767cf2f | |
parent | d27afd6ad62a512f5f1973b11908d9626a11f317 (diff) | |
download | ydb-c396916a700a53920a2e76247779688320e270e3.tar.gz |
[yt/yt/core] YT-22539: Add TBoundedNonblockingQueue
9faaa9fb1ce67dff6871693b7dda3024a9ff2ce9
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_queue-inl.h | 73 | ||||
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_queue.h | 29 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp | 151 |
3 files changed, 248 insertions, 5 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_queue-inl.h b/yt/yt/core/concurrency/nonblocking_queue-inl.h index d35401753a..da360bff8c 100644 --- a/yt/yt/core/concurrency/nonblocking_queue-inl.h +++ b/yt/yt/core/concurrency/nonblocking_queue-inl.h @@ -14,7 +14,7 @@ void TNonblockingQueue<T>::Enqueue(TFuture<T> asyncValue) { auto guard = Guard(SpinLock_); if (PromiseQueue_.empty()) { - ValueQueue_.push(std::move(asyncValue)); + AsyncValueQueue_.push(std::move(asyncValue)); } else { auto promise = PromiseQueue_.front(); PromiseQueue_.pop(); @@ -34,13 +34,78 @@ template <class T> TFuture<T> TNonblockingQueue<T>::Dequeue() { auto guard = Guard(SpinLock_); - if (ValueQueue_.empty()) { + if (AsyncValueQueue_.empty()) { auto promise = NewPromise<T>(); PromiseQueue_.push(promise); return promise.ToFuture(); } else { - auto future = std::move(ValueQueue_.front()); - ValueQueue_.pop(); + auto future = std::move(AsyncValueQueue_.front()); + AsyncValueQueue_.pop(); + return future; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +TBoundedNonblockingQueue<T>::TBoundedNonblockingQueue(i64 sizeLimit) + : SizeLimit_(sizeLimit) +{ } + +template <class T> +TFuture<void> TBoundedNonblockingQueue<T>::Enqueue(TFuture<T> asyncValue) +{ + auto guard = Guard(SpinLock_); + if (ConsumerQueue_.empty()) { + AsyncValueQueue_.push(std::move(asyncValue)); + + if (std::ssize(AsyncValueQueue_) <= SizeLimit_) { + return VoidFuture; + } + + auto promise = NewPromise<void>(); + ProducerQueue_.push(promise); + return promise.ToFuture(); + } else { + auto promise = ConsumerQueue_.front(); + ConsumerQueue_.pop(); + + guard.Release(); + + promise.SetFrom(std::move(asyncValue)); + + return VoidFuture; + } +} + +template <class T> +template <class TArg> +TFuture<void> TBoundedNonblockingQueue<T>::Enqueue(TArg&& value) +{ + return Enqueue(MakeFuture<T>(std::forward<TArg>(value))); +} + +template <class T> +TFuture<T> TBoundedNonblockingQueue<T>::Dequeue() +{ + auto guard = Guard(SpinLock_); + if (AsyncValueQueue_.empty()) { + auto promise = NewPromise<T>(); + ConsumerQueue_.push(promise); + return promise.ToFuture(); + } else { + auto future = std::move(AsyncValueQueue_.front()); + AsyncValueQueue_.pop(); + + if (!ProducerQueue_.empty()) { + auto promise = ProducerQueue_.front(); + ProducerQueue_.pop(); + + guard.Release(); + + promise.Set(); + } + return future; } } diff --git a/yt/yt/core/concurrency/nonblocking_queue.h b/yt/yt/core/concurrency/nonblocking_queue.h index 64d676d652..675990a011 100644 --- a/yt/yt/core/concurrency/nonblocking_queue.h +++ b/yt/yt/core/concurrency/nonblocking_queue.h @@ -26,13 +26,40 @@ public: private: YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - TRingQueue<TFuture<T>> ValueQueue_; + TRingQueue<TFuture<T>> AsyncValueQueue_; TRingQueue<TPromise<T>> PromiseQueue_; }; //////////////////////////////////////////////////////////////////////////////// +template <class T> +class TBoundedNonblockingQueue +{ +public: + explicit TBoundedNonblockingQueue(i64 sizeLimit); + + TFuture<void> Enqueue(TFuture<T> asyncValue); + + // This template is required to enable perfect forwarding. + template <class TArg> + TFuture<void> Enqueue(TArg&& value); + + // Dequeued futures could be set in arbitrary order. + TFuture<T> Dequeue(); + +private: + const i64 SizeLimit_ = 0; + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); + + TRingQueue<TFuture<T>> AsyncValueQueue_; + TRingQueue<TPromise<T>> ConsumerQueue_; + TRingQueue<TPromise<void>> ProducerQueue_; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NConcurrency #define NONBLOCKING_QUEUE_INL_H_ diff --git a/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp index 20ca635dcf..b723421857 100644 --- a/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp +++ b/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp @@ -97,6 +97,157 @@ TEST(TNonblockingQueueTest, EnqueueFirstAsync) //////////////////////////////////////////////////////////////////////////////// +TEST(TBoundedNonblockingQueueTest, DequeueFirst) +{ + TBoundedNonblockingQueue<int> queue(1); + auto resultDequeue1 = queue.Dequeue(); + auto resultDequeue2 = queue.Dequeue(); + + EXPECT_FALSE(resultDequeue1.IsSet()); + EXPECT_FALSE(resultDequeue2.IsSet()); + + auto resultEnqueue1 = queue.Enqueue(1); + EXPECT_TRUE(resultEnqueue1.IsSet()); + + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(1, resultDequeue1.Get().Value()); + + auto resultEnqueue2 = queue.Enqueue(2); + EXPECT_TRUE(resultEnqueue2.IsSet()); + + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(2, resultDequeue2.Get().Value()); +} + +TEST(TBoundedNonblockingQueueTest, EnqueueFirst) +{ + TBoundedNonblockingQueue<int> queue(1); + auto resultEnqueue1 = queue.Enqueue(1); + EXPECT_TRUE(resultEnqueue1.IsSet()); + + auto resultEnqueue2 = queue.Enqueue(2); + EXPECT_FALSE(resultEnqueue2.IsSet()); + + auto resultDequeue1 = queue.Dequeue(); + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(1, resultDequeue1.Get().Value()); + + EXPECT_TRUE(resultEnqueue2.IsSet()); + + auto resultDequeue2 = queue.Dequeue(); + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(2, resultDequeue2.Get().Value()); +} + +TEST(TBoundedNonblockingQueueTest, MixedEnqueueFirst) +{ + TBoundedNonblockingQueue<int> queue(1); + auto resultEnqueue1 = queue.Enqueue(1); + EXPECT_TRUE(resultEnqueue1.IsSet()); + + auto resultDequeue1 = queue.Dequeue(); + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(1, resultDequeue1.Get().Value()); + + auto resultDequeue2 = queue.Dequeue(); + EXPECT_FALSE(resultDequeue2.IsSet()); + + auto resultEnqueue2 = queue.Enqueue(2); + EXPECT_TRUE(resultEnqueue2.IsSet()); + + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(2, resultDequeue2.Get().Value()); +} + +TEST(TBoundedNonblockingQueueTest, MixedDequeueFirst) +{ + TBoundedNonblockingQueue<int> queue(1); + + auto resultDequeue1 = queue.Dequeue(); + EXPECT_FALSE(resultDequeue1.IsSet()); + + auto resultEnqueue1 = queue.Enqueue(1); + EXPECT_TRUE(resultEnqueue1.IsSet()); + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(1, resultDequeue1.Get().Value()); + + auto resultEnqueue2 = queue.Enqueue(2); + EXPECT_TRUE(resultEnqueue2.IsSet()); + + auto resultDequeue2 = queue.Dequeue(); + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(2, resultDequeue2.Get().Value()); +} + +TEST(TBoundedNonblockingQueueTest, DequeueFirstAsync) +{ + TBoundedNonblockingQueue<int> queue(1); + auto resultDequeue = queue.Dequeue(); + EXPECT_FALSE(resultDequeue.IsSet()); + + auto promise = NewPromise<int>(); + auto resultEnqueue = queue.Enqueue(promise.ToFuture()); + EXPECT_FALSE(resultDequeue.IsSet()); + EXPECT_TRUE(resultEnqueue.IsSet()); + + promise.Set(1); + EXPECT_TRUE(resultDequeue.IsSet()); + EXPECT_EQ(resultDequeue.Get().Value(), 1); +} + +TEST(TBoundedNonblockingQueueTest, EnqueueFirstAsync) +{ + TBoundedNonblockingQueue<int> queue(1); + auto promise1 = NewPromise<int>(); + auto promise2 = NewPromise<int>(); + auto resultEnqueue1 = queue.Enqueue(promise1.ToFuture()); + EXPECT_TRUE(resultEnqueue1.IsSet()); + auto resultEnqueue2 = queue.Enqueue(promise2.ToFuture()); + EXPECT_FALSE(resultEnqueue2.IsSet()); + + auto resultDequeue1 = queue.Dequeue(); + EXPECT_FALSE(resultDequeue1.IsSet()); + EXPECT_TRUE(resultEnqueue2.IsSet()); + + promise1.Set(1); + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(resultDequeue1.Get().Value(), 1); + + promise2.Set(2); + auto resultDequeue2 = queue.Dequeue(); + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(resultDequeue2.Get().Value(), 2); +} + +TEST(TBoundedNonblockingQueueTest, EnqueueFirstAsync2) +{ + TBoundedNonblockingQueue<int> queue(1); + auto promise1 = NewPromise<int>(); + auto promise2 = NewPromise<int>(); + auto resultEnqueue1 = queue.Enqueue(promise1.ToFuture()); + EXPECT_TRUE(resultEnqueue1.IsSet()); + auto resultEnqueue2 = queue.Enqueue(promise2.ToFuture()); + EXPECT_FALSE(resultEnqueue2.IsSet()); + + auto resultDequeue1 = queue.Dequeue(); + EXPECT_FALSE(resultDequeue1.IsSet()); + + EXPECT_TRUE(resultEnqueue2.IsSet()); + + auto resultDequeue2 = queue.Dequeue(); + EXPECT_FALSE(resultDequeue2.IsSet()); + + promise1.Set(1); + EXPECT_TRUE(resultDequeue1.IsSet()); + EXPECT_EQ(resultDequeue1.Get().Value(), 1); + + promise2.Set(2); + EXPECT_TRUE(resultDequeue2.IsSet()); + EXPECT_EQ(resultDequeue2.Get().Value(), 2); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace } // namespace NYT::NConcurrency |