aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-09-22 11:08:14 +0300
committersnaury <snaury@ydb.tech>2023-09-22 11:43:59 +0300
commit65e63b14e494370b04ebfd6b79b111e4b18796bf (patch)
tree8b231e0beed169cfa7effcd32ec36a00b0c1c4a0
parentc314faa9c4e35c951df74582296dcdd479ab233c (diff)
downloadydb-65e63b14e494370b04ebfd6b79b111e4b18796bf.tar.gz
Make it possible for erase cache to stitch erased ranges in read iterators KIKIMR-19276
-rw-r--r--ydb/core/tablet_flat/flat_iterator.h24
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp66
-rw-r--r--ydb/core/tx/datashard/read_iterator.h3
3 files changed, 58 insertions, 35 deletions
diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h
index 15761132e41..3a262db8226 100644
--- a/ydb/core/tablet_flat/flat_iterator.h
+++ b/ydb/core/tablet_flat/flat_iterator.h
@@ -238,6 +238,7 @@ class TTableItBase : TNonCopyable {
// way out.
LastKey.assign(endKey.begin(), endKey.end());
LastKeyPage = {};
+ LastKeyState = ERowOp::Erase;
}
return SkipTo(endKey, !inclusive);
}
@@ -295,8 +296,10 @@ public:
} else if ((Ready = Apply()) != EReady::Data) {
} else if (mode != ENext::Data || State.GetRowState() != ERowOp::Erase) {
+ InitLastKey(State.GetRowState());
break;
} else {
+ InitLastKey(ERowOp::Erase);
++Stats.DeletedRowSkips; /* skip internal technical row states w/o data */
if (ErasedKeysCache && Stats.InvisibleRowSkips == SnapInvisibleRowSkips) {
// Try to cache erases that are at a head version
@@ -325,6 +328,10 @@ public:
return State;
}
+ ERowOp GetKeyState() const noexcept {
+ return LastKeyState;
+ }
+
bool IsUncommitted() const noexcept;
ui64 GetUncommittedTxId() const noexcept;
EReady SkipUncommitted() noexcept;
@@ -343,6 +350,7 @@ private:
TRowState State;
TVector<TCell> LastKey;
TSharedData LastKeyPage;
+ ERowOp LastKeyState = ERowOp::Absent;
// RowVersion of a persistent snapshot that we are reading
// By default iterator is initialized with the HEAD snapshot
@@ -413,6 +421,7 @@ private:
void ClearKey() {
LastKey.clear();
LastKeyPage = {};
+ LastKeyState = ERowOp::Absent;
}
// ITERATORS STORAGE
@@ -439,6 +448,7 @@ private:
EReady Snap(TRowVersion rowVersion) noexcept;
EReady DoSkipUncommitted() noexcept;
EReady Apply() noexcept;
+ void InitLastKey(ERowOp op) noexcept;
void AddReadyIterator(TArrayRef<const TCell> key, TIteratorId itId);
void AddNotReadyIterator(TIteratorId itId);
@@ -783,6 +793,8 @@ inline EReady TTableItBase<TIteratorOps>::Snap() noexcept
return EReady::Data;
case EReady::Gone:
+ InitLastKey(ERowOp::Absent);
+ ++Stats.DeletedRowSkips;
Stage = EStage::Turn;
return EReady::Data;
@@ -941,6 +953,15 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept
return EReady::Page;
}
+ Stage = EStage::Done;
+ return EReady::Data;
+}
+
+template<class TIteratorOps>
+inline void TTableItBase<TIteratorOps>::InitLastKey(ERowOp op) noexcept
+{
+ TArrayRef<const TCell> key = Iterators.back().Key;
+
LastKey.assign(key.begin(), key.end());
TIteratorId ai = Iterators.back().IteratorId;
@@ -963,8 +984,7 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept
}
}
- Stage = EStage::Done;
- return EReady::Data;
+ LastKeyState = op;
}
template<class TIteratorOps>
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index d252b0eb4be..1bce43b671a 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -314,6 +314,7 @@ class TReader {
ui32 FirstUnprocessedQuery; // must be unsigned
TString LastProcessedKey;
+ bool LastProcessedKeyErased = false;
ui64 RowsRead = 0;
ui64 RowsProcessed = 0;
@@ -371,17 +372,17 @@ public:
if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
if (!State.Reverse) {
keyFromCells = TSerializedCellVec(State.LastProcessedKey);
- fromInclusive = false;
+ fromInclusive = State.LastProcessedKeyErased;
keyToCells = range.To;
toInclusive = range.ToInclusive;
} else {
// reverse
keyFromCells = range.From;
- fromInclusive = true;
+ fromInclusive = range.FromInclusive;
keyToCells = TSerializedCellVec(State.LastProcessedKey);
- toInclusive = false;
+ toInclusive = State.LastProcessedKeyErased;
}
} else {
keyFromCells = range.From;
@@ -755,6 +756,7 @@ public:
state.TotalRows += RowsRead;
state.FirstUnprocessedQuery = FirstUnprocessedQuery;
state.LastProcessedKey = LastProcessedKey;
+ state.LastProcessedKeyErased = LastProcessedKeyErased;
if (sentResult) {
state.ConsumeSeqNo(RowsRead, BytesInResult);
}
@@ -865,53 +867,57 @@ private:
EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx) {
Y_UNUSED(ctx);
- bool stoppedByLimit = false;
auto keyAccessSampler = Self->GetKeyAccessSampler();
bool advanced = false;
- while (iter->Next(CanResume() ? NTable::ENext::All : NTable::ENext::Data) == NTable::EReady::Data) {
+ while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) {
advanced = true;
DeletedRowSkips += iter->Stats.DeletedRowSkips;
InvisibleRowSkips += iter->Stats.InvisibleRowSkips;
TDbTupleRef rowKey = iter->GetKey();
- // Note: we sample and count deleted keys too
keyAccessSampler->AddSample(TableId, rowKey.Cells());
const ui64 processedRecords = 1 + ResetRowSkips(iter->Stats);
RowsSinceLastCheck += processedRecords;
RowsProcessed += processedRecords;
- // Note: we skip deleted rows
- if (iter->Row().GetRowState() != NTable::ERowOp::Erase) {
- TDbTupleRef rowValues = iter->GetValues();
+ TDbTupleRef rowValues = iter->GetValues();
- // note that if user requests key columns then they will be in
- // rowValues and we don't have to add rowKey columns
- BlockBuilder.AddRow(TDbTupleRef(), rowValues);
- ++RowsRead;
+ // note that if user requests key columns then they will be in
+ // rowValues and we don't have to add rowKey columns
+ BlockBuilder.AddRow(TDbTupleRef(), rowValues);
+ ++RowsRead;
- if (ReachedTotalRowsLimit()) {
- break;
- }
- } else {
- ++DeletedRowSkips;
+ if (ReachedTotalRowsLimit()) {
+ LastProcessedKey.clear();
+ return EReadStatus::Done;
}
if (ShouldStop()) {
- stoppedByLimit = true;
- break;
- }
- }
-
- if (auto key = iter->GetKey().Cells()) {
- LastProcessedKey = TSerializedCellVec::Serialize(key);
+ LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells());
+ LastProcessedKeyErased = false;
+ return EReadStatus::StoppedByLimit;
+ }
+ }
+
+ // Note: when stopping due to page faults after an erased row we will
+ // reposition on that same row so erase cache can extend that cached
+ // erased range. When we don't observe any user-visible rows before a
+ // page fault we want to make sure we observe multiple deleted rows,
+ // which must be at least 2 (because we may resume from a known deleted
+ // row). When there are not enough rows we would prefer restarting in
+ // the same transaction, instead of starting a new one, in which case
+ // we will not update stats and will not update RowsProcessed.
+ auto lastKey = iter->GetKey().Cells();
+ if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) {
+ LastProcessedKey = TSerializedCellVec::Serialize(lastKey);
+ LastProcessedKeyErased = iter->GetKeyState() == NTable::ERowOp::Erase;
advanced = true;
+ } else {
+ LastProcessedKey.clear();
}
- if (stoppedByLimit)
- return EReadStatus::StoppedByLimit;
-
// last iteration to Page or Gone might also have deleted or invisible rows
if (advanced || iter->Last() != NTable::EReady::Page) {
DeletedRowSkips += iter->Stats.DeletedRowSkips;
@@ -927,10 +933,6 @@ private:
return EReadStatus::NeedData;
}
- // range fully read, no reason to keep LastProcessedKey
- if (iter->Last() == NTable::EReady::Gone)
- LastProcessedKey.clear();
-
return EReadStatus::Done;
}
diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h
index 38d0b082ea0..17e61f916c1 100644
--- a/ydb/core/tx/datashard/read_iterator.h
+++ b/ydb/core/tx/datashard/read_iterator.h
@@ -208,7 +208,8 @@ public:
ui64 SeqNo = 0;
ui64 LastAckSeqNo = 0;
ui32 FirstUnprocessedQuery = 0;
- TString LastProcessedKey = 0;
+ TString LastProcessedKey;
+ bool LastProcessedKeyErased = false;
// Orbit used for tracking progress
NLWTrace::TOrbit Orbit;