aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2023-09-27 16:55:40 +0300
committernadya73 <nadya73@yandex-team.com>2023-09-27 17:43:45 +0300
commit54ce98c42f381b12614dd327dc58a129a661c70b (patch)
treead18b9dfa5638be96373f2aa460508ddaafc9302
parentc437fa9f244a185589a18f4b97cb5ed8bf432876 (diff)
downloadydb-54ce98c42f381b12614dd327dc58a129a661c70b.tar.gz
[yt/cpp/mapreduce] YT-19790: Fix RangeIndex for table readers with retries
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.cpp5
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.h1
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp5
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.h1
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.cpp5
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.h1
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp5
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.h1
8 files changed, 20 insertions, 4 deletions
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
index 98274c7996..4dba2a1a86 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
@@ -159,7 +159,7 @@ ui32 TLenvalTableReader::GetTableIndex() const
ui32 TLenvalTableReader::GetRangeIndex() const
{
CheckValidity();
- return RangeIndex_.GetOrElse(0);
+ return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
}
ui64 TLenvalTableReader::GetRowIndex() const
@@ -186,6 +186,9 @@ bool TLenvalTableReader::IsRawReaderExhausted() const
bool TLenvalTableReader::PrepareRetry()
{
if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (RangeIndex_) {
+ RangeIndexShift_ += *RangeIndex_;
+ }
RowIndex_.Clear();
RangeIndex_.Clear();
return true;
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h
index 990fe0b756..cfece052c5 100644
--- a/yt/cpp/mapreduce/io/lenval_table_reader.h
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.h
@@ -52,6 +52,7 @@ protected:
ui32 TableIndex_ = 0;
TMaybe<ui64> RowIndex_;
TMaybe<ui32> RangeIndex_;
+ ui32 RangeIndexShift_ = 0;
TMaybe<ui64> TabletIndex_;
bool IsEndOfStream_ = false;
bool AtStart_ = true;
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
index d39e1398a5..1170cbcbab 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -294,7 +294,7 @@ ui32 TNodeTableReader::GetTableIndex() const
ui32 TNodeTableReader::GetRangeIndex() const
{
CheckValidity();
- return RangeIndex_.GetOrElse(0);
+ return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
}
ui64 TNodeTableReader::GetRowIndex() const
@@ -355,6 +355,9 @@ void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error
YT_LOG_ERROR("Read error: %v", error);
Exception_ = exception;
if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (RangeIndex_) {
+ RangeIndexShift_ += *RangeIndex_;
+ }
RowIndex_.Clear();
RangeIndex_.Clear();
PrepareParsing();
diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h
index 4fe839eeb6..38cb440632 100644
--- a/yt/cpp/mapreduce/io/node_table_reader.h
+++ b/yt/cpp/mapreduce/io/node_table_reader.h
@@ -71,6 +71,7 @@ private:
ui32 TableIndex_ = 0;
TMaybe<ui64> RowIndex_;
TMaybe<ui32> RangeIndex_;
+ ui32 RangeIndexShift_ = 0;
TMaybe<i64> TabletIndex_;
bool IsEndOfStream_ = false;
bool AtStart_ = true;
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
index 8da3b2da31..c0004564bf 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
@@ -42,6 +42,9 @@ bool TSkiffRowTableReader::Retry()
bool TSkiffRowTableReader::PrepareRetry()
{
if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ if (RangeIndex_) {
+ RangeIndexShift_ += *RangeIndex_;
+ }
RowIndex_.Clear();
RangeIndex_.Clear();
BufferedInput_ = TBufferedInput(&Input_);
@@ -188,7 +191,7 @@ ui32 TSkiffRowTableReader::GetTableIndex() const
ui32 TSkiffRowTableReader::GetRangeIndex() const
{
CheckValidity();
- return RangeIndex_.GetOrElse(0);
+ return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
}
ui64 TSkiffRowTableReader::GetRowIndex() const
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
index 368968266c..ed3c94c6c2 100644
--- a/yt/cpp/mapreduce/io/skiff_row_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
@@ -59,6 +59,7 @@ private:
TMaybe<ui64> RowIndex_;
TMaybe<ui32> RangeIndex_;
+ ui32 RangeIndexShift_ = 0;
ui32 TableIndex_ = 0;
};
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
index 51c20609f0..18a280911d 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -105,6 +105,9 @@ void TSkiffTableReader::Next()
}
BufferedInput_ = TBufferedInput(&Input_);
Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
+ if (RangeIndex_) {
+ RangeIndexShift_ += *RangeIndex_;
+ }
RangeIndex_.Clear();
RowIndex_.Clear();
}
@@ -120,7 +123,7 @@ ui32 TSkiffTableReader::GetTableIndex() const
ui32 TSkiffTableReader::GetRangeIndex() const
{
EnsureValidity();
- return RangeIndex_.GetOrElse(0);
+ return RangeIndex_.GetOrElse(0) + RangeIndexShift_;
}
ui64 TSkiffTableReader::GetRowIndex() const
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h
index 95ece5f9c7..c7614776f4 100644
--- a/yt/cpp/mapreduce/io/skiff_table_reader.h
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.h
@@ -55,6 +55,7 @@ private:
bool AfterKeySwitch_ = false;
bool Finished_ = false;
TMaybe<ui64> RangeIndex_;
+ ui64 RangeIndexShift_ = 0;
TMaybe<ui64> RowIndex_;
ui32 TableIndex_ = 0;
};