diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/common | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/common')
| -rw-r--r-- | yt/cpp/mapreduce/common/abortable_stream.cpp | 124 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/abortable_stream.h | 21 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/halting_stream.cpp | 67 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/halting_stream.h | 15 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/retry_lib.cpp | 2 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/common/ya.make | 5 |
6 files changed, 233 insertions, 1 deletions
diff --git a/yt/cpp/mapreduce/common/abortable_stream.cpp b/yt/cpp/mapreduce/common/abortable_stream.cpp new file mode 100644 index 00000000000..6336d792f14 --- /dev/null +++ b/yt/cpp/mapreduce/common/abortable_stream.cpp @@ -0,0 +1,124 @@ +#include "abortable_stream.h" + +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/yt/core/concurrency/async_stream.h> +#include <yt/yt/core/concurrency/scheduler_api.h> + +#include <library/cpp/yt/logging/logger.h> +#include <library/cpp/yt/memory/ref.h> + +#include <util/system/spinlock.h> + +namespace NYT::NDetail { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableInputStreamAdapter + : public IAbortableInputStream +{ +public: + explicit TAbortableInputStreamAdapter(IAsyncInputStreamPtr underlyingStream) + : UnderlyingStream_(std::move(underlyingStream)) + { } + + void Abort() override + { + auto guard = Guard(Lock_); + IsAborted_ = true; + CurrentFuture_.Cancel(TError("Stream was aborted")); + } + + bool IsAborted() const override + { + auto guard = Guard(Lock_); + return IsAborted_; + } + +private: + const IAsyncInputStreamPtr UnderlyingStream_; + + TAdaptiveLock Lock_; + bool IsAborted_ = false; + TFuture<size_t> CurrentFuture_; + + size_t DoRead(void* buffer, size_t length) override + { + if (length == 0) { + return 0; + } + + struct TAbortableInputStreamBufferTag { }; + auto readBuffer = TSharedMutableRef::Allocate<TAbortableInputStreamBufferTag>(length); + + auto future = UnderlyingStream_->Read(readBuffer); + { + auto guard = Guard(Lock_); + CurrentFuture_ = future; + if (IsAborted_) { + future.Cancel(TError("Stream was aborted")); + } + } + + auto result = WaitFor(future); + + { + auto guard = Guard(Lock_); + if (IsAborted_) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } + } + + auto bytesRead = result.ValueOrThrow(); + + memcpy(buffer, readBuffer.Begin(), bytesRead); + + return bytesRead; + } +}; + +std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter( + IAsyncInputStreamPtr underlyingStream) +{ + YT_VERIFY(underlyingStream); + return std::make_unique<TAbortableInputStreamAdapter>( + std::move(underlyingStream)); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableStreamFallback + : public IAbortableInputStream +{ +public: + explicit TAbortableStreamFallback(IInputStream* underlyingStream) + : UnderlyingStream_(underlyingStream) + { } + + void Abort() override + { + YT_LOG_WARNING("Abort for this stream type is not supported"); + } + +private: + IInputStream* const UnderlyingStream_; + + size_t DoRead(void* buffer, size_t length) override + { + return UnderlyingStream_->Read(buffer, length); + } +}; + +std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback( + IInputStream* underlyingStream) +{ + YT_VERIFY(underlyingStream); + return std::make_unique<TAbortableStreamFallback>(underlyingStream); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/abortable_stream.h b/yt/cpp/mapreduce/common/abortable_stream.h new file mode 100644 index 00000000000..8b58e6582fc --- /dev/null +++ b/yt/cpp/mapreduce/common/abortable_stream.h @@ -0,0 +1,21 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/abortable_stream.h> + +#include <yt/yt/core/concurrency/async_stream.h> + +#include <memory> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapter( + NConcurrency::IAsyncInputStreamPtr underlyingStream); + +std::unique_ptr<IAbortableInputStream> CreateAbortableInputStreamAdapterFallback( + IInputStream* underlyingStream); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp new file mode 100644 index 00000000000..ba14d9af316 --- /dev/null +++ b/yt/cpp/mapreduce/common/halting_stream.cpp @@ -0,0 +1,67 @@ +#include "halting_stream.h" + +#include <yt/yt/core/actions/bind.h> +#include <yt/yt/core/concurrency/async_stream.h> + +namespace NYT::NDetail { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class THaltingAsyncStream + : public IAsyncInputStream +{ +public: + THaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) + : Underlying_(std::move(underlying)) + , BytesBeforeHalt_(bytesBeforeHalt) + { } + +private: + void OnRead(TPromise<size_t> promise, const TErrorOr<size_t>& result) + { + if (result.IsOK()) { + BytesRead_ += result.Value(); + } + promise.TrySet(result); + } + + TFuture<size_t> Read(const TSharedMutableRef& buffer) override + { + if (BytesRead_ >= BytesBeforeHalt_) { + HaltPromise_ = NewPromise<size_t>(); + return HaltPromise_.ToFuture(); + } + + auto limit = std::min(buffer.Size(), static_cast<size_t>(BytesBeforeHalt_ - BytesRead_)); + auto promise = NewPromise<size_t>(); + auto future = promise.ToFuture(); + + Underlying_->Read(buffer.Slice(0, limit)).Subscribe( + BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise))); + + return future; + } + +private: + IAsyncInputStreamPtr Underlying_; + const i64 BytesBeforeHalt_; + i64 BytesRead_ = 0; + TPromise<size_t> HaltPromise_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IAsyncInputStreamPtr CreateHaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) +{ + return New<THaltingAsyncStream>(std::move(underlying), bytesBeforeHalt); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/halting_stream.h b/yt/cpp/mapreduce/common/halting_stream.h new file mode 100644 index 00000000000..10b8f75e1f7 --- /dev/null +++ b/yt/cpp/mapreduce/common/halting_stream.h @@ -0,0 +1,15 @@ +#pragma once + +#include <yt/yt/core/concurrency/async_stream.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +NConcurrency::IAsyncInputStreamPtr CreateHaltingAsyncStream( + NConcurrency::IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp index abcb9e87b1a..9e63455242a 100644 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -257,6 +257,8 @@ bool IsRetriable(const std::exception& ex) { if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) { return false; + } else if (dynamic_cast<const TInputStreamAbortedError*>(&ex)) { + return false; } return true; } diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make index 0b427d5ce40..6500da9e8e0 100644 --- a/yt/cpp/mapreduce/common/ya.make +++ b/yt/cpp/mapreduce/common/ya.make @@ -3,8 +3,10 @@ LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( - expected_error_guard.cpp + abortable_stream.cpp debug_metrics.cpp + expected_error_guard.cpp + halting_stream.cpp helpers.cpp retry_lib.cpp wait_proxy.cpp @@ -19,6 +21,7 @@ PEERDIR( library/cpp/yson/node yt/cpp/mapreduce/interface yt/cpp/mapreduce/interface/logging + yt/yt/core ) END() |
