aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authorhiddenpath <hiddenpath@yandex-team.com>2024-12-28 16:16:43 +0300
committerhiddenpath <hiddenpath@yandex-team.com>2024-12-28 16:44:44 +0300
commit8ff3898891499e702afdf66722fa8a5cfe4c2336 (patch)
treebe8207b7fbc0686bf1421da283469f70efbe8ad6 /yt/cpp
parentb36c4920ee2e83bfbd2d2761ed0e574125c73426 (diff)
downloadydb-8ff3898891499e702afdf66722fa8a5cfe4c2336.tar.gz
Remove TResponseReader implementation
commit_hash:0b103a32d38299cd8ae3f308687280983046d02a
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/http/http_client.h4
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp61
2 files changed, 18 insertions, 47 deletions
diff --git a/yt/cpp/mapreduce/http/http_client.h b/yt/cpp/mapreduce/http/http_client.h
index a01b619fab..d66e3e7c67 100644
--- a/yt/cpp/mapreduce/http/http_client.h
+++ b/yt/cpp/mapreduce/http/http_client.h
@@ -2,7 +2,7 @@
#include "fwd.h"
-#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/io.h>
#include <util/datetime/base.h>
@@ -65,7 +65,7 @@ public:
////////////////////////////////////////////////////////////////////////////////
class THttpResponseStream
- : public IInputStream
+ : public IFileReader
{
public:
THttpResponseStream(IHttpResponsePtr response)
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
index 4f2b91fe34..359c73da3e 100644
--- a/yt/cpp/mapreduce/raw_client/raw_client.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -465,54 +465,17 @@ TListJobsResult THttpRawClient::ListJobs(
return result;
}
-class TResponseReader
- : public IFileReader
-{
-public:
- TResponseReader(const TClientContext& context, THttpHeader header)
- {
- if (context.ServiceTicketAuth) {
- header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
- } else {
- header.SetToken(context.Token);
- }
-
- if (context.ImpersonationUser) {
- header.SetImpersonationUser(*context.ImpersonationUser);
- }
-
- auto hostName = GetProxyForHeavyRequest(context);
- auto requestId = CreateGuidAsString();
-
- UpdateHeaderForProxyIfNeed(hostName, context, header);
-
- Response_ = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
- ResponseStream_ = Response_->GetResponseStream();
- }
-
-private:
- size_t DoRead(void* buf, size_t len) override
- {
- return ResponseStream_->Read(buf, len);
- }
-
- size_t DoSkip(size_t len) override
- {
- return ResponseStream_->Skip(len);
- }
-
-private:
- NHttpClient::IHttpResponsePtr Response_;
- IInputStream* ResponseStream_;
-};
-
IFileReaderPtr THttpRawClient::GetJobInput(
const TJobId& jobId,
const TGetJobInputOptions& /*options*/)
{
+ TMutationId mutationId;
THttpHeader header("GET", "get_job_input");
header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(Context_, std::move(header));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
IFileReaderPtr THttpRawClient::GetJobFailContext(
@@ -520,10 +483,14 @@ IFileReaderPtr THttpRawClient::GetJobFailContext(
const TJobId& jobId,
const TGetJobFailContextOptions& /*options*/)
{
+ TMutationId mutationId;
THttpHeader header("GET", "get_job_fail_context");
header.AddOperationId(operationId);
header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(Context_, std::move(header));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
TString THttpRawClient::GetJobStderrWithRetries(
@@ -537,7 +504,7 @@ TString THttpRawClient::GetJobStderrWithRetries(
header.AddParameter("job_id", GetGuidAsString(jobId));
TRequestConfig config;
config.IsHeavy = true;
- auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, {}, config);
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
return responseInfo->GetResponse();
}
@@ -546,10 +513,14 @@ IFileReaderPtr THttpRawClient::GetJobStderr(
const TJobId& jobId,
const TGetJobStderrOptions& /*options*/)
{
+ TMutationId mutationId;
THttpHeader header("GET", "get_job_stderr");
header.AddOperationId(operationId);
header.AddParameter("job_id", GetGuidAsString(jobId));
- return new TResponseReader(Context_, std::move(header));
+ TRequestConfig config;
+ config.IsHeavy = true;
+ auto responseInfo = RequestWithoutRetry(Context_, mutationId, header, /*body*/ {}, config);
+ return MakeIntrusive<NHttpClient::THttpResponseStream>(std::move(responseInfo));
}
TJobTraceEvent ParseJobTraceEvent(const TNode& node)