diff options
| author | maybenotilya <[email protected]> | 2026-04-17 11:57:44 +0300 |
|---|---|---|
| committer | maybenotilya <[email protected]> | 2026-04-17 12:34:55 +0300 |
| commit | a6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch) | |
| tree | 087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/http | |
| parent | 7bcfb91453e538211ed44e97c57d1104f543f920 (diff) | |
YT-26179: Add Abort for readers
* Changelog entry
Type: feature
Component: cpp-sdk
Add Abort for table readers
commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/http')
| -rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.cpp | 27 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http/abortable_http_response.h | 18 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 36 | ||||
| -rw-r--r-- | yt/cpp/mapreduce/http/http_client.h | 15 |
4 files changed, 71 insertions, 25 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp index 7c337651f0a..a75d9e558ed 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.cpp +++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp @@ -52,8 +52,8 @@ public: auto g = Guard(Lock_); int result = 0; for (auto& response : ResponseList_) { - if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) { - response.Abort(); + if (!response.IsResponseAborted() && response.GetUrl().find(urlPattern) != TString::npos) { + response.AbortResponse(); ++result; } } @@ -112,7 +112,7 @@ TAbortableHttpResponseBase::~TAbortableHttpResponseBase() TAbortableHttpResponseRegistry::Get().Remove(this); } -void TAbortableHttpResponseBase::Abort() +void TAbortableHttpResponseBase::AbortResponse() { Aborted_ = true; } @@ -121,7 +121,7 @@ void TAbortableHttpResponseBase::SetLengthLimit(size_t limit) { LengthLimit_ = limit; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } } @@ -130,7 +130,7 @@ const TString& TAbortableHttpResponseBase::GetUrl() const return Url_; } -bool TAbortableHttpResponseBase::IsAborted() const +bool TAbortableHttpResponseBase::IsResponseAborted() const { return Aborted_; } @@ -155,7 +155,7 @@ size_t TAbortableHttpResponse::DoRead(void* buf, size_t len) auto read = THttpResponse::DoRead(buf, len); LengthLimit_ -= read; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } return read; } @@ -188,7 +188,7 @@ TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage( } TAbortableCoreHttpResponse::TAbortableCoreHttpResponse( - std::unique_ptr<IInputStream> stream, + std::unique_ptr<IAbortableInputStream> stream, const TString& url) : TAbortableHttpResponseBase(url) , Stream_(std::move(stream)) @@ -204,7 +204,7 @@ size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len) auto read = Stream_->Read(buf, len); LengthLimit_ -= read; if (LengthLimit_ == 0) { - Abort(); + AbortResponse(); } return read; @@ -218,6 +218,17 @@ size_t TAbortableCoreHttpResponse::DoSkip(size_t len) return Stream_->Skip(len); } +// IAbortableInputStream +void TAbortableCoreHttpResponse::Abort() +{ + Stream_->Abort(); +} + +bool TAbortableCoreHttpResponse::IsAborted() const +{ + return Stream_->IsAborted(); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace NYT diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h index e9b1483bf70..db8f97d50ba 100644 --- a/yt/cpp/mapreduce/http/abortable_http_response.h +++ b/yt/cpp/mapreduce/http/abortable_http_response.h @@ -35,9 +35,9 @@ class IAbortableHttpResponse : public TIntrusiveListItem<IAbortableHttpResponse> { public: - virtual void Abort() = 0; + virtual void AbortResponse() = 0; virtual const TString& GetUrl() const = 0; - virtual bool IsAborted() const = 0; + virtual bool IsResponseAborted() const = 0; virtual void SetLengthLimit(size_t limit) = 0; virtual ~IAbortableHttpResponse() = default; @@ -50,9 +50,9 @@ public: TAbortableHttpResponseBase(const TString& url); ~TAbortableHttpResponseBase(); - void Abort() override; + void AbortResponse() override; const TString& GetUrl() const override; - bool IsAborted() const override; + bool IsResponseAborted() const override; void SetLengthLimit(size_t limit) override; protected: @@ -65,20 +65,24 @@ protected: /// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors. class TAbortableCoreHttpResponse - : public IInputStream + : public IAbortableInputStream , public TAbortableHttpResponseBase { public: TAbortableCoreHttpResponse( - std::unique_ptr<IInputStream> stream, + std::unique_ptr<IAbortableInputStream> stream, const TString& url); + // IAbortableInputStream + void Abort() override; + bool IsAborted() const override; + private: size_t DoRead(void* buf, size_t len) override; size_t DoSkip(size_t len) override; private: - std::unique_ptr<IInputStream> Stream_; + std::unique_ptr<IAbortableInputStream> Stream_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 8f0fb723d2d..454ba761241 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -5,7 +5,9 @@ #include "helpers.h" #include "http.h" +#include <yt/cpp/mapreduce/common/abortable_stream.h> #include <yt/cpp/mapreduce/common/expected_error_guard.h> +#include <yt/cpp/mapreduce/common/halting_stream.h> #include <yt/cpp/mapreduce/interface/config.h> @@ -126,9 +128,12 @@ public: return Request_->GetHttpCode(); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { - return Request_->GetResponseStream(); + if (!Stream_) { + Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream()); + } + return Stream_.get(); } TString GetResponse() override @@ -143,6 +148,7 @@ public: private: std::unique_ptr<THttpRequest> Request_; + std::unique_ptr<IAbortableInputStream> Stream_; }; class TDefaultHttpRequest @@ -225,11 +231,15 @@ public: return static_cast<int>(Response_->GetStatusCode()); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { if (!Stream_) { + NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } auto stream = std::make_unique<TWrappedStream>( - NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor), + NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)), Response_, Context_.RequestId); CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_); @@ -277,15 +287,25 @@ public: private: class TWrappedStream - : public IInputStream + : public IAbortableInputStream { public: - TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId) + TWrappedStream(std::unique_ptr<IAbortableInputStream> underlying, NHttp::IResponsePtr response, TString requestId) : Underlying_(std::move(underlying)) , Response_(std::move(response)) , RequestId_(std::move(requestId)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -333,7 +353,7 @@ private: } private: - std::unique_ptr<IInputStream> Underlying_; + std::unique_ptr<IAbortableInputStream> Underlying_; NHttp::IResponsePtr Response_; TString RequestId_; }; @@ -341,7 +361,7 @@ private: private: TCoreRequestContext Context_; NHttp::IResponsePtr Response_; - std::unique_ptr<IInputStream> Stream_; + std::unique_ptr<IAbortableInputStream> Stream_; }; class TCoreHttpRequest diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h index d66e3e7c675..5ae57a200a7 100644 --- a/yt/cpp/mapreduce/http/http_client.h +++ b/yt/cpp/mapreduce/http/http_client.h @@ -2,6 +2,7 @@ #include "fwd.h" +#include <yt/cpp/mapreduce/interface/abortable_stream.h> #include <yt/cpp/mapreduce/interface/io.h> #include <util/datetime/base.h> @@ -28,7 +29,7 @@ public: virtual ~IHttpResponse() = default; virtual int GetStatusCode() = 0; - virtual IInputStream* GetResponseStream() = 0; + virtual IAbortableInputStream* GetResponseStream() = 0; virtual TString GetResponse() = 0; virtual TString GetRequestId() const = 0; }; @@ -74,6 +75,16 @@ public: Underlying_ = Response_->GetResponseStream(); } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + private: size_t DoRead(void *buf, size_t len) override { @@ -87,7 +98,7 @@ private: private: IHttpResponsePtr Response_; - IInputStream* Underlying_; + IAbortableInputStream* Underlying_; }; //////////////////////////////////////////////////////////////////////////////// |
