diff options
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 606ac1c2974..b080da26048 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -145,6 +145,9 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u if (!CurrentRequestRetryPolicy_) { CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); } + + bool areRangesUpdated = false; + while (true) { CurrentRequestRetryPolicy_->NotifyNewAttempt(); @@ -170,7 +173,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - if (rowIndex.Defined()) { + if (rowIndex.Defined() && !areRangesUpdated) { auto& ranges = Path_.MutableRanges(); if (ranges.Empty()) { ranges.ConstructInPlace(TVector{TReadRange()}); @@ -183,6 +186,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); + areRangesUpdated = true; } header.MergeParameters(FormIORequestParameters(Path_, Options_)); @@ -196,7 +200,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u Input_ = Response_->GetResponseStream(); - YT_LOG_DEBUG("RSP %v - table stream", requestId); + YT_LOG_DEBUG("RSP %v - table stream (RequestId: %v, RangeIndex: %v, RowIndex: %v)", requestId, rangeIndex, rowIndex); return; } catch (const TErrorResponse& e) { |
