summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/http
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/http')
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.cpp27
-rw-r--r--yt/cpp/mapreduce/http/abortable_http_response.h18
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp36
-rw-r--r--yt/cpp/mapreduce/http/http_client.h15
4 files changed, 71 insertions, 25 deletions
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.cpp b/yt/cpp/mapreduce/http/abortable_http_response.cpp
index 7c337651f0a..a75d9e558ed 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.cpp
+++ b/yt/cpp/mapreduce/http/abortable_http_response.cpp
@@ -52,8 +52,8 @@ public:
auto g = Guard(Lock_);
int result = 0;
for (auto& response : ResponseList_) {
- if (!response.IsAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
- response.Abort();
+ if (!response.IsResponseAborted() && response.GetUrl().find(urlPattern) != TString::npos) {
+ response.AbortResponse();
++result;
}
}
@@ -112,7 +112,7 @@ TAbortableHttpResponseBase::~TAbortableHttpResponseBase()
TAbortableHttpResponseRegistry::Get().Remove(this);
}
-void TAbortableHttpResponseBase::Abort()
+void TAbortableHttpResponseBase::AbortResponse()
{
Aborted_ = true;
}
@@ -121,7 +121,7 @@ void TAbortableHttpResponseBase::SetLengthLimit(size_t limit)
{
LengthLimit_ = limit;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
}
@@ -130,7 +130,7 @@ const TString& TAbortableHttpResponseBase::GetUrl() const
return Url_;
}
-bool TAbortableHttpResponseBase::IsAborted() const
+bool TAbortableHttpResponseBase::IsResponseAborted() const
{
return Aborted_;
}
@@ -155,7 +155,7 @@ size_t TAbortableHttpResponse::DoRead(void* buf, size_t len)
auto read = THttpResponse::DoRead(buf, len);
LengthLimit_ -= read;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
return read;
}
@@ -188,7 +188,7 @@ TAbortableHttpResponse::TOutage TAbortableHttpResponse::StartOutage(
}
TAbortableCoreHttpResponse::TAbortableCoreHttpResponse(
- std::unique_ptr<IInputStream> stream,
+ std::unique_ptr<IAbortableInputStream> stream,
const TString& url)
: TAbortableHttpResponseBase(url)
, Stream_(std::move(stream))
@@ -204,7 +204,7 @@ size_t TAbortableCoreHttpResponse::DoRead(void* buf, size_t len)
auto read = Stream_->Read(buf, len);
LengthLimit_ -= read;
if (LengthLimit_ == 0) {
- Abort();
+ AbortResponse();
}
return read;
@@ -218,6 +218,17 @@ size_t TAbortableCoreHttpResponse::DoSkip(size_t len)
return Stream_->Skip(len);
}
+// IAbortableInputStream
+void TAbortableCoreHttpResponse::Abort()
+{
+ Stream_->Abort();
+}
+
+bool TAbortableCoreHttpResponse::IsAborted() const
+{
+ return Stream_->IsAborted();
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
diff --git a/yt/cpp/mapreduce/http/abortable_http_response.h b/yt/cpp/mapreduce/http/abortable_http_response.h
index e9b1483bf70..db8f97d50ba 100644
--- a/yt/cpp/mapreduce/http/abortable_http_response.h
+++ b/yt/cpp/mapreduce/http/abortable_http_response.h
@@ -35,9 +35,9 @@ class IAbortableHttpResponse
: public TIntrusiveListItem<IAbortableHttpResponse>
{
public:
- virtual void Abort() = 0;
+ virtual void AbortResponse() = 0;
virtual const TString& GetUrl() const = 0;
- virtual bool IsAborted() const = 0;
+ virtual bool IsResponseAborted() const = 0;
virtual void SetLengthLimit(size_t limit) = 0;
virtual ~IAbortableHttpResponse() = default;
@@ -50,9 +50,9 @@ public:
TAbortableHttpResponseBase(const TString& url);
~TAbortableHttpResponseBase();
- void Abort() override;
+ void AbortResponse() override;
const TString& GetUrl() const override;
- bool IsAborted() const override;
+ bool IsResponseAborted() const override;
void SetLengthLimit(size_t limit) override;
protected:
@@ -65,20 +65,24 @@ protected:
/// @brief Stream wrapper for @ref NYT::NHttpClient::TCoreHttpResponse with possibility to emulate errors.
class TAbortableCoreHttpResponse
- : public IInputStream
+ : public IAbortableInputStream
, public TAbortableHttpResponseBase
{
public:
TAbortableCoreHttpResponse(
- std::unique_ptr<IInputStream> stream,
+ std::unique_ptr<IAbortableInputStream> stream,
const TString& url);
+ // IAbortableInputStream
+ void Abort() override;
+ bool IsAborted() const override;
+
private:
size_t DoRead(void* buf, size_t len) override;
size_t DoSkip(size_t len) override;
private:
- std::unique_ptr<IInputStream> Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 8f0fb723d2d..454ba761241 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -5,7 +5,9 @@
#include "helpers.h"
#include "http.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
#include <yt/cpp/mapreduce/common/expected_error_guard.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/interface/config.h>
@@ -126,9 +128,12 @@ public:
return Request_->GetHttpCode();
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
- return Request_->GetResponseStream();
+ if (!Stream_) {
+ Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream());
+ }
+ return Stream_.get();
}
TString GetResponse() override
@@ -143,6 +148,7 @@ public:
private:
std::unique_ptr<THttpRequest> Request_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TDefaultHttpRequest
@@ -225,11 +231,15 @@ public:
return static_cast<int>(Response_->GetStatusCode());
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
if (!Stream_) {
+ NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_);
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
auto stream = std::make_unique<TWrappedStream>(
- NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
+ NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)),
Response_,
Context_.RequestId);
CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
@@ -277,15 +287,25 @@ public:
private:
class TWrappedStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
+ TWrappedStream(std::unique_ptr<IAbortableInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
: Underlying_(std::move(underlying))
, Response_(std::move(response))
, RequestId_(std::move(requestId))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -333,7 +353,7 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
NHttp::IResponsePtr Response_;
TString RequestId_;
};
@@ -341,7 +361,7 @@ private:
private:
TCoreRequestContext Context_;
NHttp::IResponsePtr Response_;
- std::unique_ptr<IInputStream> Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TCoreHttpRequest
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index d66e3e7c675..5ae57a200a7 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -2,6 +2,7 @@
#include "fwd.h"
+#include <yt/cpp/mapreduce/interface/abortable_stream.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <util/datetime/base.h>
@@ -28,7 +29,7 @@ public:
virtual ~IHttpResponse() = default;
virtual int GetStatusCode() = 0;
- virtual IInputStream* GetResponseStream() = 0;
+ virtual IAbortableInputStream* GetResponseStream() = 0;
virtual TString GetResponse() = 0;
virtual TString GetRequestId() const = 0;
};
@@ -74,6 +75,16 @@ public:
Underlying_ = Response_->GetResponseStream();
}
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
private:
size_t DoRead(void *buf, size_t len) override
{
@@ -87,7 +98,7 @@ private:
private:
IHttpResponsePtr Response_;
- IInputStream* Underlying_;
+ IAbortableInputStream* Underlying_;
};
////////////////////////////////////////////////////////////////////////////////