From cfcea9a51e10b12e6e4a28e908e479da31d15f3e Mon Sep 17 00:00:00 2001 From: nadya73 Date: Tue, 28 Nov 2023 09:43:31 +0300 Subject: [yt/cpp/mapreduce] YT-20588: Add more logs for readers and fix client_reader retries --- yt/cpp/mapreduce/client/client_reader.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'yt/cpp/mapreduce/client/client_reader.cpp') diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index 606ac1c2974..b080da26048 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -145,6 +145,9 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybeCreatePolicyForGenericRequest(); } + + bool areRangesUpdated = false; + while (true) { CurrentRequestRetryPolicy_->NotifyNewAttempt(); @@ -170,7 +173,7 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybeAcceptEncoding)); - if (rowIndex.Defined()) { + if (rowIndex.Defined() && !areRangesUpdated) { auto& ranges = Path_.MutableRanges(); if (ranges.Empty()) { ranges.ConstructInPlace(TVector{TReadRange()}); @@ -183,6 +186,7 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybeerase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0)); } ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex)); + areRangesUpdated = true; } header.MergeParameters(FormIORequestParameters(Path_, Options_)); @@ -196,7 +200,7 @@ void TClientReader::CreateRequest(const TMaybe& rangeIndex, const TMaybeGetResponseStream(); - YT_LOG_DEBUG("RSP %v - table stream", requestId); + YT_LOG_DEBUG("RSP %v - table stream (RequestId: %v, RangeIndex: %v, RowIndex: %v)", requestId, rangeIndex, rowIndex); return; } catch (const TErrorResponse& e) { -- cgit v1.3