diff options
| author | Alexey Borzenkov <[email protected]> | 2022-05-23 18:47:00 +0300 |
|---|---|---|
| committer | Alexey Borzenkov <[email protected]> | 2022-05-23 18:47:00 +0300 |
| commit | eb3bd69695ba3886b67fa5a62fb3b42eba78dfd0 (patch) | |
| tree | 393bdc0ea819168b0e25bb052158af3023c750d3 | |
| parent | 6cf6c53ec85d3bf00e65d2f5f9cedc7ba419fddf (diff) | |
Add ITransactionObserver for observing skipped transactions during iteration, KIKIMR-14732
ref:bc1c5e00557c227d88d9971ce247fa076cf41934
| -rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 74 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 18 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_iterator.h | 23 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_mem_iter.h | 16 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_part_iface.h | 1 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_part_iter_multi.h | 29 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 69 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 27 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_table_committed.h | 60 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/flat_table_stats.h | 8 | ||||
| -rw-r--r-- | ydb/core/tablet_flat/test/libs/table/wrap_part.h | 3 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 2 |
14 files changed, 213 insertions, 121 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp index 36c35f32cc4..7c0ad77aa07 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.cpp +++ b/ydb/core/engine/minikql/minikql_engine_host.cpp @@ -299,7 +299,7 @@ NUdf::TUnboxedValue TEngineHost::SelectRow(const TTableId& tableId, const TArray ui64 flags = Settings.DisableByKeyFilter ? (ui64)NTable::NoByKey : 0; const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId)); - Counters.InvisibleRowSkips += stats.Invisible; + Counters.InvisibleRowSkips += stats.InvisibleRowSkips; if (NTable::EReady::Page == ready) { if (auto collector = GetChangeCollector(tableId)) { diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 90e3f079381..45cca5a0771 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -74,13 +74,14 @@ TAutoPtr<TTableIt> TDatabase::Iterate(ui32 table, TRawVals key, TTagsRef tags, E TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); IteratedTables.insert(table); - auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible); + auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible, observer); // N.B. ESeek::Exact produces iterators with limit=1 @@ -89,7 +90,8 @@ TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef ta TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); @@ -97,7 +99,7 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T ESeek seek = !range.MinKey || range.MinInclusive ? ESeek::Lower : ESeek::Upper; - auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible); + auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible, observer); if (range.MaxKey) { TCelled maxKey(range.MaxKey, *iter->Scheme->Keys, false); @@ -114,7 +116,8 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); @@ -122,7 +125,7 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR ESeek seek = !range.MaxKey || range.MaxInclusive ? ESeek::Lower : ESeek::Upper; - auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible); + auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible, observer); if (range.MinKey) { TCelled minKey(range.MinKey, *iter->Scheme->Keys, false); @@ -140,51 +143,50 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR template<> TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { - return IterateRange(table, range, tags, snapshot, visible); + return IterateRange(table, range, tags, snapshot, visible, observer); } template<> TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { - return IterateRangeReverse(table, range, tags, snapshot, visible); + return IterateRangeReverse(table, range, tags, snapshot, visible, observer); } EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { - TempIterators.clear(); - Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); - auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible); - Change->Stats.SelectSieved += res.Sieved; - Change->Stats.SelectWeeded += res.Weeded; - Change->Stats.SelectNoKey += res.NoKey; - Change->Stats.SelectInvisible += res.Invisible; - return res.Ready; + TSelectStats stats; + return Select(table, key, tags, row, stats, flg, snapshot, visible, observer); } EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { TempIterators.clear(); Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); - auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible); - Change->Stats.SelectSieved += res.Sieved; - Change->Stats.SelectWeeded += res.Weeded; - Change->Stats.SelectNoKey += res.NoKey; - Change->Stats.SelectInvisible += res.Invisible; - stats.Sieved += res.Sieved; - stats.Weeded += res.Weeded; - stats.NoKey += res.NoKey; - stats.Invisible += res.Invisible; + auto prevSieved = stats.Sieved; + auto prevWeeded = stats.Weeded; + auto prevNoKey = stats.NoKey; + auto prevInvisible = stats.InvisibleRowSkips; + + auto ready = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, stats, visible, observer); + Change->Stats.SelectSieved += stats.Sieved - prevSieved; + Change->Stats.SelectWeeded += stats.Weeded - prevWeeded; + Change->Stats.SelectNoKey += stats.NoKey - prevNoKey; + Change->Stats.SelectInvisible += stats.InvisibleRowSkips - prevInvisible; - return res.Ready; + return ready; } void TDatabase::CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TRawVals maxKey, @@ -192,7 +194,8 @@ void TDatabase::CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TR EDirection direction, TRowVersion snapshot) { Y_VERIFY(!NoMoreReadsFlag, "Trying to do precharge after reads prohibited, table %u", table); - Require(table)->Precharge(minKey, maxKey, tags, &env, flg, items, bytes, direction, snapshot); + TSelectStats stats; + Require(table)->Precharge(minKey, maxKey, tags, &env, flg, items, bytes, direction, snapshot, stats); } bool TDatabase::Precharge(ui32 table, TRawVals minKey, TRawVals maxKey, @@ -200,10 +203,11 @@ bool TDatabase::Precharge(ui32 table, TRawVals minKey, TRawVals maxKey, EDirection direction, TRowVersion snapshot) { Y_VERIFY(!NoMoreReadsFlag, "Trying to do precharge after reads prohibited, table %u", table); - auto res = Require(table)->Precharge(minKey, maxKey, tags, Env, flg, items, bytes, direction, snapshot); - Change->Stats.ChargeSieved += res.Sieved; - Change->Stats.ChargeWeeded += res.Weeded; - return res.Ready == EReady::Data; + TSelectStats stats; + auto ready = Require(table)->Precharge(minKey, maxKey, tags, Env, flg, items, bytes, direction, snapshot, stats); + Change->Stats.ChargeSieved += stats.Sieved; + Change->Stats.ChargeWeeded += stats.Weeded; + return ready == EReady::Data; } void TDatabase::Update(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUpdateOp> ops, TRowVersion rowVersion) diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 8fa72b959c2..ee73c8300a0 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -70,27 +70,33 @@ public: TAutoPtr<TTableIt> Iterate(ui32 table, TRawVals key, TTagsRef tags, ELookup) const noexcept; TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; template<class TIteratorType> TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; // NOTE: the row refeneces data in some internal buffers that get invalidated on the next Select() or Commit() call EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, TSelectStats& stats, ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(), - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; bool Precharge(ui32 table, TRawVals minKey, TRawVals maxKey, TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit, diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h index e041b397a49..4a4e44917cd 100644 --- a/ydb/core/tablet_flat/flat_iterator.h +++ b/ydb/core/tablet_flat/flat_iterator.h @@ -22,11 +22,6 @@ enum class ENext { Uncommitted, }; -struct TIteratorStats { - ui64 DeletedRowSkips = 0; - ui64 InvisibleRowSkips = 0; -}; - template<class TIteratorOps> class TTableItBase : TNonCopyable { enum class EType : ui8 { @@ -238,7 +233,8 @@ public: TTableItBase( const TRowScheme* scheme, TTagsRef tags, ui64 lim = Max<ui64>(), TRowVersion snapshot = TRowVersion::Max(), - NTable::ITransactionMapPtr committedTransactions = nullptr); + NTable::ITransactionMapPtr committedTransactions = nullptr, + NTable::ITransactionObserverPtr transactionObserver = nullptr); ~TTableItBase(); @@ -342,6 +338,9 @@ private: // A map of currently committed transactions to corresponding row versions const NTable::ITransactionMapPtr CommittedTransactions; + // A transaction observer for detecting skips + const NTable::ITransactionObserverPtr TransactionObserver; + EStage Stage = EStage::Seek; EReady Ready = EReady::Gone; THolderVector<TMemIt> MemIters; @@ -458,13 +457,15 @@ template<class TIteratorOps> inline TTableItBase<TIteratorOps>::TTableItBase( const TRowScheme* scheme, TTagsRef tags, ui64 limit, TRowVersion snapshot, - NTable::ITransactionMapPtr committedTransactions) + NTable::ITransactionMapPtr committedTransactions, + NTable::ITransactionObserverPtr transactionObserver) : Scheme(scheme) , Remap(*Scheme, tags) , Limit(limit) , State(Remap.Size()) , SnapshotVersion(snapshot) , CommittedTransactions(std::move(committedTransactions)) + , TransactionObserver(std::move(transactionObserver)) , Comparator(Scheme->Keys->Types) , Active(Iterators.end()) , Inactive(Iterators.end()) @@ -781,16 +782,14 @@ inline EReady TTableItBase<TIteratorOps>::Snap(TRowVersion rowVersion) noexcept TIteratorId ai = i->IteratorId; switch (ai.Type) { case EType::Mem: { - auto ready = MemIters[ai.Index]->SkipToRowVersion(rowVersion, CommittedTransactions); - Stats.InvisibleRowSkips += std::exchange(MemIters[ai.Index]->InvisibleRowSkips, 0); + auto ready = MemIters[ai.Index]->SkipToRowVersion(rowVersion, Stats, CommittedTransactions, TransactionObserver); if (ready) { return EReady::Data; } break; } case EType::Run: { - auto ready = RunIters[ai.Index]->SkipToRowVersion(rowVersion, CommittedTransactions); - Stats.InvisibleRowSkips += std::exchange(RunIters[ai.Index]->InvisibleRowSkips, 0); + auto ready = RunIters[ai.Index]->SkipToRowVersion(rowVersion, Stats, CommittedTransactions, TransactionObserver); if (ready == EReady::Data) { return EReady::Data; } else if (ready != EReady::Gone) { @@ -823,6 +822,7 @@ inline EReady TTableItBase<TIteratorOps>::DoSkipUncommitted() noexcept case EType::Mem: { auto& it = *MemIters[ai.Index]; Y_VERIFY_DEBUG(it.IsDelta() && !CommittedTransactions.Find(it.GetDeltaTxId())); + TransactionObserver.OnSkipUncommitted(it.GetDeltaTxId()); if (it.SkipDelta()) { return EReady::Data; } @@ -831,6 +831,7 @@ inline EReady TTableItBase<TIteratorOps>::DoSkipUncommitted() noexcept case EType::Run: { auto& it = *RunIters[ai.Index]; Y_VERIFY_DEBUG(it.IsDelta() && !CommittedTransactions.Find(it.GetDeltaTxId())); + TransactionObserver.OnSkipUncommitted(it.GetDeltaTxId()); auto ready = it.SkipDelta(); if (ready != EReady::Gone) { return ready; diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h index 462416e7430..a7059f68135 100644 --- a/ydb/core/tablet_flat/flat_mem_iter.h +++ b/ydb/core/tablet_flat/flat_mem_iter.h @@ -198,6 +198,8 @@ namespace NTable { ApplyColumn(row, up); } } + } else { + // FIXME: add uncommitted tx to stats? } if (!isDelta) { break; @@ -225,7 +227,9 @@ namespace NTable { * Returns false if there is no such version, e.g. current key did not * exist or didn't have any known updates at this rowVersion. */ - bool SkipToRowVersion(TRowVersion rowVersion, NTable::ITransactionMapSimplePtr committedTransactions) noexcept + bool SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats, + NTable::ITransactionMapSimplePtr committedTransactions, + NTable::ITransactionObserverSimplePtr transactionObserver) noexcept { Y_VERIFY_DEBUG(IsValid(), "Attempt to access an invalid row"); @@ -234,6 +238,7 @@ namespace NTable { // Skip uncommitted deltas while (chain->RowVersion.Step == Max<ui64>() && !committedTransactions.Find(chain->RowVersion.TxId)) { + transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId); if (!(chain = chain->Next)) { CurrentVersion = nullptr; return false; @@ -254,7 +259,7 @@ namespace NTable { } } - InvisibleRowSkips++; + stats.InvisibleRowSkips++; while ((chain = chain->Next)) { if (chain->RowVersion.Step != Max<ui64>()) { @@ -263,7 +268,7 @@ namespace NTable { return true; } - InvisibleRowSkips++; + stats.InvisibleRowSkips++; } else { auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId); if (commitVersion && *commitVersion <= rowVersion) { @@ -272,7 +277,9 @@ namespace NTable { } if (commitVersion) { // Only committed deltas increment InvisibleRowSkips - InvisibleRowSkips++; + stats.InvisibleRowSkips++; + } else { + transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId); } } } @@ -352,7 +359,6 @@ namespace NTable { const TIntrusiveConstPtr<TKeyCellDefaults> KeyCellDefaults; const TRemap* Remap = nullptr; IPages * const Env = nullptr; - ui64 InvisibleRowSkips = 0; private: NMem::TTreeIterator RowIt; diff --git a/ydb/core/tablet_flat/flat_part_iface.h b/ydb/core/tablet_flat/flat_part_iface.h index 9be72fc735a..03882305a50 100644 --- a/ydb/core/tablet_flat/flat_part_iface.h +++ b/ydb/core/tablet_flat/flat_part_iface.h @@ -2,6 +2,7 @@ #include "flat_page_iface.h" #include "flat_sausage_solid.h" +#include "flat_table_stats.h" #include "flat_row_eggs.h" #include "util_basics.h" diff --git a/ydb/core/tablet_flat/flat_part_iter_multi.h b/ydb/core/tablet_flat/flat_part_iter_multi.h index 4ca3aef2734..607b94aea1a 100644 --- a/ydb/core/tablet_flat/flat_part_iter_multi.h +++ b/ydb/core/tablet_flat/flat_part_iter_multi.h @@ -818,8 +818,9 @@ namespace NTable { } } - EReady SkipToRowVersion(TRowVersion rowVersion, - NTable::ITransactionMapSimplePtr committedTransactions) noexcept + EReady SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats, + NTable::ITransactionMapSimplePtr committedTransactions, + NTable::ITransactionObserverSimplePtr transactionObserver) noexcept { Y_VERIFY_DEBUG(Main.IsValid(), "Attempt to use an invalid iterator"); @@ -832,7 +833,7 @@ namespace NTable { if (rowVersion < Part->MinRowVersion) { // Don't bother seeking below the known first row version - InvisibleRowSkips++; + stats.InvisibleRowSkips++; return EReady::Gone; } } @@ -852,7 +853,10 @@ namespace NTable { } if (commitVersion) { // Skipping a newer committed delta - InvisibleRowSkips++; + stats.InvisibleRowSkips++; + } else { + // Skipping an uncommitted delta + transactionObserver.OnSkipUncommitted(txId); } data = Main.GetRecord()->GetAltRecord(++SkipMainDeltas); if (!data) { @@ -869,7 +873,7 @@ namespace NTable { return EReady::Data; } SkipEraseVersion = true; - InvisibleRowSkips++; + stats.InvisibleRowSkips++; } TRowVersion current = data->IsVersioned() ? data->GetMinVersion(info) : Part->MinRowVersion; @@ -878,7 +882,7 @@ namespace NTable { return EReady::Data; } - InvisibleRowSkips++; + stats.InvisibleRowSkips++; if (!data->HasHistory()) { // There is no history, reset @@ -1039,6 +1043,8 @@ namespace NTable { if (row.IsFinalized()) { return; } + } else { + // FIXME: add uncommitted tx to stats? } // Skip deltas until the row is finalized data = Main.GetRecord()->GetAltRecord(++index); @@ -1168,7 +1174,6 @@ namespace NTable { public: const TPart* const Part; IPages* const Env; - ui64 InvisibleRowSkips = 0; private: const TPinout Pinout; @@ -1493,13 +1498,12 @@ namespace NTable { return CurrentIt->GetRowVersion(); } - EReady SkipToRowVersion( - TRowVersion rowVersion, - NTable::ITransactionMapSimplePtr committedTransactions) noexcept + EReady SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats, + NTable::ITransactionMapSimplePtr committedTransactions, + NTable::ITransactionObserverSimplePtr transactionObserver) noexcept { Y_VERIFY_DEBUG(CurrentIt); - auto ready = CurrentIt->SkipToRowVersion(rowVersion, committedTransactions); - InvisibleRowSkips += std::exchange(CurrentIt->InvisibleRowSkips, 0); + auto ready = CurrentIt->SkipToRowVersion(rowVersion, stats, committedTransactions, transactionObserver); return ready; } @@ -1584,7 +1588,6 @@ namespace NTable { TTagsRef const Tags; TIntrusiveConstPtr<TKeyCellDefaults> const KeyCellDefaults; IPages* const Env; - ui64 InvisibleRowSkips = 0; private: TRun::const_iterator Current; diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index 78c8a4b5d0e..d5ce3ee8810 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -561,13 +561,13 @@ void TTable::AddSafe(TPartView partView) } } -TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags, - IPages* env, ui64 flg, - ui64 items, ui64 bytes, - EDirection direction, - TRowVersion snapshot) const +EReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags, + IPages* env, ui64 flg, + ui64 items, ui64 bytes, + EDirection direction, + TRowVersion snapshot, + TSelectStats& stats) const { - TReady res; bool ready = true; bool includeHistory = !snapshot.IsMax(); @@ -595,9 +595,9 @@ TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef ta ready &= TCharge(env, *pos->Part, tags, includeHistory) .Do(key, key, row1, row2, *Scheme->Keys, items, bytes) .Ready; - ++res.Sieved; + ++stats.Sieved; } else { - ++res.Weeded; + ++stats.Weeded; } } } @@ -617,8 +617,7 @@ TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef ta } } - res.Ready = ready ? EReady::Data : EReady::Page; - return res; + return ready ? EReady::Data : EReady::Page; } void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, TRowVersion rowVersion) @@ -683,7 +682,8 @@ TMemTable& TTable::MemTable() TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts"); @@ -691,7 +691,8 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>(); TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot, - TMergedTransactionMap::Create(visible, CommittedTransactions))); + TMergedTransactionMap::Create(visible, CommittedTransactions), + observer)); if (Mutable) { dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward)); @@ -724,7 +725,8 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot, - const ITransactionMapPtr& visible) const noexcept + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts"); @@ -732,7 +734,8 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>(); TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot, - TMergedTransactionMap::Create(visible, CommittedTransactions))); + TMergedTransactionMap::Create(visible, CommittedTransactions), + observer)); if (Mutable) { dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse)); @@ -763,10 +766,12 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I return dbIter; } -TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row, - ui64 flg, TRowVersion snapshot, - TDeque<TPartSimpleIt>& tempIterators, - const ITransactionMapPtr& visible) const noexcept +EReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row, + ui64 flg, TRowVersion snapshot, + TDeque<TPartSimpleIt>& tempIterators, + TSelectStats& stats, + const ITransactionMapPtr& visible, + const ITransactionObserverPtr& observer) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot select with cold parts"); Y_VERIFY(key_.size() == Scheme->Keys->Types.size()); @@ -780,8 +785,6 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta for (auto &pin: remap.KeyPins()) row.Set(pin.Pos, { ECellOp::Set, ELargeObj::Inline }, key[pin.Key]); - TReady result; - const NBloom::TPrefix prefix(key); TEpoch lastEpoch = TEpoch::Max(); @@ -789,16 +792,17 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta bool snapshotFound = (snapshot == TRowVersion::Max()); auto committed = TMergedTransactionMap::Create(visible, CommittedTransactions); + const auto prevInvisibleRowSkips = stats.InvisibleRowSkips; + // Mutable has the newest data if (Mutable) { lastEpoch = Mutable->Epoch; if (auto it = TMemIt::Make(*Mutable, Mutable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) { - if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) { + if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) { // N.B. stop looking for snapshot after the first hit snapshotFound = true; it->Apply(row, committed); } - result.Invisible += it->InvisibleRowSkips; } } @@ -808,12 +812,11 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta Y_VERIFY(lastEpoch > memTable->Epoch, "Ordering of epochs is incorrect"); lastEpoch = memTable->Epoch; if (auto it = TMemIt::Make(*memTable, memTable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) { - if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) { + if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) { // N.B. stop looking for snapshot after the first hit snapshotFound = true; it->Apply(row, committed); } - result.Invisible += it->InvisibleRowSkips; } } @@ -827,7 +830,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta if ((flg & EHint::NoByKey) || part->MightHaveKey(prefix.Get(part->Scheme->Groups[0].KeyTypes.size()))) { - ++result.Sieved; + ++stats.Sieved; TPartSimpleIt& it = tempIterators.emplace_back(part, tags, Scheme->Keys, env); it.SetBounds(pos->Slice); auto res = it.Seek(key, ESeek::Exact); @@ -835,8 +838,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta Y_VERIFY(lastEpoch > part->Epoch, "Ordering of epochs is incorrect"); lastEpoch = part->Epoch; if (!snapshotFound) { - res = it.SkipToRowVersion(snapshot, committed); - result.Invisible += std::exchange(it.InvisibleRowSkips, 0); + res = it.SkipToRowVersion(snapshot, stats, committed, observer); if (res == EReady::Data) { // N.B. stop looking for snapshot after the first hit snapshotFound = true; @@ -850,26 +852,25 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta break; } } else { - ++result.NoKey; + ++stats.NoKey; } } } else { - ++result.Weeded; + ++stats.Weeded; } } } } - Y_VERIFY_DEBUG(result.Invisible == 0 || !snapshot.IsMax()); + Y_VERIFY_DEBUG(!snapshot.IsMax() || (stats.InvisibleRowSkips - prevInvisibleRowSkips) == 0); if (!ready || row.Need()) { - result.Ready = EReady::Page; + return EReady::Page; } else if (row == ERowOp::Erase || row == ERowOp::Absent) { - result.Ready = EReady::Gone; + return EReady::Gone; } else { - result.Ready = EReady::Data; + return EReady::Data; } - return result; } void TTable::DebugDump(IOutputStream& str, IPages* env, const NScheme::TTypeRegistry& reg) const diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 1b8af67fe74..90dab02babc 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -58,7 +58,8 @@ public: ui64 Weeded = 0; ui64 Sieved = 0; ui64 NoKey = 0; /* Examined TPart without the key */ - ui64 Invisible = 0; /* Skipped invisible versions */ + + TIteratorStats Stats; }; explicit TTable(TEpoch); @@ -128,18 +129,22 @@ public: TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot, - const ITransactionMapPtr& visible = nullptr) const noexcept; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot, - const ITransactionMapPtr& visible = nullptr) const noexcept; - TReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row, - ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators, - const ITransactionMapPtr& visible = nullptr) const noexcept; - - TReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags, - IPages* env, ui64 flg, - ui64 itemsLimit, ui64 bytesLimit, - EDirection direction, TRowVersion snapshot) const; + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; + EReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row, + ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators, + TSelectStats& stats, + const ITransactionMapPtr& visible = nullptr, + const ITransactionObserverPtr& observer = nullptr) const noexcept; + + EReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags, + IPages* env, ui64 flg, + ui64 itemsLimit, ui64 bytesLimit, + EDirection direction, TRowVersion snapshot, TSelectStats& stats) const; void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, TRowVersion rowVersion); diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h index 299842e63aa..7be6f0115e9 100644 --- a/ydb/core/tablet_flat/flat_table_committed.h +++ b/ydb/core/tablet_flat/flat_table_committed.h @@ -315,5 +315,65 @@ namespace NTable { TIntrusivePtr<TState> State_; }; + /** + * An interface for an optional observer of events during iteration + */ + class ITransactionObserver : public TThrRefBase { + public: + /** + * Called when iterator skips over an uncommitted delta + * + * This may be used for detecting possible conflicts with transactions. + */ + virtual void OnSkipUncommitted(ui64 txId) = 0; + }; + + /** + * Makes it easier to call observer methods even when pointer is nullptr + */ + class ITransactionObserverPtr : public TIntrusivePtr<ITransactionObserver> { + public: + using TIntrusivePtr::TIntrusivePtr; + + void OnSkipUncommitted(ui64 txId) const { + if (ITransactionObserver* p = Get()) { + p->OnSkipUncommitted(txId); + } + } + }; + + /** + * A simple pointer wrapper that may be useful as a function argument on a hot path + * + * Unlike a smart pointer it has no destructor and doesn't perform any Ref/UnRef + */ + class ITransactionObserverSimplePtr { + public: + ITransactionObserverSimplePtr(const ITransactionObserverPtr& ptr) + : Ptr(ptr.Get()) + { } + + ITransactionObserverSimplePtr(ITransactionObserver* ptr) + : Ptr(ptr) + { } + + ITransactionObserverSimplePtr(std::nullptr_t) + : Ptr(nullptr) + { } + + explicit operator bool() const { + return bool(Ptr); + } + + void OnSkipUncommitted(ui64 txId) const { + if (Ptr) { + Ptr->OnSkipUncommitted(txId); + } + } + + private: + ITransactionObserver* Ptr; + }; + } // namespace NTable } // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_table_stats.h b/ydb/core/tablet_flat/flat_table_stats.h index 5b600dcf13a..4b632558b6f 100644 --- a/ydb/core/tablet_flat/flat_table_stats.h +++ b/ydb/core/tablet_flat/flat_table_stats.h @@ -31,11 +31,15 @@ namespace NTable { TPartStats& operator-=(const TPartStats& rhs); }; - struct TSelectStats { + struct TIteratorStats { + ui64 DeletedRowSkips = 0; + ui64 InvisibleRowSkips = 0; + }; + + struct TSelectStats : TIteratorStats { ui64 Sieved = 0; ui64 Weeded = 0; ui64 NoKey = 0; - ui64 Invisible = 0; }; struct TCompactionStats { diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_part.h b/ydb/core/tablet_flat/test/libs/table/wrap_part.h index a1e9c9bb677..35ba8af4b05 100644 --- a/ydb/core/tablet_flat/test/libs/table/wrap_part.h +++ b/ydb/core/tablet_flat/test/libs/table/wrap_part.h @@ -86,7 +86,8 @@ namespace NTest { EReady SkipToRowVersion(TRowVersion rowVersion) noexcept { - Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ nullptr); + TIteratorStats stats; + Ready = Iter->SkipToRowVersion(rowVersion, stats, /* committed */ nullptr, /* observer */ nullptr); if (Ready == EReady::Data) Ready = RollUp(); diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index bfc05ece6e9..79f5d2c0baa 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -315,7 +315,7 @@ public: rowState.Init(State.Columns.size()); NTable::TSelectStats stats; auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion); - RowsSinceLastCheck += 1 + stats.Invisible; + RowsSinceLastCheck += 1 + stats.InvisibleRowSkips; if (ready == NTable::EReady::Page) { return EReadStatus::NeedData; } diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index c7bb8a9152c..979553b1401 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -405,7 +405,7 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion()); kqpStats.NSelectRow = 1; - kqpStats.InvisibleRowSkips = stats.Invisible; + kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips; switch (ready) { case EReady::Page: |
