summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/common/halting_stream.cpp
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/common/halting_stream.cpp
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/common/halting_stream.cpp')
-rw-r--r--yt/cpp/mapreduce/common/halting_stream.cpp67
1 files changed, 67 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp
new file mode 100644
index 00000000000..ba14d9af316
--- /dev/null
+++ b/yt/cpp/mapreduce/common/halting_stream.cpp
@@ -0,0 +1,67 @@
+#include "halting_stream.h"
+
+#include <yt/yt/core/actions/bind.h>
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NDetail {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THaltingAsyncStream
+ : public IAsyncInputStream
+{
+public:
+ THaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+ : Underlying_(std::move(underlying))
+ , BytesBeforeHalt_(bytesBeforeHalt)
+ { }
+
+private:
+ void OnRead(TPromise<size_t> promise, const TErrorOr<size_t>& result)
+ {
+ if (result.IsOK()) {
+ BytesRead_ += result.Value();
+ }
+ promise.TrySet(result);
+ }
+
+ TFuture<size_t> Read(const TSharedMutableRef& buffer) override
+ {
+ if (BytesRead_ >= BytesBeforeHalt_) {
+ HaltPromise_ = NewPromise<size_t>();
+ return HaltPromise_.ToFuture();
+ }
+
+ auto limit = std::min(buffer.Size(), static_cast<size_t>(BytesBeforeHalt_ - BytesRead_));
+ auto promise = NewPromise<size_t>();
+ auto future = promise.ToFuture();
+
+ Underlying_->Read(buffer.Slice(0, limit)).Subscribe(
+ BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise)));
+
+ return future;
+ }
+
+private:
+ IAsyncInputStreamPtr Underlying_;
+ const i64 BytesBeforeHalt_;
+ i64 BytesRead_ = 0;
+ TPromise<size_t> HaltPromise_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IAsyncInputStreamPtr CreateHaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+{
+ return New<THaltingAsyncStream>(std::move(underlying), bytesBeforeHalt);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail