From a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf Mon Sep 17 00:00:00 2001 From: maybenotilya Date: Fri, 17 Apr 2026 11:57:44 +0300 Subject: YT-26179: Add Abort for readers * Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b --- yt/cpp/mapreduce/common/halting_stream.cpp | 67 ++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 yt/cpp/mapreduce/common/halting_stream.cpp (limited to 'yt/cpp/mapreduce/common/halting_stream.cpp') diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp new file mode 100644 index 00000000000..ba14d9af316 --- /dev/null +++ b/yt/cpp/mapreduce/common/halting_stream.cpp @@ -0,0 +1,67 @@ +#include "halting_stream.h" + +#include +#include + +namespace NYT::NDetail { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class THaltingAsyncStream + : public IAsyncInputStream +{ +public: + THaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) + : Underlying_(std::move(underlying)) + , BytesBeforeHalt_(bytesBeforeHalt) + { } + +private: + void OnRead(TPromise promise, const TErrorOr& result) + { + if (result.IsOK()) { + BytesRead_ += result.Value(); + } + promise.TrySet(result); + } + + TFuture Read(const TSharedMutableRef& buffer) override + { + if (BytesRead_ >= BytesBeforeHalt_) { + HaltPromise_ = NewPromise(); + return HaltPromise_.ToFuture(); + } + + auto limit = std::min(buffer.Size(), static_cast(BytesBeforeHalt_ - BytesRead_)); + auto promise = NewPromise(); + auto future = promise.ToFuture(); + + Underlying_->Read(buffer.Slice(0, limit)).Subscribe( + BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise))); + + return future; + } + +private: + IAsyncInputStreamPtr Underlying_; + const i64 BytesBeforeHalt_; + i64 BytesRead_ = 0; + TPromise HaltPromise_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IAsyncInputStreamPtr CreateHaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) +{ + return New(std::move(underlying), bytesBeforeHalt); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail -- cgit v1.3