summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/client_reader.cpp
diff options
context:
space:
mode:
authorhiddenpath <[email protected]>2024-12-22 16:37:15 +0300
committerhiddenpath <[email protected]>2024-12-22 16:58:29 +0300
commitf9e36e78609de5b801f829466f06ccc9a87dc00b (patch)
treef0d954fa30bf5352c4ce825c3293032d72a51189 /yt/cpp/mapreduce/client/client_reader.cpp
parent75a274b6f3b196529306be849688c15b6537d1d2 (diff)
YT-23616: Move ReadTable to THttpRawClient
commit_hash:c145049aef2f4ccaff537670be830d81ebc6f8d6
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp105
1 files changed, 17 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index b3127168771..e7538a22da0 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u
CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
}
- bool areRangesUpdated = false;
+ auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- while (true) {
- CurrentRequestRetryPolicy_->NotifyNewAttempt();
-
- THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
- if (Context_.ServiceTicketAuth) {
- header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket());
+ if (rowIndex.Defined()) {
+ auto& ranges = Path_.MutableRanges();
+ if (ranges.Empty()) {
+ ranges.ConstructInPlace(TVector{TReadRange()});
} else {
- header.SetToken(Context_.Token);
- }
-
- if (Context_.ImpersonationUser) {
- header.SetImpersonationUser(*Context_.ImpersonationUser);
- }
-
- auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
- header.AddTransactionId(transactionId);
-
- const auto& controlAttributes = Options_.ControlAttributes_;
- header.AddParameter("control_attributes", TNode()
- ("enable_row_index", controlAttributes.EnableRowIndex_)
- ("enable_range_index", controlAttributes.EnableRangeIndex_));
- header.SetOutputFormat(Format_);
-
- header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
-
- if (rowIndex.Defined() && !areRangesUpdated) {
- auto& ranges = Path_.MutableRanges();
- if (ranges.Empty()) {
- ranges.ConstructInPlace(TVector{TReadRange()});
- } else {
- if (rangeIndex.GetOrElse(0) >= ranges->size()) {
- ythrow yexception()
- << "range index " << rangeIndex.GetOrElse(0)
- << " is out of range, input range count is " << ranges->size();
- }
- ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
- }
- ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
- areRangesUpdated = true;
- }
-
- header.MergeParameters(FormIORequestParameters(Path_, Options_));
-
- auto requestId = CreateGuidAsString();
-
- try {
- const auto proxyName = GetProxyForHeavyRequest(Context_);
- UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
- Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
-
- Input_ = Response_->GetResponseStream();
-
- YT_LOG_DEBUG(
- "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)",
- requestId,
- rangeIndex,
- rowIndex);
-
- return;
- } catch (const TErrorResponse& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- if (!IsRetriable(e)) {
- throw;
+ if (rangeIndex.GetOrElse(0) >= ranges->size()) {
+ ythrow yexception()
+ << "range index " << rangeIndex.GetOrElse(0)
+ << " is out of range, input range count is " << ranges->size();
}
- auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
- } catch (const std::exception& e) {
- LogRequestError(
- requestId,
- header,
- e.what(),
- CurrentRequestRetryPolicy_->GetAttemptDescription());
-
- Response_.reset();
- Input_ = nullptr;
-
- auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e);
- if (!backoff) {
- throw;
- }
- NDetail::TWaitProxy::Get()->Sleep(*backoff);
+ ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
}
+ ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
}
+
+ Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
+ CurrentRequestRetryPolicy_,
+ [this, &transactionId] (TMutationId /*mutationId*/) {
+ return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
+ });
}
////////////////////////////////////////////////////////////////////////////////