summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/http/http_client.cpp
diff options
context:
space:
mode:
authormaybenotilya <[email protected]>2026-04-17 11:57:44 +0300
committermaybenotilya <[email protected]>2026-04-17 12:34:55 +0300
commita6423bf330c10089d7a0e2aaa8d971d0dcbb3fcf (patch)
tree087131eb0146a82cd9c72d1b0a290d8d081c021e /yt/cpp/mapreduce/http/http_client.cpp
parent7bcfb91453e538211ed44e97c57d1104f543f920 (diff)
YT-26179: Add Abort for readers
* Changelog entry Type: feature Component: cpp-sdk Add Abort for table readers commit_hash:be5026883c90d18333f20468946f1e5a5b71839b
Diffstat (limited to 'yt/cpp/mapreduce/http/http_client.cpp')
-rw-r--r--yt/cpp/mapreduce/http/http_client.cpp36
1 files changed, 28 insertions, 8 deletions
diff --git a/yt/cpp/mapreduce/http/http_client.cpp b/yt/cpp/mapreduce/http/http_client.cpp
index 8f0fb723d2d..454ba761241 100644
--- a/yt/cpp/mapreduce/http/http_client.cpp
+++ b/yt/cpp/mapreduce/http/http_client.cpp
@@ -5,7 +5,9 @@
#include "helpers.h"
#include "http.h"
+#include <yt/cpp/mapreduce/common/abortable_stream.h>
#include <yt/cpp/mapreduce/common/expected_error_guard.h>
+#include <yt/cpp/mapreduce/common/halting_stream.h>
#include <yt/cpp/mapreduce/interface/config.h>
@@ -126,9 +128,12 @@ public:
return Request_->GetHttpCode();
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
- return Request_->GetResponseStream();
+ if (!Stream_) {
+ Stream_ = NDetail::CreateAbortableInputStreamAdapterFallback(Request_->GetResponseStream());
+ }
+ return Stream_.get();
}
TString GetResponse() override
@@ -143,6 +148,7 @@ public:
private:
std::unique_ptr<THttpRequest> Request_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TDefaultHttpRequest
@@ -225,11 +231,15 @@ public:
return static_cast<int>(Response_->GetStatusCode());
}
- IInputStream* GetResponseStream() override
+ IAbortableInputStream* GetResponseStream() override
{
if (!Stream_) {
+ NConcurrency::IAsyncInputStreamPtr asyncStream = NConcurrency::CreateCopyingAdapter(Response_);
+ if (TConfig::Get()->UseHaltingResponse) {
+ asyncStream = NDetail::CreateHaltingAsyncStream(std::move(asyncStream), TConfig::Get()->HaltingResponseBytesLimit);
+ }
auto stream = std::make_unique<TWrappedStream>(
- NConcurrency::CreateSyncAdapter(NConcurrency::CreateCopyingAdapter(Response_), NConcurrency::EWaitForStrategy::WaitFor),
+ NDetail::CreateAbortableInputStreamAdapter(std::move(asyncStream)),
Response_,
Context_.RequestId);
CheckErrorResponse(Context_.HostName, Context_.RequestId, Response_);
@@ -277,15 +287,25 @@ public:
private:
class TWrappedStream
- : public IInputStream
+ : public IAbortableInputStream
{
public:
- TWrappedStream(std::unique_ptr<IInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
+ TWrappedStream(std::unique_ptr<IAbortableInputStream> underlying, NHttp::IResponsePtr response, TString requestId)
: Underlying_(std::move(underlying))
, Response_(std::move(response))
, RequestId_(std::move(requestId))
{ }
+ void Abort() override
+ {
+ Underlying_->Abort();
+ }
+
+ bool IsAborted() const override
+ {
+ return Underlying_->IsAborted();
+ }
+
protected:
size_t DoRead(void* buf, size_t len) override
{
@@ -333,7 +353,7 @@ private:
}
private:
- std::unique_ptr<IInputStream> Underlying_;
+ std::unique_ptr<IAbortableInputStream> Underlying_;
NHttp::IResponsePtr Response_;
TString RequestId_;
};
@@ -341,7 +361,7 @@ private:
private:
TCoreRequestContext Context_;
NHttp::IResponsePtr Response_;
- std::unique_ptr<IInputStream> Stream_;
+ std::unique_ptr<IAbortableInputStream> Stream_;
};
class TCoreHttpRequest