diff options
| author | nadya73 <[email protected]> | 2023-11-28 09:43:31 +0300 |
|---|---|---|
| committer | nadya73 <[email protected]> | 2023-11-28 10:30:42 +0300 |
| commit | cfcea9a51e10b12e6e4a28e908e479da31d15f3e (patch) | |
| tree | e8ac5fa092c3f0356d6808aa5b2f1967d6088d43 /yt/cpp/mapreduce/client/client_reader.cpp | |
| parent | 44d57691b82c69b65e3b570c8f52424a7c3e833a (diff) | |
[yt/cpp/mapreduce] YT-20588: Add more logs for readers and fix client_reader retries
Diffstat (limited to 'yt/cpp/mapreduce/client/client_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 606ac1c2974..b080da26048 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -145,6 +145,9 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u if (!CurrentRequestRetryPolicy_) { CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest(); } + + bool areRangesUpdated = false; + while (true) { CurrentRequestRetryPolicy_->NotifyNewAttempt(); @@ -170,7 +173,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding)); - if (rowIndex.Defined()) { + if (rowIndex.Defined() && !areRangesUpdated) { auto& ranges = Path_.MutableRanges(); if (ranges.Empty()) { ranges.ConstructInPlace(TVector{TReadRange()}); @@ -183,6 +186,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); + areRangesUpdated = true; } header.MergeParameters(FormIORequestParameters(Path_, Options_)); @@ -196,7 +200,7 @@ void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<u Input_ = Response_->GetResponseStream(); - YT_LOG_DEBUG("RSP %v - table stream", requestId); + YT_LOG_DEBUG("RSP %v - table stream (RequestId: %v, RangeIndex: %v, RowIndex: %v)", requestId, rangeIndex, rowIndex); return; } catch (const TErrorResponse& e) { |
