diff options
Diffstat (limited to 'yt/cpp/mapreduce/http/http_client.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/http/http_client.cpp | 36 |
1 files changed, 28 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp index 8f0fb723d2d..454ba761241 100644 --- a/yt/cpp/mapreduce/http/http_client.cpp +++ b/yt/cpp/mapreduce/http/http_client.cpp @@ -5,7 +5,9 @@ #include "helpers.h" #include "http.h" +#include <yt/cpp/mapreduce/common/abortable_stream.h> #include <yt/cpp/mapreduce/common/expected_error_guard.h> +#include <yt/cpp/mapreduce/common/halting_stream.h> #include <yt/cpp/mapreduce/interface/config.h> @@ -126,9 +128,12 @@ public: return Request_->GetHttpCode(); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { - return Request_->GetResponseStream(); + if (!Stream_) { + Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream()); + } + return Stream_.get(); } TString GetResponse() override @@ -143,6 +148,7 @@ public: private: std::unique_ptr<THttpRequest> Request_; + std::unique_ptr<IAbortableInputStream> Stream_; }; class TDefaultHttpRequest @@ -225,11 +231,15 @@ public: return static_cast<int>(Response_->GetStatusCode()); } - IInputStream* GetResponseStream() override + IAbortableInputStream* GetResponseStream() override { if (!Stream_) { + NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_); + if (TConfig::Get()->UseHaltingResponse) { + asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit); + } auto stream = std::make_unique<TWrappedStream>( - NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor), + NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)), Response_, Context_.RequestId); CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_); @@ -277,15 +287,25 @@ public: private: class TWrappedStream - : public IInputStream + : public IAbortableInputStream { public: - TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId) + TWrappedStream(std::unique_ptr<IAbortableInputStream> underlying, NHttp::IResponsePtr response, TString requestId) : Underlying_(std::move(underlying)) , Response_(std::move(response)) , RequestId_(std::move(requestId)) { } + void Abort() override + { + Underlying_->Abort(); + } + + bool IsAborted() const override + { + return Underlying_->IsAborted(); + } + protected: size_t DoRead(void* buf, size_t len) override { @@ -333,7 +353,7 @@ private: } private: - std::unique_ptr<IInputStream> Underlying_; + std::unique_ptr<IAbortableInputStream> Underlying_; NHttp::IResponsePtr Response_; TString RequestId_; }; @@ -341,7 +361,7 @@ private: private: TCoreRequestContext Context_; NHttp::IResponsePtr Response_; - std::unique_ptr<IInputStream> Stream_; + std::unique_ptr<IAbortableInputStream> Stream_; }; class TCoreHttpRequest |
