diff options
author | Alexey Borzenkov <snaury@gmail.com> | 2022-05-04 20:30:56 +0300 |
---|---|---|
committer | Alexey Borzenkov <snaury@gmail.com> | 2022-05-04 20:30:56 +0300 |
commit | 704f0b833b7e8ea1b65e8613e1fb114cf13c1b1a (patch) | |
tree | 802798bb4d86a58d0363953dd68a069c7122d135 | |
parent | d863245c76f2cd52a027484d16edfc084d01d57b (diff) | |
download | ydb-704f0b833b7e8ea1b65e8613e1fb114cf13c1b1a.tar.gz |
Support per-query transaction maps in executor, KIKIMR-14732
ref:ff3e036a88690aaa8ddea4bd8dc665b7edeb78a4
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 22 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_misc.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_executor_ut.cpp | 99 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_iterator.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_mem_iter.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_mem_warm.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_part_iter_multi.h | 8 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 13 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table_committed.h | 180 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table_subset.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/misc.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/test_cooker.h | 3 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/wrap_part.h | 4 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/libs/table/wrap_warm.h | 2 | ||||
-rw-r--r-- | ydb/core/tablet_flat/test/tool/perf/do_mem.h | 2 |
17 files changed, 361 insertions, 70 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 647e79645fc..71aaedc29dc 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -72,20 +72,24 @@ TAutoPtr<TTableIt> TDatabase::Iterate(ui32 table, TRawVals key, TTagsRef tags, E return Require(table)->Iterate(key, tags, Env, seekBy(key, mode), TRowVersion::Max()); } -TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot) const noexcept +TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); IteratedTables.insert(table); - auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot); + auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible); // N.B. ESeek::Exact produces iterators with limit=1 return iter; } -TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept +TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); @@ -93,7 +97,7 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T ESeek seek = !range.MinKey || range.MinInclusive ? ESeek::Lower : ESeek::Upper; - auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot); + auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible); if (range.MaxKey) { TCelled maxKey(range.MaxKey, *iter->Scheme->Keys, false); @@ -108,7 +112,9 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T return iter; } -TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept +TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); @@ -116,7 +122,7 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR ESeek seek = !range.MaxKey || range.MaxInclusive ? ESeek::Lower : ESeek::Upper; - auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot); + auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible); if (range.MinKey) { TCelled minKey(range.MinKey, *iter->Scheme->Keys, false); @@ -132,22 +138,28 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR } template<> -TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept +TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { - return IterateRange(table, range, tags, snapshot); + return IterateRange(table, range, tags, snapshot, visible); } template<> -TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept +TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { - return IterateRangeReverse(table, range, tags, snapshot); + return IterateRangeReverse(table, range, tags, snapshot, visible); } -EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg, TRowVersion snapshot) const noexcept +EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { TempIterators.clear(); Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); - auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators); + auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible); Change->Stats.SelectSieved += res.Sieved; Change->Stats.SelectWeeded += res.Weeded; Change->Stats.SelectNoKey += res.NoKey; @@ -155,11 +167,13 @@ EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row return res.Ready; } -EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg, TRowVersion snapshot) const noexcept +EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { TempIterators.clear(); Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table); - auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators); + auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible); Change->Stats.SelectSieved += res.Sieved; Change->Stats.SelectWeeded += res.Weeded; Change->Stats.SelectNoKey += res.NoKey; diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 98baa8f4383..f29740158c6 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -68,19 +68,29 @@ public: /*_ Call Next() before accessing each row including the 1st row. */ TAutoPtr<TTableIt> Iterate(ui32 table, TRawVals key, TTagsRef tags, ELookup) const noexcept; - TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept; - TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept; - TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept; + TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags, + TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; + TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; + TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; template<class TIteratorType> - TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept; + TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags, + TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; // NOTE: the row refeneces data in some internal buffers that get invalidated on the next Select() or Commit() call EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, - ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max()) const noexcept; + ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, TSelectStats& stats, - ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max()) const noexcept; + ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(), + const ITransactionMapPtr& visible = nullptr) const noexcept; bool Precharge(ui32 table, TRawVals minKey, TRawVals maxKey, TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit, diff --git a/ydb/core/tablet_flat/flat_executor_misc.h b/ydb/core/tablet_flat/flat_executor_misc.h index a1382963bfc..4f925840781 100644 --- a/ydb/core/tablet_flat/flat_executor_misc.h +++ b/ydb/core/tablet_flat/flat_executor_misc.h @@ -35,7 +35,7 @@ namespace NTabletFlatExecutor { NTable::TRowVersionRanges::TSnapshot RemovedRowVersions; // Non-empty when compaction also needs to write a tx status table part - NTable::TTransactionMap<TRowVersion> CommittedTransactions; + NTable::TTransactionMap CommittedTransactions; NTable::TTransactionSet RemovedTransactions; // The above may contain extra keys, these allow them to be narrowed TVector<TIntrusiveConstPtr<NTable::TMemTable>> Frozen; diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index 419167facf2..d3f008825f0 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -4091,6 +4091,58 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTx) { } }; + struct TTxCheckRowsReadTx : public ITransaction { + TString& Data; + const ui64 TxId; + + TTxCheckRowsReadTx(TString& data, ui64 txId) + : Data(data) + , TxId(txId) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + TStringBuilder builder; + + TVector<NTable::TTag> tags; + tags.push_back(KeyColumnId); + tags.push_back(ValueColumnId); + tags.push_back(Value2ColumnId); + + NTable::EReady ready; + auto it = txc.DB.IterateRange(TableId, { }, tags, + TRowVersion::Max(), + MakeIntrusive<NTable::TSingleTransactionMap>(TxId, TRowVersion::Min())); + + while ((ready = it->Next(NTable::ENext::All)) != NTable::EReady::Gone) { + if (ready == NTable::EReady::Page) { + return false; + } + + const auto& row = it->Row(); + + TString key; + DbgPrintValue(key, row.Get(0), NScheme::TUint64::TypeId); + + TString value; + DbgPrintValue(value, row.Get(1), NScheme::TString::TypeId); + + TString value2; + DbgPrintValue(value2, row.Get(2), NScheme::TString::TypeId); + + builder << "Key " << key << " = " << row.GetRowState() + << " value = " << NTable::ECellOp(row.GetCellOp(1)) << " " << value + << " value2 = " << NTable::ECellOp(row.GetCellOp(2)) << " " << value2 << Endl; + } + + Data = builder; + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } + }; + struct TTxCheckRowsUncommitted : public ITransaction { TString& Data; @@ -4620,6 +4672,53 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTx) { } } + Y_UNIT_TEST(MemTableLongTxRead) { + TMyEnvBase env; + + //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG); + //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + + env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp)); + + env.SendSync(new NFake::TEvExecute{ new TTxInitSchema }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(1, "foo") }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(2, "bar") }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(3, "abc", 123) }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(1, "def", 123) }); + env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(2, "ghi", 123) }); + + // We should see our own uncommitted changes + { + TString data; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRowsReadTx(data, 123) }); + UNIT_ASSERT_VALUES_EQUAL(data, + "Key 1 = Upsert value = Set foo value2 = Set def\n" + "Key 2 = Upsert value = Empty NULL value2 = Set ghi\n" + "Key 3 = Upsert value = Set abc value2 = Empty NULL\n"); + } + + // Others shouldn't see these changes yet + { + TString data; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) }); + UNIT_ASSERT_VALUES_EQUAL(data, + "Key 1 = Upsert value = Set foo value2 = Empty NULL\n" + "Key 2 = Upsert value = Empty NULL value2 = Set bar\n"); + } + + env.SendSync(new NFake::TEvExecute{ new TTxCommitLongTx(123) }); + + // Once committed everyone will see these changes + { + TString data; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) }); + UNIT_ASSERT_VALUES_EQUAL(data, + "Key 1 = Upsert value = Set foo value2 = Set def\n" + "Key 2 = Upsert value = Empty NULL value2 = Set ghi\n" + "Key 3 = Upsert value = Set abc value2 = Empty NULL\n"); + } + } + } // Y_UNIT_TEST_SUITE(TFlatTableLongTx) Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs) { diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h index edc4bc2eded..e041b397a49 100644 --- a/ydb/core/tablet_flat/flat_iterator.h +++ b/ydb/core/tablet_flat/flat_iterator.h @@ -238,7 +238,7 @@ public: TTableItBase( const TRowScheme* scheme, TTagsRef tags, ui64 lim = Max<ui64>(), TRowVersion snapshot = TRowVersion::Max(), - const NTable::TTransactionMap<TRowVersion>& committedTransactions = {}); + NTable::ITransactionMapPtr committedTransactions = nullptr); ~TTableItBase(); @@ -340,7 +340,7 @@ private: const TRowVersion SnapshotVersion; // A map of currently committed transactions to corresponding row versions - const NTable::TTransactionMap<TRowVersion> CommittedTransactions; + const NTable::ITransactionMapPtr CommittedTransactions; EStage Stage = EStage::Seek; EReady Ready = EReady::Gone; @@ -458,13 +458,13 @@ template<class TIteratorOps> inline TTableItBase<TIteratorOps>::TTableItBase( const TRowScheme* scheme, TTagsRef tags, ui64 limit, TRowVersion snapshot, - const NTable::TTransactionMap<TRowVersion>& committedTransactions) + NTable::ITransactionMapPtr committedTransactions) : Scheme(scheme) , Remap(*Scheme, tags) , Limit(limit) , State(Remap.Size()) , SnapshotVersion(snapshot) - , CommittedTransactions(committedTransactions) + , CommittedTransactions(std::move(committedTransactions)) , Comparator(Scheme->Keys->Types) , Active(Iterators.end()) , Inactive(Iterators.end()) diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h index 8eb790fcbdc..462416e7430 100644 --- a/ydb/core/tablet_flat/flat_mem_iter.h +++ b/ydb/core/tablet_flat/flat_mem_iter.h @@ -183,7 +183,7 @@ namespace NTable { return bool(CurrentVersion); } - void Apply(TRowState& row, const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept + void Apply(TRowState& row, NTable::ITransactionMapSimplePtr committedTransactions) const noexcept { Y_VERIFY(row.Size() == Remap->Size(), "row state doesn't match the remap index"); @@ -225,7 +225,7 @@ namespace NTable { * Returns false if there is no such version, e.g. current key did not * exist or didn't have any known updates at this rowVersion. */ - bool SkipToRowVersion(TRowVersion rowVersion, const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept + bool SkipToRowVersion(TRowVersion rowVersion, NTable::ITransactionMapSimplePtr committedTransactions) noexcept { Y_VERIFY_DEBUG(IsValid(), "Attempt to access an invalid row"); diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h index 83b744b113f..b3e5ef95796 100644 --- a/ydb/core/tablet_flat/flat_mem_warm.h +++ b/ydb/core/tablet_flat/flat_mem_warm.h @@ -132,7 +132,7 @@ namespace NMem { {} void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<TMemGlob> pages, TRowVersion rowVersion, - const NTable::TTransactionMap<TRowVersion>& committed) + NTable::ITransactionMapSimplePtr committed) { Y_VERIFY_DEBUG( rop == ERowOp::Upsert || rop == ERowOp::Erase || rop == ERowOp::Reset, diff --git a/ydb/core/tablet_flat/flat_part_iter_multi.h b/ydb/core/tablet_flat/flat_part_iter_multi.h index 0fb46c3040a..4ca3aef2734 100644 --- a/ydb/core/tablet_flat/flat_part_iter_multi.h +++ b/ydb/core/tablet_flat/flat_part_iter_multi.h @@ -819,7 +819,7 @@ namespace NTable { } EReady SkipToRowVersion(TRowVersion rowVersion, - const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept + NTable::ITransactionMapSimplePtr committedTransactions) noexcept { Y_VERIFY_DEBUG(Main.IsValid(), "Attempt to use an invalid iterator"); @@ -1013,7 +1013,7 @@ namespace NTable { } void Apply(TRowState& row, - const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept + NTable::ITransactionMapSimplePtr committedTransactions) const noexcept { Y_VERIFY_DEBUG(IsValid(), "Attempt to apply an invalid row"); @@ -1481,7 +1481,7 @@ namespace NTable { } void Apply(TRowState& row, - const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept + NTable::ITransactionMapSimplePtr committedTransactions) const noexcept { Y_VERIFY_DEBUG(CurrentIt); CurrentIt->Apply(row, committedTransactions); @@ -1495,7 +1495,7 @@ namespace NTable { EReady SkipToRowVersion( TRowVersion rowVersion, - const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept + NTable::ITransactionMapSimplePtr committedTransactions) noexcept { Y_VERIFY_DEBUG(CurrentIt); auto ready = CurrentIt->SkipToRowVersion(rowVersion, committedTransactions); diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index 11b3b90199b..78c8a4b5d0e 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -444,6 +444,8 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept auto res = TxStatus.emplace(txStatus->Label, txStatus); Y_VERIFY(res.second, "Unexpected failure to add a new TTxStatusPart"); + + ErasedKeysCache.Reset(); } void TTable::ProcessCheckTransactions() noexcept @@ -679,14 +681,17 @@ TMemTable& TTable::MemTable() *(Mutable ? Mutable : (Mutable = new TMemTable(Scheme, Epoch, Annexed))); } -TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot) const noexcept +TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts"); const TCelled key(key_, *Scheme->Keys, false); const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>(); - TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot, CommittedTransactions)); + TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot, + TMergedTransactionMap::Create(visible, CommittedTransactions))); if (Mutable) { dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward)); @@ -707,7 +712,7 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES } } - if (EraseCacheEnabled) { + if (EraseCacheEnabled && !visible) { if (!ErasedKeysCache) { ErasedKeysCache = new TKeyRangeCache(*Scheme->Keys, EraseCacheConfig); } @@ -717,14 +722,17 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES return dbIter; } -TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot) const noexcept +TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, + TRowVersion snapshot, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts"); const TCelled key(key_, *Scheme->Keys, false); const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>(); - TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot, CommittedTransactions)); + TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot, + TMergedTransactionMap::Create(visible, CommittedTransactions))); if (Mutable) { dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse)); @@ -745,7 +753,7 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I } } - if (EraseCacheEnabled) { + if (EraseCacheEnabled && !visible) { if (!ErasedKeysCache) { ErasedKeysCache = new TKeyRangeCache(*Scheme->Keys, EraseCacheConfig); } @@ -757,7 +765,8 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row, ui64 flg, TRowVersion snapshot, - TDeque<TPartSimpleIt>& tempIterators) const noexcept + TDeque<TPartSimpleIt>& tempIterators, + const ITransactionMapPtr& visible) const noexcept { Y_VERIFY(ColdParts.empty(), "Cannot select with cold parts"); Y_VERIFY(key_.size() == Scheme->Keys->Types.size()); @@ -778,15 +787,16 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta TEpoch lastEpoch = TEpoch::Max(); bool snapshotFound = (snapshot == TRowVersion::Max()); + auto committed = TMergedTransactionMap::Create(visible, CommittedTransactions); // Mutable has the newest data if (Mutable) { lastEpoch = Mutable->Epoch; if (auto it = TMemIt::Make(*Mutable, Mutable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) { - if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, CommittedTransactions))) { + if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) { // N.B. stop looking for snapshot after the first hit snapshotFound = true; - it->Apply(row, CommittedTransactions); + it->Apply(row, committed); } result.Invisible += it->InvisibleRowSkips; } @@ -798,10 +808,10 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta Y_VERIFY(lastEpoch > memTable->Epoch, "Ordering of epochs is incorrect"); lastEpoch = memTable->Epoch; if (auto it = TMemIt::Make(*memTable, memTable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) { - if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, CommittedTransactions))) { + if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) { // N.B. stop looking for snapshot after the first hit snapshotFound = true; - it->Apply(row, CommittedTransactions); + it->Apply(row, committed); } result.Invisible += it->InvisibleRowSkips; } @@ -825,7 +835,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta Y_VERIFY(lastEpoch > part->Epoch, "Ordering of epochs is incorrect"); lastEpoch = part->Epoch; if (!snapshotFound) { - res = it.SkipToRowVersion(snapshot, CommittedTransactions); + res = it.SkipToRowVersion(snapshot, committed); result.Invisible += std::exchange(it.InvisibleRowSkips, 0); if (res == EReady::Data) { // N.B. stop looking for snapshot after the first hit @@ -835,7 +845,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta } if (ready = ready && bool(res)) { if (res == EReady::Data) { - it.Apply(row, CommittedTransactions); + it.Apply(row, committed); if (row.IsFinalized()) { break; } diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index a1fdeb97093..1b8af67fe74 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -126,10 +126,15 @@ public: TVector<TIntrusiveConstPtr<TMemTable>> GetMemTables() const noexcept; - TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot) const noexcept; - TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot) const noexcept; + TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek, + TRowVersion snapshot, + const ITransactionMapPtr& visible = nullptr) const noexcept; + TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek, + TRowVersion snapshot, + const ITransactionMapPtr& visible = nullptr) const noexcept; TReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row, - ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators) const noexcept; + ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators, + const ITransactionMapPtr& visible = nullptr) const noexcept; TReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags, IPages* env, ui64 flg, @@ -301,7 +306,7 @@ private: THashSet<ui64> CheckTransactions; THashMap<ui64, TOpenTransaction> OpenTransactions; - TTransactionMap<TRowVersion> CommittedTransactions; + TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; }; diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h index 222b140cc8d..299842e63aa 100644 --- a/ydb/core/tablet_flat/flat_table_committed.h +++ b/ydb/core/tablet_flat/flat_table_committed.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" +#include <ydb/core/base/row_version.h> #include <util/generic/hash.h> #include <util/generic/ptr.h> @@ -11,14 +12,147 @@ namespace NKikimr { namespace NTable { /** - * A simple copy-on-write data structure for a TxId -> TValue map + * An interface that maps TxId to commit RowVersion + */ + class ITransactionMap : public TThrRefBase { + public: + /** + * Returns a pointer to a stored row version for a given txId, or nullptr when it's missing + */ + virtual const TRowVersion* Find(ui64 txId) const = 0; + }; + + /** + * Smart pointer that can safely be used to call methods even when it's nullptr + */ + class ITransactionMapPtr : public TIntrusivePtr<ITransactionMap> { + public: + using TIntrusivePtr::TIntrusivePtr; + + const TRowVersion* Find(ui64 txId) const { + if (ITransactionMap* p = Get()) { + return p->Find(txId); + } + return nullptr; + } + }; + + /** + * A simple pointer wrapper that may be useful as a function argument on a hot path + * + * Unlike a smart pointer it has no destructor and doesn't perform any Ref/UnRef + */ + class ITransactionMapSimplePtr { + public: + ITransactionMapSimplePtr(const ITransactionMapPtr& ptr) + : Ptr(ptr.Get()) + { } + + ITransactionMapSimplePtr(ITransactionMap* ptr) + : Ptr(ptr) + { } + + ITransactionMapSimplePtr(std::nullptr_t) + : Ptr(nullptr) + { } + + explicit operator bool() const { + return bool(Ptr); + } + + const TRowVersion* Find(ui64 txId) const { + if (Ptr) { + return Ptr->Find(txId); + } + return nullptr; + } + + private: + ITransactionMap* Ptr; + }; + + /** + * A special implementation of transaction map with a single transaction + */ + class TSingleTransactionMap final : public ITransactionMap { + public: + TSingleTransactionMap(ui64 txId, TRowVersion value) + : TxId(txId) + , Value(std::move(value)) + { } + + const TRowVersion* Find(ui64 txId) const override { + if (TxId == txId) { + return &Value; + } + return nullptr; + } + + static ITransactionMapPtr Create(ui64 txId, TRowVersion value) { + return new TSingleTransactionMap(txId, value); + } + + private: + ui64 TxId; + TRowVersion Value; + }; + + /** + * A special implementation of transaction map that merges two non-empty maps + */ + class TMergedTransactionMap final : public ITransactionMap { + private: + TMergedTransactionMap( + ITransactionMapPtr first, + ITransactionMapPtr second) + : First(std::move(first)) + , Second(std::move(second)) + { } + + public: + const TRowVersion* Find(ui64 txId) const override { + if (const TRowVersion* value = First.Find(txId)) { + return value; + } + return Second.Find(txId); + } + + static ITransactionMapPtr Create( + const ITransactionMapPtr& first, + const ITransactionMapPtr& second) + { + if (first) { + if (second) { + return new TMergedTransactionMap(first, second); + } + return first; + } else { + return second; + } + } + + private: + ITransactionMapPtr First; + ITransactionMapPtr Second; + }; + + /** + * A simple copy-on-write data structure for a TxId -> RowVersion map + * + * Pretends to be an instance of ITransactionMapPtr */ - template<class TValue> class TTransactionMap { private: - using TTxMap = std::unordered_map<ui64, TValue>; + using TTxMap = std::unordered_map<ui64, TRowVersion>; - struct TState : public TThrRefBase, public TTxMap { + struct TState final : public ITransactionMap, public TTxMap { + const TRowVersion* Find(ui64 txId) const override { + auto it = this->find(txId); + if (it != this->end()) { + return &it->second; + } + return nullptr; + } }; public: @@ -31,16 +165,9 @@ namespace NTable { return State_ && !State_->empty(); } - bool Contains(ui64 txId) const { - return State_ && State_->contains(txId); - } - - const TValue* Find(ui64 txId) const { + const TRowVersion* Find(ui64 txId) const { if (State_) { - auto it = State_->find(txId); - if (it != State_->end()) { - return &it->second; - } + return State_->Find(txId); } return nullptr; } @@ -49,7 +176,7 @@ namespace NTable { State_.Reset(); } - void Add(ui64 txId, const TValue& value) { + void Add(ui64 txId, TRowVersion value) { Unshare()[txId] = value; } @@ -59,6 +186,31 @@ namespace NTable { } } + operator ITransactionMapPtr() const { + if (State_ && !State_->empty()) { + return State_; + } + return nullptr; + } + + operator ITransactionMapSimplePtr() const { + return static_cast<ITransactionMap*>(State_.Get()); + } + + ITransactionMap* Get() const { + return State_.Get(); + } + + ITransactionMap& operator*() const { + Y_VERIFY(State_); + return *State_; + } + + ITransactionMap* operator->() const { + Y_VERIFY(State_); + return State_.Get(); + } + public: const_iterator begin() const { if (State_) { diff --git a/ydb/core/tablet_flat/flat_table_subset.h b/ydb/core/tablet_flat/flat_table_subset.h index bffbf5a1271..0c7355abfa6 100644 --- a/ydb/core/tablet_flat/flat_table_subset.h +++ b/ydb/core/tablet_flat/flat_table_subset.h @@ -116,7 +116,7 @@ namespace NTable { TVector<TMemTableSnapshot> Frozen; TVector<TPartView> Flatten; TVector<TIntrusiveConstPtr<TColdPart>> ColdParts; - TTransactionMap<TRowVersion> CommittedTransactions; + TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; TVector<TIntrusiveConstPtr<TTxStatusPart>> TxStatus; }; diff --git a/ydb/core/tablet_flat/test/libs/table/misc.cpp b/ydb/core/tablet_flat/test/libs/table/misc.cpp index 1a2a678fccf..9e2ac54f789 100644 --- a/ydb/core/tablet_flat/test/libs/table/misc.cpp +++ b/ydb/core/tablet_flat/test/libs/table/misc.cpp @@ -30,7 +30,7 @@ TString PrintRowImpl(const TRemap& remap, const TIterator& it) state.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]); } - it.Apply(state, /* committed */ {}); + it.Apply(state, /* committed */ nullptr); { TStringStream ss; diff --git a/ydb/core/tablet_flat/test/libs/table/test_cooker.h b/ydb/core/tablet_flat/test/libs/table/test_cooker.h index d6a6426a97d..0844c0b6ba1 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_cooker.h +++ b/ydb/core/tablet_flat/test/libs/table/test_cooker.h @@ -46,7 +46,8 @@ namespace NTest { { auto pair = Tool.Split(tagged, true, rop != ERowOp::Erase); - return Table->Update(rop, pair.Key, pair.Ops, { }, /* TODO: rowVersion */ TRowVersion::Min(), /* committed */ {}), *this; + return Table->Update(rop, pair.Key, pair.Ops, { }, /* TODO: rowVersion */ TRowVersion::Min(), + /* committed */ nullptr), *this; } private: diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_part.h b/ydb/core/tablet_flat/test/libs/table/wrap_part.h index d7538b947ee..a1e9c9bb677 100644 --- a/ydb/core/tablet_flat/test/libs/table/wrap_part.h +++ b/ydb/core/tablet_flat/test/libs/table/wrap_part.h @@ -86,7 +86,7 @@ namespace NTest { EReady SkipToRowVersion(TRowVersion rowVersion) noexcept { - Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ {}); + Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ nullptr); if (Ready == EReady::Data) Ready = RollUp(); @@ -144,7 +144,7 @@ namespace NTest { for (auto &pin: Remap_.KeyPins()) State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]); - Iter->Apply(State, /* committed */ {}); + Iter->Apply(State, /* committed */ nullptr); return (NoBlobs = State.Need() > 0) ? EReady::Page : EReady::Data; } diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_warm.h b/ydb/core/tablet_flat/test/libs/table/wrap_warm.h index 6d747ecf7dd..4587dd42d41 100644 --- a/ydb/core/tablet_flat/test/libs/table/wrap_warm.h +++ b/ydb/core/tablet_flat/test/libs/table/wrap_warm.h @@ -82,7 +82,7 @@ namespace NTest { for (auto &pin: Remap_.KeyPins()) State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]); - Iter->Apply(State, /* committed */ {}); + Iter->Apply(State, /* committed */ nullptr); } return Iter->IsValid() ? EReady::Data : EReady::Gone; diff --git a/ydb/core/tablet_flat/test/tool/perf/do_mem.h b/ydb/core/tablet_flat/test/tool/perf/do_mem.h index 384290bd8d5..36e9b6b91c3 100644 --- a/ydb/core/tablet_flat/test/tool/perf/do_mem.h +++ b/ydb/core/tablet_flat/test/tool/perf/do_mem.h @@ -64,7 +64,7 @@ namespace NPerf { for (auto &pin: Remap.KeyPins()) State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]); - Iter->Apply(State, /* committed */ {}); + Iter->Apply(State, /* committed */ nullptr); aggr(State); } |