diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-04-18 12:01:09 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-04-18 12:11:25 +0300 |
commit | 044c22732c7e0fa5cbdd08543c7cd68a501170a6 (patch) | |
tree | 1d2f571d26bd697eb3e30102cd8d90ae5a28a201 | |
parent | 4d77c1fb4de7a121c720a5b403ba7aa516a32ce3 (diff) | |
download | ydb-044c22732c7e0fa5cbdd08543c7cd68a501170a6.tar.gz |
[yt/cpp/mapreduce] YT-21405: Don't ignore backoff and pass actual exception in Retry()
Don't ignore backoff and pass actual exception in Retry()
b821c02fd21c9f8115cd2a4896372a9fda69e5f6
-rw-r--r-- | yt/cpp/mapreduce/client/client_reader.cpp | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/client/client_reader.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/interface/io.h | 3 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/counting_raw_reader.cpp | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/counting_raw_reader.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/job_reader.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/job_reader.h | 9 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.cpp | 12 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.cpp | 2 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/proto_table_reader.cpp | 8 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.cpp | 14 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.h | 4 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.cpp | 6 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/stream_table_reader.h | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/yamr_table_reader.cpp | 8 |
16 files changed, 77 insertions, 44 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp index b080da2604..e1051fda96 100644 --- a/yt/cpp/mapreduce/client/client_reader.cpp +++ b/yt/cpp/mapreduce/client/client_reader.cpp @@ -82,15 +82,31 @@ TClientReader::TClientReader( bool TClientReader::Retry( const TMaybe<ui32>& rangeIndex, - const TMaybe<ui64>& rowIndex) + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) { if (CurrentRequestRetryPolicy_) { - // TODO we should pass actual exception in Retry function - yexception genericError; - auto backoff = CurrentRequestRetryPolicy_->OnGenericError(genericError); - if (!backoff) { + TMaybe<TDuration> backoffDuration; + try { + std::rethrow_exception(error); + } catch (const TErrorResponse& ex) { + if (!IsRetriable(ex)) { + throw; + } + backoffDuration = CurrentRequestRetryPolicy_->OnRetriableError(ex); + } catch (const std::exception& ex) { + if (!IsRetriable(ex)) { + throw; + } + backoffDuration = CurrentRequestRetryPolicy_->OnGenericError(ex); + } catch (...) { + } + + if (!backoffDuration) { return false; } + + NDetail::TWaitProxy::Get()->Sleep(*backoffDuration); } try { diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h index 22f5a0ebb0..782edb77b7 100644 --- a/yt/cpp/mapreduce/client/client_reader.h +++ b/yt/cpp/mapreduce/client/client_reader.h @@ -31,7 +31,8 @@ public: bool Retry( const TMaybe<ui32>& rangeIndex, - const TMaybe<ui64>& rowIndex) override; + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) override; void ResetRetries() override; diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h index 8497c8aae2..4f5480d107 100644 --- a/yt/cpp/mapreduce/interface/io.h +++ b/yt/cpp/mapreduce/interface/io.h @@ -142,7 +142,8 @@ public: /// from the stream. virtual bool Retry( const TMaybe<ui32>& rangeIndex, - const TMaybe<ui64>& rowIndex) = 0; + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) = 0; /// Resets retry attempt count to the initial value (then `Retry()` can be called again). virtual void ResetRetries() = 0; diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp index 6a918bdddb..c6213e8665 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp +++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp @@ -5,9 +5,12 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////////// -bool TCountingRawTableReader::Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) +bool TCountingRawTableReader::Retry( + const TMaybe<ui32>& rangeIndex, + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) { - return Reader_->Retry(rangeIndex, rowIndex); + return Reader_->Retry(rangeIndex, rowIndex, error); } void TCountingRawTableReader::ResetRetries() diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h index 3b6705c5e4..c3b197d584 100644 --- a/yt/cpp/mapreduce/io/counting_raw_reader.h +++ b/yt/cpp/mapreduce/io/counting_raw_reader.h @@ -13,7 +13,10 @@ public: : Reader_(std::move(reader)) { } - bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) override; + bool Retry( + const TMaybe<ui32>& rangeIndex, + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) override; void ResetRetries() override; bool HasRangeIndices() const override; diff --git a/yt/cpp/mapreduce/io/job_reader.cpp b/yt/cpp/mapreduce/io/job_reader.cpp index 39056f00e2..1547309cf0 100644 --- a/yt/cpp/mapreduce/io/job_reader.cpp +++ b/yt/cpp/mapreduce/io/job_reader.cpp @@ -16,7 +16,10 @@ TJobReader::TJobReader(const TFile& file) , BufferedInput_(&FdInput_, BUFFER_SIZE) { } -bool TJobReader::Retry(const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) +bool TJobReader::Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& /*rowIndex*/, + const std::exception_ptr& /*error*/) { return false; } diff --git a/yt/cpp/mapreduce/io/job_reader.h b/yt/cpp/mapreduce/io/job_reader.h index ce62ec180f..c5f6cdffc2 100644 --- a/yt/cpp/mapreduce/io/job_reader.h +++ b/yt/cpp/mapreduce/io/job_reader.h @@ -17,9 +17,12 @@ public: explicit TJobReader(int fd); explicit TJobReader(const TFile& file); - virtual bool Retry( const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) override; - virtual void ResetRetries() override; - virtual bool HasRangeIndices() const override; + bool Retry( + const TMaybe<ui32>& rangeIndex, + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& error) override; + void ResetRetries() override; + bool HasRangeIndices() const override; protected: size_t DoRead(void* buf, size_t len) override; diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp index 4dba2a1a86..d7a1c9754e 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -111,8 +111,8 @@ void TLenvalTableReader::Next() Length_ = static_cast<ui32>(value); RowTaken_ = false; AtStart_ = false; - } catch (const std::exception& e) { - if (!PrepareRetry()) { + } catch (const std::exception& ex) { + if (!PrepareRetry(std::make_exception_ptr(ex))) { throw; } continue; @@ -121,9 +121,9 @@ void TLenvalTableReader::Next() } } -bool TLenvalTableReader::Retry() +bool TLenvalTableReader::Retry(const std::exception_ptr& error) { - if (PrepareRetry()) { + if (PrepareRetry(error)) { RowTaken_ = true; Next(); return true; @@ -183,9 +183,9 @@ bool TLenvalTableReader::IsRawReaderExhausted() const return Finished_; } -bool TLenvalTableReader::PrepareRetry() +bool TLenvalTableReader::PrepareRetry(const std::exception_ptr& error) { - if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (Input_.Retry(RangeIndex_, RowIndex_, error)) { if (RangeIndex_) { RangeIndexShift_ += *RangeIndex_; } diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h index cfece052c5..2baa6e45a0 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.h +++ b/yt/cpp/mapreduce/io/lenval_table_reader.h @@ -27,7 +27,7 @@ protected: void CheckValidity() const; - bool Retry(); + bool Retry(const std::exception_ptr& error); template <class T> bool ReadInteger(T* result, bool acceptEndOfStream = false) @@ -60,7 +60,7 @@ protected: ui32 Length_ = 0; private: - bool PrepareRetry(); + bool PrepareRetry(const std::exception_ptr& error); }; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index ca529e14db..bc3da75ee6 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -354,7 +354,7 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error { YT_LOG_ERROR("Read error (RangeIndex: %v, RowIndex: %v, Error: %v)", RangeIndex_, RowIndex_, error); Exception_ = exception; - if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (Input_.Retry(RangeIndex_, RowIndex_, exception)) { if (RangeIndex_) { RangeIndexShift_ += *RangeIndex_; } diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp index c0bf90d253..6f79619c81 100644 --- a/yt/cpp/mapreduce/io/proto_table_reader.cpp +++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp @@ -238,8 +238,8 @@ void TLenvalProtoTableReader::ReadRow(Message* row) Input_.ResetRetries(); break; - } catch (const std::exception& ) { - if (!TLenvalTableReader::Retry()) { + } catch (const std::exception& ex) { + if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) { throw; } } @@ -300,8 +300,8 @@ void TLenvalProtoTableReader::SkipRow() ythrow yexception() << "Premature end of stream"; } break; - } catch (const std::exception& ) { - if (!TLenvalTableReader::Retry()) { + } catch (const std::exception& ex) { + if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) { throw; } } diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp index c0004564bf..3f7769a7e1 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp @@ -29,9 +29,9 @@ TSkiffRowTableReader::TSkiffRowTableReader( TSkiffRowTableReader::~TSkiffRowTableReader() { } -bool TSkiffRowTableReader::Retry() +bool TSkiffRowTableReader::Retry(const std::exception_ptr& error) { - if (PrepareRetry()) { + if (PrepareRetry(error)) { RowTaken_ = true; Next(); return true; @@ -39,9 +39,9 @@ bool TSkiffRowTableReader::Retry() return false; } -bool TSkiffRowTableReader::PrepareRetry() +bool TSkiffRowTableReader::PrepareRetry(const std::exception_ptr& error) { - if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (Input_.Retry(RangeIndex_, RowIndex_, error)) { if (RangeIndex_) { RangeIndexShift_ += *RangeIndex_; } @@ -69,7 +69,7 @@ void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser) } catch (const std::exception& ex) { YT_LOG_ERROR("Read error during parsing: %v", ex.what()); - if (!Retry()) { + if (!Retry(std::make_exception_ptr(ex))) { throw; } } @@ -92,7 +92,7 @@ void TSkiffRowTableReader::SkipRow() } catch (const std::exception& ex) { YT_LOG_ERROR("Read error during skipping row: %v", ex.what()); - if (!Retry()) { + if (!Retry(std::make_exception_ptr(ex))) { throw; } } @@ -173,7 +173,7 @@ void TSkiffRowTableReader::Next() } catch (const std::exception& ex) { YT_LOG_ERROR("Read error: %v", ex.what()); - if (!PrepareRetry()) { + if (!PrepareRetry(std::make_exception_ptr(ex))) { throw; } } diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h index ed3c94c6c2..1f623570bb 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h @@ -39,10 +39,10 @@ public: bool IsRawReaderExhausted() const override; private: - bool Retry(); + bool Retry(const std::exception_ptr& error); void SkipRow(); void CheckValidity() const; - bool PrepareRetry(); + bool PrepareRetry(const std::exception_ptr& error); private: NDetail::TCountingRawTableReader Input_; diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp index 3f6440a8bb..446c529169 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -98,9 +98,9 @@ void TSkiffTableReader::Next() try { ReadRow(); break; - } catch (const std::exception& exception) { - YT_LOG_ERROR("Read error: %v", exception.what()); - if (!Input_.Retry(RangeIndex_, RowIndex_)) { + } catch (const std::exception& ex) { + YT_LOG_ERROR("Read error: %v", ex.what()); + if (!Input_.Retry(RangeIndex_, RowIndex_, std::make_exception_ptr(ex))) { throw; } BufferedInput_ = TBufferedInput(&Input_); diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h index d799c63cf4..a7694a0a70 100644 --- a/yt/cpp/mapreduce/io/stream_table_reader.h +++ b/yt/cpp/mapreduce/io/stream_table_reader.h @@ -15,7 +15,10 @@ public: : Stream_(stream) { } - bool Retry(const TMaybe<ui32>& /* rangeIndex */, const TMaybe<ui64>& /* rowIndex */) override + bool Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& /*rowIndex*/, + const std::exception_ptr& /*error*/) override { return false; } diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp index 6204738e10..55f270a6b6 100644 --- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp +++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp @@ -111,8 +111,8 @@ void TYaMRTableReader::ReadRow() Input_.ResetRetries(); break; - } catch (const std::exception& ) { - if (!TLenvalTableReader::Retry()) { + } catch (const std::exception& ex) { + if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) { throw; } } @@ -132,8 +132,8 @@ void TYaMRTableReader::SkipRow() ReadInteger(&value); CheckedSkip(&Input_, value); break; - } catch (const std::exception& ) { - if (!TLenvalTableReader::Retry()) { + } catch (const std::exception& ex) { + if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) { throw; } } |