aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-08-14 21:58:50 +0300
committernadya73 <nadya73@yandex-team.com>2024-08-14 22:07:46 +0300
commit1ab0dd269e79cf843ea577eebcdec512376f5a7c (patch)
tree56d9f4884e652724e69b04852423b9d09213d4d1
parent0634a119529d4799e23b20e067daf2020d8fbc3c (diff)
downloadydb-1ab0dd269e79cf843ea577eebcdec512376f5a7c.tar.gz
[yt/yt/core] Add TBoundedAsyncStreamPipe
6aa88a1c73ee8fb7fa669994cefd9bc31065446b
-rw-r--r--yt/yt/core/concurrency/async_stream_pipe.cpp69
-rw-r--r--yt/yt/core/concurrency/async_stream_pipe.h26
-rw-r--r--yt/yt/core/concurrency/nonblocking_queue-inl.h35
-rw-r--r--yt/yt/core/concurrency/nonblocking_queue.h2
-rw-r--r--yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp185
5 files changed, 301 insertions, 16 deletions
diff --git a/yt/yt/core/concurrency/async_stream_pipe.cpp b/yt/yt/core/concurrency/async_stream_pipe.cpp
index 794786a9e8..5ef5cfccf5 100644
--- a/yt/yt/core/concurrency/async_stream_pipe.cpp
+++ b/yt/yt/core/concurrency/async_stream_pipe.cpp
@@ -52,4 +52,73 @@ TFuture<void> TAsyncStreamPipe::Abort(const TError& error)
////////////////////////////////////////////////////////////////////////////////
+TBoundedAsyncStreamPipe::TBoundedAsyncStreamPipe(i64 sizeLimit)
+ : Queue_(sizeLimit)
+{ }
+
+TFuture<void> TBoundedAsyncStreamPipe::Write(const TSharedRef& buffer)
+{
+ if (!buffer) {
+ // Empty buffer has special meaning in our queue, so we don't write it.
+ return VoidFuture;
+ }
+
+ if (Aborted_.load()) {
+ return MakeFuture(Error_);
+ }
+
+ auto result = Queue_.Enqueue(TSharedRef::MakeCopy<TAsyncStreamPipeTag>(buffer));
+
+ if (Aborted_.load()) {
+ Queue_.Drain(Error_);
+ return MakeFuture(Error_);
+ }
+
+ return result.Apply(BIND([this, this_ = MakeStrong(this)] () -> TFuture<void> {
+ if (Aborted_.load()) {
+ return MakeFuture(Error_);
+ }
+ return VoidFuture;
+ }));
+}
+
+TFuture<TSharedRef> TBoundedAsyncStreamPipe::Read()
+{
+ if (Aborted_.load()) {
+ return MakeFuture<TSharedRef>(Error_);
+ }
+
+ auto result = Queue_.Dequeue();
+
+ if (Aborted_.load()) {
+ Queue_.Drain(Error_);
+ return MakeFuture<TSharedRef>(Error_);
+ }
+
+ return result.ApplyUnique(BIND([this, this_ = MakeStrong(this)] (TSharedRef&& data) -> TFuture<TSharedRef> {
+ if (Aborted_.load()) {
+ return MakeFuture<TSharedRef>(Error_);
+ }
+ return MakeFuture(std::move(data));
+ }));
+}
+
+TFuture<void> TBoundedAsyncStreamPipe::Close()
+{
+ return Queue_.Enqueue(TSharedRef());
+}
+
+void TBoundedAsyncStreamPipe::Abort(const TError& error)
+{
+ if (Aborting_.exchange(true)) {
+ return;
+ }
+ Error_ = error;
+ Aborted_.store(true);
+
+ Queue_.Drain(Error_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/async_stream_pipe.h b/yt/yt/core/concurrency/async_stream_pipe.h
index 2dbb6a527b..897eb865ea 100644
--- a/yt/yt/core/concurrency/async_stream_pipe.h
+++ b/yt/yt/core/concurrency/async_stream_pipe.h
@@ -39,4 +39,30 @@ DEFINE_REFCOUNTED_TYPE(TAsyncStreamPipe)
////////////////////////////////////////////////////////////////////////////////
+class TBoundedAsyncStreamPipe
+ : public IAsyncZeroCopyInputStream
+ , public IAsyncOutputStream
+{
+public:
+ explicit TBoundedAsyncStreamPipe(i64 sizeLimit);
+
+ TFuture<TSharedRef> Read() override;
+
+ TFuture<void> Write(const TSharedRef& buffer) override;
+ TFuture<void> Close() override;
+
+ void Abort(const TError& error);
+
+private:
+ TBoundedNonblockingQueue<TSharedRef> Queue_;
+
+ std::atomic<bool> Aborting_ = false;
+ std::atomic<bool> Aborted_ = false;
+ TError Error_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TBoundedAsyncStreamPipe)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/nonblocking_queue-inl.h b/yt/yt/core/concurrency/nonblocking_queue-inl.h
index da360bff8c..e18d924b01 100644
--- a/yt/yt/core/concurrency/nonblocking_queue-inl.h
+++ b/yt/yt/core/concurrency/nonblocking_queue-inl.h
@@ -110,6 +110,41 @@ TFuture<T> TBoundedNonblockingQueue<T>::Dequeue()
}
}
+template <class T>
+void TBoundedNonblockingQueue<T>::Drain(const TError& error)
+{
+ auto guard = Guard(SpinLock_);
+
+ std::vector<TPromise<T>> consumers;
+ consumers.reserve(ConsumerQueue_.size());
+
+ std::vector<TPromise<void>> producers;
+ producers.reserve(ProducerQueue_.size());
+
+ while (!ConsumerQueue_.empty()) {
+ auto promise = ConsumerQueue_.front();
+ ConsumerQueue_.pop();
+ consumers.push_back(std::move(promise));
+ }
+
+ while (!ProducerQueue_.empty()) {
+ auto promise = ProducerQueue_.front();
+ ProducerQueue_.pop();
+ producers.push_back(std::move(promise));
+ }
+
+ guard.Release();
+
+ auto resultError = TError("Queue was drained with error") << error;
+
+ for (const auto& consumer : consumers) {
+ consumer.Set(resultError);
+ }
+ for (const auto& producer : producers) {
+ producer.Set(resultError);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NConcurrency
diff --git a/yt/yt/core/concurrency/nonblocking_queue.h b/yt/yt/core/concurrency/nonblocking_queue.h
index 675990a011..ecf2e4c738 100644
--- a/yt/yt/core/concurrency/nonblocking_queue.h
+++ b/yt/yt/core/concurrency/nonblocking_queue.h
@@ -48,6 +48,8 @@ public:
// Dequeued futures could be set in arbitrary order.
TFuture<T> Dequeue();
+ void Drain(const TError& error);
+
private:
const i64 SizeLimit_ = 0;
diff --git a/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp b/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp
index 55b329391e..18b88b4962 100644
--- a/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/async_stream_pipe_ut.cpp
@@ -20,14 +20,14 @@ TEST(TAsyncStreamPipeTest, Simple)
{
const auto readResult = pipe->Read();
- ASSERT_FALSE(readResult.IsSet());
+ EXPECT_FALSE(readResult.IsSet());
auto writeResult = pipe->Write(TSharedRef::FromString("FOO"));
- ASSERT_TRUE(readResult.IsSet());
- ASSERT_TRUE(readResult.Get().IsOK());
- ASSERT_EQ(GetString(readResult.Get().Value()), "FOO");
- ASSERT_TRUE(writeResult.IsSet());
- ASSERT_TRUE(writeResult.Get().IsOK());
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "FOO");
+ EXPECT_TRUE(writeResult.IsSet());
+ EXPECT_TRUE(writeResult.Get().IsOK());
}
{
@@ -35,28 +35,181 @@ TEST(TAsyncStreamPipeTest, Simple)
EXPECT_FALSE(writeResult.IsSet());
const auto readResult = pipe->Read();
- ASSERT_TRUE(readResult.IsSet());
- ASSERT_TRUE(readResult.Get().IsOK());
- ASSERT_EQ(GetString(readResult.Get().Value()), "BAR_BAZ");
- ASSERT_TRUE(writeResult.IsSet());
- ASSERT_TRUE(writeResult.Get().IsOK());
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "BAR_BAZ");
+ EXPECT_TRUE(writeResult.IsSet());
+ EXPECT_TRUE(writeResult.Get().IsOK());
}
{
const auto readResult = pipe->Read();
- ASSERT_FALSE(readResult.IsSet());
+ EXPECT_FALSE(readResult.IsSet());
const auto closed = pipe->Close();
- ASSERT_TRUE(closed.IsSet());
+ EXPECT_TRUE(closed.IsSet());
- ASSERT_TRUE(readResult.IsSet());
- ASSERT_TRUE(readResult.Get().IsOK());
- ASSERT_EQ(GetString(readResult.Get().Value()), "");
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "");
}
}
////////////////////////////////////////////////////////////////////////////////
+TEST(TBoundedAsyncStreamPipeTest, Simple)
+{
+ auto pipe = New<TBoundedAsyncStreamPipe>(1);
+
+ {
+ const auto readResult = pipe->Read();
+ EXPECT_FALSE(readResult.IsSet());
+
+ auto writeResult = pipe->Write(TSharedRef::FromString("FOO"));
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "FOO");
+
+ EXPECT_TRUE(writeResult.IsSet());
+ EXPECT_TRUE(writeResult.Get().IsOK());
+ }
+
+ {
+ auto writeResult = pipe->Write(TSharedRef::FromString("BAR"));
+ EXPECT_TRUE(writeResult.IsSet());
+ EXPECT_TRUE(writeResult.Get().IsOK());
+
+ const auto readResult = pipe->Read();
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "BAR");
+ }
+
+ {
+ auto writeResult1 = pipe->Write(TSharedRef::FromString("BAZ_1"));
+ EXPECT_TRUE(writeResult1.IsSet());
+ EXPECT_TRUE(writeResult1.Get().IsOK());
+
+ auto writeResult2 = pipe->Write(TSharedRef::FromString("BAZ_2"));
+ EXPECT_FALSE(writeResult2.IsSet());
+
+ const auto readResult1 = pipe->Read();
+ EXPECT_TRUE(readResult1.IsSet());
+ EXPECT_TRUE(readResult1.Get().IsOK());
+ EXPECT_EQ(GetString(readResult1.Get().Value()), "BAZ_1");
+
+ EXPECT_TRUE(writeResult2.IsSet());
+ EXPECT_TRUE(writeResult2.Get().IsOK());
+
+ const auto readResult2 = pipe->Read();
+ EXPECT_TRUE(readResult2.IsSet());
+ EXPECT_TRUE(readResult2.Get().IsOK());
+ EXPECT_EQ(GetString(readResult2.Get().Value()), "BAZ_2");
+ }
+
+ {
+ const auto readResult1 = pipe->Read();
+ EXPECT_FALSE(readResult1.IsSet());
+
+ const auto readResult2 = pipe->Read();
+ EXPECT_FALSE(readResult2.IsSet());
+
+ auto writeResult1 = pipe->Write(TSharedRef::FromString("ABC_1"));
+ EXPECT_TRUE(writeResult1.IsSet());
+ EXPECT_TRUE(writeResult1.Get().IsOK());
+
+ auto writeResult2 = pipe->Write(TSharedRef::FromString("ABC_2"));
+ EXPECT_TRUE(writeResult2.IsSet());
+ EXPECT_TRUE(writeResult2.Get().IsOK());
+
+ EXPECT_TRUE(readResult1.IsSet());
+ EXPECT_TRUE(readResult1.Get().IsOK());
+ EXPECT_EQ(GetString(readResult1.Get().Value()), "ABC_1");
+
+ EXPECT_TRUE(readResult2.IsSet());
+ EXPECT_TRUE(readResult2.Get().IsOK());
+ EXPECT_EQ(GetString(readResult2.Get().Value()), "ABC_2");
+ }
+
+ {
+ const auto readResult = pipe->Read();
+ EXPECT_FALSE(readResult.IsSet());
+
+ const auto closed = pipe->Close();
+ EXPECT_TRUE(closed.IsSet());
+
+ EXPECT_TRUE(readResult.IsSet());
+ EXPECT_TRUE(readResult.Get().IsOK());
+ EXPECT_EQ(GetString(readResult.Get().Value()), "");
+ }
+}
+
+TEST(TBoundedAsyncStreamPipeTest, AbortWaitRead)
+{
+ auto pipe = New<TBoundedAsyncStreamPipe>(1);
+
+ const auto readResult1 = pipe->Read();
+ EXPECT_FALSE(readResult1.IsSet());
+
+ auto writeResult1 = pipe->Write(TSharedRef::FromString("FOO"));
+ EXPECT_TRUE(writeResult1.IsSet());
+ EXPECT_TRUE(writeResult1.Get().IsOK());
+
+ EXPECT_TRUE(readResult1.IsSet());
+ EXPECT_TRUE(readResult1.Get().IsOK());
+ EXPECT_EQ(GetString(readResult1.Get().Value()), "FOO");
+
+ const auto readResult2 = pipe->Read();
+ EXPECT_FALSE(readResult2.IsSet());
+
+ pipe->Abort(TError("fail"));
+
+ EXPECT_TRUE(readResult2.IsSet());
+ EXPECT_FALSE(readResult2.Get().IsOK());
+ EXPECT_THROW_WITH_SUBSTRING(readResult2.Get().ThrowOnError(), "was drained");
+}
+
+TEST(TBoundedAsyncStreamPipeTest, AbortWaitWrite)
+{
+ auto pipe = New<TBoundedAsyncStreamPipe>(1);
+
+ const auto readResult1 = pipe->Read();
+ EXPECT_FALSE(readResult1.IsSet());
+
+ auto writeResult1 = pipe->Write(TSharedRef::FromString("FOO"));
+ EXPECT_TRUE(writeResult1.IsSet());
+ EXPECT_TRUE(writeResult1.Get().IsOK());
+
+ EXPECT_TRUE(readResult1.IsSet());
+ EXPECT_TRUE(readResult1.Get().IsOK());
+ EXPECT_EQ(GetString(readResult1.Get().Value()), "FOO");
+
+ const auto writeResult2 = pipe->Write(TSharedRef::FromString("BAR"));
+ EXPECT_TRUE(writeResult2.IsSet());
+ EXPECT_TRUE(writeResult2.Get().IsOK());
+
+ const auto writeResult3 = pipe->Write(TSharedRef::FromString("BAZ"));
+ EXPECT_FALSE(writeResult3.IsSet());
+
+ pipe->Abort(TError("fail"));
+
+ EXPECT_TRUE(writeResult3.IsSet());
+ EXPECT_FALSE(writeResult3.Get().IsOK());
+ EXPECT_THROW_WITH_SUBSTRING(writeResult3.Get().ThrowOnError(), "was drained");
+
+ const auto writeResult4 = pipe->Write(TSharedRef::FromString("ABC"));
+ EXPECT_TRUE(writeResult4.IsSet());
+ EXPECT_FALSE(writeResult4.Get().IsOK());
+ EXPECT_THROW_WITH_SUBSTRING(writeResult4.Get().ThrowOnError(), "fail");
+
+ const auto readResult2 = pipe->Read();
+ EXPECT_TRUE(readResult2.IsSet());
+ EXPECT_FALSE(readResult2.Get().IsOK());
+ EXPECT_THROW_WITH_SUBSTRING(readResult2.Get().ThrowOnError(), "fail");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace
} // namespace NYT