aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-04-18 12:01:09 +0300
committernadya73 <nadya73@yandex-team.com>2024-04-18 12:11:25 +0300
commit044c22732c7e0fa5cbdd08543c7cd68a501170a6 (patch)
tree1d2f571d26bd697eb3e30102cd8d90ae5a28a201
parent4d77c1fb4de7a121c720a5b403ba7aa516a32ce3 (diff)
downloadydb-044c22732c7e0fa5cbdd08543c7cd68a501170a6.tar.gz
[yt/cpp/mapreduce] YT-21405: Don't ignore backoff and pass actual exception in Retry()
Don't ignore backoff and pass actual exception in Retry() b821c02fd21c9f8115cd2a4896372a9fda69e5f6
-rw-r--r--yt/cpp/mapreduce/client/client_reader.cpp26
-rw-r--r--yt/cpp/mapreduce/client/client_reader.h3
-rw-r--r--yt/cpp/mapreduce/interface/io.h3
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.cpp7
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.h5
-rw-r--r--yt/cpp/mapreduce/io/job_reader.cpp5
-rw-r--r--yt/cpp/mapreduce/io/job_reader.h9
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.cpp12
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp2
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.cpp8
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.cpp14
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.h4
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp6
-rw-r--r--yt/cpp/mapreduce/io/stream_table_reader.h5
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.cpp8
16 files changed, 77 insertions, 44 deletions
diff --git a/yt/cpp/mapreduce/client/client_reader.cpp b/yt/cpp/mapreduce/client/client_reader.cpp
index b080da2604..e1051fda96 100644
--- a/yt/cpp/mapreduce/client/client_reader.cpp
+++ b/yt/cpp/mapreduce/client/client_reader.cpp
@@ -82,15 +82,31 @@ TClientReader::TClientReader(
bool TClientReader::Retry(
const TMaybe<ui32>& rangeIndex,
- const TMaybe<ui64>& rowIndex)
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error)
{
if (CurrentRequestRetryPolicy_) {
- // TODO we should pass actual exception in Retry function
- yexception genericError;
- auto backoff = CurrentRequestRetryPolicy_->OnGenericError(genericError);
- if (!backoff) {
+ TMaybe<TDuration> backoffDuration;
+ try {
+ std::rethrow_exception(error);
+ } catch (const TErrorResponse& ex) {
+ if (!IsRetriable(ex)) {
+ throw;
+ }
+ backoffDuration = CurrentRequestRetryPolicy_->OnRetriableError(ex);
+ } catch (const std::exception& ex) {
+ if (!IsRetriable(ex)) {
+ throw;
+ }
+ backoffDuration = CurrentRequestRetryPolicy_->OnGenericError(ex);
+ } catch (...) {
+ }
+
+ if (!backoffDuration) {
return false;
}
+
+ NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
}
try {
diff --git a/yt/cpp/mapreduce/client/client_reader.h b/yt/cpp/mapreduce/client/client_reader.h
index 22f5a0ebb0..782edb77b7 100644
--- a/yt/cpp/mapreduce/client/client_reader.h
+++ b/yt/cpp/mapreduce/client/client_reader.h
@@ -31,7 +31,8 @@ public:
bool Retry(
const TMaybe<ui32>& rangeIndex,
- const TMaybe<ui64>& rowIndex) override;
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error) override;
void ResetRetries() override;
diff --git a/yt/cpp/mapreduce/interface/io.h b/yt/cpp/mapreduce/interface/io.h
index 8497c8aae2..4f5480d107 100644
--- a/yt/cpp/mapreduce/interface/io.h
+++ b/yt/cpp/mapreduce/interface/io.h
@@ -142,7 +142,8 @@ public:
/// from the stream.
virtual bool Retry(
const TMaybe<ui32>& rangeIndex,
- const TMaybe<ui64>& rowIndex) = 0;
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error) = 0;
/// Resets retry attempt count to the initial value (then `Retry()` can be called again).
virtual void ResetRetries() = 0;
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
index 6a918bdddb..c6213e8665 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.cpp
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
@@ -5,9 +5,12 @@ namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
-bool TCountingRawTableReader::Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex)
+bool TCountingRawTableReader::Retry(
+ const TMaybe<ui32>& rangeIndex,
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error)
{
- return Reader_->Retry(rangeIndex, rowIndex);
+ return Reader_->Retry(rangeIndex, rowIndex, error);
}
void TCountingRawTableReader::ResetRetries()
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h
index 3b6705c5e4..c3b197d584 100644
--- a/yt/cpp/mapreduce/io/counting_raw_reader.h
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.h
@@ -13,7 +13,10 @@ public:
: Reader_(std::move(reader))
{ }
- bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) override;
+ bool Retry(
+ const TMaybe<ui32>& rangeIndex,
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error) override;
void ResetRetries() override;
bool HasRangeIndices() const override;
diff --git a/yt/cpp/mapreduce/io/job_reader.cpp b/yt/cpp/mapreduce/io/job_reader.cpp
index 39056f00e2..1547309cf0 100644
--- a/yt/cpp/mapreduce/io/job_reader.cpp
+++ b/yt/cpp/mapreduce/io/job_reader.cpp
@@ -16,7 +16,10 @@ TJobReader::TJobReader(const TFile& file)
, BufferedInput_(&FdInput_, BUFFER_SIZE)
{ }
-bool TJobReader::Retry(const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/)
+bool TJobReader::Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& /*rowIndex*/,
+ const std::exception_ptr& /*error*/)
{
return false;
}
diff --git a/yt/cpp/mapreduce/io/job_reader.h b/yt/cpp/mapreduce/io/job_reader.h
index ce62ec180f..c5f6cdffc2 100644
--- a/yt/cpp/mapreduce/io/job_reader.h
+++ b/yt/cpp/mapreduce/io/job_reader.h
@@ -17,9 +17,12 @@ public:
explicit TJobReader(int fd);
explicit TJobReader(const TFile& file);
- virtual bool Retry( const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) override;
- virtual void ResetRetries() override;
- virtual bool HasRangeIndices() const override;
+ bool Retry(
+ const TMaybe<ui32>& rangeIndex,
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& error) override;
+ void ResetRetries() override;
+ bool HasRangeIndices() const override;
protected:
size_t DoRead(void* buf, size_t len) override;
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
index 4dba2a1a86..d7a1c9754e 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
@@ -111,8 +111,8 @@ void TLenvalTableReader::Next()
Length_ = static_cast<ui32>(value);
RowTaken_ = false;
AtStart_ = false;
- } catch (const std::exception& e) {
- if (!PrepareRetry()) {
+ } catch (const std::exception& ex) {
+ if (!PrepareRetry(std::make_exception_ptr(ex))) {
throw;
}
continue;
@@ -121,9 +121,9 @@ void TLenvalTableReader::Next()
}
}
-bool TLenvalTableReader::Retry()
+bool TLenvalTableReader::Retry(const std::exception_ptr& error)
{
- if (PrepareRetry()) {
+ if (PrepareRetry(error)) {
RowTaken_ = true;
Next();
return true;
@@ -183,9 +183,9 @@ bool TLenvalTableReader::IsRawReaderExhausted() const
return Finished_;
}
-bool TLenvalTableReader::PrepareRetry()
+bool TLenvalTableReader::PrepareRetry(const std::exception_ptr& error)
{
- if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (Input_.Retry(RangeIndex_, RowIndex_, error)) {
if (RangeIndex_) {
RangeIndexShift_ += *RangeIndex_;
}
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h
index cfece052c5..2baa6e45a0 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.h
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.h
@@ -27,7 +27,7 @@ protected:
void CheckValidity() const;
- bool Retry();
+ bool Retry(const std::exception_ptr& error);
template <class T>
bool ReadInteger(T* result, bool acceptEndOfStream = false)
@@ -60,7 +60,7 @@ protected:
ui32 Length_ = 0;
private:
- bool PrepareRetry();
+ bool PrepareRetry(const std::exception_ptr& error);
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
index ca529e14db..bc3da75ee6 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -354,7 +354,7 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error
{
YT_LOG_ERROR("Read error (RangeIndex: %v, RowIndex: %v, Error: %v)", RangeIndex_, RowIndex_, error);
Exception_ = exception;
- if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (Input_.Retry(RangeIndex_, RowIndex_, exception)) {
if (RangeIndex_) {
RangeIndexShift_ += *RangeIndex_;
}
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp
index c0bf90d253..6f79619c81 100644
--- a/yt/cpp/mapreduce/io/proto_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp
@@ -238,8 +238,8 @@ void TLenvalProtoTableReader::ReadRow(Message* row)
Input_.ResetRetries();
break;
- } catch (const std::exception& ) {
- if (!TLenvalTableReader::Retry()) {
+ } catch (const std::exception& ex) {
+ if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
throw;
}
}
@@ -300,8 +300,8 @@ void TLenvalProtoTableReader::SkipRow()
ythrow yexception() << "Premature end of stream";
}
break;
- } catch (const std::exception& ) {
- if (!TLenvalTableReader::Retry()) {
+ } catch (const std::exception& ex) {
+ if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
throw;
}
}
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
index c0004564bf..3f7769a7e1 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
@@ -29,9 +29,9 @@ TSkiffRowTableReader::TSkiffRowTableReader(
TSkiffRowTableReader::~TSkiffRowTableReader()
{ }
-bool TSkiffRowTableReader::Retry()
+bool TSkiffRowTableReader::Retry(const std::exception_ptr& error)
{
- if (PrepareRetry()) {
+ if (PrepareRetry(error)) {
RowTaken_ = true;
Next();
return true;
@@ -39,9 +39,9 @@ bool TSkiffRowTableReader::Retry()
return false;
}
-bool TSkiffRowTableReader::PrepareRetry()
+bool TSkiffRowTableReader::PrepareRetry(const std::exception_ptr& error)
{
- if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (Input_.Retry(RangeIndex_, RowIndex_, error)) {
if (RangeIndex_) {
RangeIndexShift_ += *RangeIndex_;
}
@@ -69,7 +69,7 @@ void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser)
} catch (const std::exception& ex) {
YT_LOG_ERROR("Read error during parsing: %v", ex.what());
- if (!Retry()) {
+ if (!Retry(std::make_exception_ptr(ex))) {
throw;
}
}
@@ -92,7 +92,7 @@ void TSkiffRowTableReader::SkipRow()
} catch (const std::exception& ex) {
YT_LOG_ERROR("Read error during skipping row: %v", ex.what());
- if (!Retry()) {
+ if (!Retry(std::make_exception_ptr(ex))) {
throw;
}
}
@@ -173,7 +173,7 @@ void TSkiffRowTableReader::Next()
} catch (const std::exception& ex) {
YT_LOG_ERROR("Read error: %v", ex.what());
- if (!PrepareRetry()) {
+ if (!PrepareRetry(std::make_exception_ptr(ex))) {
throw;
}
}
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
index ed3c94c6c2..1f623570bb 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
@@ -39,10 +39,10 @@ public:
bool IsRawReaderExhausted() const override;
private:
- bool Retry();
+ bool Retry(const std::exception_ptr& error);
void SkipRow();
void CheckValidity() const;
- bool PrepareRetry();
+ bool PrepareRetry(const std::exception_ptr& error);
private:
NDetail::TCountingRawTableReader Input_;
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
index 3f6440a8bb..446c529169 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -98,9 +98,9 @@ void TSkiffTableReader::Next()
try {
ReadRow();
break;
- } catch (const std::exception& exception) {
- YT_LOG_ERROR("Read error: %v", exception.what());
- if (!Input_.Retry(RangeIndex_, RowIndex_)) {
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Read error: %v", ex.what());
+ if (!Input_.Retry(RangeIndex_, RowIndex_, std::make_exception_ptr(ex))) {
throw;
}
BufferedInput_ = TBufferedInput(&Input_);
diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h
index d799c63cf4..a7694a0a70 100644
--- a/yt/cpp/mapreduce/io/stream_table_reader.h
+++ b/yt/cpp/mapreduce/io/stream_table_reader.h
@@ -15,7 +15,10 @@ public:
: Stream_(stream)
{ }
- bool Retry(const TMaybe<ui32>& /* rangeIndex */, const TMaybe<ui64>& /* rowIndex */) override
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& /*rowIndex*/,
+ const std::exception_ptr& /*error*/) override
{
return false;
}
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
index 6204738e10..55f270a6b6 100644
--- a/yt/cpp/mapreduce/io/yamr_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
@@ -111,8 +111,8 @@ void TYaMRTableReader::ReadRow()
Input_.ResetRetries();
break;
- } catch (const std::exception& ) {
- if (!TLenvalTableReader::Retry()) {
+ } catch (const std::exception& ex) {
+ if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
throw;
}
}
@@ -132,8 +132,8 @@ void TYaMRTableReader::SkipRow()
ReadInteger(&value);
CheckedSkip(&Input_, value);
break;
- } catch (const std::exception& ) {
- if (!TLenvalTableReader::Retry()) {
+ } catch (const std::exception& ex) {
+ if (!TLenvalTableReader::Retry(std::make_exception_ptr(ex))) {
throw;
}
}