diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-08-14 21:58:50 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-08-14 22:07:46 +0300 |
commit | 1ab0dd269e79cf843ea577eebcdec512376f5a7c (patch) | |
tree | 56d9f4884e652724e69b04852423b9d09213d4d1 | |
parent | 0634a119529d4799e23b20e067daf2020d8fbc3c (diff) | |
download | ydb-1ab0dd269e79cf843ea577eebcdec512376f5a7c.tar.gz |
[yt/yt/core] Add TBoundedAsyncStreamPipe
6aa88a1c73ee8fb7fa669994cefd9bc31065446b
-rw-r--r-- | yt/yt/core/concurrency/async_stream_pipe.cpp | 69 | ||||
-rw-r--r-- | yt/yt/core/concurrency/async_stream_pipe.h | 26 | ||||
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_queue-inl.h | 35 | ||||
-rw-r--r-- | yt/yt/core/concurrency/nonblocking_queue.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp | 185 |
5 files changed, 301 insertions, 16 deletions
diff --git a/yt/yt/core/concurrency/async_stream_pipe.cpp b/yt/yt/core/concurrency/async_stream_pipe.cpp index 794786a9e8..5ef5cfccf5 100644 --- a/yt/yt/core/concurrency/async_stream_pipe.cpp +++ b/yt/yt/core/concurrency/async_stream_pipe.cpp @@ -52,4 +52,73 @@ TFuture<void> TAsyncStreamPipe::Abort(const TError& error) //////////////////////////////////////////////////////////////////////////////// +TBoundedAsyncStreamPipe::TBoundedAsyncStreamPipe(i64 sizeLimit) + : Queue_(sizeLimit) +{ } + +TFuture<void> TBoundedAsyncStreamPipe::Write(const TSharedRef& buffer) +{ + if (!buffer) { + // Empty buffer has special meaning in our queue, so we don't write it. + return VoidFuture; + } + + if (Aborted_.load()) { + return MakeFuture(Error_); + } + + auto result = Queue_.Enqueue(TSharedRef::MakeCopy<TAsyncStreamPipeTag>(buffer)); + + if (Aborted_.load()) { + Queue_.Drain(Error_); + return MakeFuture(Error_); + } + + return result.Apply(BIND([this, this_ = MakeStrong(this)] () -> TFuture<void> { + if (Aborted_.load()) { + return MakeFuture(Error_); + } + return VoidFuture; + })); +} + +TFuture<TSharedRef> TBoundedAsyncStreamPipe::Read() +{ + if (Aborted_.load()) { + return MakeFuture<TSharedRef>(Error_); + } + + auto result = Queue_.Dequeue(); + + if (Aborted_.load()) { + Queue_.Drain(Error_); + return MakeFuture<TSharedRef>(Error_); + } + + return result.ApplyUnique(BIND([this, this_ = MakeStrong(this)] (TSharedRef&& data) -> TFuture<TSharedRef> { + if (Aborted_.load()) { + return MakeFuture<TSharedRef>(Error_); + } + return MakeFuture(std::move(data)); + })); +} + +TFuture<void> TBoundedAsyncStreamPipe::Close() +{ + return Queue_.Enqueue(TSharedRef()); +} + +void TBoundedAsyncStreamPipe::Abort(const TError& error) +{ + if (Aborting_.exchange(true)) { + return; + } + Error_ = error; + Aborted_.store(true); + + Queue_.Drain(Error_); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/async_stream_pipe.h b/yt/yt/core/concurrency/async_stream_pipe.h index 2dbb6a527b..897eb865ea 100644 --- a/yt/yt/core/concurrency/async_stream_pipe.h +++ b/yt/yt/core/concurrency/async_stream_pipe.h @@ -39,4 +39,30 @@ DEFINE_REFCOUNTED_TYPE(TAsyncStreamPipe) //////////////////////////////////////////////////////////////////////////////// +class TBoundedAsyncStreamPipe + : public IAsyncZeroCopyInputStream + , public IAsyncOutputStream +{ +public: + explicit TBoundedAsyncStreamPipe(i64 sizeLimit); + + TFuture<TSharedRef> Read() override; + + TFuture<void> Write(const TSharedRef& buffer) override; + TFuture<void> Close() override; + + void Abort(const TError& error); + +private: + TBoundedNonblockingQueue<TSharedRef> Queue_; + + std::atomic<bool> Aborting_ = false; + std::atomic<bool> Aborted_ = false; + TError Error_; +}; + +DEFINE_REFCOUNTED_TYPE(TBoundedAsyncStreamPipe) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/nonblocking_queue-inl.h b/yt/yt/core/concurrency/nonblocking_queue-inl.h index da360bff8c..e18d924b01 100644 --- a/yt/yt/core/concurrency/nonblocking_queue-inl.h +++ b/yt/yt/core/concurrency/nonblocking_queue-inl.h @@ -110,6 +110,41 @@ TFuture<T> TBoundedNonblockingQueue<T>::Dequeue() } } +template <class T> +void TBoundedNonblockingQueue<T>::Drain(const TError& error) +{ + auto guard = Guard(SpinLock_); + + std::vector<TPromise<T>> consumers; + consumers.reserve(ConsumerQueue_.size()); + + std::vector<TPromise<void>> producers; + producers.reserve(ProducerQueue_.size()); + + while (!ConsumerQueue_.empty()) { + auto promise = ConsumerQueue_.front(); + ConsumerQueue_.pop(); + consumers.push_back(std::move(promise)); + } + + while (!ProducerQueue_.empty()) { + auto promise = ProducerQueue_.front(); + ProducerQueue_.pop(); + producers.push_back(std::move(promise)); + } + + guard.Release(); + + auto resultError = TError("Queue was drained with error") << error; + + for (const auto& consumer : consumers) { + consumer.Set(resultError); + } + for (const auto& producer : producers) { + producer.Set(resultError); + } +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NConcurrency diff --git a/yt/yt/core/concurrency/nonblocking_queue.h b/yt/yt/core/concurrency/nonblocking_queue.h index 675990a011..ecf2e4c738 100644 --- a/yt/yt/core/concurrency/nonblocking_queue.h +++ b/yt/yt/core/concurrency/nonblocking_queue.h @@ -48,6 +48,8 @@ public: // Dequeued futures could be set in arbitrary order. TFuture<T> Dequeue(); + void Drain(const TError& error); + private: const i64 SizeLimit_ = 0; diff --git a/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp b/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp index 55b329391e..18b88b4962 100644 --- a/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp +++ b/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp @@ -20,14 +20,14 @@ TEST(TAsyncStreamPipeTest, Simple) { const auto readResult = pipe->Read(); - ASSERT_FALSE(readResult.IsSet()); + EXPECT_FALSE(readResult.IsSet()); auto writeResult = pipe->Write(TSharedRef::FromString("FOO")); - ASSERT_TRUE(readResult.IsSet()); - ASSERT_TRUE(readResult.Get().IsOK()); - ASSERT_EQ(GetString(readResult.Get().Value()), "FOO"); - ASSERT_TRUE(writeResult.IsSet()); - ASSERT_TRUE(writeResult.Get().IsOK()); + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), "FOO"); + EXPECT_TRUE(writeResult.IsSet()); + EXPECT_TRUE(writeResult.Get().IsOK()); } { @@ -35,28 +35,181 @@ TEST(TAsyncStreamPipeTest, Simple) EXPECT_FALSE(writeResult.IsSet()); const auto readResult = pipe->Read(); - ASSERT_TRUE(readResult.IsSet()); - ASSERT_TRUE(readResult.Get().IsOK()); - ASSERT_EQ(GetString(readResult.Get().Value()), "BAR_BAZ"); - ASSERT_TRUE(writeResult.IsSet()); - ASSERT_TRUE(writeResult.Get().IsOK()); + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), "BAR_BAZ"); + EXPECT_TRUE(writeResult.IsSet()); + EXPECT_TRUE(writeResult.Get().IsOK()); } { const auto readResult = pipe->Read(); - ASSERT_FALSE(readResult.IsSet()); + EXPECT_FALSE(readResult.IsSet()); const auto closed = pipe->Close(); - ASSERT_TRUE(closed.IsSet()); + EXPECT_TRUE(closed.IsSet()); - ASSERT_TRUE(readResult.IsSet()); - ASSERT_TRUE(readResult.Get().IsOK()); - ASSERT_EQ(GetString(readResult.Get().Value()), ""); + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), ""); } } //////////////////////////////////////////////////////////////////////////////// +TEST(TBoundedAsyncStreamPipeTest, Simple) +{ + auto pipe = New<TBoundedAsyncStreamPipe>(1); + + { + const auto readResult = pipe->Read(); + EXPECT_FALSE(readResult.IsSet()); + + auto writeResult = pipe->Write(TSharedRef::FromString("FOO")); + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), "FOO"); + + EXPECT_TRUE(writeResult.IsSet()); + EXPECT_TRUE(writeResult.Get().IsOK()); + } + + { + auto writeResult = pipe->Write(TSharedRef::FromString("BAR")); + EXPECT_TRUE(writeResult.IsSet()); + EXPECT_TRUE(writeResult.Get().IsOK()); + + const auto readResult = pipe->Read(); + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), "BAR"); + } + + { + auto writeResult1 = pipe->Write(TSharedRef::FromString("BAZ_1")); + EXPECT_TRUE(writeResult1.IsSet()); + EXPECT_TRUE(writeResult1.Get().IsOK()); + + auto writeResult2 = pipe->Write(TSharedRef::FromString("BAZ_2")); + EXPECT_FALSE(writeResult2.IsSet()); + + const auto readResult1 = pipe->Read(); + EXPECT_TRUE(readResult1.IsSet()); + EXPECT_TRUE(readResult1.Get().IsOK()); + EXPECT_EQ(GetString(readResult1.Get().Value()), "BAZ_1"); + + EXPECT_TRUE(writeResult2.IsSet()); + EXPECT_TRUE(writeResult2.Get().IsOK()); + + const auto readResult2 = pipe->Read(); + EXPECT_TRUE(readResult2.IsSet()); + EXPECT_TRUE(readResult2.Get().IsOK()); + EXPECT_EQ(GetString(readResult2.Get().Value()), "BAZ_2"); + } + + { + const auto readResult1 = pipe->Read(); + EXPECT_FALSE(readResult1.IsSet()); + + const auto readResult2 = pipe->Read(); + EXPECT_FALSE(readResult2.IsSet()); + + auto writeResult1 = pipe->Write(TSharedRef::FromString("ABC_1")); + EXPECT_TRUE(writeResult1.IsSet()); + EXPECT_TRUE(writeResult1.Get().IsOK()); + + auto writeResult2 = pipe->Write(TSharedRef::FromString("ABC_2")); + EXPECT_TRUE(writeResult2.IsSet()); + EXPECT_TRUE(writeResult2.Get().IsOK()); + + EXPECT_TRUE(readResult1.IsSet()); + EXPECT_TRUE(readResult1.Get().IsOK()); + EXPECT_EQ(GetString(readResult1.Get().Value()), "ABC_1"); + + EXPECT_TRUE(readResult2.IsSet()); + EXPECT_TRUE(readResult2.Get().IsOK()); + EXPECT_EQ(GetString(readResult2.Get().Value()), "ABC_2"); + } + + { + const auto readResult = pipe->Read(); + EXPECT_FALSE(readResult.IsSet()); + + const auto closed = pipe->Close(); + EXPECT_TRUE(closed.IsSet()); + + EXPECT_TRUE(readResult.IsSet()); + EXPECT_TRUE(readResult.Get().IsOK()); + EXPECT_EQ(GetString(readResult.Get().Value()), ""); + } +} + +TEST(TBoundedAsyncStreamPipeTest, AbortWaitRead) +{ + auto pipe = New<TBoundedAsyncStreamPipe>(1); + + const auto readResult1 = pipe->Read(); + EXPECT_FALSE(readResult1.IsSet()); + + auto writeResult1 = pipe->Write(TSharedRef::FromString("FOO")); + EXPECT_TRUE(writeResult1.IsSet()); + EXPECT_TRUE(writeResult1.Get().IsOK()); + + EXPECT_TRUE(readResult1.IsSet()); + EXPECT_TRUE(readResult1.Get().IsOK()); + EXPECT_EQ(GetString(readResult1.Get().Value()), "FOO"); + + const auto readResult2 = pipe->Read(); + EXPECT_FALSE(readResult2.IsSet()); + + pipe->Abort(TError("fail")); + + EXPECT_TRUE(readResult2.IsSet()); + EXPECT_FALSE(readResult2.Get().IsOK()); + EXPECT_THROW_WITH_SUBSTRING(readResult2.Get().ThrowOnError(), "was drained"); +} + +TEST(TBoundedAsyncStreamPipeTest, AbortWaitWrite) +{ + auto pipe = New<TBoundedAsyncStreamPipe>(1); + + const auto readResult1 = pipe->Read(); + EXPECT_FALSE(readResult1.IsSet()); + + auto writeResult1 = pipe->Write(TSharedRef::FromString("FOO")); + EXPECT_TRUE(writeResult1.IsSet()); + EXPECT_TRUE(writeResult1.Get().IsOK()); + + EXPECT_TRUE(readResult1.IsSet()); + EXPECT_TRUE(readResult1.Get().IsOK()); + EXPECT_EQ(GetString(readResult1.Get().Value()), "FOO"); + + const auto writeResult2 = pipe->Write(TSharedRef::FromString("BAR")); + EXPECT_TRUE(writeResult2.IsSet()); + EXPECT_TRUE(writeResult2.Get().IsOK()); + + const auto writeResult3 = pipe->Write(TSharedRef::FromString("BAZ")); + EXPECT_FALSE(writeResult3.IsSet()); + + pipe->Abort(TError("fail")); + + EXPECT_TRUE(writeResult3.IsSet()); + EXPECT_FALSE(writeResult3.Get().IsOK()); + EXPECT_THROW_WITH_SUBSTRING(writeResult3.Get().ThrowOnError(), "was drained"); + + const auto writeResult4 = pipe->Write(TSharedRef::FromString("ABC")); + EXPECT_TRUE(writeResult4.IsSet()); + EXPECT_FALSE(writeResult4.Get().IsOK()); + EXPECT_THROW_WITH_SUBSTRING(writeResult4.Get().ThrowOnError(), "fail"); + + const auto readResult2 = pipe->Read(); + EXPECT_TRUE(readResult2.IsSet()); + EXPECT_FALSE(readResult2.Get().IsOK()); + EXPECT_THROW_WITH_SUBSTRING(readResult2.Get().ThrowOnError(), "fail"); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace } // namespace NYT |