aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-08-14 10:29:52 +0300
committernadya73 <nadya73@yandex-team.com>2024-08-14 10:41:01 +0300
commitc396916a700a53920a2e76247779688320e270e3 (patch)
tree52a6357eda43f475e1f80088c49e86936767cf2f
parentd27afd6ad62a512f5f1973b11908d9626a11f317 (diff)
downloadydb-c396916a700a53920a2e76247779688320e270e3.tar.gz
[yt/yt/core] YT-22539: Add TBoundedNonblockingQueue
9faaa9fb1ce67dff6871693b7dda3024a9ff2ce9
-rw-r--r--yt/yt/core/concurrency/nonblocking_queue-inl.h73
-rw-r--r--yt/yt/core/concurrency/nonblocking_queue.h29
-rw-r--r--yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp151
3 files changed, 248 insertions, 5 deletions
diff --git a/yt/yt/core/concurrency/nonblocking_queue-inl.h b/yt/yt/core/concurrency/nonblocking_queue-inl.h
index d35401753a..da360bff8c 100644
--- a/yt/yt/core/concurrency/nonblocking_queue-inl.h
+++ b/yt/yt/core/concurrency/nonblocking_queue-inl.h
@@ -14,7 +14,7 @@ void TNonblockingQueue<T>::Enqueue(TFuture<T> asyncValue)
{
auto guard = Guard(SpinLock_);
if (PromiseQueue_.empty()) {
- ValueQueue_.push(std::move(asyncValue));
+ AsyncValueQueue_.push(std::move(asyncValue));
} else {
auto promise = PromiseQueue_.front();
PromiseQueue_.pop();
@@ -34,13 +34,78 @@ template <class T>
TFuture<T> TNonblockingQueue<T>::Dequeue()
{
auto guard = Guard(SpinLock_);
- if (ValueQueue_.empty()) {
+ if (AsyncValueQueue_.empty()) {
auto promise = NewPromise<T>();
PromiseQueue_.push(promise);
return promise.ToFuture();
} else {
- auto future = std::move(ValueQueue_.front());
- ValueQueue_.pop();
+ auto future = std::move(AsyncValueQueue_.front());
+ AsyncValueQueue_.pop();
+ return future;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+TBoundedNonblockingQueue<T>::TBoundedNonblockingQueue(i64 sizeLimit)
+ : SizeLimit_(sizeLimit)
+{ }
+
+template <class T>
+TFuture<void> TBoundedNonblockingQueue<T>::Enqueue(TFuture<T> asyncValue)
+{
+ auto guard = Guard(SpinLock_);
+ if (ConsumerQueue_.empty()) {
+ AsyncValueQueue_.push(std::move(asyncValue));
+
+ if (std::ssize(AsyncValueQueue_) <= SizeLimit_) {
+ return VoidFuture;
+ }
+
+ auto promise = NewPromise<void>();
+ ProducerQueue_.push(promise);
+ return promise.ToFuture();
+ } else {
+ auto promise = ConsumerQueue_.front();
+ ConsumerQueue_.pop();
+
+ guard.Release();
+
+ promise.SetFrom(std::move(asyncValue));
+
+ return VoidFuture;
+ }
+}
+
+template <class T>
+template <class TArg>
+TFuture<void> TBoundedNonblockingQueue<T>::Enqueue(TArg&& value)
+{
+ return Enqueue(MakeFuture<T>(std::forward<TArg>(value)));
+}
+
+template <class T>
+TFuture<T> TBoundedNonblockingQueue<T>::Dequeue()
+{
+ auto guard = Guard(SpinLock_);
+ if (AsyncValueQueue_.empty()) {
+ auto promise = NewPromise<T>();
+ ConsumerQueue_.push(promise);
+ return promise.ToFuture();
+ } else {
+ auto future = std::move(AsyncValueQueue_.front());
+ AsyncValueQueue_.pop();
+
+ if (!ProducerQueue_.empty()) {
+ auto promise = ProducerQueue_.front();
+ ProducerQueue_.pop();
+
+ guard.Release();
+
+ promise.Set();
+ }
+
return future;
}
}
diff --git a/yt/yt/core/concurrency/nonblocking_queue.h b/yt/yt/core/concurrency/nonblocking_queue.h
index 64d676d652..675990a011 100644
--- a/yt/yt/core/concurrency/nonblocking_queue.h
+++ b/yt/yt/core/concurrency/nonblocking_queue.h
@@ -26,13 +26,40 @@ public:
private:
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
- TRingQueue<TFuture<T>> ValueQueue_;
+ TRingQueue<TFuture<T>> AsyncValueQueue_;
TRingQueue<TPromise<T>> PromiseQueue_;
};
////////////////////////////////////////////////////////////////////////////////
+template <class T>
+class TBoundedNonblockingQueue
+{
+public:
+ explicit TBoundedNonblockingQueue(i64 sizeLimit);
+
+ TFuture<void> Enqueue(TFuture<T> asyncValue);
+
+ // This template is required to enable perfect forwarding.
+ template <class TArg>
+ TFuture<void> Enqueue(TArg&& value);
+
+ // Dequeued futures could be set in arbitrary order.
+ TFuture<T> Dequeue();
+
+private:
+ const i64 SizeLimit_ = 0;
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+
+ TRingQueue<TFuture<T>> AsyncValueQueue_;
+ TRingQueue<TPromise<T>> ConsumerQueue_;
+ TRingQueue<TPromise<void>> ProducerQueue_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NConcurrency
#define NONBLOCKING_QUEUE_INL_H_
diff --git a/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp b/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp
index 20ca635dcf..b723421857 100644
--- a/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/nonblocking_queue_ut.cpp
@@ -97,6 +97,157 @@ TEST(TNonblockingQueueTest, EnqueueFirstAsync)
////////////////////////////////////////////////////////////////////////////////
+TEST(TBoundedNonblockingQueueTest, DequeueFirst)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto resultDequeue1 = queue.Dequeue();
+ auto resultDequeue2 = queue.Dequeue();
+
+ EXPECT_FALSE(resultDequeue1.IsSet());
+ EXPECT_FALSE(resultDequeue2.IsSet());
+
+ auto resultEnqueue1 = queue.Enqueue(1);
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(1, resultDequeue1.Get().Value());
+
+ auto resultEnqueue2 = queue.Enqueue(2);
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(2, resultDequeue2.Get().Value());
+}
+
+TEST(TBoundedNonblockingQueueTest, EnqueueFirst)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto resultEnqueue1 = queue.Enqueue(1);
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+
+ auto resultEnqueue2 = queue.Enqueue(2);
+ EXPECT_FALSE(resultEnqueue2.IsSet());
+
+ auto resultDequeue1 = queue.Dequeue();
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(1, resultDequeue1.Get().Value());
+
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ auto resultDequeue2 = queue.Dequeue();
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(2, resultDequeue2.Get().Value());
+}
+
+TEST(TBoundedNonblockingQueueTest, MixedEnqueueFirst)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto resultEnqueue1 = queue.Enqueue(1);
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+
+ auto resultDequeue1 = queue.Dequeue();
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(1, resultDequeue1.Get().Value());
+
+ auto resultDequeue2 = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue2.IsSet());
+
+ auto resultEnqueue2 = queue.Enqueue(2);
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(2, resultDequeue2.Get().Value());
+}
+
+TEST(TBoundedNonblockingQueueTest, MixedDequeueFirst)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+
+ auto resultDequeue1 = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue1.IsSet());
+
+ auto resultEnqueue1 = queue.Enqueue(1);
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(1, resultDequeue1.Get().Value());
+
+ auto resultEnqueue2 = queue.Enqueue(2);
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ auto resultDequeue2 = queue.Dequeue();
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(2, resultDequeue2.Get().Value());
+}
+
+TEST(TBoundedNonblockingQueueTest, DequeueFirstAsync)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto resultDequeue = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue.IsSet());
+
+ auto promise = NewPromise<int>();
+ auto resultEnqueue = queue.Enqueue(promise.ToFuture());
+ EXPECT_FALSE(resultDequeue.IsSet());
+ EXPECT_TRUE(resultEnqueue.IsSet());
+
+ promise.Set(1);
+ EXPECT_TRUE(resultDequeue.IsSet());
+ EXPECT_EQ(resultDequeue.Get().Value(), 1);
+}
+
+TEST(TBoundedNonblockingQueueTest, EnqueueFirstAsync)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto promise1 = NewPromise<int>();
+ auto promise2 = NewPromise<int>();
+ auto resultEnqueue1 = queue.Enqueue(promise1.ToFuture());
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+ auto resultEnqueue2 = queue.Enqueue(promise2.ToFuture());
+ EXPECT_FALSE(resultEnqueue2.IsSet());
+
+ auto resultDequeue1 = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue1.IsSet());
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ promise1.Set(1);
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(resultDequeue1.Get().Value(), 1);
+
+ promise2.Set(2);
+ auto resultDequeue2 = queue.Dequeue();
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(resultDequeue2.Get().Value(), 2);
+}
+
+TEST(TBoundedNonblockingQueueTest, EnqueueFirstAsync2)
+{
+ TBoundedNonblockingQueue<int> queue(1);
+ auto promise1 = NewPromise<int>();
+ auto promise2 = NewPromise<int>();
+ auto resultEnqueue1 = queue.Enqueue(promise1.ToFuture());
+ EXPECT_TRUE(resultEnqueue1.IsSet());
+ auto resultEnqueue2 = queue.Enqueue(promise2.ToFuture());
+ EXPECT_FALSE(resultEnqueue2.IsSet());
+
+ auto resultDequeue1 = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue1.IsSet());
+
+ EXPECT_TRUE(resultEnqueue2.IsSet());
+
+ auto resultDequeue2 = queue.Dequeue();
+ EXPECT_FALSE(resultDequeue2.IsSet());
+
+ promise1.Set(1);
+ EXPECT_TRUE(resultDequeue1.IsSet());
+ EXPECT_EQ(resultDequeue1.Get().Value(), 1);
+
+ promise2.Set(2);
+ EXPECT_TRUE(resultDequeue2.IsSet());
+ EXPECT_EQ(resultDequeue2.Get().Value(), 2);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT::NConcurrency