From a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf Mon Sep 17 00:00:00 2001 From: maybenotilya Date: Fri, 17 Apr 2026 11:57:44 +0300 Subject: YT-26179: Add Abort for readers * Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b --- yt/cpp/mapreduce/client/client_reader.cpp | 17 ++- yt/cpp/mapreduce/client/client_reader.h | 7 +- yt/cpp/mapreduce/client/file_reader.cpp | 14 ++- yt/cpp/mapreduce/client/file_reader.h | 14 ++- yt/cpp/mapreduce/client/partition_reader.cpp | 16 ++- yt/cpp/mapreduce/common/abortable_stream.cpp | 124 +++++++++++++++++++++ yt/cpp/mapreduce/common/abortable_stream.h | 21 ++++ yt/cpp/mapreduce/common/halting_stream.cpp | 67 +++++++++++ yt/cpp/mapreduce/common/halting_stream.h | 15 +++ yt/cpp/mapreduce/common/retry_lib.cpp | 2 + yt/cpp/mapreduce/common/ya.make | 5 +- yt/cpp/mapreduce/http/abortable_http_response.cpp | 27 +++-- yt/cpp/mapreduce/http/abortable_http_response.h | 18 +-- yt/cpp/mapreduce/http/http_client.cpp | 36 ++++-- yt/cpp/mapreduce/http/http_client.h | 15 ++- yt/cpp/mapreduce/http_client/raw_client.cpp | 8 +- yt/cpp/mapreduce/http_client/raw_client.h | 8 +- yt/cpp/mapreduce/interface/abortable_stream.cpp | 34 ++++++ yt/cpp/mapreduce/interface/abortable_stream.h | 33 ++++++ yt/cpp/mapreduce/interface/config.cpp | 4 + yt/cpp/mapreduce/interface/config.h | 12 ++ yt/cpp/mapreduce/interface/errors.h | 7 ++ yt/cpp/mapreduce/interface/io-inl.h | 12 ++ yt/cpp/mapreduce/interface/io.cpp | 10 ++ yt/cpp/mapreduce/interface/io.h | 11 +- yt/cpp/mapreduce/interface/raw_client.h | 10 +- yt/cpp/mapreduce/interface/ya.make | 1 + yt/cpp/mapreduce/io/counting_raw_reader.cpp | 10 ++ yt/cpp/mapreduce/io/counting_raw_reader.h | 3 + yt/cpp/mapreduce/io/lenval_table_reader.cpp | 4 + yt/cpp/mapreduce/io/node_table_reader.cpp | 14 +++ yt/cpp/mapreduce/io/node_table_reader.h | 4 +- yt/cpp/mapreduce/io/proto_table_reader.cpp | 20 ++++ yt/cpp/mapreduce/io/proto_table_reader.h | 4 + yt/cpp/mapreduce/io/skiff_row_table_reader.cpp | 14 +++ yt/cpp/mapreduce/io/skiff_row_table_reader.h | 2 + yt/cpp/mapreduce/io/skiff_table_reader.cpp | 14 +++ yt/cpp/mapreduce/io/skiff_table_reader.h | 3 + yt/cpp/mapreduce/io/stream_raw_reader.cpp | 10 ++ yt/cpp/mapreduce/io/stream_table_reader.h | 17 ++- yt/cpp/mapreduce/io/yamr_table_reader.cpp | 10 ++ yt/cpp/mapreduce/io/yamr_table_reader.h | 2 + yt/cpp/mapreduce/rpc_client/raw_client.cpp | 65 +++++++---- yt/cpp/mapreduce/rpc_client/raw_client.h | 8 +- yt/cpp/mapreduce/rpc_client/ya.make | 1 + .../tests/yt_unittest_lib/yt_unittest_lib.h | 5 + 46 files changed, 674 insertions(+), 84 deletions(-) create mode 100644 yt/cpp/mapreduce/common/abortable_stream.cpp create mode 100644 yt/cpp/mapreduce/common/abortable_stream.h create mode 100644 yt/cpp/mapreduce/common/halting_stream.cpp create mode 100644 yt/cpp/mapreduce/common/halting_stream.h create mode 100644 yt/cpp/mapreduce/interface/abortable_stream.cpp create mode 100644 yt/cpp/mapreduce/interface/abortable_stream.h (limited to 'yt/cpp') diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 1eade18397a..0bbc6fa1322 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -84,6 +84,11 @@ bool TClientReader::Retry( const TMaybe& rowIndex, const std::exception_ptr& error) { + // We always stop retries if reader is aborted + if (IAbortableInputStream::IsAbortedError(error)) { + std::rethrow_exception(error); + } + if (CurrentRequestRetryPolicy_) { TMaybe backoffDuration; try { @@ -124,6 +129,16 @@ void TClientReader::ResetRetries() CurrentRequestRetryPolicy_ = nullptr; } +void TClientReader::Abort() +{ + Input_->Abort(); +} + +bool TClientReader::IsAborted() const +{ + return Input_->IsAborted(); +} + size_t TClientReader::DoRead(void* buf, size_t len) { return Input_->Read(buf, len); @@ -178,7 +193,7 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybebegin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } - Input_ = NDetail::RequestWithRetry>( + Input_ = NDetail::RequestWithRetry>( CurrentRequestRetryPolicy_, [this, &transactionId] (TMutationId /*mutationId*/) { return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 9f3160c9c7d..19a9f1ad173 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -2,6 +2,8 @@ #include +#include + #include #include #include @@ -37,6 +39,9 @@ public: bool HasRangeIndices() const override { return true; } + void Abort() override; + bool IsAborted() const override; + protected: size_t DoRead(void* buf, size_t len) override; @@ -53,7 +58,7 @@ private: std::unique_ptr ReadTransaction_; - std::unique_ptr Input_; + std::unique_ptr Input_; IRequestRetryPolicyPtr CurrentRequestRetryPolicy_; diff --git a/yt/cpp/mapreduce/client/file_reader.cpp b/yt/cpp/mapreduce/client/file_reader.cpp index 6163dd55331..ffdd2c9ac30 100644 --- a/yt/cpp/mapreduce/client/file_reader.cpp +++ b/yt/cpp/mapreduce/client/file_reader.cpp @@ -50,6 +50,16 @@ TStreamReaderBase::TStreamReaderBase( TStartTransactionOptions())) { } +void TStreamReaderBase::Abort() +{ + Input_->Abort(); +} + +bool TStreamReaderBase::IsAborted() const +{ + return Input_->IsAborted(); +} + TStreamReaderBase::~TStreamReaderBase() = default; TYPath TStreamReaderBase::Snapshot(const TYPath& path) @@ -98,7 +108,7 @@ TFileReader::TFileReader( Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_); } -std::unique_ptr TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const ui64 currentOffset = StartOffset_ + readBytes; @@ -130,7 +140,7 @@ TBlobTableReader::TBlobTableReader( Path_ = TStreamReaderBase::Snapshot(path); } -std::unique_ptr TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) +std::unique_ptr TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes) { const i64 currentOffset = StartOffset_ + readBytes; const i64 startPartIndex = currentOffset / Options_.PartSize_; diff --git a/yt/cpp/mapreduce/client/file_reader.h b/yt/cpp/mapreduce/client/file_reader.h index d3efe90f26a..6c7bcc8935e 100644 --- a/yt/cpp/mapreduce/client/file_reader.h +++ b/yt/cpp/mapreduce/client/file_reader.h @@ -2,13 +2,12 @@ #include +#include #include #include #include -class IInputStream; - namespace NYT { class TPingableTransaction; @@ -27,6 +26,9 @@ public: const TClientContext& context, const TTransactionId& transactionId); + void Abort() override; + bool IsAborted() const override; + ~TStreamReaderBase(); protected: @@ -37,13 +39,13 @@ protected: private: size_t DoRead(void* buf, size_t len) override; - virtual std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) = 0; + virtual std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) = 0; private: const IClientRetryPolicyPtr ClientRetryPolicy_; TFileReaderOptions FileReaderOptions_; - std::unique_ptr Input_; + std::unique_ptr Input_; std::unique_ptr ReadTransaction_; @@ -66,7 +68,7 @@ public: const TFileReaderOptions& options = {}); private: - std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) override; private: const ui64 StartOffset_; @@ -93,7 +95,7 @@ public: const TBlobTableReaderOptions& options = {}); private: - std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) override; + std::unique_ptr Request(const TTransactionId& transactionId, ui64 readBytes) override; private: const ui64 StartOffset_; diff --git a/yt/cpp/mapreduce/client/partition_reader.cpp b/yt/cpp/mapreduce/client/partition_reader.cpp index a2ca7eb9758..d849040d854 100644 --- a/yt/cpp/mapreduce/client/partition_reader.cpp +++ b/yt/cpp/mapreduce/client/partition_reader.cpp @@ -12,7 +12,7 @@ class TPartitionTableReader : public TRawTableReader { public: - TPartitionTableReader(std::unique_ptr input) + TPartitionTableReader(std::unique_ptr input) : Input_(std::move(input)) { } @@ -32,6 +32,16 @@ public: return false; } + void Abort() override + { + Input_->Abort(); + } + + bool IsAborted() const override + { + return Input_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -39,7 +49,7 @@ protected: } private: - std::unique_ptr Input_; + std::unique_ptr Input_; }; //////////////////////////////////////////////////////////////////////////////// @@ -52,7 +62,7 @@ TRawTableReaderPtr CreateTablePartitionReader( const TTablePartitionReaderOptions& options) { - auto stream = NDetail::RequestWithRetry>( + auto stream = NDetail::RequestWithRetry>( retryPolicy, [&] (TMutationId /*mutationId*/) { return rawClient->ReadTablePartition(cookie, format, options); diff --git a/yt/cpp/mapreduce/common/abortable_stream.cpp b/yt/cpp/mapreduce/common/abortable_stream.cpp new file mode 100644 index 00000000000..6336d792f14 --- /dev/null +++ b/yt/cpp/mapreduce/common/abortable_stream.cpp @@ -0,0 +1,124 @@ +#include "abortable_stream.h" + +#include +#include + +#include +#include + +#include +#include + +#include + +namespace NYT::NDetail { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableInputStreamAdapter + : public IAbortableInputStream +{ +public: + explicit TAbortableInputStreamAdapter(IAsyncInputStreamPtr underlyingStream) + : UnderlyingStream_(std::move(underlyingStream)) + { } + + void Abort() override + { + auto guard = Guard(Lock_); + IsAborted_ = true; + CurrentFuture_.Cancel(TError("Stream was aborted")); + } + + bool IsAborted() const override + { + auto guard = Guard(Lock_); + return IsAborted_; + } + +private: + const IAsyncInputStreamPtr UnderlyingStream_; + + TAdaptiveLock Lock_; + bool IsAborted_ = false; + TFuture CurrentFuture_; + + size_t DoRead(void* buffer, size_t length) override + { + if (length == 0) { + return 0; + } + + struct TAbortableInputStreamBufferTag { }; + auto readBuffer = TSharedMutableRef::Allocate(length); + + auto future = UnderlyingStream_->Read(readBuffer); + { + auto guard = Guard(Lock_); + CurrentFuture_ = future; + if (IsAborted_) { + future.Cancel(TError("Stream was aborted")); + } + } + + auto result = WaitFor(future); + + { + auto guard = Guard(Lock_); + if (IsAborted_) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } + } + + auto bytesRead = result.ValueOrThrow(); + + memcpy(buffer, readBuffer.Begin(), bytesRead); + + return bytesRead; + } +}; + +std::unique_ptr CreateAbortableInputStreamAdapter( + IAsyncInputStreamPtr underlyingStream) +{ + YT_VERIFY(underlyingStream); + return std::make_unique( + std::move(underlyingStream)); +} + +//////////////////////////////////////////////////////////////////////////////// + +class TAbortableStreamFallback + : public IAbortableInputStream +{ +public: + explicit TAbortableStreamFallback(IInputStream* underlyingStream) + : UnderlyingStream_(underlyingStream) + { } + + void Abort() override + { + YT_LOG_WARNING("Abort for this stream type is not supported"); + } + +private: + IInputStream* const UnderlyingStream_; + + size_t DoRead(void* buffer, size_t length) override + { + return UnderlyingStream_->Read(buffer, length); + } +}; + +std::unique_ptr CreateAbortableInputStreamAdapterFallback( + IInputStream* underlyingStream) +{ + YT_VERIFY(underlyingStream); + return std::make_unique(underlyingStream); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/abortable_stream.h b/yt/cpp/mapreduce/common/abortable_stream.h new file mode 100644 index 00000000000..8b58e6582fc --- /dev/null +++ b/yt/cpp/mapreduce/common/abortable_stream.h @@ -0,0 +1,21 @@ +#pragma once + +#include + +#include + +#include + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +std::unique_ptr CreateAbortableInputStreamAdapter( + NConcurrency::IAsyncInputStreamPtr underlyingStream); + +std::unique_ptr CreateAbortableInputStreamAdapterFallback( + IInputStream* underlyingStream); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/halting_stream.cpp b/yt/cpp/mapreduce/common/halting_stream.cpp new file mode 100644 index 00000000000..ba14d9af316 --- /dev/null +++ b/yt/cpp/mapreduce/common/halting_stream.cpp @@ -0,0 +1,67 @@ +#include "halting_stream.h" + +#include +#include + +namespace NYT::NDetail { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class THaltingAsyncStream + : public IAsyncInputStream +{ +public: + THaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) + : Underlying_(std::move(underlying)) + , BytesBeforeHalt_(bytesBeforeHalt) + { } + +private: + void OnRead(TPromise promise, const TErrorOr& result) + { + if (result.IsOK()) { + BytesRead_ += result.Value(); + } + promise.TrySet(result); + } + + TFuture Read(const TSharedMutableRef& buffer) override + { + if (BytesRead_ >= BytesBeforeHalt_) { + HaltPromise_ = NewPromise(); + return HaltPromise_.ToFuture(); + } + + auto limit = std::min(buffer.Size(), static_cast(BytesBeforeHalt_ - BytesRead_)); + auto promise = NewPromise(); + auto future = promise.ToFuture(); + + Underlying_->Read(buffer.Slice(0, limit)).Subscribe( + BIND(&THaltingAsyncStream::OnRead, MakeStrong(this), std::move(promise))); + + return future; + } + +private: + IAsyncInputStreamPtr Underlying_; + const i64 BytesBeforeHalt_; + i64 BytesRead_ = 0; + TPromise HaltPromise_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IAsyncInputStreamPtr CreateHaltingAsyncStream( + IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt) +{ + return New(std::move(underlying), bytesBeforeHalt); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/halting_stream.h b/yt/cpp/mapreduce/common/halting_stream.h new file mode 100644 index 00000000000..10b8f75e1f7 --- /dev/null +++ b/yt/cpp/mapreduce/common/halting_stream.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +NConcurrency::IAsyncInputStreamPtr CreateHaltingAsyncStream( + NConcurrency::IAsyncInputStreamPtr underlying, + i64 bytesBeforeHalt); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/common/retry_lib.cpp b/yt/cpp/mapreduce/common/retry_lib.cpp index abcb9e87b1a..9e63455242a 100644 --- a/yt/cpp/mapreduce/common/retry_lib.cpp +++ b/yt/cpp/mapreduce/common/retry_lib.cpp @@ -257,6 +257,8 @@ bool IsRetriable(const std::exception& ex) { if (dynamic_cast(&ex)) { return false; + } else if (dynamic_cast(&ex)) { + return false; } return true; } diff --git a/yt/cpp/mapreduce/common/ya.make b/yt/cpp/mapreduce/common/ya.make index 0b427d5ce40..6500da9e8e0 100644 --- a/yt/cpp/mapreduce/common/ya.make +++ b/yt/cpp/mapreduce/common/ya.make @@ -3,8 +3,10 @@ LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( - expected_error_guard.cpp + abortable_stream.cpp debug_metrics.cpp + expected_error_guard.cpp + halting_stream.cpp helpers.cpp retry_lib.cpp wait_proxy.cpp @@ -19,6 +21,7 @@ PEERDIR( library/cpp/yson/node yt/cpp/mapreduce/interface yt/cpp/mapreduce/interface/logging + yt/yt/core ) END() diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp index 7c337651f0a..a75d9e558ed 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.cpp +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -52,8 +52,8 @@ public: auto g = Guard(Lock_); int result = 0; for (auto& response : ResponseList_) { - if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) { - response.Abort(); + if (!response.IsResponseAborted() && response.GetUrl().find(urlPattern) != TString::npos) { + response.AbortResponse(); ++result; } } @@ -112,7 +112,7 @@ TAbortableHttpResponseBase::~TAbortableHttpResponseBase() TAbortableHttpResponseRegistry::Get().Remove(this); } -void TAbortableHttpResponseBase::Abort() +void TAbortableHttpResponseBase::AbortResponse() { Aborted_ = true; } @@ -121,7 +121,7 @@ void TAbortableHttpResponseBase::SetLengthLimit(size_t limit) { LengthLimit_ = limit; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } } @@ -130,7 +130,7 @@ const TString& TAbortableHttpResponseBase::GetUrl() const return Url_; } -bool TAbortableHttpResponseBase::IsAborted() const +bool TAbortableHttpResponseBase::IsResponseAborted() const { return Aborted_; } @@ -155,7 +155,7 @@ size_t TAbortableHttpResponse::DoRead(void* buf, size_t len) auto read = THttpResponse::DoRead(buf, len); LengthLimit_ -= read; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } return read; } @@ -188,7 +188,7 @@ TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage( } TAbortableCoreHttpResponse::TAbortableCoreHttpResponse( - std::unique_ptr stream, + std::unique_ptr stream, const TString& url) : TAbortableHttpResponseBase(url) , Stream_(std::move(stream)) @@ -204,7 +204,7 @@ size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len) auto read = Stream_->Read(buf, len); LengthLimit_ -= read; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } return read; @@ -218,6 +218,17 @@ size_t TAbortableCoreHttpResponse::DoSkip(size_t len) return Stream_->Skip(len); } +// IAbortableInputStream +void TAbortableCoreHttpResponse::Abort() +{ + Stream_->Abort(); +} + +bool TAbortableCoreHttpResponse::IsAborted() const +{ + return Stream_->IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h index e9b1483bf70..db8f97d50ba 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.h +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -35,9 +35,9 @@ class IAbortableHttpResponse : public TIntrusiveListItem { public: - virtual void Abort() = 0; + virtual void AbortResponse() = 0; virtual const TString& GetUrl() const = 0; - virtual bool IsAborted() const = 0; + virtual bool IsResponseAborted() const = 0; virtual void SetLengthLimit(size_t limit) = 0; virtual ~IAbortableHttpResponse() = default; @@ -50,9 +50,9 @@ public: TAbortableHttpResponseBase(const TString& url); ~TAbortableHttpResponseBase(); - void Abort() override; + void AbortResponse() override; const TString& GetUrl() const override; - bool IsAborted() const override; + bool IsResponseAborted() const override; void SetLengthLimit(size_t limit) override; protected: @@ -65,20 +65,24 @@ protected: /// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors. class TAbortableCoreHttpResponse - : public IInputStream + : public IAbortableInputStream , public TAbortableHttpResponseBase { public: TAbortableCoreHttpResponse( - std::unique_ptr stream, + std::unique_ptr stream, const TString& url); + // IAbortableInputStream + void Abort() override; + bool IsAborted() const override; + private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; private: - std::unique_ptr Stream_; + std::unique_ptr Stream_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 8f0fb723d2d..454ba761241 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -5,7 +5,9 @@ #include "helpers.h" #include "http.h" +#include #include +#include #include @@ -126,9 +128,12 @@ public: return Request_->GetHttpCode(); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { - return Request_->GetResponseStream(); + if (!Stream_) { + Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream()); + } + return Stream_.get(); } TString GetResponse() override @@ -143,6 +148,7 @@ public: private: std::unique_ptr Request_; + std::unique_ptr Stream_; }; class TDefaultHttpRequest @@ -225,11 +231,15 @@ public: return static_cast(Response_->GetStatusCode()); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { if (!Stream_) { + NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } auto stream = std::make_unique( - NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor), + NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)), Response_, Context_.RequestId); CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_); @@ -277,15 +287,25 @@ public: private: class TWrappedStream - : public IInputStream + : public IAbortableInputStream { public: - TWrappedStream(std::unique_ptr underlying, NHttp::IResponsePtr response, TString requestId) + TWrappedStream(std::unique_ptr underlying, NHttp::IResponsePtr response, TString requestId) : Underlying_(std::move(underlying)) , Response_(std::move(response)) , RequestId_(std::move(requestId)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -333,7 +353,7 @@ private: } private: - std::unique_ptr Underlying_; + std::unique_ptr Underlying_; NHttp::IResponsePtr Response_; TString RequestId_; }; @@ -341,7 +361,7 @@ private: private: TCoreRequestContext Context_; NHttp::IResponsePtr Response_; - std::unique_ptr Stream_; + std::unique_ptr Stream_; }; class TCoreHttpRequest diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index d66e3e7c675..5ae57a200a7 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -2,6 +2,7 @@ #include "fwd.h" +#include #include #include @@ -28,7 +29,7 @@ public: virtual ~IHttpResponse() = default; virtual int GetStatusCode() = 0; - virtual IInputStream* GetResponseStream() = 0; + virtual IAbortableInputStream* GetResponseStream() = 0; virtual TString GetResponse() = 0; virtual TString GetRequestId() const = 0; }; @@ -74,6 +75,16 @@ public: Underlying_ = Response_->GetResponseStream(); } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -87,7 +98,7 @@ private: private: IHttpResponsePtr Response_; - IInputStream* Underlying_; + IAbortableInputStream* Underlying_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http_client/raw_client.cpp b/yt/cpp/mapreduce/http_client/raw_client.cpp index 4ccafb1a987..557e4afdbd4 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.cpp +++ b/yt/cpp/mapreduce/http_client/raw_client.cpp @@ -632,7 +632,7 @@ IFileReaderPtr THttpRawClient::GetJobTrace( return MakeIntrusive(std::move(responseInfo)); } -std::unique_ptr THttpRawClient::ReadFile( +std::unique_ptr THttpRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) @@ -827,7 +827,7 @@ std::unique_ptr THttpRawClient::WriteTable( return NRawClient::WriteTable(Context_, transactionId, path, format, options); } -std::unique_ptr THttpRawClient::ReadTable( +std::unique_ptr THttpRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, @@ -854,7 +854,7 @@ std::unique_ptr THttpRawClient::WriteFile( return NRawClient::WriteFile(Context_, transactionId, path, options); } -std::unique_ptr THttpRawClient::ReadTablePartition( +std::unique_ptr THttpRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) @@ -872,7 +872,7 @@ std::unique_ptr THttpRawClient::ReadTablePartition( return std::make_unique(std::move(responseInfo)); } -std::unique_ptr THttpRawClient::ReadBlobTable( +std::unique_ptr THttpRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/http_client/raw_client.h b/yt/cpp/mapreduce/http_client/raw_client.h index f1e9c780789..d2a503605ff 100644 --- a/yt/cpp/mapreduce/http_client/raw_client.h +++ b/yt/cpp/mapreduce/http_client/raw_client.h @@ -210,7 +210,7 @@ public: // Files - std::unique_ptr ReadFile( + std::unique_ptr ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options = {}) override; @@ -290,18 +290,18 @@ public: const TMaybe& format, const TTableWriterOptions& options = {}) override; - std::unique_ptr ReadTable( + std::unique_ptr ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = {}) override; - std::unique_ptr ReadTablePartition( + std::unique_ptr ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options = {}) override; - std::unique_ptr ReadBlobTable( + std::unique_ptr ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/interface/abortable_stream.cpp b/yt/cpp/mapreduce/interface/abortable_stream.cpp new file mode 100644 index 00000000000..fc3b8d01f5b --- /dev/null +++ b/yt/cpp/mapreduce/interface/abortable_stream.cpp @@ -0,0 +1,34 @@ +#include "abortable_stream.h" + +#include + +#include + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +void IAbortableInputStream::Abort() +{ + Y_ABORT("Unimplemented"); +} + +bool IAbortableInputStream::IsAborted() const +{ + return false; +} + +bool IAbortableInputStream::IsAbortedError(const std::exception_ptr& error) +{ + try { + std::rethrow_exception(error); + } catch (const TInputStreamAbortedError& ex) { + return true; + } catch (...) { + return false; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/abortable_stream.h b/yt/cpp/mapreduce/interface/abortable_stream.h new file mode 100644 index 00000000000..40316e1b790 --- /dev/null +++ b/yt/cpp/mapreduce/interface/abortable_stream.h @@ -0,0 +1,33 @@ +#pragma once + +#include + +#include + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +/// @brief Input stream that supports aborting the read operation. +/// +/// Extends @ref IInputStream with the ability to immediately cancel current or future reads. +class IAbortableInputStream + : public IInputStream +{ +public: + ~IAbortableInputStream() override = default; + + /// @brief Immediately abort the stream, cancelling current and future reads. + /// + /// Some clients have already implemented this interface, so using pure virtual method leads to build errors. + virtual void Abort(); + + /// @brief Check whether the stream has been aborted. + virtual bool IsAborted() const; + + static bool IsAbortedError(const std::exception_ptr& error); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/interface/config.cpp b/yt/cpp/mapreduce/interface/config.cpp index 9379ab57473..b4edcee4370 100644 --- a/yt/cpp/mapreduce/interface/config.cpp +++ b/yt/cpp/mapreduce/interface/config.cpp @@ -457,6 +457,8 @@ void Serialize(const TConfig& config, NYson::IYsonConsumer* consumer) .Item("api_file_path_options").Value(config.ApiFilePathOptions) .Item("use_abortable_response").Value(config.UseAbortableResponse) .Item("enable_debug_metrics").Value(config.EnableDebugMetrics) + .Item("use_halting_response").Value(config.UseHaltingResponse) + .Item("halting_response_bytes_limit").Value(config.HaltingResponseBytesLimit) .Item("enable_local_mode_optimization").Value(config.EnableLocalModeOptimization) .Item("write_stderr_successful_jobs").Value(config.WriteStderrSuccessfulJobs) .Item("trace_http_requests_mode").Value(::ToString(config.TraceHttpRequestsMode)) @@ -533,6 +535,8 @@ void Deserialize(TConfig& config, const TNode& node) DESERIALIZE_ITEM("api_file_path_options", config.ApiFilePathOptions); DESERIALIZE_ITEM("use_abortable_response", config.UseAbortableResponse); DESERIALIZE_ITEM("enable_debug_metrics", config.EnableDebugMetrics); + DESERIALIZE_ITEM("use_halting_response", config.UseHaltingResponse); + DESERIALIZE_ITEM("halting_response_bytes_limit", config.HaltingResponseBytesLimit); DESERIALIZE_ITEM("enable_local_mode_optimization", config.EnableLocalModeOptimization); DESERIALIZE_ITEM("write_stderr_successful_jobs", config.WriteStderrSuccessfulJobs); DESERIALIZE_ITEM("trace_http_requests_mode", config.TraceHttpRequestsMode); diff --git a/yt/cpp/mapreduce/interface/config.h b/yt/cpp/mapreduce/interface/config.h index 7bf3112c620..b0d77548956 100644 --- a/yt/cpp/mapreduce/interface/config.h +++ b/yt/cpp/mapreduce/interface/config.h @@ -239,6 +239,18 @@ struct TConfig bool UseAbortableResponse = false; bool EnableDebugMetrics = false; + /// @brief Simulate a response that halts (stops sending data) mid-stream. + /// + /// Testing options, should never be used in user programs. + /// When enabled, the HTTP response will be truncated after @ref HaltingResponseBytesLimit bytes. + bool UseHaltingResponse = false; + + /// @brief Maximum number of bytes sent before the response is halted. + /// + /// Testing options, should never be used in user programs. + /// Only meaningful when @ref UseHaltingResponse is true. + i64 HaltingResponseBytesLimit = 64 * 1024; + // // There is optimization used with local YT that enables to skip binary upload and use real binary path. // When EnableLocalModeOptimization is set to false this optimization is completely disabled. diff --git a/yt/cpp/mapreduce/interface/errors.h b/yt/cpp/mapreduce/interface/errors.h index 6f78ffe14f3..cb8a7b5af90 100644 --- a/yt/cpp/mapreduce/interface/errors.h +++ b/yt/cpp/mapreduce/interface/errors.h @@ -293,6 +293,13 @@ private: TVector FailedJobInfo_; }; +/// +/// @brief Error that is thrown when trying to read from aborted reader. +/// +class TInputStreamAbortedError + : public yexception +{ }; + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/interface/io-inl.h b/yt/cpp/mapreduce/interface/io-inl.h index 77078bb5eb5..ac1464f5b98 100644 --- a/yt/cpp/mapreduce/interface/io-inl.h +++ b/yt/cpp/mapreduce/interface/io-inl.h @@ -123,6 +123,8 @@ struct IReaderImplBase virtual i64 GetTabletIndex() const; virtual bool IsEndOfStream() const; virtual bool IsRawReaderExhausted() const; + virtual void Abort(); + virtual bool IsAborted() const; }; struct INodeReaderImpl @@ -223,6 +225,16 @@ public: return Reader_->GetTabletIndex(); } + void Abort() + { + Reader_->Abort(); + } + + bool IsAborted() const + { + return Reader_->IsAborted(); + } + protected: template const auto& DoGetRowCached(TCacher cacher, TCacheGetter cacheGetter) const diff --git a/yt/cpp/mapreduce/interface/io.cpp b/yt/cpp/mapreduce/interface/io.cpp index bbd417893c4..d3b4621c235 100644 --- a/yt/cpp/mapreduce/interface/io.cpp +++ b/yt/cpp/mapreduce/interface/io.cpp @@ -28,6 +28,16 @@ bool IReaderImplBase::IsRawReaderExhausted() const Y_ABORT("Unimplemented"); } +void IReaderImplBase::Abort() +{ + Y_ABORT("Unimplemented"); +} + +bool IReaderImplBase::IsAborted() const +{ + return false; +} + //////////////////////////////////////////////////////////////////////////////// namespace NDetail { diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h index f8faf09663a..c80da6b041c 100644 --- a/yt/cpp/mapreduce/interface/io.h +++ b/yt/cpp/mapreduce/interface/io.h @@ -8,6 +8,7 @@ #include "fwd.h" +#include "abortable_stream.h" #include "client_method_options.h" #include "common.h" #include "distributed_session.h" @@ -107,7 +108,7 @@ class TIOException /// Interface representing YT file reader. class IFileReader : public TThrRefBase - , public IInputStream + , public IAbortableInputStream { }; /// Interface representing YT file writer. @@ -135,7 +136,7 @@ public: /// Low-level interface to read YT table with retries. class TRawTableReader : public TThrRefBase - , public IInputStream + , public IAbortableInputStream { public: /// @brief Retry table read starting from the specified `rangeIndex` and `rowIndex`. @@ -279,6 +280,12 @@ public: /// Returns `true` if job raw input stream was closed and `false` otherwise. bool IsRawReaderExhausted() const; + + /// @brief Abort the reader, interrupting any in-progress or future reads. + void Abort(); + + /// @brief Check whether the reader is aborted. + bool IsAborted() const; }; /// @brief Iterator for use in range-based-for. diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h index e318c022ce5..9347a95e6e6 100644 --- a/yt/cpp/mapreduce/interface/raw_client.h +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -4,6 +4,8 @@ #include "client_method_options.h" #include "operation.h" +#include + #include namespace NYT { @@ -213,7 +215,7 @@ public: // Files - virtual std::unique_ptr ReadFile( + virtual std::unique_ptr ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options = {}) = 0; @@ -299,18 +301,18 @@ public: const TMaybe& format, const TTableWriterOptions& options = {}) = 0; - virtual std::unique_ptr ReadTable( + virtual std::unique_ptr ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = {}) = 0; - virtual std::unique_ptr ReadTablePartition( + virtual std::unique_ptr ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options = {}) = 0; - virtual std::unique_ptr ReadBlobTable( + virtual std::unique_ptr ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make index caa68b20f69..5e56861fa86 100644 --- a/yt/cpp/mapreduce/interface/ya.make +++ b/yt/cpp/mapreduce/interface/ya.make @@ -3,6 +3,7 @@ LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( + abortable_stream.cpp batch_request.cpp client.cpp client_method_options.cpp diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp index c6213e86651..05f3b1cca6b 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp +++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp @@ -28,6 +28,16 @@ size_t TCountingRawTableReader::GetReadByteCount() const return ReadByteCount_; } +void TCountingRawTableReader::Abort() +{ + Reader_->Abort(); +} + +bool TCountingRawTableReader::IsAborted() const +{ + return Reader_->IsAborted(); +} + size_t TCountingRawTableReader::DoRead(void* buf, size_t len) { auto readLen = Reader_->Read(buf, len); diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h index c3b197d5844..fe78144d76a 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.h +++ b/yt/cpp/mapreduce/io/counting_raw_reader.h @@ -22,6 +22,9 @@ public: size_t GetReadByteCount() const; + void Abort() override; + bool IsAborted() const override; + protected: size_t DoRead(void* buf, size_t len) override; diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp index d7a1c9754ee..676400b1df3 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -2,6 +2,7 @@ #include +#include #include #include @@ -30,6 +31,9 @@ TLenvalTableReader::~TLenvalTableReader() void TLenvalTableReader::CheckValidity() const { + if (Input_.IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!IsValid()) { ythrow yexception() << "Iterator is not valid"; } diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index 558c42b30ee..01eaf5d9462 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -341,6 +342,16 @@ bool TNodeTableReader::IsRawReaderExhausted() const return Finished_; } +void TNodeTableReader::Abort() +{ + Input_.Abort(); +} + +bool TNodeTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// void TNodeTableReader::PrepareParsing() @@ -368,6 +379,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error void TNodeTableReader::CheckValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!Valid_) { ythrow yexception() << "Iterator is not valid"; } diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h index c8e319ce4f6..92af5b111b2 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.h +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -11,8 +11,6 @@ #include #include -#include - namespace NYT { class TRawTableReader; @@ -54,6 +52,8 @@ public: TMaybe GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: void NextImpl(); diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp index 6f79619c810..76e0243b395 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.cpp +++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp @@ -206,6 +206,16 @@ bool TProtoTableReader::IsRawReaderExhausted() const return NodeReader_->IsRawReaderExhausted(); } +void TProtoTableReader::Abort() +{ + NodeReader_->Abort(); +} + +bool TProtoTableReader::IsAborted() const +{ + return NodeReader_->IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// TLenvalProtoTableReader::TLenvalProtoTableReader( @@ -291,6 +301,16 @@ bool TLenvalProtoTableReader::IsRawReaderExhausted() const return TLenvalTableReader::IsRawReaderExhausted(); } +void TLenvalProtoTableReader::Abort() +{ + Input_.Abort(); +} + +bool TLenvalProtoTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + void TLenvalProtoTableReader::SkipRow() { while (true) { diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h index bfe4ac56474..fdb1f2fd5f5 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.h +++ b/yt/cpp/mapreduce/io/proto_table_reader.h @@ -31,6 +31,8 @@ public: TMaybe GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: std::unique_ptr NodeReader_; @@ -64,6 +66,8 @@ public: TMaybe GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; protected: void SkipRow() override; diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp index f77f2d71307..6de1d222d53 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp @@ -1,5 +1,6 @@ #include "skiff_row_table_reader.h" +#include #include #include @@ -100,6 +101,9 @@ void TSkiffRowTableReader::SkipRow() } void TSkiffRowTableReader::CheckValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } if (!IsValid()) { ythrow yexception() << "Iterator is not valid"; } @@ -230,6 +234,16 @@ bool TSkiffRowTableReader::IsRawReaderExhausted() const { return Finished_; } +void TSkiffRowTableReader::Abort() +{ + Input_.Abort(); +} + +bool TSkiffRowTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h index 1f623570bb9..c8086b48b83 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h @@ -37,6 +37,8 @@ public: TMaybe GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: bool Retry(const std::exception_ptr& error); diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp index c96adade41e..94662741b45 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -1,5 +1,6 @@ #include "skiff_table_reader.h" +#include #include #include @@ -155,6 +156,16 @@ bool TSkiffTableReader::IsRawReaderExhausted() const return Finished_; } +void TSkiffTableReader::Abort() +{ + Input_.Abort(); +} + +bool TSkiffTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// TVector TSkiffTableReader::CreateSkiffTableSchemas( @@ -288,6 +299,9 @@ void TSkiffTableReader::ReadRow() void TSkiffTableReader::EnsureValidity() const { + if (IsAborted()) { + ythrow TInputStreamAbortedError() << "Stream was aborted"; + } Y_ENSURE(Valid_, "Iterator is not valid"); } diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h index c7614776f41..ee01332beaf 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_table_reader.h @@ -35,6 +35,9 @@ public: TMaybe GetReadByteCount() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; + private: struct TSkiffTableSchema; diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp index ec19b67d0b0..f69eb9f034f 100644 --- a/yt/cpp/mapreduce/io/stream_raw_reader.cpp +++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp @@ -5,6 +5,10 @@ #include "skiff_table_reader.h" #include "yamr_table_reader.h" +#include + +#include + #include #include @@ -33,6 +37,12 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// +TInputStreamProxy::TInputStreamProxy(IInputStream* stream) + : Stream_(CreateAbortableInputStreamAdapter(NConcurrency::CreateAsyncAdapter(stream))) +{ } + +//////////////////////////////////////////////////////////////////////////////// + ::TIntrusivePtr CreateProtoReader( IInputStream* stream, const TTableReaderOptions& /* options */, diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h index a7694a0a709..826d0fc269a 100644 --- a/yt/cpp/mapreduce/io/stream_table_reader.h +++ b/yt/cpp/mapreduce/io/stream_table_reader.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace NYT { @@ -11,9 +12,7 @@ class TInputStreamProxy : public TRawTableReader { public: - TInputStreamProxy(IInputStream* stream) - : Stream_(stream) - { } + explicit TInputStreamProxy(IInputStream* stream); bool Retry( const TMaybe& /*rangeIndex*/, @@ -31,6 +30,16 @@ public: return false; } + void Abort() override + { + Stream_->Abort(); + } + + bool IsAborted() const override + { + return Stream_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -38,7 +47,7 @@ protected: } private: - IInputStream* Stream_; + std::unique_ptr Stream_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp index 3ae6b8a8aeb..e9ebab520cc 100644 --- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp @@ -78,6 +78,16 @@ bool TYaMRTableReader::IsRawReaderExhausted() const return TLenvalTableReader::IsRawReaderExhausted(); } +void TYaMRTableReader::Abort() +{ + Input_.Abort(); +} + +bool TYaMRTableReader::IsAborted() const +{ + return Input_.IsAborted(); +} + void TYaMRTableReader::ReadField(TString* result, i32 length) { result->resize(length); diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h index 39fdecfa71a..a2a6b97ebcb 100644 --- a/yt/cpp/mapreduce/io/yamr_table_reader.h +++ b/yt/cpp/mapreduce/io/yamr_table_reader.h @@ -30,6 +30,8 @@ public: TMaybe GetReadByteCount() const override; bool IsEndOfStream() const override; bool IsRawReaderExhausted() const override; + void Abort() override; + bool IsAborted() const override; private: void ReadField(TString* result, i32 length); diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.cpp b/yt/cpp/mapreduce/rpc_client/raw_client.cpp index 530fd58cb02..f5643b1f6a5 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.cpp +++ b/yt/cpp/mapreduce/rpc_client/raw_client.cpp @@ -5,8 +5,12 @@ #include "rpc_parameters_serialization.h" #include "wrap_rpc_error.h" +#include +#include #include +#include +#include #include #include @@ -205,15 +209,25 @@ void WriteInChunks(const void* buf, size_t len, const TWriteFn& writeFn) //////////////////////////////////////////////////////////////////////////////// class TSyncRpcInputStream - : public IInputStream + : public IAbortableInputStream { public: - explicit TSyncRpcInputStream(std::unique_ptr stream) + explicit TSyncRpcInputStream(std::unique_ptr stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: - const std::unique_ptr Underlying_; + const std::unique_ptr Underlying_; size_t DoRead(void* buf, size_t len) override { @@ -861,10 +875,15 @@ class TRpcResponseStream : public IFileReader { public: - TRpcResponseStream(std::unique_ptr stream) + TRpcResponseStream(std::unique_ptr stream) : Underlying_(std::move(stream)) { } + void Abort() override + { + Underlying_->Abort(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -877,11 +896,11 @@ private: } private: - std::unique_ptr Underlying_; + std::unique_ptr Underlying_; }; class TFixedStringStream - : public IInputStream + : public IAbortableInputStream { public: TFixedStringStream(TSharedRef data) @@ -915,7 +934,7 @@ IFileReaderPtr TRpcRawClient::GetJobInput( { auto future = Clients_.Heavy->GetJobInput(NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - auto stream = std::make_unique(CreateSyncAdapter(CreateCopyingAdapter(result))); + auto stream = std::make_unique(CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result))); return MakeIntrusive(std::move(stream)); } @@ -928,7 +947,7 @@ IFileReaderPtr TRpcRawClient::GetJobFailContext( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr stream(new TFixedStringStream(std::move(result))); + std::unique_ptr stream(new TFixedStringStream(std::move(result))); return MakeIntrusive(std::move(stream)); } @@ -941,7 +960,7 @@ IFileReaderPtr TRpcRawClient::GetJobStderr( NScheduler::TOperationId(YtGuidFromUtilGuid(operationId)), NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId))); auto result = WaitAndProcess(future); - std::unique_ptr stream(new TFixedStringStream(std::move(result.Data))); + std::unique_ptr stream(new TFixedStringStream(std::move(result.Data))); return MakeIntrusive(std::move(stream)); } @@ -955,19 +974,19 @@ IFileReaderPtr TRpcRawClient::GetJobTrace( NJobTrackerClient::TJobId(YtGuidFromUtilGuid(jobId)), SerializeOptionsForGetJobTrace(options)); auto result = WaitAndProcess(future); - auto stream = CreateSyncAdapter(CreateCopyingAdapter(result)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(result)); return MakeIntrusive(std::move(stream)); } -std::unique_ptr TRpcRawClient::ReadFile( +std::unique_ptr TRpcRawClient::ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options) { auto future = Clients_.Heavy->CreateFileReader(path.Path_, SerializeOptionsForReadFile(transactionId, options)); auto reader = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(reader)); - return std::make_unique(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(reader)); + return std::make_unique(std::move(stream)); } class TRpcWriteFileRequestStream @@ -1245,7 +1264,7 @@ std::unique_ptr TRpcRawClient::WriteTable( return std::make_unique(std::move(rowStream)); } -std::unique_ptr TRpcRawClient::ReadTable( +std::unique_ptr TRpcRawClient::ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, @@ -1259,11 +1278,15 @@ std::unique_ptr TRpcRawClient::ReadTable( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); - return std::make_unique(std::move(syncAdapter)); + auto asyncStream = CreateCopyingAdapter(std::move(formatStream)); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NYT::NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } + auto stream = CreateAbortableInputStreamAdapter(std::move(asyncStream)); + return std::make_unique(std::move(stream)); } -std::unique_ptr TRpcRawClient::ReadTablePartition( +std::unique_ptr TRpcRawClient::ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options) @@ -1276,11 +1299,11 @@ std::unique_ptr TRpcRawClient::ReadTablePartition( auto formatStream = WaitAndProcess(future); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(std::move(formatStream))); + auto syncAdapter = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(std::move(formatStream))); return std::make_unique(std::move(syncAdapter)); } -std::unique_ptr TRpcRawClient::ReadBlobTable( +std::unique_ptr TRpcRawClient::ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, @@ -1331,8 +1354,8 @@ std::unique_ptr TRpcRawClient::ReadBlobTable( options.StartPartIndex_, options.Offset_, options.PartSize_); - auto syncAdapter = CreateSyncAdapter(CreateCopyingAdapter(blobReader)); - return std::make_unique(std::move(syncAdapter)); + auto stream = CreateAbortableInputStreamAdapter(CreateCopyingAdapter(blobReader)); + return std::make_unique(std::move(stream)); } void TRpcRawClient::AlterTableReplica( diff --git a/yt/cpp/mapreduce/rpc_client/raw_client.h b/yt/cpp/mapreduce/rpc_client/raw_client.h index 07fbb19758c..0eea19df078 100644 --- a/yt/cpp/mapreduce/rpc_client/raw_client.h +++ b/yt/cpp/mapreduce/rpc_client/raw_client.h @@ -211,7 +211,7 @@ public: // Files - std::unique_ptr ReadFile( + std::unique_ptr ReadFile( const TTransactionId& transactionId, const TRichYPath& path, const TFileReaderOptions& options = {}) override; @@ -291,18 +291,18 @@ public: const TMaybe& format, const TTableWriterOptions& options = {}) override; - std::unique_ptr ReadTable( + std::unique_ptr ReadTable( const TTransactionId& transactionId, const TRichYPath& path, const TFormat& format, const TTableReaderOptions& options = {}) override; - std::unique_ptr ReadTablePartition( + std::unique_ptr ReadTablePartition( const TString& cookie, const TFormat& format, const TTablePartitionReaderOptions& options = {}) override; - std::unique_ptr ReadBlobTable( + std::unique_ptr ReadBlobTable( const TTransactionId& transactionId, const TRichYPath& path, const TKey& key, diff --git a/yt/cpp/mapreduce/rpc_client/ya.make b/yt/cpp/mapreduce/rpc_client/ya.make index 115dc550ff7..37fd90543a8 100644 --- a/yt/cpp/mapreduce/rpc_client/ya.make +++ b/yt/cpp/mapreduce/rpc_client/ya.make @@ -12,6 +12,7 @@ SRCS( PEERDIR( library/cpp/yson/node + yt/cpp/mapreduce/common yt/cpp/mapreduce/interface yt/yt/client ) diff --git a/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h b/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h index 46215ed5d0a..58263637652 100644 --- a/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h +++ b/yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h @@ -45,6 +45,8 @@ void WriteProtoTable(const IClientBasePtr& client, const TString& tablePath, con bool UseRpcClient(); +bool UseDefaultHttpClient(); + //////////////////////////////////////////////////////////////////////////////// // TODO: should be removed, usages should be replaced with TConfigSaverGuard @@ -218,6 +220,9 @@ void Out(IOutputStream& out, const NYT::NTesting: #define SKIP_IF_RPC() \ SKIP_TEST_IF(UseRpcClient(), "Unsupported test for RPC Client") +#define SKIP_IF_DEFAULT_HTTP() \ + SKIP_TEST_IF(UseDefaultHttpClient(), "Unsupported test for default (sync) HTTP Client") + #define YT_UNITTEST_LIB_H_ #include "yt_unittest_lib-inl.h" #undef YT_UNITTEST_LIB_H_ -- cgit v1.3