diff options
| author | hiddenpath <[email protected]> | 2024-12-22 16:37:15 +0300 |
|---|---|---|
| committer | hiddenpath <[email protected]> | 2024-12-22 16:58:29 +0300 |
| commit | f9e36e78609de5b801f829466f06ccc9a87dc00b (patch) | |
| tree | f0d954fa30bf5352c4ce825c3293032d72a51189 /yt/cpp/mapreduce/client/client_reader.cpp | |
| parent | 75a274b6f3b196529306be849688c15b6537d1d2 (diff) | |
YT-23616: Move ReadTable to THttpRawClient
commit_hash:c145049aef2f4ccaff537670be830d81ebc6f8d6
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 105 |
1 files changed, 17 insertions, 88 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index b3127168771..e7538a22da0 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -166,99 +166,28 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); } - bool areRangesUpdated = false; + auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - while (true) { - CurrentRequestRetryPolicy_->NotifyNewAttempt(); - - THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion)); - if (Context_.ServiceTicketAuth) { - header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket()); + if (rowIndex.Defined()) { + auto& ranges = Path_.MutableRanges(); + if (ranges.Empty()) { + ranges.ConstructInPlace(TVector{TReadRange()}); } else { - header.SetToken(Context_.Token); - } - - if (Context_.ImpersonationUser) { - header.SetImpersonationUser(*Context_.ImpersonationUser); - } - - auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_); - header.AddTransactionId(transactionId); - - const auto& controlAttributes = Options_.ControlAttributes_; - header.AddParameter("control_attributes", TNode() - ("enable_row_index", controlAttributes.EnableRowIndex_) - ("enable_range_index", controlAttributes.EnableRangeIndex_)); - header.SetOutputFormat(Format_); - - header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - - if (rowIndex.Defined() && !areRangesUpdated) { - auto& ranges = Path_.MutableRanges(); - if (ranges.Empty()) { - ranges.ConstructInPlace(TVector{TReadRange()}); - } else { - if (rangeIndex.GetOrElse(0) >= ranges->size()) { - ythrow yexception() - << "range index " << rangeIndex.GetOrElse(0) - << " is out of range, input range count is " << ranges->size(); - } - ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); - } - ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); - areRangesUpdated = true; - } - - header.MergeParameters(FormIORequestParameters(Path_, Options_)); - - auto requestId = CreateGuidAsString(); - - try { - const auto proxyName = GetProxyForHeavyRequest(Context_); - UpdateHeaderForProxyIfNeed(proxyName, Context_, header); - Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header); - - Input_ = Response_->GetResponseStream(); - - YT_LOG_DEBUG( - "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)", - requestId, - rangeIndex, - rowIndex); - - return; - } catch (const TErrorResponse& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - if (!IsRetriable(e)) { - throw; + if (rangeIndex.GetOrElse(0) >= ranges->size()) { + ythrow yexception() + << "range index " << rangeIndex.GetOrElse(0) + << " is out of range, input range count is " << ranges->size(); } - auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); - } catch (const std::exception& e) { - LogRequestError( - requestId, - header, - e.what(), - CurrentRequestRetryPolicy_->GetAttemptDescription()); - - Response_.reset(); - Input_ = nullptr; - - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e); - if (!backoff) { - throw; - } - NDetail::TWaitProxy::Get()->Sleep(*backoff); + ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } + ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); } + + Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>( + CurrentRequestRetryPolicy_, + [this, &transactionId] (TMutationId /*mutationId*/) { + return RawClient_->ReadTable(transactionId, Path_, Format_, Options_); + }); } //////////////////////////////////////////////////////////////////////////////// |
