aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@gmail.com>2022-05-04 20:30:56 +0300
committerAlexey Borzenkov <snaury@gmail.com>2022-05-04 20:30:56 +0300
commit704f0b833b7e8ea1b65e8613e1fb114cf13c1b1a (patch)
tree802798bb4d86a58d0363953dd68a069c7122d135
parentd863245c76f2cd52a027484d16edfc084d01d57b (diff)
downloadydb-704f0b833b7e8ea1b65e8613e1fb114cf13c1b1a.tar.gz
Support per-query transaction maps in executor, KIKIMR-14732
ref:ff3e036a88690aaa8ddea4bd8dc665b7edeb78a4
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp42
-rw-r--r--ydb/core/tablet_flat/flat_database.h22
-rw-r--r--ydb/core/tablet_flat/flat_executor_misc.h2
-rw-r--r--ydb/core/tablet_flat/flat_executor_ut.cpp99
-rw-r--r--ydb/core/tablet_flat/flat_iterator.h8
-rw-r--r--ydb/core/tablet_flat/flat_mem_iter.h4
-rw-r--r--ydb/core/tablet_flat/flat_mem_warm.h2
-rw-r--r--ydb/core/tablet_flat/flat_part_iter_multi.h8
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp36
-rw-r--r--ydb/core/tablet_flat/flat_table.h13
-rw-r--r--ydb/core/tablet_flat/flat_table_committed.h180
-rw-r--r--ydb/core/tablet_flat/flat_table_subset.h2
-rw-r--r--ydb/core/tablet_flat/test/libs/table/misc.cpp2
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_cooker.h3
-rw-r--r--ydb/core/tablet_flat/test/libs/table/wrap_part.h4
-rw-r--r--ydb/core/tablet_flat/test/libs/table/wrap_warm.h2
-rw-r--r--ydb/core/tablet_flat/test/tool/perf/do_mem.h2
17 files changed, 361 insertions, 70 deletions
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 647e79645fc..71aaedc29dc 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -72,20 +72,24 @@ TAutoPtr<TTableIt> TDatabase::Iterate(ui32 table, TRawVals key, TTagsRef tags, E
return Require(table)->Iterate(key, tags, Env, seekBy(key, mode), TRowVersion::Max());
}
-TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableIt> TDatabase::IterateExact(ui32 table, TRawVals key, TTagsRef tags,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
IteratedTables.insert(table);
- auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot);
+ auto iter = Require(table)->Iterate(key, tags, Env, ESeek::Exact, snapshot, visible);
// N.B. ESeek::Exact produces iterators with limit=1
return iter;
}
-TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
@@ -93,7 +97,7 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T
ESeek seek = !range.MinKey || range.MinInclusive ? ESeek::Lower : ESeek::Upper;
- auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot);
+ auto iter = Require(table)->Iterate(range.MinKey, tags, Env, seek, snapshot, visible);
if (range.MaxKey) {
TCelled maxKey(range.MaxKey, *iter->Scheme->Keys, false);
@@ -108,7 +112,9 @@ TAutoPtr<TTableIt> TDatabase::IterateRange(ui32 table, const TKeyRange& range, T
return iter;
}
-TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
@@ -116,7 +122,7 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR
ESeek seek = !range.MaxKey || range.MaxInclusive ? ESeek::Lower : ESeek::Upper;
- auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot);
+ auto iter = Require(table)->IterateReverse(range.MaxKey, tags, Env, seek, snapshot, visible);
if (range.MinKey) {
TCelled minKey(range.MinKey, *iter->Scheme->Keys, false);
@@ -132,22 +138,28 @@ TAutoPtr<TTableReverseIt> TDatabase::IterateRangeReverse(ui32 table, const TKeyR
}
template<>
-TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableIt> TDatabase::IterateRangeGeneric<TTableIt>(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
- return IterateRange(table, range, tags, snapshot);
+ return IterateRange(table, range, tags, snapshot, visible);
}
template<>
-TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableReverseIt> TDatabase::IterateRangeGeneric<TTableReverseIt>(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
- return IterateRangeReverse(table, range, tags, snapshot);
+ return IterateRangeReverse(table, range, tags, snapshot, visible);
}
-EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg, TRowVersion snapshot) const noexcept
+EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, ui64 flg,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
TempIterators.clear();
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
- auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators);
+ auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible);
Change->Stats.SelectSieved += res.Sieved;
Change->Stats.SelectWeeded += res.Weeded;
Change->Stats.SelectNoKey += res.NoKey;
@@ -155,11 +167,13 @@ EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row
return res.Ready;
}
-EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg, TRowVersion snapshot) const noexcept
+EReady TDatabase::Select(ui32 table, TRawVals key, TTagsRef tags, TRowState &row, TSelectStats& stats, ui64 flg,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
TempIterators.clear();
Y_VERIFY(!NoMoreReadsFlag, "Trying to read after reads prohibited, table %u", table);
- auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators);
+ auto res = Require(table)->Select(key, tags, Env, row, flg, snapshot, TempIterators, visible);
Change->Stats.SelectSieved += res.Sieved;
Change->Stats.SelectWeeded += res.Weeded;
Change->Stats.SelectNoKey += res.NoKey;
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 98baa8f4383..f29740158c6 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -68,19 +68,29 @@ public:
/*_ Call Next() before accessing each row including the 1st row. */
TAutoPtr<TTableIt> Iterate(ui32 table, TRawVals key, TTagsRef tags, ELookup) const noexcept;
- TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
- TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
- TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
+ TAutoPtr<TTableIt> IterateExact(ui32 table, TRawVals key, TTagsRef tags,
+ TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
+ TAutoPtr<TTableIt> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
+ TAutoPtr<TTableReverseIt> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
template<class TIteratorType>
- TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
+ TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags,
+ TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
// NOTE: the row refeneces data in some internal buffers that get invalidated on the next Select() or Commit() call
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row,
- ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
+ ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, TSelectStats& stats,
- ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max()) const noexcept;
+ ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
bool Precharge(ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit,
diff --git a/ydb/core/tablet_flat/flat_executor_misc.h b/ydb/core/tablet_flat/flat_executor_misc.h
index a1382963bfc..4f925840781 100644
--- a/ydb/core/tablet_flat/flat_executor_misc.h
+++ b/ydb/core/tablet_flat/flat_executor_misc.h
@@ -35,7 +35,7 @@ namespace NTabletFlatExecutor {
NTable::TRowVersionRanges::TSnapshot RemovedRowVersions;
// Non-empty when compaction also needs to write a tx status table part
- NTable::TTransactionMap<TRowVersion> CommittedTransactions;
+ NTable::TTransactionMap CommittedTransactions;
NTable::TTransactionSet RemovedTransactions;
// The above may contain extra keys, these allow them to be narrowed
TVector<TIntrusiveConstPtr<NTable::TMemTable>> Frozen;
diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp
index 419167facf2..d3f008825f0 100644
--- a/ydb/core/tablet_flat/flat_executor_ut.cpp
+++ b/ydb/core/tablet_flat/flat_executor_ut.cpp
@@ -4091,6 +4091,58 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTx) {
}
};
+ struct TTxCheckRowsReadTx : public ITransaction {
+ TString& Data;
+ const ui64 TxId;
+
+ TTxCheckRowsReadTx(TString& data, ui64 txId)
+ : Data(data)
+ , TxId(txId)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TStringBuilder builder;
+
+ TVector<NTable::TTag> tags;
+ tags.push_back(KeyColumnId);
+ tags.push_back(ValueColumnId);
+ tags.push_back(Value2ColumnId);
+
+ NTable::EReady ready;
+ auto it = txc.DB.IterateRange(TableId, { }, tags,
+ TRowVersion::Max(),
+ MakeIntrusive<NTable::TSingleTransactionMap>(TxId, TRowVersion::Min()));
+
+ while ((ready = it->Next(NTable::ENext::All)) != NTable::EReady::Gone) {
+ if (ready == NTable::EReady::Page) {
+ return false;
+ }
+
+ const auto& row = it->Row();
+
+ TString key;
+ DbgPrintValue(key, row.Get(0), NScheme::TUint64::TypeId);
+
+ TString value;
+ DbgPrintValue(value, row.Get(1), NScheme::TString::TypeId);
+
+ TString value2;
+ DbgPrintValue(value2, row.Get(2), NScheme::TString::TypeId);
+
+ builder << "Key " << key << " = " << row.GetRowState()
+ << " value = " << NTable::ECellOp(row.GetCellOp(1)) << " " << value
+ << " value2 = " << NTable::ECellOp(row.GetCellOp(2)) << " " << value2 << Endl;
+ }
+
+ Data = builder;
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+ };
+
struct TTxCheckRowsUncommitted : public ITransaction {
TString& Data;
@@ -4620,6 +4672,53 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTx) {
}
}
+ Y_UNIT_TEST(MemTableLongTxRead) {
+ TMyEnvBase env;
+
+ //env->SetLogPriority(NKikimrServices::RESOURCE_BROKER, NActors::NLog::PRI_DEBUG);
+ //env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+
+ env.FireDummyTablet(ui32(NFake::TDummy::EFlg::Comp));
+
+ env.SendSync(new NFake::TEvExecute{ new TTxInitSchema });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(1, "foo") });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(2, "bar") });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<ValueColumnId>(3, "abc", 123) });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(1, "def", 123) });
+ env.SendSync(new NFake::TEvExecute{ new TTxWriteRow<Value2ColumnId>(2, "ghi", 123) });
+
+ // We should see our own uncommitted changes
+ {
+ TString data;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRowsReadTx(data, 123) });
+ UNIT_ASSERT_VALUES_EQUAL(data,
+ "Key 1 = Upsert value = Set foo value2 = Set def\n"
+ "Key 2 = Upsert value = Empty NULL value2 = Set ghi\n"
+ "Key 3 = Upsert value = Set abc value2 = Empty NULL\n");
+ }
+
+ // Others shouldn't see these changes yet
+ {
+ TString data;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) });
+ UNIT_ASSERT_VALUES_EQUAL(data,
+ "Key 1 = Upsert value = Set foo value2 = Empty NULL\n"
+ "Key 2 = Upsert value = Empty NULL value2 = Set bar\n");
+ }
+
+ env.SendSync(new NFake::TEvExecute{ new TTxCommitLongTx(123) });
+
+ // Once committed everyone will see these changes
+ {
+ TString data;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows(data) });
+ UNIT_ASSERT_VALUES_EQUAL(data,
+ "Key 1 = Upsert value = Set foo value2 = Set def\n"
+ "Key 2 = Upsert value = Empty NULL value2 = Set ghi\n"
+ "Key 3 = Upsert value = Set abc value2 = Empty NULL\n");
+ }
+ }
+
} // Y_UNIT_TEST_SUITE(TFlatTableLongTx)
Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs) {
diff --git a/ydb/core/tablet_flat/flat_iterator.h b/ydb/core/tablet_flat/flat_iterator.h
index edc4bc2eded..e041b397a49 100644
--- a/ydb/core/tablet_flat/flat_iterator.h
+++ b/ydb/core/tablet_flat/flat_iterator.h
@@ -238,7 +238,7 @@ public:
TTableItBase(
const TRowScheme* scheme, TTagsRef tags, ui64 lim = Max<ui64>(),
TRowVersion snapshot = TRowVersion::Max(),
- const NTable::TTransactionMap<TRowVersion>& committedTransactions = {});
+ NTable::ITransactionMapPtr committedTransactions = nullptr);
~TTableItBase();
@@ -340,7 +340,7 @@ private:
const TRowVersion SnapshotVersion;
// A map of currently committed transactions to corresponding row versions
- const NTable::TTransactionMap<TRowVersion> CommittedTransactions;
+ const NTable::ITransactionMapPtr CommittedTransactions;
EStage Stage = EStage::Seek;
EReady Ready = EReady::Gone;
@@ -458,13 +458,13 @@ template<class TIteratorOps>
inline TTableItBase<TIteratorOps>::TTableItBase(
const TRowScheme* scheme, TTagsRef tags, ui64 limit,
TRowVersion snapshot,
- const NTable::TTransactionMap<TRowVersion>& committedTransactions)
+ NTable::ITransactionMapPtr committedTransactions)
: Scheme(scheme)
, Remap(*Scheme, tags)
, Limit(limit)
, State(Remap.Size())
, SnapshotVersion(snapshot)
- , CommittedTransactions(committedTransactions)
+ , CommittedTransactions(std::move(committedTransactions))
, Comparator(Scheme->Keys->Types)
, Active(Iterators.end())
, Inactive(Iterators.end())
diff --git a/ydb/core/tablet_flat/flat_mem_iter.h b/ydb/core/tablet_flat/flat_mem_iter.h
index 8eb790fcbdc..462416e7430 100644
--- a/ydb/core/tablet_flat/flat_mem_iter.h
+++ b/ydb/core/tablet_flat/flat_mem_iter.h
@@ -183,7 +183,7 @@ namespace NTable {
return bool(CurrentVersion);
}
- void Apply(TRowState& row, const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept
+ void Apply(TRowState& row, NTable::ITransactionMapSimplePtr committedTransactions) const noexcept
{
Y_VERIFY(row.Size() == Remap->Size(), "row state doesn't match the remap index");
@@ -225,7 +225,7 @@ namespace NTable {
* Returns false if there is no such version, e.g. current key did not
* exist or didn't have any known updates at this rowVersion.
*/
- bool SkipToRowVersion(TRowVersion rowVersion, const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept
+ bool SkipToRowVersion(TRowVersion rowVersion, NTable::ITransactionMapSimplePtr committedTransactions) noexcept
{
Y_VERIFY_DEBUG(IsValid(), "Attempt to access an invalid row");
diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h
index 83b744b113f..b3e5ef95796 100644
--- a/ydb/core/tablet_flat/flat_mem_warm.h
+++ b/ydb/core/tablet_flat/flat_mem_warm.h
@@ -132,7 +132,7 @@ namespace NMem {
{}
void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<TMemGlob> pages, TRowVersion rowVersion,
- const NTable::TTransactionMap<TRowVersion>& committed)
+ NTable::ITransactionMapSimplePtr committed)
{
Y_VERIFY_DEBUG(
rop == ERowOp::Upsert || rop == ERowOp::Erase || rop == ERowOp::Reset,
diff --git a/ydb/core/tablet_flat/flat_part_iter_multi.h b/ydb/core/tablet_flat/flat_part_iter_multi.h
index 0fb46c3040a..4ca3aef2734 100644
--- a/ydb/core/tablet_flat/flat_part_iter_multi.h
+++ b/ydb/core/tablet_flat/flat_part_iter_multi.h
@@ -819,7 +819,7 @@ namespace NTable {
}
EReady SkipToRowVersion(TRowVersion rowVersion,
- const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept
+ NTable::ITransactionMapSimplePtr committedTransactions) noexcept
{
Y_VERIFY_DEBUG(Main.IsValid(), "Attempt to use an invalid iterator");
@@ -1013,7 +1013,7 @@ namespace NTable {
}
void Apply(TRowState& row,
- const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept
+ NTable::ITransactionMapSimplePtr committedTransactions) const noexcept
{
Y_VERIFY_DEBUG(IsValid(), "Attempt to apply an invalid row");
@@ -1481,7 +1481,7 @@ namespace NTable {
}
void Apply(TRowState& row,
- const NTable::TTransactionMap<TRowVersion>& committedTransactions) const noexcept
+ NTable::ITransactionMapSimplePtr committedTransactions) const noexcept
{
Y_VERIFY_DEBUG(CurrentIt);
CurrentIt->Apply(row, committedTransactions);
@@ -1495,7 +1495,7 @@ namespace NTable {
EReady SkipToRowVersion(
TRowVersion rowVersion,
- const NTable::TTransactionMap<TRowVersion>& committedTransactions) noexcept
+ NTable::ITransactionMapSimplePtr committedTransactions) noexcept
{
Y_VERIFY_DEBUG(CurrentIt);
auto ready = CurrentIt->SkipToRowVersion(rowVersion, committedTransactions);
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index 11b3b90199b..78c8a4b5d0e 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -444,6 +444,8 @@ void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
auto res = TxStatus.emplace(txStatus->Label, txStatus);
Y_VERIFY(res.second, "Unexpected failure to add a new TTxStatusPart");
+
+ ErasedKeysCache.Reset();
}
void TTable::ProcessCheckTransactions() noexcept
@@ -679,14 +681,17 @@ TMemTable& TTable::MemTable()
*(Mutable ? Mutable : (Mutable = new TMemTable(Scheme, Epoch, Annexed)));
}
-TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts");
const TCelled key(key_, *Scheme->Keys, false);
const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>();
- TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot, CommittedTransactions));
+ TAutoPtr<TTableIt> dbIter(new TTableIt(Scheme.Get(), tags, limit, snapshot,
+ TMergedTransactionMap::Create(visible, CommittedTransactions)));
if (Mutable) {
dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward));
@@ -707,7 +712,7 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES
}
}
- if (EraseCacheEnabled) {
+ if (EraseCacheEnabled && !visible) {
if (!ErasedKeysCache) {
ErasedKeysCache = new TKeyRangeCache(*Scheme->Keys, EraseCacheConfig);
}
@@ -717,14 +722,17 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES
return dbIter;
}
-TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, TRowVersion snapshot) const noexcept
+TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot iterate with cold parts");
const TCelled key(key_, *Scheme->Keys, false);
const ui64 limit = seek == ESeek::Exact ? 1 : Max<ui64>();
- TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot, CommittedTransactions));
+ TAutoPtr<TTableReverseIt> dbIter(new TTableReverseIt(Scheme.Get(), tags, limit, snapshot,
+ TMergedTransactionMap::Create(visible, CommittedTransactions)));
if (Mutable) {
dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse));
@@ -745,7 +753,7 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I
}
}
- if (EraseCacheEnabled) {
+ if (EraseCacheEnabled && !visible) {
if (!ErasedKeysCache) {
ErasedKeysCache = new TKeyRangeCache(*Scheme->Keys, EraseCacheConfig);
}
@@ -757,7 +765,8 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I
TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row,
ui64 flg, TRowVersion snapshot,
- TDeque<TPartSimpleIt>& tempIterators) const noexcept
+ TDeque<TPartSimpleIt>& tempIterators,
+ const ITransactionMapPtr& visible) const noexcept
{
Y_VERIFY(ColdParts.empty(), "Cannot select with cold parts");
Y_VERIFY(key_.size() == Scheme->Keys->Types.size());
@@ -778,15 +787,16 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
TEpoch lastEpoch = TEpoch::Max();
bool snapshotFound = (snapshot == TRowVersion::Max());
+ auto committed = TMergedTransactionMap::Create(visible, CommittedTransactions);
// Mutable has the newest data
if (Mutable) {
lastEpoch = Mutable->Epoch;
if (auto it = TMemIt::Make(*Mutable, Mutable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) {
- if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, CommittedTransactions))) {
+ if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) {
// N.B. stop looking for snapshot after the first hit
snapshotFound = true;
- it->Apply(row, CommittedTransactions);
+ it->Apply(row, committed);
}
result.Invisible += it->InvisibleRowSkips;
}
@@ -798,10 +808,10 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
Y_VERIFY(lastEpoch > memTable->Epoch, "Ordering of epochs is incorrect");
lastEpoch = memTable->Epoch;
if (auto it = TMemIt::Make(*memTable, memTable->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) {
- if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, CommittedTransactions))) {
+ if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, committed))) {
// N.B. stop looking for snapshot after the first hit
snapshotFound = true;
- it->Apply(row, CommittedTransactions);
+ it->Apply(row, committed);
}
result.Invisible += it->InvisibleRowSkips;
}
@@ -825,7 +835,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
Y_VERIFY(lastEpoch > part->Epoch, "Ordering of epochs is incorrect");
lastEpoch = part->Epoch;
if (!snapshotFound) {
- res = it.SkipToRowVersion(snapshot, CommittedTransactions);
+ res = it.SkipToRowVersion(snapshot, committed);
result.Invisible += std::exchange(it.InvisibleRowSkips, 0);
if (res == EReady::Data) {
// N.B. stop looking for snapshot after the first hit
@@ -835,7 +845,7 @@ TTable::TReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowSta
}
if (ready = ready && bool(res)) {
if (res == EReady::Data) {
- it.Apply(row, CommittedTransactions);
+ it.Apply(row, committed);
if (row.IsFinalized()) {
break;
}
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index a1fdeb97093..1b8af67fe74 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -126,10 +126,15 @@ public:
TVector<TIntrusiveConstPtr<TMemTable>> GetMemTables() const noexcept;
- TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot) const noexcept;
- TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek, TRowVersion snapshot) const noexcept;
+ TAutoPtr<TTableIt> Iterate(TRawVals key, TTagsRef tags, IPages* env, ESeek,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
+ TAutoPtr<TTableReverseIt> IterateReverse(TRawVals key, TTagsRef tags, IPages* env, ESeek,
+ TRowVersion snapshot,
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
TReady Select(TRawVals key, TTagsRef tags, IPages* env, TRowState& row,
- ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators) const noexcept;
+ ui64 flg, TRowVersion snapshot, TDeque<TPartSimpleIt>& tempIterators,
+ const ITransactionMapPtr& visible = nullptr) const noexcept;
TReady Precharge(TRawVals minKey, TRawVals maxKey, TTagsRef tags,
IPages* env, ui64 flg,
@@ -301,7 +306,7 @@ private:
THashSet<ui64> CheckTransactions;
THashMap<ui64, TOpenTransaction> OpenTransactions;
- TTransactionMap<TRowVersion> CommittedTransactions;
+ TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
};
diff --git a/ydb/core/tablet_flat/flat_table_committed.h b/ydb/core/tablet_flat/flat_table_committed.h
index 222b140cc8d..299842e63aa 100644
--- a/ydb/core/tablet_flat/flat_table_committed.h
+++ b/ydb/core/tablet_flat/flat_table_committed.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
+#include <ydb/core/base/row_version.h>
#include <util/generic/hash.h>
#include <util/generic/ptr.h>
@@ -11,14 +12,147 @@ namespace NKikimr {
namespace NTable {
/**
- * A simple copy-on-write data structure for a TxId -> TValue map
+ * An interface that maps TxId to commit RowVersion
+ */
+ class ITransactionMap : public TThrRefBase {
+ public:
+ /**
+ * Returns a pointer to a stored row version for a given txId, or nullptr when it's missing
+ */
+ virtual const TRowVersion* Find(ui64 txId) const = 0;
+ };
+
+ /**
+ * Smart pointer that can safely be used to call methods even when it's nullptr
+ */
+ class ITransactionMapPtr : public TIntrusivePtr<ITransactionMap> {
+ public:
+ using TIntrusivePtr::TIntrusivePtr;
+
+ const TRowVersion* Find(ui64 txId) const {
+ if (ITransactionMap* p = Get()) {
+ return p->Find(txId);
+ }
+ return nullptr;
+ }
+ };
+
+ /**
+ * A simple pointer wrapper that may be useful as a function argument on a hot path
+ *
+ * Unlike a smart pointer it has no destructor and doesn't perform any Ref/UnRef
+ */
+ class ITransactionMapSimplePtr {
+ public:
+ ITransactionMapSimplePtr(const ITransactionMapPtr& ptr)
+ : Ptr(ptr.Get())
+ { }
+
+ ITransactionMapSimplePtr(ITransactionMap* ptr)
+ : Ptr(ptr)
+ { }
+
+ ITransactionMapSimplePtr(std::nullptr_t)
+ : Ptr(nullptr)
+ { }
+
+ explicit operator bool() const {
+ return bool(Ptr);
+ }
+
+ const TRowVersion* Find(ui64 txId) const {
+ if (Ptr) {
+ return Ptr->Find(txId);
+ }
+ return nullptr;
+ }
+
+ private:
+ ITransactionMap* Ptr;
+ };
+
+ /**
+ * A special implementation of transaction map with a single transaction
+ */
+ class TSingleTransactionMap final : public ITransactionMap {
+ public:
+ TSingleTransactionMap(ui64 txId, TRowVersion value)
+ : TxId(txId)
+ , Value(std::move(value))
+ { }
+
+ const TRowVersion* Find(ui64 txId) const override {
+ if (TxId == txId) {
+ return &Value;
+ }
+ return nullptr;
+ }
+
+ static ITransactionMapPtr Create(ui64 txId, TRowVersion value) {
+ return new TSingleTransactionMap(txId, value);
+ }
+
+ private:
+ ui64 TxId;
+ TRowVersion Value;
+ };
+
+ /**
+ * A special implementation of transaction map that merges two non-empty maps
+ */
+ class TMergedTransactionMap final : public ITransactionMap {
+ private:
+ TMergedTransactionMap(
+ ITransactionMapPtr first,
+ ITransactionMapPtr second)
+ : First(std::move(first))
+ , Second(std::move(second))
+ { }
+
+ public:
+ const TRowVersion* Find(ui64 txId) const override {
+ if (const TRowVersion* value = First.Find(txId)) {
+ return value;
+ }
+ return Second.Find(txId);
+ }
+
+ static ITransactionMapPtr Create(
+ const ITransactionMapPtr& first,
+ const ITransactionMapPtr& second)
+ {
+ if (first) {
+ if (second) {
+ return new TMergedTransactionMap(first, second);
+ }
+ return first;
+ } else {
+ return second;
+ }
+ }
+
+ private:
+ ITransactionMapPtr First;
+ ITransactionMapPtr Second;
+ };
+
+ /**
+ * A simple copy-on-write data structure for a TxId -> RowVersion map
+ *
+ * Pretends to be an instance of ITransactionMapPtr
*/
- template<class TValue>
class TTransactionMap {
private:
- using TTxMap = std::unordered_map<ui64, TValue>;
+ using TTxMap = std::unordered_map<ui64, TRowVersion>;
- struct TState : public TThrRefBase, public TTxMap {
+ struct TState final : public ITransactionMap, public TTxMap {
+ const TRowVersion* Find(ui64 txId) const override {
+ auto it = this->find(txId);
+ if (it != this->end()) {
+ return &it->second;
+ }
+ return nullptr;
+ }
};
public:
@@ -31,16 +165,9 @@ namespace NTable {
return State_ && !State_->empty();
}
- bool Contains(ui64 txId) const {
- return State_ && State_->contains(txId);
- }
-
- const TValue* Find(ui64 txId) const {
+ const TRowVersion* Find(ui64 txId) const {
if (State_) {
- auto it = State_->find(txId);
- if (it != State_->end()) {
- return &it->second;
- }
+ return State_->Find(txId);
}
return nullptr;
}
@@ -49,7 +176,7 @@ namespace NTable {
State_.Reset();
}
- void Add(ui64 txId, const TValue& value) {
+ void Add(ui64 txId, TRowVersion value) {
Unshare()[txId] = value;
}
@@ -59,6 +186,31 @@ namespace NTable {
}
}
+ operator ITransactionMapPtr() const {
+ if (State_ && !State_->empty()) {
+ return State_;
+ }
+ return nullptr;
+ }
+
+ operator ITransactionMapSimplePtr() const {
+ return static_cast<ITransactionMap*>(State_.Get());
+ }
+
+ ITransactionMap* Get() const {
+ return State_.Get();
+ }
+
+ ITransactionMap& operator*() const {
+ Y_VERIFY(State_);
+ return *State_;
+ }
+
+ ITransactionMap* operator->() const {
+ Y_VERIFY(State_);
+ return State_.Get();
+ }
+
public:
const_iterator begin() const {
if (State_) {
diff --git a/ydb/core/tablet_flat/flat_table_subset.h b/ydb/core/tablet_flat/flat_table_subset.h
index bffbf5a1271..0c7355abfa6 100644
--- a/ydb/core/tablet_flat/flat_table_subset.h
+++ b/ydb/core/tablet_flat/flat_table_subset.h
@@ -116,7 +116,7 @@ namespace NTable {
TVector<TMemTableSnapshot> Frozen;
TVector<TPartView> Flatten;
TVector<TIntrusiveConstPtr<TColdPart>> ColdParts;
- TTransactionMap<TRowVersion> CommittedTransactions;
+ TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
TVector<TIntrusiveConstPtr<TTxStatusPart>> TxStatus;
};
diff --git a/ydb/core/tablet_flat/test/libs/table/misc.cpp b/ydb/core/tablet_flat/test/libs/table/misc.cpp
index 1a2a678fccf..9e2ac54f789 100644
--- a/ydb/core/tablet_flat/test/libs/table/misc.cpp
+++ b/ydb/core/tablet_flat/test/libs/table/misc.cpp
@@ -30,7 +30,7 @@ TString PrintRowImpl(const TRemap& remap, const TIterator& it)
state.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]);
}
- it.Apply(state, /* committed */ {});
+ it.Apply(state, /* committed */ nullptr);
{
TStringStream ss;
diff --git a/ydb/core/tablet_flat/test/libs/table/test_cooker.h b/ydb/core/tablet_flat/test/libs/table/test_cooker.h
index d6a6426a97d..0844c0b6ba1 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_cooker.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_cooker.h
@@ -46,7 +46,8 @@ namespace NTest {
{
auto pair = Tool.Split(tagged, true, rop != ERowOp::Erase);
- return Table->Update(rop, pair.Key, pair.Ops, { }, /* TODO: rowVersion */ TRowVersion::Min(), /* committed */ {}), *this;
+ return Table->Update(rop, pair.Key, pair.Ops, { }, /* TODO: rowVersion */ TRowVersion::Min(),
+ /* committed */ nullptr), *this;
}
private:
diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_part.h b/ydb/core/tablet_flat/test/libs/table/wrap_part.h
index d7538b947ee..a1e9c9bb677 100644
--- a/ydb/core/tablet_flat/test/libs/table/wrap_part.h
+++ b/ydb/core/tablet_flat/test/libs/table/wrap_part.h
@@ -86,7 +86,7 @@ namespace NTest {
EReady SkipToRowVersion(TRowVersion rowVersion) noexcept
{
- Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ {});
+ Ready = Iter->SkipToRowVersion(rowVersion, /* committed */ nullptr);
if (Ready == EReady::Data)
Ready = RollUp();
@@ -144,7 +144,7 @@ namespace NTest {
for (auto &pin: Remap_.KeyPins())
State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]);
- Iter->Apply(State, /* committed */ {});
+ Iter->Apply(State, /* committed */ nullptr);
return (NoBlobs = State.Need() > 0) ? EReady::Page : EReady::Data;
}
diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_warm.h b/ydb/core/tablet_flat/test/libs/table/wrap_warm.h
index 6d747ecf7dd..4587dd42d41 100644
--- a/ydb/core/tablet_flat/test/libs/table/wrap_warm.h
+++ b/ydb/core/tablet_flat/test/libs/table/wrap_warm.h
@@ -82,7 +82,7 @@ namespace NTest {
for (auto &pin: Remap_.KeyPins())
State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]);
- Iter->Apply(State, /* committed */ {});
+ Iter->Apply(State, /* committed */ nullptr);
}
return Iter->IsValid() ? EReady::Data : EReady::Gone;
diff --git a/ydb/core/tablet_flat/test/tool/perf/do_mem.h b/ydb/core/tablet_flat/test/tool/perf/do_mem.h
index 384290bd8d5..36e9b6b91c3 100644
--- a/ydb/core/tablet_flat/test/tool/perf/do_mem.h
+++ b/ydb/core/tablet_flat/test/tool/perf/do_mem.h
@@ -64,7 +64,7 @@ namespace NPerf {
for (auto &pin: Remap.KeyPins())
State.Set(pin.Pos, ECellOp::Set, key.Columns[pin.Key]);
- Iter->Apply(State, /* committed */ {});
+ Iter->Apply(State, /* committed */ nullptr);
aggr(State);
}