summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/common
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/common
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/common')
-rw-r--r--yt/cpp/mapreduce/common/abortable_stream.cpp124
-rw-r--r--yt/cpp/mapreduce/common/abortable_stream.h21
-rw-r--r--yt/cpp/mapreduce/common/halting_stream.cpp67
-rw-r--r--yt/cpp/mapreduce/common/halting_stream.h15
-rw-r--r--yt/cpp/mapreduce/common/retry_lib.cpp2
-rw-r--r--yt/cpp/mapreduce/common/ya.make5
6 files changed, 233 insertions, 1 deletions
diff --git a/yt/cpp/mapreduce/common/abortable_stream.cpp b/yt/cpp/mapreduce/common/abortable_stream.cpp
new file mode 100644
index 00000000000..6336d792f14
--- /dev/null
+++ b/yt/cpp/mapreduce/common/abortable_stream.cpp
@@ -0,0 +1,124 @@
+#include "abortable_stream.h"
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+#include <yt/yt/core/concurrency/scheduler_api.h>
+
+#include <library/cpp/yt/logging/logger.h>
+#include <library/cpp/yt/memory/ref.h>
+
+#include <util/system/spinlock.h>
+
+namespace NYT::NDetail {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableInputStreamAdapter
+ : public IAbortableInputStream
+{
+public:
+ explicit TAbortableInputStreamAdapter(IAsyncInputStreamPtr underlyingStream)
+ : UnderlyingStream_(std::move(underlyingStream))
+ { }
+
+ void Abort() override
+ {
+ auto guard = Guard(Lock_);
+ IsAborted_ = true;
+ CurrentFuture_.Cancel(TError("Stream was aborted"));
+ }
+
+ bool IsAborted() const override
+ {
+ auto guard = Guard(Lock_);
+ return IsAborted_;
+ }
+
+private:
+ const IAsyncInputStreamPtr UnderlyingStream_;
+
+ TAdaptiveLock Lock_;
+ bool IsAborted_ = false;
+ TFuture<size_t> CurrentFuture_;
+
+ size_t DoRead(void* buffer, size_t length) override
+ {
+ if (length == 0) {
+ return 0;
+ }
+
+ struct TAbortableInputStreamBufferTag { };
+ auto readBuffer = TSharedMutableRef::Allocate<TAbortableInputStreamBufferTag>(length);
+
+ auto future = UnderlyingStream_->Read(readBuffer);
+ {
+ auto guard = Guard(Lock_);
+ CurrentFuture_ = future;
+ if (IsAborted_) {
+ future.Cancel(TError("Stream was aborted"));
+ }
+ }
+
+ auto result = WaitFor(future);
+
+ {
+ auto guard = Guard(Lock_);
+ if (IsAborted_) {
+ ythrow TInputStreamAbortedError() << "Stream was aborted";
+ }
+ }
+
+ auto bytesRead = result.ValueOrThrow();
+
+ memcpy(buffer, readBuffer.Begin(), bytesRead);
+
+ return bytesRead;
+ }
+};
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter(
+ IAsyncInputStreamPtr underlyingStream)
+{
+ YT_VERIFY(underlyingStream);
+ return std::make_unique<TAbortableInputStreamAdapter>(
+ std::move(underlyingStream));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAbortableStreamFallback
+ : public IAbortableInputStream
+{
+public:
+ explicit TAbortableStreamFallback(IInputStream* underlyingStream)
+ : UnderlyingStream_(underlyingStream)
+ { }
+
+ void Abort() override
+ {
+ YT_LOG_WARNING("Abort for this stream type is not supported");
+ }
+
+private:
+ IInputStream* const UnderlyingStream_;
+
+ size_t DoRead(void* buffer, size_t length) override
+ {
+ return UnderlyingStream_->Read(buffer, length);
+ }
+};
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback(
+ IInputStream* underlyingStream)
+{
+ YT_VERIFY(underlyingStream);
+ return std::make_unique<TAbortableStreamFallback>(underlyingStream);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/abortable_stream.h b/yt/cpp/mapreduce/common/abortable_stream.h
new file mode 100644
index 00000000000..8b58e6582fc
--- /dev/null
+++ b/yt/cpp/mapreduce/common/abortable_stream.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+#include <memory>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter(
+ NConcurrency::IAsyncInputStreamPtr underlyingStream);
+
+std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback(
+ IInputStream* underlyingStream);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp
new file mode 100644
index 00000000000..ba14d9af316
--- /dev/null
+++ b/yt/cpp/mapreduce/common/halting_stream.cpp
@@ -0,0 +1,67 @@
+#include "halting_stream.h"
+
+#include <yt/yt/core/actions/bind.h>
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NDetail {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THaltingAsyncStream
+ : public IAsyncInputStream
+{
+public:
+ THaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+ : Underlying_(std::move(underlying))
+ , BytesBeforeHalt_(bytesBeforeHalt)
+ { }
+
+private:
+ void OnRead(TPromise<size_t> promise, const TErrorOr<size_t>& result)
+ {
+ if (result.IsOK()) {
+ BytesRead_ += result.Value();
+ }
+ promise.TrySet(result);
+ }
+
+ TFuture<size_t> Read(const TSharedMutableRef& buffer) override
+ {
+ if (BytesRead_ >= BytesBeforeHalt_) {
+ HaltPromise_ = NewPromise<size_t>();
+ return HaltPromise_.ToFuture();
+ }
+
+ auto limit = std::min(buffer.Size(), static_cast<size_t>(BytesBeforeHalt_ - BytesRead_));
+ auto promise = NewPromise<size_t>();
+ auto future = promise.ToFuture();
+
+ Underlying_->Read(buffer.Slice(0, limit)).Subscribe(
+ BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise)));
+
+ return future;
+ }
+
+private:
+ IAsyncInputStreamPtr Underlying_;
+ const i64 BytesBeforeHalt_;
+ i64 BytesRead_ = 0;
+ TPromise<size_t> HaltPromise_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IAsyncInputStreamPtr CreateHaltingAsyncStream(
+ IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt)
+{
+ return New<THaltingAsyncStream>(std::move(underlying), bytesBeforeHalt);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/halting_stream.h b/yt/cpp/mapreduce/common/halting_stream.h
new file mode 100644
index 00000000000..10b8f75e1f7
--- /dev/null
+++ b/yt/cpp/mapreduce/common/halting_stream.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NConcurrency::IAsyncInputStreamPtr CreateHaltingAsyncStream(
+ NConcurrency::IAsyncInputStreamPtr underlying,
+ i64 bytesBeforeHalt);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp
index abcb9e87b1a..9e63455242a 100644
--- a/yt/cpp/mapreduce/common/retry_lib.cpp
+++ b/yt/cpp/mapreduce/common/retry_lib.cpp
@@ -257,6 +257,8 @@ bool IsRetriable(const std::exception& ex)
{
if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) {
return false;
+ } else if (dynamic_cast<const TInputStreamAbortedError*>(&ex)) {
+ return false;
}
return true;
}
diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make
index 0b427d5ce40..6500da9e8e0 100644
--- a/yt/cpp/mapreduce/common/ya.make
+++ b/yt/cpp/mapreduce/common/ya.make
@@ -3,8 +3,10 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
- expected_error_guard.cpp
+ abortable_stream.cpp
debug_metrics.cpp
+ expected_error_guard.cpp
+ halting_stream.cpp
helpers.cpp
retry_lib.cpp
wait_proxy.cpp
@@ -19,6 +21,7 @@ PEERDIR(
library/cpp/yson/node
yt/cpp/mapreduce/interface
yt/cpp/mapreduce/interface/logging
+ yt/yt/core
)
END()