diff options
author | nadya73 <nadya73@yandex-team.com> | 2023-09-27 16:55:40 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2023-09-27 17:43:45 +0300 |
commit | 54ce98c42f381b12614dd327dc58a129a661c70b (patch) | |
tree | ad18b9dfa5638be96373f2aa460508ddaafc9302 /yt/cpp | |
parent | c437fa9f244a185589a18f4b97cb5ed8bf432876 (diff) | |
download | ydb-54ce98c42f381b12614dd327dc58a129a661c70b.tar.gz |
[yt/cpp/mapreduce] YT-19790: Fix RangeIndex for table readers with retries
Diffstat (limited to 'yt/cpp')
-rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/lenval_table_reader.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_row_table_reader.h | 1 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.cpp | 5 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.h | 1 |
8 files changed, 20 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp index 98274c7996..4dba2a1a86 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -159,7 +159,7 @@ ui32 TLenvalTableReader::GetTableIndex() const ui32 TLenvalTableReader::GetRangeIndex() const { CheckValidity(); - return RangeIndex_.GetOrElse(0); + return RangeIndex_.GetOrElse(0) + RangeIndexShift_; } ui64 TLenvalTableReader::GetRowIndex() const @@ -186,6 +186,9 @@ bool TLenvalTableReader::IsRawReaderExhausted() const bool TLenvalTableReader::PrepareRetry() { if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (RangeIndex_) { + RangeIndexShift_ += *RangeIndex_; + } RowIndex_.Clear(); RangeIndex_.Clear(); return true; diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h index 990fe0b756..cfece052c5 100644 --- a/yt/cpp/mapreduce/io/lenval_table_reader.h +++ b/yt/cpp/mapreduce/io/lenval_table_reader.h @@ -52,6 +52,7 @@ protected: ui32 TableIndex_ = 0; TMaybe<ui64> RowIndex_; TMaybe<ui32> RangeIndex_; + ui32 RangeIndexShift_ = 0; TMaybe<ui64> TabletIndex_; bool IsEndOfStream_ = false; bool AtStart_ = true; diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp index d39e1398a5..1170cbcbab 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.cpp +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -294,7 +294,7 @@ ui32 TNodeTableReader::GetTableIndex() const ui32 TNodeTableReader::GetRangeIndex() const { CheckValidity(); - return RangeIndex_.GetOrElse(0); + return RangeIndex_.GetOrElse(0) + RangeIndexShift_; } ui64 TNodeTableReader::GetRowIndex() const @@ -355,6 +355,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error YT_LOG_ERROR("Read error: %v", error); Exception_ = exception; if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (RangeIndex_) { + RangeIndexShift_ += *RangeIndex_; + } RowIndex_.Clear(); RangeIndex_.Clear(); PrepareParsing(); diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h index 4fe839eeb6..38cb440632 100644 --- a/yt/cpp/mapreduce/io/node_table_reader.h +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -71,6 +71,7 @@ private: ui32 TableIndex_ = 0; TMaybe<ui64> RowIndex_; TMaybe<ui32> RangeIndex_; + ui32 RangeIndexShift_ = 0; TMaybe<i64> TabletIndex_; bool IsEndOfStream_ = false; bool AtStart_ = true; diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp index 8da3b2da31..c0004564bf 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp @@ -42,6 +42,9 @@ bool TSkiffRowTableReader::Retry() bool TSkiffRowTableReader::PrepareRetry() { if (Input_.Retry(RangeIndex_, RowIndex_)) { + if (RangeIndex_) { + RangeIndexShift_ += *RangeIndex_; + } RowIndex_.Clear(); RangeIndex_.Clear(); BufferedInput_ = TBufferedInput(&Input_); @@ -188,7 +191,7 @@ ui32 TSkiffRowTableReader::GetTableIndex() const ui32 TSkiffRowTableReader::GetRangeIndex() const { CheckValidity(); - return RangeIndex_.GetOrElse(0); + return RangeIndex_.GetOrElse(0) + RangeIndexShift_; } ui64 TSkiffRowTableReader::GetRowIndex() const diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h index 368968266c..ed3c94c6c2 100644 --- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h @@ -59,6 +59,7 @@ private: TMaybe<ui64> RowIndex_; TMaybe<ui32> RangeIndex_; + ui32 RangeIndexShift_ = 0; ui32 TableIndex_ = 0; }; diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp index 51c20609f0..18a280911d 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -105,6 +105,9 @@ void TSkiffTableReader::Next() } BufferedInput_ = TBufferedInput(&Input_); Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_)); + if (RangeIndex_) { + RangeIndexShift_ += *RangeIndex_; + } RangeIndex_.Clear(); RowIndex_.Clear(); } @@ -120,7 +123,7 @@ ui32 TSkiffTableReader::GetTableIndex() const ui32 TSkiffTableReader::GetRangeIndex() const { EnsureValidity(); - return RangeIndex_.GetOrElse(0); + return RangeIndex_.GetOrElse(0) + RangeIndexShift_; } ui64 TSkiffTableReader::GetRowIndex() const diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h index 95ece5f9c7..c7614776f4 100644 --- a/yt/cpp/mapreduce/io/skiff_table_reader.h +++ b/yt/cpp/mapreduce/io/skiff_table_reader.h @@ -55,6 +55,7 @@ private: bool AfterKeySwitch_ = false; bool Finished_ = false; TMaybe<ui64> RangeIndex_; + ui64 RangeIndexShift_ = 0; TMaybe<ui64> RowIndex_; ui32 TableIndex_ = 0; }; |