diff options
author | snaury <snaury@ydb.tech> | 2023-09-22 11:08:14 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-09-22 11:43:59 +0300 |
commit | 65e63b14e494370b04ebfd6b79b111e4b18796bf (patch) | |
tree | 8b231e0beed169cfa7effcd32ec36a00b0c1c4a0 | |
parent | c314faa9c4e35c951df74582296dcdd479ab233c (diff) | |
download | ydb-65e63b14e494370b04ebfd6b79b111e4b18796bf.tar.gz |
Make it possible for erase cache to stitch erased ranges in read iterators KIKIMR-19276
-rw-r--r-- | ydb/core/tablet_flat/flat_iterator.h | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 66 | ||||
-rw-r--r-- | ydb/core/tx/datashard/read_iterator.h | 3 |
3 files changed, 58 insertions, 35 deletions
diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h index 15761132e41..3a262db8226 100644 --- a/ydb/core/tablet_flat/flat_iterator.h +++ b/ydb/core/tablet_flat/flat_iterator.h @@ -238,6 +238,7 @@ class TTableItBase : TNonCopyable { // way out. LastKey.assign(endKey.begin(), endKey.end()); LastKeyPage = {}; + LastKeyState = ERowOp::Erase; } return SkipTo(endKey, !inclusive); } @@ -295,8 +296,10 @@ public: } else if ((Ready = Apply()) != EReady::Data) { } else if (mode != ENext::Data || State.GetRowState() != ERowOp::Erase) { + InitLastKey(State.GetRowState()); break; } else { + InitLastKey(ERowOp::Erase); ++Stats.DeletedRowSkips; /* skip internal technical row states w/o data */ if (ErasedKeysCache && Stats.InvisibleRowSkips == SnapInvisibleRowSkips) { // Try to cache erases that are at a head version @@ -325,6 +328,10 @@ public: return State; } + ERowOp GetKeyState() const noexcept { + return LastKeyState; + } + bool IsUncommitted() const noexcept; ui64 GetUncommittedTxId() const noexcept; EReady SkipUncommitted() noexcept; @@ -343,6 +350,7 @@ private: TRowState State; TVector<TCell> LastKey; TSharedData LastKeyPage; + ERowOp LastKeyState = ERowOp::Absent; // RowVersion of a persistent snapshot that we are reading // By default iterator is initialized with the HEAD snapshot @@ -413,6 +421,7 @@ private: void ClearKey() { LastKey.clear(); LastKeyPage = {}; + LastKeyState = ERowOp::Absent; } // ITERATORS STORAGE @@ -439,6 +448,7 @@ private: EReady Snap(TRowVersion rowVersion) noexcept; EReady DoSkipUncommitted() noexcept; EReady Apply() noexcept; + void InitLastKey(ERowOp op) noexcept; void AddReadyIterator(TArrayRef<const TCell> key, TIteratorId itId); void AddNotReadyIterator(TIteratorId itId); @@ -783,6 +793,8 @@ inline EReady TTableItBase<TIteratorOps>::Snap() noexcept return EReady::Data; case EReady::Gone: + InitLastKey(ERowOp::Absent); + ++Stats.DeletedRowSkips; Stage = EStage::Turn; return EReady::Data; @@ -941,6 +953,15 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept return EReady::Page; } + Stage = EStage::Done; + return EReady::Data; +} + +template<class TIteratorOps> +inline void TTableItBase<TIteratorOps>::InitLastKey(ERowOp op) noexcept +{ + TArrayRef<const TCell> key = Iterators.back().Key; + LastKey.assign(key.begin(), key.end()); TIteratorId ai = Iterators.back().IteratorId; @@ -963,8 +984,7 @@ inline EReady TTableItBase<TIteratorOps>::Apply() noexcept } } - Stage = EStage::Done; - return EReady::Data; + LastKeyState = op; } template<class TIteratorOps> diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index d252b0eb4be..1bce43b671a 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -314,6 +314,7 @@ class TReader { ui32 FirstUnprocessedQuery; // must be unsigned TString LastProcessedKey; + bool LastProcessedKeyErased = false; ui64 RowsRead = 0; ui64 RowsProcessed = 0; @@ -371,17 +372,17 @@ public: if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) { if (!State.Reverse) { keyFromCells = TSerializedCellVec(State.LastProcessedKey); - fromInclusive = false; + fromInclusive = State.LastProcessedKeyErased; keyToCells = range.To; toInclusive = range.ToInclusive; } else { // reverse keyFromCells = range.From; - fromInclusive = true; + fromInclusive = range.FromInclusive; keyToCells = TSerializedCellVec(State.LastProcessedKey); - toInclusive = false; + toInclusive = State.LastProcessedKeyErased; } } else { keyFromCells = range.From; @@ -755,6 +756,7 @@ public: state.TotalRows += RowsRead; state.FirstUnprocessedQuery = FirstUnprocessedQuery; state.LastProcessedKey = LastProcessedKey; + state.LastProcessedKeyErased = LastProcessedKeyErased; if (sentResult) { state.ConsumeSeqNo(RowsRead, BytesInResult); } @@ -865,53 +867,57 @@ private: EReadStatus IterateRange(TIterator* iter, const TActorContext& ctx) { Y_UNUSED(ctx); - bool stoppedByLimit = false; auto keyAccessSampler = Self->GetKeyAccessSampler(); bool advanced = false; - while (iter->Next(CanResume() ? NTable::ENext::All : NTable::ENext::Data) == NTable::EReady::Data) { + while (iter->Next(NTable::ENext::Data) == NTable::EReady::Data) { advanced = true; DeletedRowSkips += iter->Stats.DeletedRowSkips; InvisibleRowSkips += iter->Stats.InvisibleRowSkips; TDbTupleRef rowKey = iter->GetKey(); - // Note: we sample and count deleted keys too keyAccessSampler->AddSample(TableId, rowKey.Cells()); const ui64 processedRecords = 1 + ResetRowSkips(iter->Stats); RowsSinceLastCheck += processedRecords; RowsProcessed += processedRecords; - // Note: we skip deleted rows - if (iter->Row().GetRowState() != NTable::ERowOp::Erase) { - TDbTupleRef rowValues = iter->GetValues(); + TDbTupleRef rowValues = iter->GetValues(); - // note that if user requests key columns then they will be in - // rowValues and we don't have to add rowKey columns - BlockBuilder.AddRow(TDbTupleRef(), rowValues); - ++RowsRead; + // note that if user requests key columns then they will be in + // rowValues and we don't have to add rowKey columns + BlockBuilder.AddRow(TDbTupleRef(), rowValues); + ++RowsRead; - if (ReachedTotalRowsLimit()) { - break; - } - } else { - ++DeletedRowSkips; + if (ReachedTotalRowsLimit()) { + LastProcessedKey.clear(); + return EReadStatus::Done; } if (ShouldStop()) { - stoppedByLimit = true; - break; - } - } - - if (auto key = iter->GetKey().Cells()) { - LastProcessedKey = TSerializedCellVec::Serialize(key); + LastProcessedKey = TSerializedCellVec::Serialize(rowKey.Cells()); + LastProcessedKeyErased = false; + return EReadStatus::StoppedByLimit; + } + } + + // Note: when stopping due to page faults after an erased row we will + // reposition on that same row so erase cache can extend that cached + // erased range. When we don't observe any user-visible rows before a + // page fault we want to make sure we observe multiple deleted rows, + // which must be at least 2 (because we may resume from a known deleted + // row). When there are not enough rows we would prefer restarting in + // the same transaction, instead of starting a new one, in which case + // we will not update stats and will not update RowsProcessed. + auto lastKey = iter->GetKey().Cells(); + if (lastKey && (advanced || iter->Stats.DeletedRowSkips >= 4) && iter->Last() == NTable::EReady::Page) { + LastProcessedKey = TSerializedCellVec::Serialize(lastKey); + LastProcessedKeyErased = iter->GetKeyState() == NTable::ERowOp::Erase; advanced = true; + } else { + LastProcessedKey.clear(); } - if (stoppedByLimit) - return EReadStatus::StoppedByLimit; - // last iteration to Page or Gone might also have deleted or invisible rows if (advanced || iter->Last() != NTable::EReady::Page) { DeletedRowSkips += iter->Stats.DeletedRowSkips; @@ -927,10 +933,6 @@ private: return EReadStatus::NeedData; } - // range fully read, no reason to keep LastProcessedKey - if (iter->Last() == NTable::EReady::Gone) - LastProcessedKey.clear(); - return EReadStatus::Done; } diff --git a/ydb/core/tx/datashard/read_iterator.h b/ydb/core/tx/datashard/read_iterator.h index 38d0b082ea0..17e61f916c1 100644 --- a/ydb/core/tx/datashard/read_iterator.h +++ b/ydb/core/tx/datashard/read_iterator.h @@ -208,7 +208,8 @@ public: ui64 SeqNo = 0; ui64 LastAckSeqNo = 0; ui32 FirstUnprocessedQuery = 0; - TString LastProcessedKey = 0; + TString LastProcessedKey; + bool LastProcessedKeyErased = false; // Orbit used for tracking progress NLWTrace::TOrbit Orbit; |