summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <[email protected]>2022-05-23 18:47:00 +0300
committerAlexey Borzenkov <[email protected]>2022-05-23 18:47:00 +0300
commiteb3bd69695ba3886b67fa5a62fb3b42eba78dfd0 (patch)
tree393bdc0ea819168b0e25bb052158af3023c750d3
parent6cf6c53ec85d3bf00e65d2f5f9cedc7ba419fddf (diff)
Add ITransactionObserver for observing skipped transactions during iteration, KIKIMR-14732
ref:bc1c5e00557c227d88d9971ce247fa076cf41934
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.cpp2
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp74
-rw-r--r--ydb/core/tablet_flat/flat_database.h18
-rw-r--r--ydb/core/tablet_flat/flat_iterator.h23
-rw-r--r--ydb/core/tablet_flat/flat_mem_iter.h16
-rw-r--r--ydb/core/tablet_flat/flat_part_iface.h1
-rw-r--r--ydb/core/tablet_flat/flat_part_iter_multi.h29
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp69
-rw-r--r--ydb/core/tablet_flat/flat_table.h27
-rw-r--r--ydb/core/tablet_flat/flat_table_committed.h60
-rw-r--r--ydb/core/tablet_flat/flat_table_stats.h8
-rw-r--r--ydb/core/tablet_flat/test/libs/table/wrap_part.h3
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp2
14 files changed, 213 insertions, 121 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp
index 36c35f32cc4..7c0ad77aa07 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.cpp
+++ b/ydb/core/engine/minikql/minikql_engine_host.cpp
@@ -299,7 +299,7 @@ NUdf::TUnboxedValue TEngineHost::SelectRow(const TTableId& tableId, const TArray
ui64 flags = Settings.DisableByKeyFilter ? (ui64)NTable::NoByKey : 0;
const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId));
- Counters.InvisibleRowSkips += stats.Invisible;
+ Counters.InvisibleRowSkips += stats.InvisibleRowSkips;
if (NTable::EReady::Page == ready) {
if (auto collector = GetChangeCollector(tableId)) {
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 90e3f079381..45cca5a0771 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -74,13 +74,14 @@ TAutoPtr<TTableIt> TDatabase::Iterate(ui32 table, TRawVals key, TTagsRef tags, E
TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
IteratedTables.insert(table);
- auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible);
+ auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible, observer);
// N.B. ESeek::Exact produces iterators with limit=1
@@ -89,7 +90,8 @@ TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef ta
TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
@@ -97,7 +99,7 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T
ESeek seek = !range.MinKey || range.MinInclusive ? ESeek::Lower : ESeek::Upper;
- auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible);
+ auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible, observer);
if (range.MaxKey) {
TCelled maxKey(range.MaxKey, *iter->Scheme->Keys, false);
@@ -114,7 +116,8 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T
TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
@@ -122,7 +125,7 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR
ESeek seek = !range.MaxKey || range.MaxInclusive ? ESeek::Lower : ESeek::Upper;
- auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible);
+ auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible, observer);
if (range.MinKey) {
TCelled minKey(range.MinKey, *iter->Scheme->Keys, false);
@@ -140,51 +143,50 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR
template<>
TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
- return IterateRange(table, range, tags, snapshot, visible);
+ return IterateRange(table, range, tags, snapshot, visible, observer);
}
template<>
TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
- return IterateRangeReverse(table, range, tags, snapshot, visible);
+ return IterateRangeReverse(table, range, tags, snapshot, visible, observer);
}
EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
- TempIterators.clear();
- Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
- auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible);
- Change->Stats.SelectSieved += res.Sieved;
- Change->Stats.SelectWeeded += res.Weeded;
- Change->Stats.SelectNoKey += res.NoKey;
- Change->Stats.SelectInvisible += res.Invisible;
- return res.Ready;
+ TSelectStats stats;
+ return Select(table, key, tags, row, stats, flg, snapshot, visible, observer);
}
EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
TempIterators.clear();
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
- auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible);
- Change->Stats.SelectSieved += res.Sieved;
- Change->Stats.SelectWeeded += res.Weeded;
- Change->Stats.SelectNoKey += res.NoKey;
- Change->Stats.SelectInvisible += res.Invisible;
- stats.Sieved += res.Sieved;
- stats.Weeded += res.Weeded;
- stats.NoKey += res.NoKey;
- stats.Invisible += res.Invisible;
+ auto prevSieved = stats.Sieved;
+ auto prevWeeded = stats.Weeded;
+ auto prevNoKey = stats.NoKey;
+ auto prevInvisible = stats.InvisibleRowSkips;
+
+ auto ready = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, stats, visible, observer);
+ Change->Stats.SelectSieved += stats.Sieved - prevSieved;
+ Change->Stats.SelectWeeded += stats.Weeded - prevWeeded;
+ Change->Stats.SelectNoKey += stats.NoKey - prevNoKey;
+ Change->Stats.SelectInvisible += stats.InvisibleRowSkips - prevInvisible;
- return res.Ready;
+ return ready;
}
void TDatabase::CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TRawVals maxKey,
@@ -192,7 +194,8 @@ void TDatabase::CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TR
EDirection direction, TRowVersion snapshot)
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to do precharge after reads prohibited, table %u", table);
- Require(table)->Precharge(minKey, maxKey, tags, &env, flg, items, bytes, direction, snapshot);
+ TSelectStats stats;
+ Require(table)->Precharge(minKey, maxKey, tags, &env, flg, items, bytes, direction, snapshot, stats);
}
bool TDatabase::Precharge(ui32 table, TRawVals minKey, TRawVals maxKey,
@@ -200,10 +203,11 @@ bool TDatabase::Precharge(ui32 table, TRawVals minKey, TRawVals maxKey,
EDirection direction, TRowVersion snapshot)
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to do precharge after reads prohibited, table %u", table);
- auto res = Require(table)->Precharge(minKey, maxKey, tags, Env, flg, items, bytes, direction, snapshot);
- Change->Stats.ChargeSieved += res.Sieved;
- Change->Stats.ChargeWeeded += res.Weeded;
- return res.Ready == EReady::Data;
+ TSelectStats stats;
+ auto ready = Require(table)->Precharge(minKey, maxKey, tags, Env, flg, items, bytes, direction, snapshot, stats);
+ Change->Stats.ChargeSieved += stats.Sieved;
+ Change->Stats.ChargeWeeded += stats.Weeded;
+ return ready == EReady::Data;
}
void TDatabase::Update(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUpdateOp> ops, TRowVersion rowVersion)
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 8fa72b959c2..ee73c8300a0 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -70,27 +70,33 @@ public:
TAutoPtr<TTableIt> Iterate(ui32 table, TRawVals key, TTagsRef tags, ELookup) const noexcept;
TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
template<class TIteratorType>
TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
// NOTE: the row refeneces data in some internal buffers that get invalidated on the next Select() or Commit() call
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row,
ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, TSelectStats& stats,
ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
bool Precharge(ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit,
diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h
index e041b397a49..4a4e44917cd 100644
--- a/ydb/core/tablet_flat/flat_iterator.h
+++ b/ydb/core/tablet_flat/flat_iterator.h
@@ -22,11 +22,6 @@ enum class ENext {
Uncommitted,
};
-struct TIteratorStats {
- ui64 DeletedRowSkips = 0;
- ui64 InvisibleRowSkips = 0;
-};
-
template<class TIteratorOps>
class TTableItBase : TNonCopyable {
enum class EType : ui8 {
@@ -238,7 +233,8 @@ public:
TTableItBase(
const TRowScheme* scheme, TTagsRef tags, ui64 lim = Max<ui64>(),
TRowVersion snapshot = TRowVersion::Max(),
- NTable::ITransactionMapPtr committedTransactions = nullptr);
+ NTable::ITransactionMapPtr committedTransactions = nullptr,
+ NTable::ITransactionObserverPtr transactionObserver = nullptr);
~TTableItBase();
@@ -342,6 +338,9 @@ private:
// A map of currently committed transactions to corresponding row versions
const NTable::ITransactionMapPtr CommittedTransactions;
+ // A transaction observer for detecting skips
+ const NTable::ITransactionObserverPtr TransactionObserver;
+
EStage Stage = EStage::Seek;
EReady Ready = EReady::Gone;
THolderVector<TMemIt> MemIters;
@@ -458,13 +457,15 @@ template<class TIteratorOps>
inline TTableItBase<TIteratorOps>::TTableItBase(
const TRowScheme* scheme, TTagsRef tags, ui64 limit,
TRowVersion snapshot,
- NTable::ITransactionMapPtr committedTransactions)
+ NTable::ITransactionMapPtr committedTransactions,
+ NTable::ITransactionObserverPtr transactionObserver)
: Scheme(scheme)
, Remap(*Scheme, tags)
, Limit(limit)
, State(Remap.Size())
, SnapshotVersion(snapshot)
, CommittedTransactions(std::move(committedTransactions))
+ , TransactionObserver(std::move(transactionObserver))
, Comparator(Scheme->Keys->Types)
, Active(Iterators.end())
, Inactive(Iterators.end())
@@ -781,16 +782,14 @@ inline EReady TTableItBase<TIteratorOps>::Snap(TRowVersion rowVersion) noexcept
TIteratorId ai = i->IteratorId;
switch (ai.Type) {
case EType::Mem: {
- auto ready = MemIters[ai.Index]->SkipToRowVersion(rowVersion, CommittedTransactions);
- Stats.InvisibleRowSkips += std::exchange(MemIters[ai.Index]->InvisibleRowSkips, 0);
+ auto ready = MemIters[ai.Index]->SkipToRowVersion(rowVersion, Stats, CommittedTransactions, TransactionObserver);
if (ready) {
return EReady::Data;
}
break;
}
case EType::Run: {
- auto ready = RunIters[ai.Index]->SkipToRowVersion(rowVersion, CommittedTransactions);
- Stats.InvisibleRowSkips += std::exchange(RunIters[ai.Index]->InvisibleRowSkips, 0);
+ auto ready = RunIters[ai.Index]->SkipToRowVersion(rowVersion, Stats, CommittedTransactions, TransactionObserver);
if (ready == EReady::Data) {
return EReady::Data;
} else if (ready != EReady::Gone) {
@@ -823,6 +822,7 @@ inline EReady TTableItBase<TIteratorOps>::DoSkipUncommitted() noexcept
case EType::Mem: {
auto& it = *MemIters[ai.Index];
Y_VERIFY_DEBUG(it.IsDelta() && !CommittedTransactions.Find(it.GetDeltaTxId()));
+ TransactionObserver.OnSkipUncommitted(it.GetDeltaTxId());
if (it.SkipDelta()) {
return EReady::Data;
}
@@ -831,6 +831,7 @@ inline EReady TTableItBase<TIteratorOps>::DoSkipUncommitted() noexcept
case EType::Run: {
auto& it = *RunIters[ai.Index];
Y_VERIFY_DEBUG(it.IsDelta() && !CommittedTransactions.Find(it.GetDeltaTxId()));
+ TransactionObserver.OnSkipUncommitted(it.GetDeltaTxId());
auto ready = it.SkipDelta();
if (ready != EReady::Gone) {
return ready;
diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h
index 462416e7430..a7059f68135 100644
--- a/ydb/core/tablet_flat/flat_mem_iter.h
+++ b/ydb/core/tablet_flat/flat_mem_iter.h
@@ -198,6 +198,8 @@ namespace NTable {
ApplyColumn(row, up);
}
}
+ } else {
+ // FIXME: add uncommitted tx to stats?
}
if (!isDelta) {
break;
@@ -225,7 +227,9 @@ namespace NTable {
* Returns false if there is no such version, e.g. current key did not
* exist or didn't have any known updates at this rowVersion.
*/
- bool SkipToRowVersion(TRowVersion rowVersion, NTable::ITransactionMapSimplePtr committedTransactions) noexcept
+ bool SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats,
+ NTable::ITransactionMapSimplePtr committedTransactions,
+ NTable::ITransactionObserverSimplePtr transactionObserver) noexcept
{
Y_VERIFY_DEBUG(IsValid(), "Attempt to access an invalid row");
@@ -234,6 +238,7 @@ namespace NTable {
// Skip uncommitted deltas
while (chain->RowVersion.Step == Max<ui64>() && !committedTransactions.Find(chain->RowVersion.TxId)) {
+ transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
if (!(chain = chain->Next)) {
CurrentVersion = nullptr;
return false;
@@ -254,7 +259,7 @@ namespace NTable {
}
}
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
while ((chain = chain->Next)) {
if (chain->RowVersion.Step != Max<ui64>()) {
@@ -263,7 +268,7 @@ namespace NTable {
return true;
}
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
} else {
auto* commitVersion = committedTransactions.Find(chain->RowVersion.TxId);
if (commitVersion && *commitVersion <= rowVersion) {
@@ -272,7 +277,9 @@ namespace NTable {
}
if (commitVersion) {
// Only committed deltas increment InvisibleRowSkips
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
+ } else {
+ transactionObserver.OnSkipUncommitted(chain->RowVersion.TxId);
}
}
}
@@ -352,7 +359,6 @@ namespace NTable {
const TIntrusiveConstPtr<TKeyCellDefaults> KeyCellDefaults;
const TRemap* Remap = nullptr;
IPages * const Env = nullptr;
- ui64 InvisibleRowSkips = 0;
private:
NMem::TTreeIterator RowIt;
diff --git a/ydb/core/tablet_flat/flat_part_iface.h b/ydb/core/tablet_flat/flat_part_iface.h
index 9be72fc735a..03882305a50 100644
--- a/ydb/core/tablet_flat/flat_part_iface.h
+++ b/ydb/core/tablet_flat/flat_part_iface.h
@@ -2,6 +2,7 @@
#include "flat_page_iface.h"
#include "flat_sausage_solid.h"
+#include "flat_table_stats.h"
#include "flat_row_eggs.h"
#include "util_basics.h"
diff --git a/ydb/core/tablet_flat/flat_part_iter_multi.h b/ydb/core/tablet_flat/flat_part_iter_multi.h
index 4ca3aef2734..607b94aea1a 100644
--- a/ydb/core/tablet_flat/flat_part_iter_multi.h
+++ b/ydb/core/tablet_flat/flat_part_iter_multi.h
@@ -818,8 +818,9 @@ namespace NTable {
}
}
- EReady SkipToRowVersion(TRowVersion rowVersion,
- NTable::ITransactionMapSimplePtr committedTransactions) noexcept
+ EReady SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats,
+ NTable::ITransactionMapSimplePtr committedTransactions,
+ NTable::ITransactionObserverSimplePtr transactionObserver) noexcept
{
Y_VERIFY_DEBUG(Main.IsValid(), "Attempt to use an invalid iterator");
@@ -832,7 +833,7 @@ namespace NTable {
if (rowVersion < Part->MinRowVersion) {
// Don't bother seeking below the known first row version
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
return EReady::Gone;
}
}
@@ -852,7 +853,10 @@ namespace NTable {
}
if (commitVersion) {
// Skipping a newer committed delta
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
+ } else {
+ // Skipping an uncommitted delta
+ transactionObserver.OnSkipUncommitted(txId);
}
data = Main.GetRecord()->GetAltRecord(++SkipMainDeltas);
if (!data) {
@@ -869,7 +873,7 @@ namespace NTable {
return EReady::Data;
}
SkipEraseVersion = true;
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
}
TRowVersion current = data->IsVersioned() ? data->GetMinVersion(info) : Part->MinRowVersion;
@@ -878,7 +882,7 @@ namespace NTable {
return EReady::Data;
}
- InvisibleRowSkips++;
+ stats.InvisibleRowSkips++;
if (!data->HasHistory()) {
// There is no history, reset
@@ -1039,6 +1043,8 @@ namespace NTable {
if (row.IsFinalized()) {
return;
}
+ } else {
+ // FIXME: add uncommitted tx to stats?
}
// Skip deltas until the row is finalized
data = Main.GetRecord()->GetAltRecord(++index);
@@ -1168,7 +1174,6 @@ namespace NTable {
public:
const TPart* const Part;
IPages* const Env;
- ui64 InvisibleRowSkips = 0;
private:
const TPinout Pinout;
@@ -1493,13 +1498,12 @@ namespace NTable {
return CurrentIt->GetRowVersion();
}
- EReady SkipToRowVersion(
- TRowVersion rowVersion,
- NTable::ITransactionMapSimplePtr committedTransactions) noexcept
+ EReady SkipToRowVersion(TRowVersion rowVersion, TIteratorStats& stats,
+ NTable::ITransactionMapSimplePtr committedTransactions,
+ NTable::ITransactionObserverSimplePtr transactionObserver) noexcept
{
Y_VERIFY_DEBUG(CurrentIt);
- auto ready = CurrentIt->SkipToRowVersion(rowVersion, committedTransactions);
- InvisibleRowSkips += std::exchange(CurrentIt->InvisibleRowSkips, 0);
+ auto ready = CurrentIt->SkipToRowVersion(rowVersion, stats, committedTransactions, transactionObserver);
return ready;
}
@@ -1584,7 +1588,6 @@ namespace NTable {
TTagsRef const Tags;
TIntrusiveConstPtr<TKeyCellDefaults> const KeyCellDefaults;
IPages* const Env;
- ui64 InvisibleRowSkips = 0;
private:
TRun::const_iterator Current;
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index 78c8a4b5d0e..d5ce3ee8810 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -561,13 +561,13 @@ void TTable::AddSafe(TPartView partView)
}
}
-TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags,
- IPages* env, ui64 flg,
- ui64 items, ui64 bytes,
- EDirection direction,
- TRowVersion snapshot) const
+EReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags,
+ IPages* env, ui64 flg,
+ ui64 items, ui64 bytes,
+ EDirection direction,
+ TRowVersion snapshot,
+ TSelectStats& stats) const
{
- TReady res;
bool ready = true;
bool includeHistory = !snapshot.IsMax();
@@ -595,9 +595,9 @@ TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef ta
ready &= TCharge(env, *pos->Part, tags, includeHistory)
.Do(key, key, row1, row2, *Scheme->Keys, items, bytes)
.Ready;
- ++res.Sieved;
+ ++stats.Sieved;
} else {
- ++res.Weeded;
+ ++stats.Weeded;
}
}
}
@@ -617,8 +617,7 @@ TTable::TReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef ta
}
}
- res.Ready = ready ? EReady::Data : EReady::Page;
- return res;
+ return ready ? EReady::Data : EReady::Page;
}
void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, TRowVersion rowVersion)
@@ -683,7 +682,8 @@ TMemTable& TTable::MemTable()
TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts");
@@ -691,7 +691,8 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES
const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>();
TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot,
- TMergedTransactionMap::Create(visible, CommittedTransactions)));
+ TMergedTransactionMap::Create(visible, CommittedTransactions),
+ observer));
if (Mutable) {
dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward));
@@ -724,7 +725,8 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES
TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek,
TRowVersion snapshot,
- const ITransactionMapPtr& visible) const noexcept
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts");
@@ -732,7 +734,8 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I
const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>();
TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot,
- TMergedTransactionMap::Create(visible, CommittedTransactions)));
+ TMergedTransactionMap::Create(visible, CommittedTransactions),
+ observer));
if (Mutable) {
dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse));
@@ -763,10 +766,12 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I
return dbIter;
}
-TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row,
- ui64 flg, TRowVersion snapshot,
- TDeque<TPartSimpleIt>& tempIterators,
- const ITransactionMapPtr& visible) const noexcept
+EReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row,
+ ui64 flg, TRowVersion snapshot,
+ TDeque<TPartSimpleIt>& tempIterators,
+ TSelectStats& stats,
+ const ITransactionMapPtr& visible,
+ const ITransactionObserverPtr& observer) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot select with cold parts");
Y_VERIFY(key_.size() == Scheme->Keys->Types.size());
@@ -780,8 +785,6 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
for (auto &pin: remap.KeyPins())
row.Set(pin.Pos, { ECellOp::Set, ELargeObj::Inline }, key[pin.Key]);
- TReady result;
-
const NBloom::TPrefix prefix(key);
TEpoch lastEpoch = TEpoch::Max();
@@ -789,16 +792,17 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
bool snapshotFound = (snapshot == TRowVersion::Max());
auto committed = TMergedTransactionMap::Create(visible, CommittedTransactions);
+ const auto prevInvisibleRowSkips = stats.InvisibleRowSkips;
+
// Mutable has the newest data
if (Mutable) {
lastEpoch = Mutable->Epoch;
if (auto it = TMemIt::Make(*Mutable, Mutable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) {
- if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) {
+ if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) {
// N.B. stop looking for snapshot after the first hit
snapshotFound = true;
it->Apply(row, committed);
}
- result.Invisible += it->InvisibleRowSkips;
}
}
@@ -808,12 +812,11 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
Y_VERIFY(lastEpoch > memTable->Epoch, "Ordering of epochs is incorrect");
lastEpoch = memTable->Epoch;
if (auto it = TMemIt::Make(*memTable, memTable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) {
- if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) {
+ if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) {
// N.B. stop looking for snapshot after the first hit
snapshotFound = true;
it->Apply(row, committed);
}
- result.Invisible += it->InvisibleRowSkips;
}
}
@@ -827,7 +830,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
if ((flg & EHint::NoByKey) ||
part->MightHaveKey(prefix.Get(part->Scheme->Groups[0].KeyTypes.size())))
{
- ++result.Sieved;
+ ++stats.Sieved;
TPartSimpleIt& it = tempIterators.emplace_back(part, tags, Scheme->Keys, env);
it.SetBounds(pos->Slice);
auto res = it.Seek(key, ESeek::Exact);
@@ -835,8 +838,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
Y_VERIFY(lastEpoch > part->Epoch, "Ordering of epochs is incorrect");
lastEpoch = part->Epoch;
if (!snapshotFound) {
- res = it.SkipToRowVersion(snapshot, committed);
- result.Invisible += std::exchange(it.InvisibleRowSkips, 0);
+ res = it.SkipToRowVersion(snapshot, stats, committed, observer);
if (res == EReady::Data) {
// N.B. stop looking for snapshot after the first hit
snapshotFound = true;
@@ -850,26 +852,25 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
break;
}
} else {
- ++result.NoKey;
+ ++stats.NoKey;
}
}
} else {
- ++result.Weeded;
+ ++stats.Weeded;
}
}
}
}
- Y_VERIFY_DEBUG(result.Invisible == 0 || !snapshot.IsMax());
+ Y_VERIFY_DEBUG(!snapshot.IsMax() || (stats.InvisibleRowSkips - prevInvisibleRowSkips) == 0);
if (!ready || row.Need()) {
- result.Ready = EReady::Page;
+ return EReady::Page;
} else if (row == ERowOp::Erase || row == ERowOp::Absent) {
- result.Ready = EReady::Gone;
+ return EReady::Gone;
} else {
- result.Ready = EReady::Data;
+ return EReady::Data;
}
- return result;
}
void TTable::DebugDump(IOutputStream& str, IPages* env, const NScheme::TTypeRegistry& reg) const
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 1b8af67fe74..90dab02babc 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -58,7 +58,8 @@ public:
ui64 Weeded = 0;
ui64 Sieved = 0;
ui64 NoKey = 0; /* Examined TPart without the key */
- ui64 Invisible = 0; /* Skipped invisible versions */
+
+ TIteratorStats Stats;
};
explicit TTable(TEpoch);
@@ -128,18 +129,22 @@ public:
TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek,
TRowVersion snapshot,
- const ITransactionMapPtr& visible = nullptr) const noexcept;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek,
TRowVersion snapshot,
- const ITransactionMapPtr& visible = nullptr) const noexcept;
- TReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row,
- ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators,
- const ITransactionMapPtr& visible = nullptr) const noexcept;
-
- TReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags,
- IPages* env, ui64 flg,
- ui64 itemsLimit, ui64 bytesLimit,
- EDirection direction, TRowVersion snapshot) const;
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
+ EReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row,
+ ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators,
+ TSelectStats& stats,
+ const ITransactionMapPtr& visible = nullptr,
+ const ITransactionObserverPtr& observer = nullptr) const noexcept;
+
+ EReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags,
+ IPages* env, ui64 flg,
+ ui64 itemsLimit, ui64 bytesLimit,
+ EDirection direction, TRowVersion snapshot, TSelectStats& stats) const;
void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, TRowVersion rowVersion);
diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h
index 299842e63aa..7be6f0115e9 100644
--- a/ydb/core/tablet_flat/flat_table_committed.h
+++ b/ydb/core/tablet_flat/flat_table_committed.h
@@ -315,5 +315,65 @@ namespace NTable {
TIntrusivePtr<TState> State_;
};
+ /**
+ * An interface for an optional observer of events during iteration
+ */
+ class ITransactionObserver : public TThrRefBase {
+ public:
+ /**
+ * Called when iterator skips over an uncommitted delta
+ *
+ * This may be used for detecting possible conflicts with transactions.
+ */
+ virtual void OnSkipUncommitted(ui64 txId) = 0;
+ };
+
+ /**
+ * Makes it easier to call observer methods even when pointer is nullptr
+ */
+ class ITransactionObserverPtr : public TIntrusivePtr<ITransactionObserver> {
+ public:
+ using TIntrusivePtr::TIntrusivePtr;
+
+ void OnSkipUncommitted(ui64 txId) const {
+ if (ITransactionObserver* p = Get()) {
+ p->OnSkipUncommitted(txId);
+ }
+ }
+ };
+
+ /**
+ * A simple pointer wrapper that may be useful as a function argument on a hot path
+ *
+ * Unlike a smart pointer it has no destructor and doesn't perform any Ref/UnRef
+ */
+ class ITransactionObserverSimplePtr {
+ public:
+ ITransactionObserverSimplePtr(const ITransactionObserverPtr& ptr)
+ : Ptr(ptr.Get())
+ { }
+
+ ITransactionObserverSimplePtr(ITransactionObserver* ptr)
+ : Ptr(ptr)
+ { }
+
+ ITransactionObserverSimplePtr(std::nullptr_t)
+ : Ptr(nullptr)
+ { }
+
+ explicit operator bool() const {
+ return bool(Ptr);
+ }
+
+ void OnSkipUncommitted(ui64 txId) const {
+ if (Ptr) {
+ Ptr->OnSkipUncommitted(txId);
+ }
+ }
+
+ private:
+ ITransactionObserver* Ptr;
+ };
+
} // namespace NTable
} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/flat_table_stats.h b/ydb/core/tablet_flat/flat_table_stats.h
index 5b600dcf13a..4b632558b6f 100644
--- a/ydb/core/tablet_flat/flat_table_stats.h
+++ b/ydb/core/tablet_flat/flat_table_stats.h
@@ -31,11 +31,15 @@ namespace NTable {
TPartStats& operator-=(const TPartStats& rhs);
};
- struct TSelectStats {
+ struct TIteratorStats {
+ ui64 DeletedRowSkips = 0;
+ ui64 InvisibleRowSkips = 0;
+ };
+
+ struct TSelectStats : TIteratorStats {
ui64 Sieved = 0;
ui64 Weeded = 0;
ui64 NoKey = 0;
- ui64 Invisible = 0;
};
struct TCompactionStats {
diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_part.h b/ydb/core/tablet_flat/test/libs/table/wrap_part.h
index a1e9c9bb677..35ba8af4b05 100644
--- a/ydb/core/tablet_flat/test/libs/table/wrap_part.h
+++ b/ydb/core/tablet_flat/test/libs/table/wrap_part.h
@@ -86,7 +86,8 @@ namespace NTest {
EReady SkipToRowVersion(TRowVersion rowVersion) noexcept
{
- Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ nullptr);
+ TIteratorStats stats;
+ Ready = Iter->SkipToRowVersion(rowVersion, stats, /* committed */ nullptr, /* observer */ nullptr);
if (Ready == EReady::Data)
Ready = RollUp();
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index bfc05ece6e9..79f5d2c0baa 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -315,7 +315,7 @@ public:
rowState.Init(State.Columns.size());
NTable::TSelectStats stats;
auto ready = txc.DB.Select(TableInfo.LocalTid, key, State.Columns, rowState, stats, 0, State.ReadVersion);
- RowsSinceLastCheck += 1 + stats.Invisible;
+ RowsSinceLastCheck += 1 + stats.InvisibleRowSkips;
if (ready == NTable::EReady::Page) {
return EReadStatus::NeedData;
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index c7bb8a9152c..979553b1401 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -405,7 +405,7 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con
auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion());
kqpStats.NSelectRow = 1;
- kqpStats.InvisibleRowSkips = stats.Invisible;
+ kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips;
switch (ready) {
case EReady::Page: