aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-08-10 17:00:32 +0300
committersnaury <snaury@ydb.tech>2022-08-10 17:00:32 +0300
commit42fb891327a4880587bd9617992bf0f75fcf0887 (patch)
tree76a5663f335eaa4d6ee72479acc9cb680b842a6f
parente3b22a667ccceb30e2952d1e7ddf1b43edf2ece8 (diff)
downloadydb-42fb891327a4880587bd9617992bf0f75fcf0887.tar.gz
Make local tx changes visible inside transaction,
-rw-r--r--ydb/core/engine/mkql_engine_flat.cpp12
-rw-r--r--ydb/core/mind/hive/tx__init_scheme.cpp3
-rw-r--r--ydb/core/sys_view/ut_kqp.cpp2
-rw-r--r--ydb/core/tablet_flat/CMakeLists.txt1
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp157
-rw-r--r--ydb/core/tablet_flat/flat_database.h9
-rw-r--r--ydb/core/tablet_flat/flat_dbase_annex.h10
-rw-r--r--ydb/core/tablet_flat/flat_dbase_apply.cpp224
-rw-r--r--ydb/core/tablet_flat/flat_dbase_apply.h41
-rw-r--r--ydb/core/tablet_flat/flat_dbase_change.h9
-rw-r--r--ydb/core/tablet_flat/flat_dbase_naked.h397
-rw-r--r--ydb/core/tablet_flat/flat_dbase_scheme.cpp73
-rw-r--r--ydb/core/tablet_flat/flat_dbase_scheme.h38
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp30
-rw-r--r--ydb/core/tablet_flat/flat_executor_tx_env.cpp25
-rw-r--r--ydb/core/tablet_flat/flat_executor_tx_env.h17
-rw-r--r--ydb/core/tablet_flat/flat_executor_ut.cpp130
-rw-r--r--ydb/core/tablet_flat/flat_mem_blobs.h31
-rw-r--r--ydb/core/tablet_flat/flat_mem_eggs.h13
-rw-r--r--ydb/core/tablet_flat/flat_mem_warm.cpp81
-rw-r--r--ydb/core/tablet_flat/flat_mem_warm.h208
-rw-r--r--ydb/core/tablet_flat/flat_redo_writer.h194
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp272
-rw-r--r--ydb/core/tablet_flat/flat_table.h98
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_dbase.h88
-rw-r--r--ydb/core/tablet_flat/test/libs/table/test_iter.h6
-rw-r--r--ydb/core/tablet_flat/test/libs/table/wrap_dbase.h10
-rw-r--r--ydb/core/tablet_flat/test/libs/table/wrap_select.h12
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tablet_flat/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tablet_flat/ut/ut_db_iface.cpp193
-rw-r--r--ydb/core/tablet_flat/ut/ut_redo.cpp2
-rw-r--r--ydb/core/tablet_flat/util_pool.h269
-rw-r--r--ydb/core/tablet_flat/util_pool_ut.cpp73
-rw-r--r--ydb/core/tablet_flat/util_store.h86
-rw-r--r--ydb/core/tx/datashard/datashard_ut_order.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_serverless.cpp2
37 files changed, 2403 insertions, 419 deletions
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp
index 3c972e6961..52a61a3eb2 100644
--- a/ydb/core/engine/mkql_engine_flat.cpp
+++ b/ydb/core/engine/mkql_engine_flat.cpp
@@ -1367,11 +1367,6 @@ public:
NUdf::TUnboxedValue replyValue = runValue.GetElement(0);
NUdf::TUnboxedValue writeValue = runValue.GetElement(1);
- TEngineFlatApplyContext applyCtx;
- applyCtx.Host = Settings.Host;
- applyCtx.Env = &Env;
- ApplyChanges(writeValue, applyCtx);
-
TCallableResults replyResults;
for (ui32 i = 0; i < replyStruct.GetValuesCount(); ++i) {
TRuntimeNode item = replyStruct.GetValue(i);
@@ -1387,6 +1382,13 @@ public:
}
auto replyStr = replyResults.ToString(holderFactory, Env);
+
+ // Note: we must apply side effects even if we reply with an error below
+ TEngineFlatApplyContext applyCtx;
+ applyCtx.Host = Settings.Host;
+ applyCtx.Env = &Env;
+ ApplyChanges(writeValue, applyCtx);
+
if (replyStr.size() > MaxDatashardReplySize) {
TString error = TStringBuilder() << "Datashard " << pgm.first
<< ": reply size limit exceeded. ("
diff --git a/ydb/core/mind/hive/tx__init_scheme.cpp b/ydb/core/mind/hive/tx__init_scheme.cpp
index b65451f94a..7548dc09ba 100644
--- a/ydb/core/mind/hive/tx__init_scheme.cpp
+++ b/ydb/core/mind/hive/tx__init_scheme.cpp
@@ -15,8 +15,9 @@ public:
bool Execute(TTransactionContext &txc, const TActorContext&) override {
BLOG_D("THive::TTxInitScheme::Execute");
+ bool wasEmpty = txc.DB.GetScheme().IsEmpty();
NIceDb::TNiceDb(txc.DB).Materialize<Schema>();
- if (!txc.DB.GetScheme().IsEmpty()) {
+ if (!wasEmpty) {
auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::State>().Key(TSchemeIds::State::DatabaseVersion).Select<Schema::State::Value>();
if (!row.IsReady())
return false;
diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp
index 7a44050218..cb99c617ec 100644
--- a/ydb/core/sys_view/ut_kqp.cpp
+++ b/ydb/core/sys_view/ut_kqp.cpp
@@ -745,7 +745,7 @@ Y_UNIT_TEST_SUITE(SystemView) {
check.Uint64GreaterOrEquals(nowUs); // AccessTime
check.DoubleGreaterOrEquals(0.0); // CPUCores
check.Uint64(1u); // CoordinatedTxCompleted
- check.Uint64(608u); // DataSize
+ check.Uint64(576u); // DataSize
check.Uint64(1u); // ImmediateTxCompleted
check.Uint64(0u); // IndexSize
check.Uint64(0u); // InFlightTxCount
diff --git a/ydb/core/tablet_flat/CMakeLists.txt b/ydb/core/tablet_flat/CMakeLists.txt
index 55728be780..c2688b43d1 100644
--- a/ydb/core/tablet_flat/CMakeLists.txt
+++ b/ydb/core/tablet_flat/CMakeLists.txt
@@ -57,6 +57,7 @@ target_sources(ydb-core-tablet_flat PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_bio_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_tx_env.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_txloglogic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_load_blob_queue.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_mem_warm.cpp
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 965463954c..bc106e823e 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -219,13 +219,43 @@ void TDatabase::Update(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUp
Y_FAIL("Key index %" PRISZT " validation failure: %s", index, error.c_str());
}
}
- for (size_t index = 0; index < ops.size(); ++index) {
- if (auto error = NScheme::HasUnexpectedValueSize(ops[index].Value)) {
- Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, ops[index].Tag, error.c_str());
+
+ NRedo::IAnnex* annex = Annex.Get();
+ NRedo::IAnnex::TLimit limit = annex->Limit(table);
+ limit.MinExternSize = Max(limit.MinExternSize, DatabaseImpl->AnnexByteLimit());
+
+ if (ModifiedRefs.size() < ops.size()) {
+ ModifiedRefs.resize(FastClp2(ops.size()));
+ }
+ ModifiedOps.assign(ops.begin(), ops.end());
+ for (size_t index = 0; index < ModifiedOps.size(); ++index) {
+ TUpdateOp& op = ModifiedOps[index];
+ if (auto error = NScheme::HasUnexpectedValueSize(op.Value)) {
+ Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, op.Tag, error.c_str());
+ }
+
+ if (op.Value.IsEmpty()) {
+ // Null values lose type id in redo log
+ op.Value = {};
+ // Null values must change ECellOp::Set to ECellOp::Null in redo log
+ op.Op = op.NormalizedCellOp();
+ }
+
+ Y_VERIFY(op.Op == ELargeObj::Inline, "User provided an invalid ECellOp");
+
+ TArrayRef<const char> raw = op.AsRef();
+ if (limit.IsExtern(raw.size())) {
+ if (auto got = annex->Place(table, op.Tag, raw)) {
+ ModifiedRefs[index] = got.Ref;
+ const auto payload = NUtil::NBin::ToRef(ModifiedRefs[index]);
+ op.Value = TRawTypeValue(payload, op.Value.Type());
+ op.Op = ELargeObj::Extern;
+ }
}
}
- Redo->EvUpdate(table, rop, key, ops, rowVersion);
+ Redo->EvUpdate(table, rop, key, ModifiedOps, rowVersion);
+ RequireForUpdate(table)->Update(rop, key, ModifiedOps, Annex->Current(), rowVersion);
}
void TDatabase::UpdateTx(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUpdateOp> ops, ui64 txId)
@@ -235,23 +265,41 @@ void TDatabase::UpdateTx(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const T
Y_FAIL("Key index %" PRISZT " validation failure: %s", index, error.c_str());
}
}
- for (size_t index = 0; index < ops.size(); ++index) {
- if (auto error = NScheme::HasUnexpectedValueSize(ops[index].Value)) {
- Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, ops[index].Tag, error.c_str());
+
+ ModifiedOps.assign(ops.begin(), ops.end());
+ for (size_t index = 0; index < ModifiedOps.size(); ++index) {
+ TUpdateOp& op = ModifiedOps[index];
+ if (auto error = NScheme::HasUnexpectedValueSize(op.Value)) {
+ Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, op.Tag, error.c_str());
+ }
+
+ if (op.Value.IsEmpty()) {
+ // Null values lose type id in redo log
+ op.Value = {};
+ // Null values must change ECellOp::Set to ECellOp::Null in redo log
+ op.Op = op.NormalizedCellOp();
}
+
+ Y_VERIFY(op.Op == ELargeObj::Inline, "User provided an invalid ECellOp");
+
+ // FIXME: we cannot handle blob references during scans, so we
+ // avoid creating large objects when they are in deltas
}
- Redo->EvUpdateTx(table, rop, key, ops, txId);
+ Redo->EvUpdateTx(table, rop, key, ModifiedOps, txId);
+ RequireForUpdate(table)->UpdateTx(rop, key, ModifiedOps, Annex->Current(), txId);
}
void TDatabase::RemoveTx(ui32 table, ui64 txId)
{
Redo->EvRemoveTx(table, txId);
+ RequireForUpdate(table)->RemoveTx(txId);
}
void TDatabase::CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion)
{
Redo->EvCommitTx(table, txId, rowVersion);
+ RequireForUpdate(table)->CommitTx(txId, rowVersion);
}
bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const
@@ -259,6 +307,16 @@ bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const
return Require(table)->HasOpenTx(txId);
}
+bool TDatabase::HasCommittedTx(ui32 table, ui64 txId) const
+{
+ return Require(table)->HasCommittedTx(txId);
+}
+
+bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const
+{
+ return Require(table)->HasRemovedTx(txId);
+}
+
void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper)
{
if (Y_LIKELY(lower < upper)) {
@@ -285,8 +343,9 @@ void TDatabase::Begin(TTxStamp stamp, IPages& env)
Y_VERIFY(!Redo, "Transaction already in progress");
Y_VERIFY(!Env);
Annex = new TAnnex(*DatabaseImpl->Scheme);
- Redo = new NRedo::TWriter{ Annex.Get(), DatabaseImpl->AnnexByteLimit() };
- Change = MakeHolder<TChange>(Stamp = stamp, DatabaseImpl->Serial() + 1);
+ Redo = new NRedo::TWriter;
+ DatabaseImpl->BeginTransaction();
+ Change = MakeHolder<TChange>(stamp, DatabaseImpl->Serial());
Env = &env;
NoMoreReadsFlag = false;
}
@@ -355,28 +414,34 @@ TDatabase::TChg TDatabase::Head(ui32 table) const noexcept
} else {
auto &wrap = DatabaseImpl->Get(table, true);
- return { wrap.Serial, wrap->Head() };
+ // We return numbers as they have been at the start of transaction,
+ // but possibly after schema changes or memtable flushes.
+ auto serial = wrap.DataModified && !wrap.EpochSnapshot ? wrap.SerialBackup : wrap.Serial;
+ auto head = wrap.EpochSnapshot ? *wrap.EpochSnapshot : wrap->Head();
+ return { serial, head };
}
}
TString TDatabase::SnapshotToLog(ui32 table, TTxStamp stamp)
{
+ Y_VERIFY(!Redo, "Cannot SnapshotToLog inside a transaction");
+
auto scn = DatabaseImpl->Serial() + 1;
auto epoch = DatabaseImpl->Get(table, true)->Snapshot();
DatabaseImpl->Rewind(scn);
- return
- NRedo::TWriter{ }
- .EvBegin(ui32(ECompatibility::Head), ui32(ECompatibility::Edge), scn, stamp)
- .EvFlush(table, stamp, epoch).Dump();
+ NRedo::TWriter writer;
+ writer.EvBegin(ui32(ECompatibility::Head), ui32(ECompatibility::Edge), scn, stamp);
+ writer.EvFlush(table, stamp, epoch);
+ return std::move(writer).Finish();
}
-ui32 TDatabase::TxSnapTable(ui32 table)
+TEpoch TDatabase::TxSnapTable(ui32 table)
{
- Require(table);
- Change->Snapshots.emplace_back(table);
- return Change->Snapshots.size() - 1;
+ Y_VERIFY(Redo, "Cannot TxSnapTable outside a transaction");
+ ++Change->Snapshots;
+ return DatabaseImpl->FlushTable(table);
}
TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const
@@ -452,9 +517,8 @@ void TDatabase::Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart> txStatus)
TAlter& TDatabase::Alter()
{
Y_VERIFY(Redo, "Scheme change must be done within a transaction");
- Y_VERIFY(!*Redo, "Scheme change must be done before any data updates");
- return *(Alter_ ? Alter_ : (Alter_ = new TAlter()));
+ return *(Alter_ ? Alter_ : (Alter_ = new TAlter(DatabaseImpl.Get())));
}
void TDatabase::DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const {
@@ -510,15 +574,15 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
IteratedTables.clear();
}
- if (commit && (*Redo || Alter_ || Change->Snapshots || Change->RemovedRowVersions)) {
+ if (commit && (*Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions)) {
Y_VERIFY(stamp >= Change->Stamp);
+ Y_VERIFY(DatabaseImpl->Serial() == Change->Serial);
- /* TODO: Temporary hack fot getting correct Stamp and Serial state
- against invocation of SnapshotToLog() between Begin(...) and
- Commit(...). Read KIKIMR-5366 for details and progress. */
-
+ // FIXME: Temporary hack for using up to date change stamp when scan
+ // is queued inside a transaction. In practice we just need to
+ // stop using empty commits for scans. See KIKIMR-5366 for
+ // details.
const_cast<TTxStamp&>(Change->Stamp) = stamp;
- const_cast<ui64&>(Change->Serial) = DatabaseImpl->Serial() + 1;
NRedo::TWriter prefix{ };
@@ -526,12 +590,13 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
const ui32 head = ui32(ECompatibility::Head);
const ui32 edge = ui32(ECompatibility::Edge);
- prefix.EvBegin(head, edge, Change->Serial, Change->Stamp);
+ prefix.EvBegin(head, edge, Change->Serial, Change->Stamp);
}
const auto offset = prefix.Bytes(); /* useful payload starts here */
- if (auto annex = Annex->Unwrap()) {
+ auto annex = Annex->Unwrap();
+ if (annex) {
Y_VERIFY(cookieAllocator, "Have to provide TCookieAllocator with enabled annex");
TVector<NPageCollection::TGlobId> blobs;
@@ -547,29 +612,18 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
}
prefix.EvAnnex(blobs);
- DatabaseImpl->Assign(std::move(annex));
}
- DatabaseImpl->Switch(Stamp);
+ DatabaseImpl->CommitTransaction(Change->Stamp, annex, prefix);
- if (Alter_) {
+ if (Alter_ && *Alter_) {
auto delta = Alter_->Flush();
-
- if (DatabaseImpl->Apply(*delta, &prefix))
- Y_PROTOBUF_SUPPRESS_NODISCARD delta->SerializeToString(&Change->Scheme);
+ Y_PROTOBUF_SUPPRESS_NODISCARD delta->SerializeToString(&Change->Scheme);
}
- for (auto &one: Change->Snapshots) {
- one.Epoch = Require(one.Table)->Snapshot();
- prefix.EvFlush(one.Table, Stamp, one.Epoch);
- }
-
- prefix.Join(*Redo);
+ prefix.Join(std::move(*Redo));
- Change->Redo = prefix.Dump();
-
- for (auto &entry: prefix.Unwrap())
- DatabaseImpl->ApplyRedo(entry);
+ Change->Redo = std::move(prefix).Finish();
for (const auto& xpair : Change->RemovedRowVersions) {
if (auto& wrap = DatabaseImpl->Get(xpair.first, false)) {
@@ -582,7 +636,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
Change->Garbage = std::move(DatabaseImpl->Garbage);
Change->Deleted = std::move(DatabaseImpl->Deleted);
Change->Affects = DatabaseImpl->GrabAffects();
- Change->Annex = DatabaseImpl->GrabAnnex();
+ Change->Annex = std::move(annex);
if (Change->Redo.size() == offset && !Change->Affects) {
std::exchange(Change->Redo, { }); /* omit complete NOOP redo */
@@ -592,13 +646,15 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator
Y_Fail(
NFmt::Do(*Change) << " produced " << (Change->Redo.size() - offset)
<< "b of non technical redo without leaving effects on data");
- } else if (Change->Serial != DatabaseImpl->Serial()) {
+ } else if (Change->Redo && Change->Serial != DatabaseImpl->Serial()) {
Y_Fail(
NFmt::Do(*Change) << " serial diverged from current db "
<< DatabaseImpl->Serial() << " after rolling up redo log");
} else if (Change->Deleted.size() != Change->Garbage.size()) {
Y_Fail(NFmt::Do(*Change) << " has inconsistent garbage data");
}
+ } else {
+ DatabaseImpl->RollbackTransaction();
}
Redo = nullptr;
@@ -614,6 +670,11 @@ TTable* TDatabase::Require(ui32 table) const noexcept
return DatabaseImpl->Get(table, true).Self.Get();
}
+TTable* TDatabase::RequireForUpdate(ui32 table) const noexcept
+{
+ return DatabaseImpl->GetForUpdate(table).Self.Get();
+}
+
TGarbage TDatabase::RollUp(TTxStamp stamp, TArrayRef<const char> delta, TArrayRef<const char> redo,
TMemGlobs annex)
{
@@ -626,7 +687,7 @@ TGarbage TDatabase::RollUp(TTxStamp stamp, TArrayRef<const char> delta, TArrayRe
bool parseOk = ParseFromStringNoSizeLimit(changes, delta);
Y_VERIFY(parseOk);
- DatabaseImpl->Apply(changes, nullptr);
+ DatabaseImpl->ApplySchema(changes);
}
if (redo) {
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 18fdd12d84..06740064e3 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -118,6 +118,8 @@ public:
* Returns true when table has an open transaction that is not committed or removed yet
*/
bool HasOpenTx(ui32 table, ui64 txId) const;
+ bool HasCommittedTx(ui32 table, ui64 txId) const;
+ bool HasRemovedTx(ui32 table, ui64 txId) const;
/**
* Remove row versions [lower, upper) from the given table
@@ -139,7 +141,7 @@ public:
TAlter& Alter(); /* Begin DDL ALTER script */
- ui32 TxSnapTable(ui32 table);
+ TEpoch TxSnapTable(ui32 table);
const TScheme& GetScheme() const noexcept;
@@ -194,11 +196,11 @@ public:
private:
TTable* Require(ui32 tableId) const noexcept;
+ TTable* RequireForUpdate(ui32 tableId) const noexcept;
private:
const THolder<TDatabaseImpl> DatabaseImpl;
- ui64 Stamp = Max<ui64>();
bool NoMoreReadsFlag;
IPages* Env = nullptr;
THolder<TChange> Change;
@@ -206,6 +208,9 @@ private:
TAutoPtr<TAnnex> Annex;
TAutoPtr<NRedo::TWriter> Redo;
+ TVector<ui32> ModifiedRefs;
+ TVector<TUpdateOp> ModifiedOps;
+
mutable TDeque<TPartSimpleIt> TempIterators; // Keeps the last result of Select() valid
mutable THashSet<ui32> IteratedTables;
};
diff --git a/ydb/core/tablet_flat/flat_dbase_annex.h b/ydb/core/tablet_flat/flat_dbase_annex.h
index 1a52eddca9..492e78c74d 100644
--- a/ydb/core/tablet_flat/flat_dbase_annex.h
+++ b/ydb/core/tablet_flat/flat_dbase_annex.h
@@ -25,6 +25,11 @@ namespace NTable {
return bool(Blobs);
}
+ TArrayRef<const NPageCollection::TMemGlob> Current() const noexcept
+ {
+ return Blobs;
+ }
+
private:
TLimit Limit(ui32 table) noexcept override
{
@@ -41,11 +46,12 @@ namespace NTable {
auto blob = NPage::TLabelWrapper::Wrap(data, EPage::Opaque, 0);
- const TLogoBlobID fake(0, 0, 0, Room->Blobs, blob.size(), 0);
+ const ui32 ref = Blobs.size();
+ const TLogoBlobID fake(0, 0, 0, Room->Blobs, blob.size(), ref);
Blobs.emplace_back(TGlobId{ fake, 0 }, std::move(blob));
- return Blobs.size() - 1;
+ return ref;
}
bool Lookup(ui32 table) noexcept
diff --git a/ydb/core/tablet_flat/flat_dbase_apply.cpp b/ydb/core/tablet_flat/flat_dbase_apply.cpp
index f87b315d33..714fc05edc 100644
--- a/ydb/core/tablet_flat/flat_dbase_apply.cpp
+++ b/ydb/core/tablet_flat/flat_dbase_apply.cpp
@@ -6,8 +6,9 @@ namespace NKikimr {
namespace NTable {
-TSchemeModifier::TSchemeModifier(TScheme &scheme)
+TSchemeModifier::TSchemeModifier(TScheme &scheme, TSchemeRollbackState *rollbackState)
: Scheme(scheme)
+ , RollbackState(rollbackState)
{
}
@@ -38,14 +39,24 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta)
} else if (action == TAlterRecord::AddColumnToKey) {
changes = AddColumnToKey(table, delta.GetColumnId());
} else if (action == TAlterRecord::AddFamily) {
- auto &family = Table(table)->Families[delta.GetFamilyId()];
+ auto &tableInfo = *Table(table);
+ if (!tableInfo.Families.contains(delta.GetFamilyId())) {
+ PreserveTable(table);
+ changes = true;
+ }
+ auto &family = tableInfo.Families[delta.GetFamilyId()];
const ui32 room = delta.GetRoomId();
- changes = (std::exchange(family.Room, room) != room);
+ changes = ChangeTableSetting(table, family.Room, room);
} else if (action == TAlterRecord::SetFamily) {
- auto &family = Table(table)->Families[delta.GetFamilyId()];
+ auto &tableInfo = *Table(table);
+ if (!tableInfo.Families.contains(delta.GetFamilyId())) {
+ PreserveTable(table);
+ changes = true;
+ }
+ auto &family = tableInfo.Families[delta.GetFamilyId()];
auto codec = delta.HasCodec() ? ECodec(delta.GetCodec()) :family.Codec;
@@ -60,51 +71,56 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta)
Y_VERIFY(ui32(cache) <= 2, "Invalid pages cache policy value");
- changes =
- (std::exchange(family.Cache, cache) != cache)
- | (std::exchange(family.Codec, codec) != codec)
- | (std::exchange(family.Small, small) != small)
- | (std::exchange(family.Large, large) != large);
+ changes |= ChangeTableSetting(table, family.Cache, cache);
+ changes |= ChangeTableSetting(table, family.Codec, codec);
+ changes |= ChangeTableSetting(table, family.Small, small);
+ changes |= ChangeTableSetting(table, family.Large, large);
} else if (action == TAlterRecord::AddColumnToFamily) {
changes = AddColumnToFamily(table, delta.GetColumnId(), delta.GetFamilyId());
} else if (action == TAlterRecord::SetRoom) {
- auto &room = Table(table)->Rooms[delta.GetRoomId()];
+ auto &tableInfo = *Table(table);
+ if (!tableInfo.Rooms.contains(delta.GetRoomId())) {
+ PreserveTable(table);
+ changes = true;
+ }
+ auto &room = tableInfo.Rooms[delta.GetRoomId()];
- ui32 main = delta.HasMain() ? delta.GetMain() : room.Main;
- ui32 blobs = delta.HasBlobs() ? delta.GetBlobs() : room.Blobs;
- ui32 outer = delta.HasOuter() ? delta.GetOuter() : room.Outer;
+ ui8 main = delta.HasMain() ? delta.GetMain() : room.Main;
+ ui8 blobs = delta.HasBlobs() ? delta.GetBlobs() : room.Blobs;
+ ui8 outer = delta.HasOuter() ? delta.GetOuter() : room.Outer;
- changes =
- (std::exchange(room.Main, main) != main)
- | (std::exchange(room.Blobs, blobs) != blobs)
- | (std::exchange(room.Outer, outer) != outer);
+ changes |= ChangeTableSetting(table, room.Main, main);
+ changes |= ChangeTableSetting(table, room.Blobs, blobs);
+ changes |= ChangeTableSetting(table, room.Outer, outer);
} else if (action == TAlterRecord::SetRedo) {
const ui32 annex = delta.HasAnnex() ? delta.GetAnnex() : 0;
- changes = (std::exchange(Scheme.Redo.Annex, annex) != annex);
+ changes |= ChangeRedoSetting(Scheme.Redo.Annex, annex);
} else if (action == TAlterRecord::SetTable) {
+ auto &tableInfo = *Table(table);
+
if (delta.HasByKeyFilter()) {
bool enabled = delta.GetByKeyFilter();
- changes |= (std::exchange(Table(table)->ByKeyFilter, enabled) != enabled);
+ changes |= ChangeTableSetting(table, tableInfo.ByKeyFilter, enabled);
}
if (delta.HasEraseCacheEnabled()) {
bool enabled = delta.GetEraseCacheEnabled();
- changes |= (std::exchange(Table(table)->EraseCacheEnabled, enabled) != enabled);
+ changes |= ChangeTableSetting(table, tableInfo.EraseCacheEnabled, enabled);
if (enabled) {
ui32 minRows = delta.GetEraseCacheMinRows();
ui32 maxBytes = delta.GetEraseCacheMaxBytes();
- changes |= (std::exchange(Table(table)->EraseCacheMinRows, minRows) != minRows);
- changes |= (std::exchange(Table(table)->EraseCacheMaxBytes, maxBytes) != maxBytes);
+ changes |= ChangeTableSetting(table, tableInfo.EraseCacheMinRows, minRows);
+ changes |= ChangeTableSetting(table, tableInfo.EraseCacheMaxBytes, maxBytes);
}
}
if (delta.HasColdBorrow()) {
bool enabled = delta.GetColdBorrow();
- changes |= (std::exchange(Table(table)->ColdBorrow, enabled) != enabled);
+ changes |= ChangeTableSetting(table, tableInfo.ColdBorrow, enabled);
}
} else if (action == TAlterRecord::UpdateExecutorInfo) {
@@ -136,38 +152,66 @@ bool TSchemeModifier::AddColumnToFamily(ui32 tid, ui32 cid, ui32 family)
auto* column = Scheme.GetColumnInfo(Table(tid), cid);
Y_VERIFY(column);
- return std::exchange(column->Family, family) != family;
+ if (column->Family != family) {
+ PreserveTable(tid);
+ column->Family = family;
+ return true;
+ }
+
+ return false;
}
bool TSchemeModifier::AddTable(const TString &name, ui32 id)
{
+ auto it = Scheme.Tables.find(id);
auto itName = Scheme.TableNames.find(name);
- auto it = Scheme.Tables.emplace(id, TTable(name, id));
+
+ // We verify id match when a table with this name already exists
if (itName != Scheme.TableNames.end()) {
- Y_VERIFY(!it.second && it.first->second.Name == name && itName->second == it.first->first);
+ auto describeFailure = [&]() -> TString {
+ TStringBuilder out;
+ out << "Table " << id << " '" << name << "'"
+ << " conflicts with table " << itName->second << " '" << itName->first << "'";
+ if (it != Scheme.Tables.end()) {
+ out << " and table " << it->first << " '" << it->second.Name << "'";
+ }
+ return out;
+ };
+ Y_VERIFY_S(itName->second == id, describeFailure());
+ // Sanity check that this table really exists
+ Y_VERIFY(it != Scheme.Tables.end() && it->second.Name == name);
return false;
- } else if (!it.second) {
- // renaming table
- Scheme.TableNames.erase(it.first->second.Name);
- Scheme.TableNames.emplace(name, id);
- it.first->second.Name = name;
- return true;
- } else {
- Y_VERIFY(it.second);
+ }
+
+ PreserveTable(id);
+
+ // We assume table is renamed when the same id already exists
+ if (it != Scheme.Tables.end()) {
+ Scheme.TableNames.erase(it->second.Name);
+ it->second.Name = name;
Scheme.TableNames.emplace(name, id);
- if (id >= 100) {
- auto *table = &it.first->second;
- // HACK: Force user tables to have some reasonable policy with multiple levels
- table->CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy();
- }
return true;
}
+
+ // Creating a new table
+ auto pr = Scheme.Tables.emplace(id, TTable(name, id));
+ Y_VERIFY(pr.second);
+ it = pr.first;
+ Scheme.TableNames.emplace(name, id);
+
+ if (id >= 100) {
+ // HACK: Force user tables to have some reasonable policy with multiple levels
+ it->second.CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy();
+ }
+
+ return true;
}
bool TSchemeModifier::DropTable(ui32 id)
{
auto it = Scheme.Tables.find(id);
if (it != Scheme.Tables.end()) {
+ PreserveTable(id);
Scheme.TableNames.erase(it->second.Name);
Scheme.Tables.erase(it);
return true;
@@ -178,34 +222,51 @@ bool TSchemeModifier::DropTable(ui32 id)
bool TSchemeModifier::AddColumn(ui32 tid, const TString &name, ui32 id, ui32 type, bool notNull, TCell null)
{
auto *table = Table(tid);
+
+ auto it = table->Columns.find(id);
auto itName = table->ColumnNames.find(name);
- bool haveName = itName != table->ColumnNames.end();
- auto it = table->Columns.emplace(id, TColumn(name, id, type, notNull));
- if (it.second)
- it.first->second.SetDefault(null);
+ // We verify ids and types match when column with the same name already exists
+ if (itName != table->ColumnNames.end()) {
+ auto describeFailure = [&]() -> TString {
+ TStringBuilder out;
+ out << "Table " << tid << " '" << table->Name << "'"
+ << " adding column " << id << " '" << name << "'"
+ << " conflicts with column " << itName->second << " '" << itName->first << "'";
+ if (it != table->Columns.end()) {
+ out << " and column " << it->first << " '" << it->second.Name << "'";
+ }
+ return out;
+ };
+ Y_VERIFY_S(itName->second == id, describeFailure());
+ // Sanity check that this column exists and types match
+ Y_VERIFY(it != table->Columns.end() && it->second.Name == name);
+ Y_VERIFY_S(it->second.PType == type,
+ "Table " << tid << " '" << table->Name << "' column " << id << " '" << name << "' expected type " << type << ", existing type " << it->second.PType);
+ return false;
+ }
+
+ PreserveTable(tid);
- if (!it.second && !haveName && it.first->second.PType == type) {
- // renaming column
- table->ColumnNames.erase(it.first->second.Name);
+ // We assume column is renamed when the same id already exists
+ if (it != table->Columns.end()) {
+ Y_VERIFY_S(it->second.PType == type,
+ "Table " << tid << " '" << table->Name << "' column " << id << " '" << it->second.Name << "' renamed to '" << name << "'"
+ << " with type " << type << ", existing type " << it->second.PType);
+ table->ColumnNames.erase(it->second.Name);
+ it->second.Name = name;
table->ColumnNames.emplace(name, id);
- it.first->second.Name = name;
return true;
- } else {
- // do we have inserted a new column, OR we already have the same column with the same name?
- bool insertedNew = it.second && !haveName;
- bool replacedExisting = !it.second && it.first->second.Name == name && haveName && itName->second == it.first->first;
- Y_VERIFY_S((insertedNew || replacedExisting),
- "NewName: " << name <<
- " OldName: " << (haveName ? itName->first : it.first->second.Name) <<
- " NewId: " << id <<
- " OldId: " << (haveName ? itName->second : it.first->first));
- if (!haveName) {
- table->ColumnNames.emplace(name, id);
- return true;
- }
}
- return false;
+
+ auto pr = table->Columns.emplace(id, TColumn(name, id, type, notNull));
+ Y_VERIFY(pr.second);
+ it = pr.first;
+ table->ColumnNames.emplace(name, id);
+
+ it->second.SetDefault(null);
+
+ return true;
}
bool TSchemeModifier::DropColumn(ui32 tid, ui32 id)
@@ -213,6 +274,7 @@ bool TSchemeModifier::DropColumn(ui32 tid, ui32 id)
auto *table = Table(tid);
auto it = table->Columns.find(id);
if (it != table->Columns.end()) {
+ PreserveTable(tid);
table->ColumnNames.erase(it->second.Name);
table->Columns.erase(it);
return true;
@@ -228,6 +290,7 @@ bool TSchemeModifier::AddColumnToKey(ui32 tid, ui32 columnId)
auto keyPos = std::find(table->KeyColumns.begin(), table->KeyColumns.end(), column->Id);
if (keyPos == table->KeyColumns.end()) {
+ PreserveTable(tid);
column->KeyOrder = table->KeyColumns.size();
table->KeyColumns.push_back(column->Id);
return true;
@@ -237,32 +300,32 @@ bool TSchemeModifier::AddColumnToKey(ui32 tid, ui32 columnId)
bool TSchemeModifier::SetExecutorCacheSize(ui64 size)
{
- return std::exchange(Scheme.Executor.CacheSize, size) != size;
+ return ChangeExecutorSetting(Scheme.Executor.CacheSize, size);
}
bool TSchemeModifier::SetExecutorAllowLogBatching(bool allow)
{
- return std::exchange(Scheme.Executor.AllowLogBatching, allow) != allow;
+ return ChangeExecutorSetting(Scheme.Executor.AllowLogBatching, allow);
}
bool TSchemeModifier::SetExecutorLogFastCommitTactic(bool allow)
{
- return std::exchange(Scheme.Executor.LogFastTactic, allow) != allow;
+ return ChangeExecutorSetting(Scheme.Executor.LogFastTactic, allow);
}
bool TSchemeModifier::SetExecutorLogFlushPeriod(TDuration delay)
{
- return std::exchange(Scheme.Executor.LogFlushPeriod, delay) != delay;
+ return ChangeExecutorSetting(Scheme.Executor.LogFlushPeriod, delay);
}
bool TSchemeModifier::SetExecutorLimitInFlyTx(ui32 limit)
{
- return std::exchange(Scheme.Executor.LimitInFlyTx, limit) != limit;
+ return ChangeExecutorSetting(Scheme.Executor.LimitInFlyTx, limit);
}
bool TSchemeModifier::SetExecutorResourceProfile(const TString &name)
{
- return std::exchange(Scheme.Executor.ResourceProfile, name) != name;
+ return ChangeExecutorSetting(Scheme.Executor.ResourceProfile, name);
}
bool TSchemeModifier::SetCompactionPolicy(ui32 tid, const NKikimrSchemeOp::TCompactionPolicy &proto)
@@ -271,9 +334,36 @@ bool TSchemeModifier::SetCompactionPolicy(ui32 tid, const NKikimrSchemeOp::TComp
TIntrusiveConstPtr<TCompactionPolicy> policy(new TCompactionPolicy(proto));
if (table->CompactionPolicy && *(table->CompactionPolicy) == *policy)
return false;
+ PreserveTable(tid);
table->CompactionPolicy = policy;
return true;
}
+void TSchemeModifier::PreserveTable(ui32 tid) noexcept
+{
+ if (RollbackState && !RollbackState->Tables.contains(tid)) {
+ auto it = Scheme.Tables.find(tid);
+ if (it != Scheme.Tables.end()) {
+ RollbackState->Tables[tid] = it->second;
+ } else {
+ RollbackState->Tables[tid] = std::nullopt;
+ }
+ }
+}
+
+void TSchemeModifier::PreserveExecutor() noexcept
+{
+ if (RollbackState && !RollbackState->Executor) {
+ RollbackState->Executor = Scheme.Executor;
+ }
+}
+
+void TSchemeModifier::PreserveRedo() noexcept
+{
+ if (RollbackState && !RollbackState->Redo) {
+ RollbackState->Redo = Scheme.Redo;
+ }
+}
+
}
}
diff --git a/ydb/core/tablet_flat/flat_dbase_apply.h b/ydb/core/tablet_flat/flat_dbase_apply.h
index 3502030b71..279a598c1f 100644
--- a/ydb/core/tablet_flat/flat_dbase_apply.h
+++ b/ydb/core/tablet_flat/flat_dbase_apply.h
@@ -15,7 +15,7 @@ namespace NTable {
using ECodec = NPage::ECodec;
using ECache = NPage::ECache;
- explicit TSchemeModifier(TScheme &scheme);
+ explicit TSchemeModifier(TScheme &scheme, TSchemeRollbackState *rollbackState = nullptr);
bool Apply(const TSchemeChanges &delta)
{
@@ -27,9 +27,9 @@ namespace NTable {
return changed;
}
- protected:
bool Apply(const TAlterRecord &delta);
+ protected:
bool AddTable(const TString& name, ui32 id);
bool DropTable(ui32 id);
bool AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, bool notNull, TCell null = { });
@@ -48,12 +48,47 @@ namespace NTable {
TTable* Table(ui32 tid) const noexcept
{
auto* table = Scheme.GetTableInfo(tid);
- Y_VERIFY(table, "Acccessing table that isn't exists");
+ Y_VERIFY(table, "Acccessing table that doesn't exist");
return table;
}
+ template<class T>
+ bool ChangeTableSetting(ui32 tid, T& dst, const T& value) {
+ if (dst != value) {
+ PreserveTable(tid);
+ dst = value;
+ return true;
+ }
+ return false;
+ }
+
+ template<class T>
+ bool ChangeExecutorSetting(T& dst, const T& value) {
+ if (dst != value) {
+ PreserveExecutor();
+ dst = value;
+ return true;
+ }
+ return false;
+ }
+
+ template<class T>
+ bool ChangeRedoSetting(T& dst, const T& value) {
+ if (dst != value) {
+ PreserveRedo();
+ dst = value;
+ return true;
+ }
+ return false;
+ }
+
+ void PreserveTable(ui32 tid) noexcept;
+ void PreserveExecutor() noexcept;
+ void PreserveRedo() noexcept;
+
public:
TScheme &Scheme;
+ TSchemeRollbackState *RollbackState;
THashSet<ui32> Affects;
};
diff --git a/ydb/core/tablet_flat/flat_dbase_change.h b/ydb/core/tablet_flat/flat_dbase_change.h
index 641e08b63d..857fe29fdc 100644
--- a/ydb/core/tablet_flat/flat_dbase_change.h
+++ b/ydb/core/tablet_flat/flat_dbase_change.h
@@ -12,13 +12,6 @@ namespace NTable {
struct TChange {
using TMemGlobs = TVector<NPageCollection::TMemGlob>;
- struct TSnapshot {
- TSnapshot(ui32 table) : Table(table) { }
-
- ui32 Table = Max<ui32>();
- TEpoch Epoch = TEpoch::Max();
- };
-
struct TStats {
ui64 ChargeSieved = 0;
ui64 ChargeWeeded = 0;
@@ -63,7 +56,7 @@ namespace NTable {
TVector<ui32> Deleted; /* Tables deleted in some alter */
TGarbage Garbage; /* Wiped tables, ids in Deleted */
- TVector<TSnapshot> Snapshots;
+ ui32 Snapshots = 0;
TMap<ui32, TVector<TRemovedRowVersions>> RemovedRowVersions;
diff --git a/ydb/core/tablet_flat/flat_dbase_naked.h b/ydb/core/tablet_flat/flat_dbase_naked.h
index 493867a7f8..ebf4c26891 100644
--- a/ydb/core/tablet_flat/flat_dbase_naked.h
+++ b/ydb/core/tablet_flat/flat_dbase_naked.h
@@ -17,8 +17,9 @@
namespace NKikimr {
namespace NTable {
- class TDatabaseImpl {
-
+ class TDatabaseImpl final
+ : public IAlterSink
+ {
struct TArgs {
ui32 Table;
TEpoch Head;
@@ -49,7 +50,12 @@ namespace NTable {
bool Touch(ui64 edge, ui64 serial) noexcept
{
- return std::exchange(Serial, serial) <= edge;
+ ui64 prevSerial = std::exchange(Serial, serial);
+ if (prevSerial <= edge) {
+ SerialBackup = prevSerial;
+ return true;
+ }
+ return false;
}
void Aggr(TDbStats &aggr, bool enter) const noexcept
@@ -77,10 +83,40 @@ namespace NTable {
}
}
+ void BackupMemStats() noexcept
+ {
+ BackupMemTableWaste = Self->GetMemWaste();
+ BackupMemTableBytes = Self->GetMemSize();
+ BackupMemTableOps = Self->GetOpsCount();
+ }
+
+ void RestoreMemStats(TDbStats &aggr) const noexcept
+ {
+ NUtil::SubSafe(aggr.MemTableWaste, BackupMemTableWaste);
+ NUtil::SubSafe(aggr.MemTableBytes, BackupMemTableBytes);
+ NUtil::SubSafe(aggr.MemTableOps, BackupMemTableOps);
+ aggr.MemTableWaste += Self->GetMemWaste();
+ aggr.MemTableBytes += Self->GetMemSize();
+ aggr.MemTableOps += Self->GetOpsCount();
+ }
+
const ui32 Table = Max<ui32>();
const TIntrusivePtr<TTable> Self;
const TTxStamp Edge = 0; /* Stamp of last snapshot */
ui64 Serial = 0;
+ ui64 SerialBackup = 0;
+
+ std::optional<TEpoch> EpochSnapshot;
+ ui64 BackupMemTableWaste;
+ ui64 BackupMemTableBytes;
+ ui64 BackupMemTableOps;
+
+ bool Created = false;
+ bool Dropped = false;
+ bool SchemePending = false;
+ bool SchemeModified = false;
+ bool DataModified = false;
+ bool RollbackPrepared = false;
};
public:
@@ -113,23 +149,312 @@ namespace NTable {
{
auto *wrap = Tables.FindPtr(table);
- Y_VERIFY(wrap || !require, "Cannot find given table");
+ if (!wrap || wrap->Dropped) {
+ Y_VERIFY(!require, "Cannot find table %" PRIu32, table);
+ return Dummy;
+ }
+
+ if (wrap->SchemePending) {
+ Y_VERIFY(InTransaction);
+
+ auto* info = Scheme->GetTableInfo(table);
+ Y_VERIFY(info, "No scheme for existing table %" PRIu32, table);
- return wrap ? *wrap : Dummy;
+ if (!wrap->Created && !wrap->RollbackPrepared) {
+ wrap->BackupMemStats();
+ (*wrap)->PrepareRollback();
+ wrap->RollbackPrepared = true;
+ Prepared.push_back(table);
+ }
+
+ if (!wrap->EpochSnapshot) {
+ // We always flush mem table on schema modification,
+ // which happens at the "start" of transaction.
+ wrap->EpochSnapshot.emplace((*wrap)->Snapshot());
+ // When this is an existing table we also simulate
+ // EvFlush that is inserted in the redo log.
+ if (!wrap->Created) {
+ Flushed.push_back(table);
+ if (wrap->Touch(Begin_, Serial_)) {
+ Affects.push_back(table);
+ }
+ }
+ }
+
+ (*wrap)->SetScheme(*info);
+
+ wrap->SchemeModified = true;
+ wrap->SchemePending = false;
+ }
+
+ return *wrap;
+ }
+
+ TTableWrapper& GetForUpdate(ui32 table) noexcept
+ {
+ Y_VERIFY(InTransaction);
+ TTableWrapper& wrap = Get(table, true);
+ if (!wrap.Created && !wrap.RollbackPrepared) {
+ wrap.BackupMemStats();
+ wrap->PrepareRollback();
+ wrap.RollbackPrepared = true;
+ Prepared.push_back(table);
+ }
+ if (wrap.Touch(Begin_, Serial_)) {
+ Affects.push_back(table);
+ }
+ Y_VERIFY(wrap.Created || wrap.RollbackPrepared);
+ wrap.DataModified = true;
+ return wrap;
}
ui64 Rewind(ui64 serial) noexcept
{
+ Y_VERIFY(!InTransaction, "Unexpected rewind inside a transaction");
return std::exchange(Serial_, Max(Serial_, serial));
}
+ void BeginTransaction() noexcept
+ {
+ Y_VERIFY(!InTransaction);
+ InTransaction = true;
+
+ // We pretend as if we just processed Switch and EvBegin with the next serial
+ Begin_ = Serial_;
+ Affects = { };
+ Serial_++;
+
+ // Sanity checks
+ Y_VERIFY_DEBUG(Annex.empty());
+ Y_VERIFY_DEBUG(Flushed.empty());
+ Y_VERIFY_DEBUG(Prepared.empty());
+ }
+
+ TEpoch FlushTable(ui32 tid) noexcept
+ {
+ Y_VERIFY(InTransaction);
+ auto& wrap = Get(tid, true);
+ Y_VERIFY(!wrap.DataModified, "Cannot flush a modified table");
+ if (!wrap.EpochSnapshot) {
+ Y_VERIFY(!wrap.Created);
+ wrap.EpochSnapshot.emplace(wrap->Snapshot());
+ // Simulate inserting and processing EvFlush
+ Flushed.push_back(tid);
+ if (wrap.Touch(Begin_, Serial_)) {
+ Affects.push_back(tid);
+ }
+ }
+ return *wrap.EpochSnapshot;
+ }
+
+ void CommitTransaction(TTxStamp stamp, TArrayRef<const TMemGlob> annex, NRedo::TWriter& writer) noexcept
+ {
+ Y_VERIFY(Stamp <= stamp, "Executor tx stamp cannot go to the past");
+ Stamp = stamp;
+
+ CommitScheme(annex);
+
+ for (ui32 tid : Prepared) {
+ auto it = Tables.find(tid);
+ if (it == Tables.end()) {
+ // Table was actually dropped
+ continue;
+ }
+ auto& wrap = it->second;
+ Y_VERIFY(wrap.RollbackPrepared);
+ wrap->CommitChanges(annex);
+ wrap.RestoreMemStats(Stats);
+ wrap.RollbackPrepared = false;
+ wrap.DataModified = false;
+ }
+ Prepared.clear();
+
+ THashSet<ui32> dropped;
+ for (ui32 tid : Flushed) {
+ auto it = Tables.find(tid);
+ if (it == Tables.end()) {
+ // Table was actually dropped
+ dropped.insert(tid);
+ continue;
+ }
+ auto& wrap = it->second;
+ Y_VERIFY(wrap.EpochSnapshot);
+ writer.EvFlush(tid, Stamp - 1, *wrap.EpochSnapshot);
+ wrap.EpochSnapshot.reset();
+ }
+ Flushed.clear();
+
+ // Remove dropped tables (if any) from affects
+ if (!dropped.empty()) {
+ auto end = std::remove_if(
+ Affects.begin(), Affects.end(),
+ [&dropped](ui32 tid) {
+ return dropped.contains(tid);
+ });
+ Affects.erase(end, Affects.end());
+ }
+
+ // We expect database to drop commits without any side-effects
+ // So we rewind serial to match what it would be after a reboot
+ if (Affects.empty()) {
+ Serial_ = Begin_;
+ }
+
+ Stats.TxCommited++;
+ InTransaction = false;
+ }
+
+ void CommitScheme(TArrayRef<const TMemGlob> annex) noexcept
+ {
+ if (!SchemeRollbackState.Tables.empty() || SchemeRollbackState.Redo) {
+ // Table or redo settings have changed
+ CalculateAnnexEdge();
+ }
+
+ TScheme& scheme = *Scheme;
+ for (auto& pr : SchemeRollbackState.Tables) {
+ ui32 tid = pr.first;
+ auto* info = scheme.GetTableInfo(tid);
+ if (!info) {
+ // This table doesn't exist in current schema,
+ // which means it has been dropped.
+ Y_VERIFY(Tables.contains(tid), "Unexpected drop for a table that doesn't exist");
+ auto& wrap = Tables.at(tid);
+ Y_VERIFY(wrap.Dropped);
+ Y_VERIFY(!wrap.DataModified, "Unexpected drop of a modified table");
+ if (wrap.RollbackPrepared) {
+ wrap->CommitChanges(annex);
+ wrap.RestoreMemStats(Stats);
+ wrap.RollbackPrepared = false;
+ }
+ wrap.Aggr(Stats, false /* leave */);
+ Deleted.emplace_back(tid);
+ Garbage.emplace_back(wrap->Unwrap());
+ Tables.erase(tid);
+ NUtil::SubSafe(Stats.Tables, ui32(1));
+ continue;
+ }
+
+ // This call will also apply schema changes
+ auto& wrap = Get(tid, true);
+ Y_VERIFY(!wrap.Dropped);
+ Y_VERIFY(!wrap.SchemePending);
+ Y_VERIFY(wrap.SchemeModified);
+
+ if (wrap.Created) {
+ // If the table is both created and modified in the same
+ // transaction, then make sure flags are cleared and the
+ // table stats are accounted for.
+ wrap.Created = false;
+ wrap.DataModified = false;
+ Y_VERIFY(!wrap.RollbackPrepared);
+ wrap.EpochSnapshot.reset();
+ wrap->CommitNewTable(annex);
+ wrap.Aggr(Stats, true /* enter */);
+ }
+
+ wrap.SchemeModified = false;
+ }
+
+ SchemeRollbackState.Tables.clear();
+ SchemeRollbackState.Executor.reset();
+ SchemeRollbackState.Redo.reset();
+ }
+
+ void RollbackTransaction() noexcept
+ {
+ for (ui32 tid : Prepared) {
+ auto& wrap = Tables.at(tid);
+ Y_VERIFY(wrap.RollbackPrepared);
+ wrap->RollbackChanges();
+ wrap.RestoreMemStats(Stats);
+ wrap.RollbackPrepared = false;
+ wrap.SchemeModified = false;
+ wrap.DataModified = false;
+ }
+ Prepared.clear();
+
+ for (ui32 tid : Flushed) {
+ auto& wrap = Tables.at(tid);
+ Y_VERIFY(wrap.EpochSnapshot);
+ wrap.EpochSnapshot.reset();
+ }
+ Flushed.clear();
+
+ for (ui32 tid : Affects) {
+ auto& wrap = Tables.at(tid);
+ if (!wrap.Created) {
+ wrap.Serial = wrap.SerialBackup;
+ }
+ }
+ Affects.clear();
+
+ RollbackScheme();
+ Serial_ = Begin_;
+ InTransaction = false;
+ }
+
+ void RollbackScheme() noexcept
+ {
+ // Note: we assume schema rollback is very rare,
+ // so it doesn't have to be efficient
+ TScheme& scheme = *Scheme;
+ if (SchemeRollbackState.Redo) {
+ scheme.Redo = *SchemeRollbackState.Redo;
+ SchemeRollbackState.Redo.reset();
+ }
+ if (SchemeRollbackState.Executor) {
+ scheme.Executor = *SchemeRollbackState.Executor;
+ SchemeRollbackState.Executor.reset();
+ }
+ // First pass: we remove all modified tables from schema to handle renames
+ for (auto& pr : SchemeRollbackState.Tables) {
+ auto it = scheme.Tables.find(pr.first);
+ if (it != scheme.Tables.end()) {
+ scheme.TableNames.erase(it->second.Name);
+ scheme.Tables.erase(it);
+ }
+ }
+ // Second pass: restore all tables that existed before transaction started
+ for (auto& pr : SchemeRollbackState.Tables) {
+ if (pr.second) {
+ auto res = scheme.Tables.emplace(pr.first, *pr.second);
+ Y_VERIFY(res.second);
+ scheme.TableNames.emplace(res.first->second.Name, pr.first);
+ }
+ }
+ // Third pass: we check modified tables and rollback their schema changes
+ for (auto& pr : SchemeRollbackState.Tables) {
+ ui32 tid = pr.first;
+ auto& wrap = Tables.at(tid);
+ if (wrap.Created) {
+ // This table didn't exist, just forget about it
+ Tables.erase(tid);
+ NUtil::SubSafe(Stats.Tables, ui32(1));
+ continue;
+ }
+ // By the time schema rollback is called we expect changes to be rolled back already
+ Y_VERIFY(!wrap.SchemeModified, "Unexpected schema rollback on a modified table");
+ Y_VERIFY(!wrap.EpochSnapshot, "Unexpected schema rollback on a flushed table");
+ if (wrap.Dropped) {
+ // This table is no longer dropped
+ wrap.Dropped = false;
+ }
+ wrap.SchemePending = false;
+ }
+ SchemeRollbackState.Tables.clear();
+ }
+
TDatabaseImpl& Switch(TTxStamp stamp) noexcept
{
- if (std::exchange(Stamp, stamp) > stamp)
- Y_FAIL("Executor tx stamp cannot go to the past");
+ Y_VERIFY(!InTransaction, "Unexpected switch inside a transaction");
+ Y_VERIFY(Stamp <= stamp, "Executor tx stamp cannot go to the past");
+ Stamp = stamp;
- First_ = Max<ui64>(), Begin_ = Serial_;
- Stats.TxCommited++, Affects = { };
+ First_ = Max<ui64>();
+ Begin_ = Serial_;
+ Stats.TxCommited++;
+ Affects = { };
return *this;
}
@@ -195,12 +520,12 @@ namespace NTable {
wrap.Aggr(Stats, true /* enter */);
}
- bool Apply(const TSchemeChanges &delta, NRedo::TWriter *writer)
+ bool ApplySchema(const TSchemeChanges &delta)
{
TModifier modifier(*Scheme);
if (modifier.Apply(delta)) {
- Apply(modifier.Affects, writer);
+ ApplySchema(modifier.Affects);
return true;
} else {
@@ -268,20 +593,12 @@ namespace NTable {
return result.first->second;
}
- void Apply(const THashSet<ui32> &affects, NRedo::TWriter *writer)
+ void ApplySchema(const THashSet<ui32> &affects)
{
for (ui32 table : affects) {
auto &wrap = Get(table, false);
if (auto *info = Scheme->GetTableInfo(table)) {
- if (wrap && writer) {
- /* Hack keeps table epoches consistent in regular
- flow and on db bootstap. Required due to scheme
- deltas and redo log async rollup on bootstrap.
- */
-
- writer->EvFlush(table, Stamp, wrap->Snapshot());
- }
(wrap ? wrap : MakeTable(table, { }))->SetScheme(*info);
@@ -325,7 +642,7 @@ namespace NTable {
Serial_ = serial;
} else {
Y_Fail("EvBegin{" << serial << " " << NFmt::TStamp(stamp)
- << "} is not fits to db state {" << Serial_ << " "
+ << "} does not match db state {" << Serial_ << " "
<< NFmt::TStamp(Stamp) << "} (redo log was reordered)");
}
@@ -415,11 +732,40 @@ namespace NTable {
Legacy log (Evolution < 12) have no EvBegin and progression
of serial require hack with virtual insertion of EvBegin here.
*/
+ if (Y_UNLIKELY(Serial_ == Begin_)) {
+ ++Serial_;
+ }
- if (wrap.Touch(Begin_, Begin_ == Serial_ ? ++Serial_ : Serial_))
+ if (wrap.Touch(Begin_, Serial_))
Affects.emplace_back(table);
- return First_ = Min(First_, Serial_), wrap;
+ First_ = Min(First_, Serial_);
+ return wrap;
+ }
+
+ private:
+ bool ApplyAlterRecord(const TAlterRecord& record) override
+ {
+ Y_VERIFY(InTransaction, "Unexpected ApplyAlterRecord outside of transaction");
+ TSchemeModifier modifier(*Scheme, &SchemeRollbackState);
+ bool changes = modifier.Apply(record);
+ if (changes) {
+ // There will be at most one table id
+ for (ui32 tid : modifier.Affects) {
+ auto* wrap = Tables.FindPtr(tid);
+ if (!wrap) {
+ wrap = &MakeTable(tid, { });
+ wrap->Created = true;
+ }
+ Y_VERIFY(!wrap->DataModified, "Table %" PRIu32 " cannot be altered after being changed", tid);
+ Y_VERIFY(!wrap->Dropped, "Table %" PRIu32 " cannot be altered after being dropped", tid);
+ if (!Scheme->GetTableInfo(tid)) {
+ wrap->Dropped = true;
+ }
+ wrap->SchemePending = true;
+ }
+ }
+ return changes;
}
public:
@@ -440,6 +786,11 @@ namespace NTable {
NRedo::TPlayer<TDatabaseImpl> Redo;
TVector<ui32> Affects;
TVector<TMemGlob> Annex;
+ TVector<ui32> Flushed;
+ TVector<ui32> Prepared;
+
+ bool InTransaction = false;
+ TSchemeRollbackState SchemeRollbackState;
public:
const TAutoPtr<TScheme> Scheme;
diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.cpp b/ydb/core/tablet_flat/flat_dbase_scheme.cpp
index fa12005009..61303df104 100644
--- a/ydb/core/tablet_flat/flat_dbase_scheme.cpp
+++ b/ydb/core/tablet_flat/flat_dbase_scheme.cpp
@@ -61,7 +61,17 @@ TAutoPtr<TSchemeChanges> TScheme::GetSnapshot() const {
TAlter& TAlter::Merge(const TSchemeChanges &log)
{
- Log.MutableDelta()->MergeFrom(log.GetDelta());
+ Y_VERIFY(&Log != &log, "Cannot merge changes onto itself");
+
+ int added = log.DeltaSize();
+ if (added > 0) {
+ auto* dst = Log.MutableDelta();
+ dst->Reserve(Log.DeltaSize() + added);
+ for (const auto& delta : log.GetDelta()) {
+ *dst->Add() = delta;
+ ApplyLastRecord();
+ }
+ }
return *this;
}
@@ -73,7 +83,7 @@ TAlter& TAlter::AddTable(const TString& name, ui32 id)
delta.SetTableName(name);
delta.SetTableId(id);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::DropTable(ui32 id)
@@ -82,7 +92,7 @@ TAlter& TAlter::DropTable(ui32 id)
delta.SetDeltaType(TAlterRecord::DropTable);
delta.SetTableId(id);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, bool notNull, TCell null)
@@ -98,7 +108,7 @@ TAlter& TAlter::AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, b
if (!null.IsNull())
delta.SetDefault(null.Data(), null.Size());
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::DropColumn(ui32 table, ui32 id)
@@ -108,7 +118,7 @@ TAlter& TAlter::DropColumn(ui32 table, ui32 id)
delta.SetTableId(table);
delta.SetColumnId(id);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::AddColumnToFamily(ui32 table, ui32 column, ui32 family)
@@ -119,7 +129,7 @@ TAlter& TAlter::AddColumnToFamily(ui32 table, ui32 column, ui32 family)
delta.SetColumnId(column);
delta.SetFamilyId(family);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::AddFamily(ui32 table, ui32 family, ui32 room)
@@ -130,7 +140,7 @@ TAlter& TAlter::AddFamily(ui32 table, ui32 family, ui32 room)
delta.SetFamilyId(family);
delta.SetRoomId(room);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::AddColumnToKey(ui32 table, ui32 column)
@@ -140,7 +150,7 @@ TAlter& TAlter::AddColumnToKey(ui32 table, ui32 column)
delta.SetTableId(table);
delta.SetColumnId(column);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetFamily(ui32 table, ui32 family, ECache cache, ECodec codec)
@@ -153,7 +163,7 @@ TAlter& TAlter::SetFamily(ui32 table, ui32 family, ECache cache, ECodec codec)
delta.SetCodec(ui32(codec));
delta.SetCache(ui32(cache));
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetFamilyBlobs(ui32 table, ui32 family, ui32 small, ui32 large)
@@ -165,7 +175,7 @@ TAlter& TAlter::SetFamilyBlobs(ui32 table, ui32 family, ui32 small, ui32 large)
delta.SetSmall(small);
delta.SetLarge(large);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetRoom(ui32 table, ui32 room, ui32 main, ui32 blobs, ui32 outer)
@@ -179,7 +189,7 @@ TAlter& TAlter::SetRoom(ui32 table, ui32 room, ui32 main, ui32 blobs, ui32 outer
delta->SetBlobs(blobs);
delta->SetOuter(outer);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetRedo(ui32 annex)
@@ -189,7 +199,7 @@ TAlter& TAlter::SetRedo(ui32 annex)
delta->SetDeltaType(TAlterRecord::SetRedo);
delta->SetAnnex(annex);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorCacheSize(ui64 cacheSize)
@@ -198,7 +208,7 @@ TAlter& TAlter::SetExecutorCacheSize(ui64 cacheSize)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorCacheSize(cacheSize);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorFastLogPolicy(bool allow)
@@ -207,7 +217,7 @@ TAlter& TAlter::SetExecutorFastLogPolicy(bool allow)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorLogFastCommitTactic(allow);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorAllowLogBatching(bool allow)
@@ -216,7 +226,7 @@ TAlter& TAlter::SetExecutorAllowLogBatching(bool allow)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorAllowLogBatching(allow);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorLogFlushPeriod(TDuration flushPeriod)
@@ -225,7 +235,7 @@ TAlter& TAlter::SetExecutorLogFlushPeriod(TDuration flushPeriod)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorLogFlushPeriod(flushPeriod.MicroSeconds());
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorLimitInFlyTx(ui32 limitTxInFly)
@@ -234,7 +244,7 @@ TAlter& TAlter::SetExecutorLimitInFlyTx(ui32 limitTxInFly)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorLimitInFlyTx(limitTxInFly);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetExecutorResourceProfile(const TString &name)
@@ -243,7 +253,7 @@ TAlter& TAlter::SetExecutorResourceProfile(const TString &name)
delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo);
delta.SetExecutorResourceProfile(name);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetCompactionPolicy(ui32 tableId, const TCompactionPolicy& newPolicy)
@@ -253,7 +263,7 @@ TAlter& TAlter::SetCompactionPolicy(ui32 tableId, const TCompactionPolicy& newPo
delta.SetTableId(tableId);
newPolicy.Serialize(*delta.MutableCompactionPolicy());
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetByKeyFilter(ui32 tableId, bool enabled)
@@ -263,7 +273,7 @@ TAlter& TAlter::SetByKeyFilter(ui32 tableId, bool enabled)
delta.SetTableId(tableId);
delta.SetByKeyFilter(enabled ? 1 : 0);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetColdBorrow(ui32 tableId, bool enabled)
@@ -273,7 +283,7 @@ TAlter& TAlter::SetColdBorrow(ui32 tableId, bool enabled)
delta.SetTableId(tableId);
delta.SetColdBorrow(enabled);
- return *this;
+ return ApplyLastRecord();
}
TAlter& TAlter::SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 maxBytes)
@@ -287,7 +297,12 @@ TAlter& TAlter::SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 max
delta.SetEraseCacheMaxBytes(maxBytes);
}
- return *this;
+ return ApplyLastRecord();
+}
+
+TAlter::operator bool() const noexcept
+{
+ return Log.DeltaSize() > 0;
}
TAutoPtr<TSchemeChanges> TAlter::Flush()
@@ -297,5 +312,19 @@ TAutoPtr<TSchemeChanges> TAlter::Flush()
return log;
}
+TAlter& TAlter::ApplyLastRecord()
+{
+ if (Sink) {
+ int deltasCount = Log.DeltaSize();
+ Y_VERIFY(deltasCount > 0);
+
+ if (!Sink->ApplyAlterRecord(Log.GetDelta(deltasCount - 1))) {
+ Log.MutableDelta()->RemoveLast();
+ }
+ }
+
+ return *this;
+}
+
}
}
diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h
index 1fd5ed2b48..da4d0f7799 100644
--- a/ydb/core/tablet_flat/flat_dbase_scheme.h
+++ b/ydb/core/tablet_flat/flat_dbase_scheme.h
@@ -200,12 +200,45 @@ public:
TRedo Redo;
};
+/**
+ * This structure holds the rollback state for database schema, which allows
+ * schema to be rolled back to initial state when transaction decides not to
+ * commit. As this almost never happens it doesn't need to be very efficient.
+ */
+struct TSchemeRollbackState {
+ // This hash map has key for each modified table, where value either holds
+ // previous table schema, or empty if table didn't exist.
+ THashMap<ui32, std::optional<TScheme::TTableInfo>> Tables;
+ // Previous executor settings if modified
+ std::optional<TScheme::TExecutorInfo> Executor;
+ // Previous redo settings if modified
+ std::optional<TScheme::TRedo> Redo;
+};
+
+/**
+ * Sink is used to inspect and apply alter records
+ */
+class IAlterSink {
+public:
+ /**
+ * Sink attempts to apply the given alter record. When the return value
+ * is true the record is deemed useful, otherwise it is discarded.
+ */
+ virtual bool ApplyAlterRecord(const TAlterRecord& record) = 0;
+};
+
// scheme delta
class TAlter {
public:
using ECodec = NPage::ECodec;
using ECache = NPage::ECache;
+ TAlter(IAlterSink* sink = nullptr)
+ : Sink(sink)
+ { }
+
+ explicit operator bool() const noexcept;
+
const TSchemeChanges& operator*() const noexcept
{
return Log;
@@ -235,7 +268,12 @@ public:
TAlter& SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 maxBytes);
TAutoPtr<TSchemeChanges> Flush();
+
+private:
+ TAlter& ApplyLastRecord();
+
protected:
+ IAlterSink* Sink;
TSchemeChanges Log;
};
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 7e17797173..2dd7869f29 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1579,7 +1579,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
THPTimer cpuTimer;
- TPageCollectionTxEnv env(*PrivatePageCache);
+ TPageCollectionTxEnv env(*Database, *PrivatePageCache);
TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
txc.NotEnoughMemory(seat->NotEnoughMemoryCount);
@@ -1590,11 +1590,6 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
LWTRACK(TransactionExecuteEnd, seat->Self->Orbit, seat->UniqID, done);
seat->CPUExecTime += cpuTimer.PassedReset();
- if (done && !Stats->IsFollower) { /* possible rw commit */
- for (auto one: env.MakeSnap)
- Database->TxSnapTable(one.first /* table */);
- }
-
bool failed = false;
TString failureReason;
if (done && (failed = !Database->ValidateCommit(failureReason))) {
@@ -1936,21 +1931,20 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv
Y_VERIFY(!force || commitResult.Commit);
auto *commit = commitResult.Commit.Get(); // could be nullptr
- Y_VERIFY(env.MakeSnap.size() == change->Snapshots.size());
-
- for (auto seq: xrange(env.MakeSnap.size())) {
- const auto &snap = change->Snapshots[seq];
+ for (auto& pr : env.MakeSnap) {
+ const ui32 table = pr.first;
+ auto& snap = pr.second;
- Y_VERIFY(snap.Epoch != NTable::TEpoch::Max(), "Table was not snapshoted");
+ Y_VERIFY(snap.Epoch, "Table was not snapshotted");
- for (auto &context: env.MakeSnap.at(snap.Table).Context) {
- auto edge = NTable::TSnapEdge(change->Stamp, snap.Epoch);
+ for (auto &context: snap.Context) {
+ auto edge = NTable::TSnapEdge(change->Stamp - 1, *snap.Epoch);
if (!context->Impl)
context->Impl.Reset(new TTableSnapshotContext::TImpl);
- context->Impl->Prepare(snap.Table, edge);
- CompactionLogic->PrepareTableSnapshot(snap.Table, edge, context.Get());
+ context->Impl->Prepare(table, edge);
+ CompactionLogic->PrepareTableSnapshot(table, edge, context.Get());
WaitingSnapshots.insert(std::make_pair(context.Get(), context));
}
}
@@ -2961,8 +2955,10 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable::
subset->ColdParts.insert(subset->ColdParts.end(), params->ColdParts.begin(), params->ColdParts.end());
}
- if (*subset && !subset->IsStickedToHead()) {
- Y_FAIL("Got table subset with unexpected epoch marker");
+ if (*subset) {
+ Y_VERIFY_S(subset->IsStickedToHead(),
+ "Got table subset with unexpected head " << subset->Head
+ << " and epoch " << subset->Epoch());
}
} else {
// This grabs a volatile snapshot of the mutable table state
diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.cpp b/ydb/core/tablet_flat/flat_executor_tx_env.cpp
new file mode 100644
index 0000000000..83aad1965d
--- /dev/null
+++ b/ydb/core/tablet_flat/flat_executor_tx_env.cpp
@@ -0,0 +1,25 @@
+#include "flat_executor_tx_env.h"
+#include "flat_database.h"
+
+namespace NKikimr {
+namespace NTabletFlatExecutor {
+
+ void TPageCollectionTxEnv::MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap)
+ {
+ auto tables = snap->TablesToSnapshot();
+ Y_VERIFY(tables);
+
+ for (ui32 table : tables) {
+ auto& entry = MakeSnap[table];
+ entry.Context.push_back(snap);
+ auto epoch = DB.TxSnapTable(table);
+ if (entry.Epoch) {
+ Y_VERIFY(*entry.Epoch == epoch, "Table snapshot changed unexpectedly");
+ } else {
+ entry.Epoch.emplace(epoch);
+ }
+ }
+ }
+
+} // namespace NTabletFlatExecutor
+} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h
index e3b54db82e..bfad1f626f 100644
--- a/ydb/core/tablet_flat/flat_executor_tx_env.h
+++ b/ydb/core/tablet_flat/flat_executor_tx_env.h
@@ -67,6 +67,11 @@ namespace NTabletFlatExecutor {
};
struct TPageCollectionTxEnv : public TPageCollectionReadEnv, public IExecuting {
+ TPageCollectionTxEnv(NTable::TDatabase& db, TPrivatePageCache& cache)
+ : TPageCollectionReadEnv(cache)
+ , DB(db)
+ { }
+
using TLogoId = TLogoBlobID;
struct TBorrowSnap {
@@ -118,6 +123,7 @@ namespace NTabletFlatExecutor {
struct TSnapshot {
TVector<TIntrusivePtr<TTableSnapshotContext>> Context;
+ std::optional<NTable::TEpoch> Epoch;
};
using TPageCollectionReadEnv::TPageCollectionReadEnv;
@@ -134,13 +140,7 @@ namespace NTabletFlatExecutor {
}
protected: /* IExecuting, tx stage func implementation */
- void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override
- {
- Y_VERIFY(snap->TablesToSnapshot());
-
- for (ui32 table : snap->TablesToSnapshot())
- MakeSnap[table].Context.push_back(snap);
- }
+ void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override;
void DropSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override
{
@@ -194,6 +194,9 @@ namespace NTabletFlatExecutor {
LoanConfirmation.insert(std::make_pair(bundle, TLoanConfirmation{borrow}));
}
+ protected:
+ NTable::TDatabase& DB;
+
public:
/*_ Pending database shanshots */
diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp
index d3f008825f..0568b33a50 100644
--- a/ydb/core/tablet_flat/flat_executor_ut.cpp
+++ b/ydb/core/tablet_flat/flat_executor_ut.cpp
@@ -4891,5 +4891,135 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs) {
} // Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs)
+Y_UNIT_TEST_SUITE(TFlatTableSnapshotWithCommits) {
+
+ struct TTxMakeSnapshotAndWrite : public ITransaction {
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ TIntrusivePtr<TTableSnapshotContext> snapContext =
+ new TTestTableSnapshotContext({TRowsModel::TableId});
+ txc.Env.MakeSnapshot(snapContext);
+
+ for (i64 keyValue = 101; keyValue <= 104; ++keyValue) {
+ const auto key = NScheme::TInt64::TInstance(keyValue);
+
+ TString str = "value";
+ const auto val = NScheme::TString::TInstance(str);
+ NTable::TUpdateOp ops{ TRowsModel::ColumnValueId, NTable::ECellOp::Set, val };
+
+ txc.DB.Update(TRowsModel::TableId, NTable::ERowOp::Upsert, { key }, { ops });
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+ };
+
+ struct TTxBorrowSnapshot : public ITransactionWithExecutor {
+ TTxBorrowSnapshot(TString& snapBody, TIntrusivePtr<TTableSnapshotContext> snapContext, ui64 targetTabletId)
+ : SnapBody(snapBody)
+ , SnapContext(std::move(snapContext))
+ , TargetTabletId(targetTabletId)
+ { }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ SnapBody = Executor->BorrowSnapshot(TRowsModel::TableId, *SnapContext, { }, { }, TargetTabletId);
+ txc.Env.DropSnapshot(SnapContext);
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+
+ private:
+ TString& SnapBody;
+ TIntrusivePtr<TTableSnapshotContext> SnapContext;
+ const ui64 TargetTabletId;
+ };
+
+ struct TTxCheckRows : public ITransaction {
+ bool Execute(TTransactionContext &txc, const TActorContext &) override
+ {
+ i64 keyId;
+ TVector<NTable::TTag> tags;
+ tags.push_back(TRowsModel::ColumnValueId);
+ TVector<TRawTypeValue> key;
+ key.emplace_back(&keyId, sizeof(keyId), NScheme::TInt64::TypeId);
+
+ for (keyId = 1; keyId <= 104; ++keyId) {
+ NTable::TRowState row;
+ auto ready = txc.DB.Select(TRowsModel::TableId, key, tags, row);
+ if (ready == NTable::EReady::Page)
+ return false;
+ Y_VERIFY_S(ready == NTable::EReady::Data, "Failed to find key " << keyId);
+ Y_VERIFY(row.GetRowState() == NTable::ERowOp::Upsert);
+ TStringBuf selected = row.Get(0).AsBuf();
+ TString expected = "value";
+ Y_VERIFY(selected == expected);
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext &ctx) override
+ {
+ ctx.Send(ctx.SelfID, new NFake::TEvReturn);
+ }
+ };
+
+ Y_UNIT_TEST(SnapshotWithCommits) {
+ TMyEnvBase env;
+ TRowsModel rows;
+
+ env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG);
+
+ // Start the first tablet
+ env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new TTestFlatTablet(env.Edge, tablet, info);
+ });
+ env.WaitForWakeUp();
+
+ // Init schema
+ {
+ TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy();
+ env.SendSync(rows.MakeScheme(std::move(policy)));
+ }
+
+ // Insert 100 rows
+ Cerr << "...inserting rows" << Endl;
+ env.SendSync(rows.MakeRows(100, 0, 100));
+
+ Cerr << "...making snapshot and writing to table" << Endl;
+ env.SendSync(new NFake::TEvExecute{ new TTxMakeSnapshotAndWrite });
+ Cerr << "...waiting for snapshot to complete" << Endl;
+ auto evSnapshot = env.GrabEdgeEvent<TEvTestFlatTablet::TEvSnapshotComplete>();
+ Cerr << "...borrowing snapshot" << Endl;
+ TString snapBody;
+ env.SendSync(new NFake::TEvExecute{ new TTxBorrowSnapshot(snapBody, evSnapshot->Get()->SnapContext, env.Tablet + 1) });
+
+ // Check all added rows one-by-one
+ Cerr << "...checking rows" << Endl;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows });
+
+ for (int i = 0; i < 2; ++i) {
+ Cerr << "...restarting tablet" << Endl;
+ env.SendSync(new TEvents::TEvPoison, false, true);
+ env.WaitForGone();
+ env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
+ return new TTestFlatTablet(env.Edge, tablet, info);
+ });
+ env.WaitForWakeUp();
+
+ // Check all added rows one-by-one
+ Cerr << "...checking rows" << Endl;
+ env.SendSync(new NFake::TEvExecute{ new TTxCheckRows }, /* retry */ true);
+ }
+ }
+
+} // Y_UNIT_TEST_SUITE(TFlatTableSnapshotWithCommits)
+
} // namespace NTabletFlatExecutor
} // namespace NKikimr
diff --git a/ydb/core/tablet_flat/flat_mem_blobs.h b/ydb/core/tablet_flat/flat_mem_blobs.h
index cb29308fe8..25417cb168 100644
--- a/ydb/core/tablet_flat/flat_mem_blobs.h
+++ b/ydb/core/tablet_flat/flat_mem_blobs.h
@@ -64,7 +64,9 @@ namespace NMem {
Store.emplace_back(glob, std::move(data));
Bytes += glob.Logo.BlobSize();
- return Head + (Store.size() - 1);
+ const ui64 ref = Head + (Store.size() - 1);
+
+ return ref;
}
void Assign(TArrayRef<NPageCollection::TLoadedPage> pages) noexcept
@@ -76,6 +78,33 @@ namespace NMem {
}
}
+ void Commit(size_t count, TArrayRef<const NPageCollection::TMemGlob> pages) noexcept
+ {
+ if (count > 0) {
+ size_t currentSize = Store.size();
+ Y_VERIFY(count <= currentSize);
+ Store.Enumerate(currentSize - count, currentSize, [pages](size_t, NPageCollection::TMemGlob& blob) {
+ Y_VERIFY(blob.GId.Logo.TabletID() == 0);
+ const ui32 ref = blob.GId.Logo.Cookie();
+ const auto& fixedBlob = pages.at(ref);
+ Y_VERIFY(fixedBlob.GId.Logo.TabletID() != 0);
+ blob.GId = fixedBlob.GId;
+ });
+ }
+ }
+
+ void Rollback(size_t count) noexcept
+ {
+ if (count > 0) {
+ size_t currentSize = Store.size();
+ Y_VERIFY(count <= currentSize);
+ Store.Enumerate(currentSize - count, currentSize, [this](size_t, NPageCollection::TMemGlob& blob) {
+ Bytes -= blob.GId.Logo.BlobSize();
+ });
+ Store.truncate(currentSize - count);
+ }
+ }
+
public:
const ui64 Head;
diff --git a/ydb/core/tablet_flat/flat_mem_eggs.h b/ydb/core/tablet_flat/flat_mem_eggs.h
index b422598f8d..d25d4cfe94 100644
--- a/ydb/core/tablet_flat/flat_mem_eggs.h
+++ b/ydb/core/tablet_flat/flat_mem_eggs.h
@@ -67,9 +67,16 @@ namespace NMem {
void Push(TUpdate* update) {
const TUpdate* next = GetFirst();
- while (next && update->RowVersion <= next->RowVersion) {
- // Collapse row versions that are no longer reachable
- next = next->Next;
+ // Collapse row versions that are no longer reachable to optimize
+ // future searches, however deltas are not collapsed because we
+ // don't know which version they may be committed at.
+ if (update->RowVersion.Step != Max<ui64>()) {
+ while (next &&
+ next->RowVersion.Step != Max<ui64>() &&
+ update->RowVersion <= next->RowVersion)
+ {
+ next = next->Next;
+ }
}
update->Next = next;
Chain = update;
diff --git a/ydb/core/tablet_flat/flat_mem_warm.cpp b/ydb/core/tablet_flat/flat_mem_warm.cpp
index df1cb798e2..627a131d38 100644
--- a/ydb/core/tablet_flat/flat_mem_warm.cpp
+++ b/ydb/core/tablet_flat/flat_mem_warm.cpp
@@ -31,12 +31,87 @@ NMem::TTreeSnapshot TMemTable::Immediate() const {
// 1) When taking a snapshot of a frozen mem table, in that case we know
// the tree is frozen and will not be modified, so using an unsafe
// snapshot is ok.
- // 2) When taking a snapshot of a mutable mem table, but used in table
- // iterators. In all those cases we know mem table is not modified
- // until transaction is committed, so using unsafe snapshot is ok.
+ // 2) When taking a snapshot of a mutable mem table, but we can guarantee
+ // mem table will not be changed while iterator is still valid, for
+ // example during point reads.
return NMem::TTreeSnapshot(Tree.UnsafeSnapshot());
}
+void TMemTable::PrepareRollback() {
+ Y_VERIFY(!RollbackState);
+ auto& state = RollbackState.emplace();
+ state.Snapshot = Tree.Snapshot();
+ state.OpsCount = OpsCount;
+ state.RowCount = RowCount;
+ state.MinRowVersion = MinRowVersion;
+ state.MaxRowVersion = MaxRowVersion;
+ Pool.BeginTransaction();
+}
+
+void TMemTable::RollbackChanges() {
+ Y_VERIFY(RollbackState);
+ auto& state = *RollbackState;
+ Tree.RollbackTo(std::move(state.Snapshot));
+ Tree.CollectGarbage();
+ Pool.RollbackTransaction();
+ Blobs.Rollback(state.AddedBlobs);
+ OpsCount = state.OpsCount;
+ RowCount = state.RowCount;
+ MinRowVersion = state.MinRowVersion;
+ MaxRowVersion = state.MaxRowVersion;
+
+ struct TApplyUndoOp {
+ TMemTable* Self;
+
+ void operator()(const TUndoOpUpdateCommitted& op) const {
+ Self->Committed[op.TxId] = op.Value;
+ }
+
+ void operator()(const TUndoOpEraseCommitted& op) const {
+ Self->Committed.erase(op.TxId);
+ }
+
+ void operator()(const TUndoOpInsertRemoved& op) const {
+ Self->Removed.insert(op.TxId);
+ }
+
+ void operator()(const TUndoOpEraseRemoved& op) const {
+ Self->Removed.erase(op.TxId);
+ }
+
+ void operator()(const TUndoOpUpdateTxIdStats& op) const {
+ Self->TxIdStats[op.TxId] = op.Value;
+ }
+
+ void operator()(const TUndoOpEraseTxIdStats& op) const {
+ Self->TxIdStats.erase(op.TxId);
+ }
+ };
+
+ while (!UndoBuffer.empty()) {
+ std::visit(TApplyUndoOp{ this }, UndoBuffer.back());
+ UndoBuffer.pop_back();
+ }
+
+ RollbackState.reset();
+}
+
+void TMemTable::CommitChanges(TArrayRef<const TMemGlob> blobs) {
+ Y_VERIFY(RollbackState);
+ auto& state = *RollbackState;
+ state.Snapshot = { };
+ Tree.CollectGarbage();
+ Pool.CommitTransaction();
+ Blobs.Commit(state.AddedBlobs, blobs);
+ UndoBuffer.clear();
+ RollbackState.reset();
+}
+
+void TMemTable::CommitBlobs(TArrayRef<const TMemGlob> blobs) {
+ Y_VERIFY(!RollbackState);
+ Blobs.Commit(Blobs.Size(), blobs);
+}
+
void TMemTable::DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const {
auto it = Immediate().Iterator();
auto types = Scheme->Keys->BasicTypes();
diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h
index b3e5ef9579..9ce331624f 100644
--- a/ydb/core/tablet_flat/flat_mem_warm.h
+++ b/ydb/core/tablet_flat/flat_mem_warm.h
@@ -9,6 +9,7 @@
#include "flat_page_blobs.h"
#include "flat_sausage_solid.h"
#include "flat_table_committed.h"
+#include "util_pool.h"
#include <ydb/core/scheme/scheme_tablecell.h>
#include <ydb/core/scheme/scheme_type_id.h>
#include <ydb/core/util/btree_cow.h>
@@ -78,7 +79,90 @@ namespace NMem {
}
};
- using TTree = TCowBTree<TTreeKey, TTreeValue, TKeyCmp>;
+ class TTreeAllocatorState {
+ struct TFreeItem {
+ TFreeItem* Next;
+ };
+
+ public:
+ explicit TTreeAllocatorState(size_t pageSize)
+ : PageSize(pageSize)
+ {
+ Y_VERIFY(PageSize >= sizeof(TFreeItem));
+ }
+
+ ~TTreeAllocatorState() noexcept {
+ TFreeItem* head = Head;
+ while (head) {
+ TFreeItem* next = head->Next;
+ ::operator delete(head, PageSize);
+ head = next;
+ }
+ }
+
+ [[nodiscard]] void* Allocate(size_t size) {
+ if (Y_LIKELY(size == PageSize)) {
+ if (Head) {
+ TFreeItem* item = Head;
+ Head = item->Next;
+ NSan::Poison(item, PageSize);
+ return item;
+ }
+ }
+ return ::operator new(size);
+ }
+
+ void Deallocate(void* p, size_t size) noexcept {
+ if (Y_LIKELY(size == PageSize)) {
+ NSan::Poison(p, size);
+ TFreeItem* item = reinterpret_cast<TFreeItem*>(p);
+ item->Next = Head;
+ Head = item;
+ } else {
+ ::operator delete(p, size);
+ }
+ }
+
+ private:
+ const size_t PageSize;
+ TFreeItem* Head = nullptr;
+ };
+
+ template<class T>
+ class TTreeAllocator {
+ public:
+ template<class U>
+ friend class TTreeAllocator;
+
+ using value_type = T;
+
+ TTreeAllocator(TTreeAllocatorState* state) noexcept
+ : State(state)
+ { }
+
+ template<class U>
+ TTreeAllocator(const TTreeAllocator<U>& other) noexcept
+ : State(other.State)
+ { }
+
+ [[nodiscard]] T* allocate(size_t n) {
+ return static_cast<T*>(State->Allocate(sizeof(T) * n));
+ }
+
+ void deallocate(T* p, size_t n) noexcept {
+ State->Deallocate(p, sizeof(T) * n);
+ }
+
+ template<class U>
+ bool operator==(const TTreeAllocator<U>& rhs) const noexcept {
+ return State == rhs.State;
+ }
+
+ private:
+ TTreeAllocatorState* State = nullptr;
+ };
+
+ using TTree = TCowBTree<TTreeKey, TTreeValue, TKeyCmp, TTreeAllocator<TTreeValue>, 512>;
class TTreeIterator;
class TTreeSnapshot;
@@ -89,27 +173,11 @@ namespace NMem {
struct TMemTableSnapshot;
+ struct TMemTableRollbackState;
+
class TMemTable : public TThrRefBase {
friend class TMemIt;
- template <size_t SizeCap = 512*1024, size_t Overhead = 64>
- class TMyPolicy : public TMemoryPool::IGrowPolicy {
- public:
- size_t Next(size_t prev) const noexcept override
- {
- if (prev >= SizeCap - Overhead)
- return SizeCap - Overhead;
-
- // Use same buckets as LF-alloc (4KB, 6KB, 8KB, 12KB, 16KB ...)
- size_t size = FastClp2(prev);
-
- if (size < prev + prev/3)
- size += size/2;
-
- return size - Overhead;
- }
- };
-
public:
struct TTxIdStat {
ui64 OpsCount = 0;
@@ -127,11 +195,12 @@ namespace NMem {
, Scheme(scheme)
, Blobs(annex)
, Comparator(*Scheme)
- , Pool(chunk, &Policy)
- , Tree(Comparator) // TODO: support TMemoryPool with caching
+ , Pool(chunk)
+ , TreeAllocatorState(TTree::PageSize)
+ , Tree(Comparator, &TreeAllocatorState)
{}
- void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<TMemGlob> pages, TRowVersion rowVersion,
+ void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<const TMemGlob> pages, TRowVersion rowVersion,
NTable::ITransactionMapSimplePtr committed)
{
Y_VERIFY_DEBUG(
@@ -302,6 +371,9 @@ namespace NMem {
/* Transformation REDO ELargeObj to TBlobs reference */
const auto ref = Blobs.Push(pages.at(cell.AsValue<ui32>()));
+ if (RollbackState) {
+ RollbackState->AddedBlobs++;
+ }
cell = TCell::Make<ui64>(ref);
@@ -331,6 +403,14 @@ namespace NMem {
if (rowVersion.Step == Max<ui64>()) {
MinRowVersion = TRowVersion::Min();
MaxRowVersion = TRowVersion::Max();
+ if (RollbackState) {
+ auto it = TxIdStats.find(rowVersion.TxId);
+ if (it != TxIdStats.end()) {
+ UndoBuffer.push_back(TUndoOpUpdateTxIdStats{ rowVersion.TxId, it->second });
+ } else {
+ UndoBuffer.push_back(TUndoOpEraseTxIdStats{ rowVersion.TxId });
+ }
+ }
++TxIdStats[rowVersion.TxId].OpsCount;
} else {
MinRowVersion = Min(MinRowVersion, rowVersion);
@@ -341,14 +421,14 @@ namespace NMem {
size_t GetUsedMem() const noexcept
{
return
- Pool.MemoryAllocated()
+ Pool.Used()
+ (Tree.AllocatedPages() - Tree.DroppedPages()) * TTree::PageSize
+ Blobs.GetBytes();
}
size_t GetWastedMem() const noexcept
{
- return Pool.MemoryWaste();
+ return Pool.Wasted();
}
ui64 GetOpsCount() const noexcept { return OpsCount; }
@@ -369,6 +449,11 @@ namespace NMem {
return &Blobs;
}
+ void PrepareRollback();
+ void RollbackChanges();
+ void CommitChanges(TArrayRef<const TMemGlob> blobs);
+ void CommitBlobs(TArrayRef<const TMemGlob> blobs);
+
const TTxIdStats& GetTxIdStats() const {
return TxIdStats;
}
@@ -376,9 +461,22 @@ namespace NMem {
void CommitTx(ui64 txId, TRowVersion rowVersion) {
auto it = Committed.find(txId);
if (it == Committed.end() || it->second > rowVersion) {
+ if (RollbackState) {
+ if (it != Committed.end()) {
+ UndoBuffer.push_back(TUndoOpUpdateCommitted{ txId, it->second });
+ } else {
+ UndoBuffer.push_back(TUndoOpEraseCommitted{ txId });
+ }
+ }
Committed[txId] = rowVersion;
if (it == Committed.end()) {
- Removed.erase(txId);
+ auto itRemoved = Removed.find(txId);
+ if (itRemoved != Removed.end()) {
+ if (RollbackState) {
+ UndoBuffer.push_back(TUndoOpInsertRemoved{ txId });
+ }
+ Removed.erase(itRemoved);
+ }
}
}
}
@@ -386,7 +484,13 @@ namespace NMem {
void RemoveTx(ui64 txId) {
auto it = Committed.find(txId);
if (it == Committed.end()) {
- Removed.insert(txId);
+ auto itRemoved = Removed.find(txId);
+ if (itRemoved == Removed.end()) {
+ if (RollbackState) {
+ UndoBuffer.push_back(TUndoOpEraseRemoved{ txId });
+ }
+ Removed.insert(txId);
+ }
}
}
@@ -425,7 +529,7 @@ namespace NMem {
{
const bool small = TCell::CanInline(size);
- return { small ? data : Pool.Append(data, size), size };
+ return { small ? data : (const char*)Pool.Append(data, size), size };
}
void DebugDump() const;
@@ -437,8 +541,8 @@ namespace NMem {
private:
NMem::TBlobs Blobs;
const NMem::TKeyCmp Comparator;
- TMyPolicy<> Policy;
- TMemoryPool Pool;
+ NUtil::TMemoryPool Pool;
+ NMem::TTreeAllocatorState TreeAllocatorState;
TTree Tree;
ui64 OpsCount = 0;
ui64 RowCount = 0;
@@ -449,6 +553,52 @@ namespace NMem {
THashSet<ui64> Removed;
private:
+ struct TRollbackState {
+ TTree::TSnapshot Snapshot;
+ ui64 OpsCount;
+ ui64 RowCount;
+ TRowVersion MinRowVersion;
+ TRowVersion MaxRowVersion;
+ size_t AddedBlobs = 0;
+ };
+
+ std::optional<TRollbackState> RollbackState;
+
+ private:
+ struct TUndoOpUpdateCommitted {
+ ui64 TxId;
+ TRowVersion Value;
+ };
+ struct TUndoOpEraseCommitted {
+ ui64 TxId;
+ };
+ struct TUndoOpInsertRemoved {
+ ui64 TxId;
+ };
+ struct TUndoOpEraseRemoved {
+ ui64 TxId;
+ };
+ struct TUndoOpUpdateTxIdStats {
+ ui64 TxId;
+ TTxIdStat Value;
+ };
+ struct TUndoOpEraseTxIdStats {
+ ui64 TxId;
+ };
+
+ using TUndoOp = std::variant<
+ TUndoOpUpdateCommitted,
+ TUndoOpEraseCommitted,
+ TUndoOpInsertRemoved,
+ TUndoOpEraseRemoved,
+ TUndoOpUpdateTxIdStats,
+ TUndoOpEraseTxIdStats>;
+
+ // This buffer is applied in reverse on rollback
+ // Memory is reused to avoid hot path allocations
+ std::vector<TUndoOp> UndoBuffer;
+
+ private:
// Temporary buffers to avoid hot path allocations
using TTagWithPos = std::pair<TTag, ui32>;
TSmallVec<TTagWithPos> ScratchUpdateTags;
diff --git a/ydb/core/tablet_flat/flat_redo_writer.h b/ydb/core/tablet_flat/flat_redo_writer.h
index e768179171..8ec6427dd5 100644
--- a/ydb/core/tablet_flat/flat_redo_writer.h
+++ b/ydb/core/tablet_flat/flat_redo_writer.h
@@ -50,21 +50,14 @@ namespace NRedo {
class TWriter {
private:
- struct TSizeInfo {
- ui32 Size;
- IAnnex::TLimit Limit;
+ struct TOutputMark {
};
public:
using TOpsRef = TArrayRef<const TUpdateOp>;
- using IOut = IOutputStream;
+ using IOut = TOutputMark;
- TWriter(IAnnex *annex = nullptr, ui32 edge = 1024)
- : Edge(edge)
- , Annex(annex)
- {
-
- }
+ TWriter() = default;
explicit operator bool() const noexcept
{
@@ -76,23 +69,21 @@ namespace NRedo {
return TotalSize;
}
- TList<TString> Unwrap() noexcept
- {
- return TotalSize = 0, std::move(Events);
- }
-
TWriter& EvBegin(ui32 tail, ui32 head, ui64 serial, ui64 stamp)
{
Y_VERIFY(tail <= head, "Invalid ABI/API evolution span");
Y_VERIFY(serial > 0, "Serial of EvBegin starts with 1");
- const ui32 size = sizeof(TEvBegin_v1);
+ const ui32 size = sizeof(TEvBegin_v1);
TEvBegin_v1 ev{ { ERedo::Begin, 0, 0x8000, size },
tail, head, serial, stamp };
- void* evBegin = &ev;
- return Push(TString(NUtil::NBin::ToByte(evBegin), size), size);
+ auto out = Begin(size);
+
+ Write(out, &ev, sizeof(ev));
+
+ return Flush(size);
}
TWriter& EvFlush(ui32 table, ui64 stamp, TEpoch epoch)
@@ -101,8 +92,12 @@ namespace NRedo {
TEvFlush ev{ { ERedo::Flush, 0, 0x8000, size },
table, 0, stamp, epoch.ToRedoLog() };
- void* evBegin = &ev;
- return Push(TString(NUtil::NBin::ToByte(evBegin), size), size);
+
+ auto out = Begin(size);
+
+ Write(out, &ev, sizeof(ev));
+
+ return Flush(size);
}
TWriter& EvAnnex(TArrayRef<const NPageCollection::TGlobId> blobs)
@@ -111,7 +106,7 @@ namespace NRedo {
Y_VERIFY(blobs.size() <= Max<ui32>(), "Too large blobs catalog");
- const ui32 size = sizeof(TEvAnnex) + SizeOf(blobs);
+ const ui32 size = sizeof(TEvAnnex) + SizeOf(blobs);
TEvAnnex ev{ { ERedo::Annex, 0, 0x8000, size }, ui32(blobs.size()) };
@@ -120,11 +115,11 @@ namespace NRedo {
Write(out, &ev, sizeof(ev));
Write(out, static_cast<const void*>(blobs.begin()), SizeOf(blobs));
- return Push(std::move(out.Str()), size);
+ return Flush(size);
}
template<class TCallback>
- TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, ERedo tag, ui32 tailSize, TCallback&& tailCallback, bool isDelta = false)
+ TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, ERedo tag, ui32 tailSize, TCallback&& tailCallback)
{
if (TCellOp::HaveNoOps(rop) && ops) {
Y_FAIL("Given ERowOp cannot have update operations");
@@ -132,9 +127,7 @@ namespace NRedo {
Y_FAIL("Too large key or too many operations in one ops");
}
- const auto sizeInfo = CalcSize(key, ops, table, isDelta);
-
- const ui32 size = sizeof(TEvUpdate) + tailSize + sizeInfo.Size;
+ const ui32 size = sizeof(TEvUpdate) + tailSize + CalcSize(key, ops);
auto out = Begin(size);
TEvUpdate ev{ { tag, 0, 0x8000, size },
@@ -145,9 +138,9 @@ namespace NRedo {
tailCallback(out);
Write(out, key);
- Write(out, ops, table, sizeInfo.Limit);
+ Write(out, ops);
- return Push(std::move(out.Str()), size);
+ return Flush(size);
}
TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, TRowVersion rowVersion)
@@ -170,8 +163,7 @@ namespace NRedo {
[&](auto& out) {
TEvUpdateTx tail{ txId };
Write(out, &tail, sizeof(tail));
- },
- /* isDelta */ true);
+ });
}
TWriter& EvRemoveTx(ui32 table, ui64 txId)
@@ -181,8 +173,11 @@ namespace NRedo {
TEvRemoveTx ev{ { ERedo::RemoveTx, 0, 0x8000, size },
table, 0, txId };
- void* evBegin = &ev;
- return Push(TString(NUtil::NBin::ToByte(evBegin), size), size);
+ auto out = Begin(size);
+
+ Write(out, &ev, sizeof(ev));
+
+ return Flush(size);
}
TWriter& EvCommitTx(ui32 table, ui64 txId, TRowVersion rowVersion)
@@ -192,97 +187,74 @@ namespace NRedo {
TEvCommitTx ev{ { ERedo::CommitTx, 0, 0x8000, size },
table, 0, txId, rowVersion.Step, rowVersion.TxId };
- void* evBegin = &ev;
- return Push(TString(NUtil::NBin::ToByte(evBegin), size), size);
+ auto out = Begin(size);
+
+ Write(out, &ev, sizeof(ev));
+
+ return Flush(size);
}
- TWriter& Join(TWriter &log)
+ TWriter& Join(TWriter &&log)
{
TotalSize += std::exchange(log.TotalSize, 0);
- Events.splice(Events.end(), log.Events);
+ Events.append(log.Events);
+ log.Events.clear();
return *this;
}
- TString Dump() const noexcept
+ TString Finish() && noexcept
{
- auto out = Begin(TotalSize);
-
- for (auto one : Events)
- out.Write(one);
-
- Y_VERIFY(out.Str().size() == TotalSize);
+ TString events;
+ events.swap(Events);
- NSan::CheckMemIsInitialized(out.Str().data(), out.Str().size());
+ NSan::CheckMemIsInitialized(events.data(), events.size());
- return std::move(out.Str());
+ TotalSize = 0;
+ return events;
}
private:
- static TStringStream Begin(size_t bytes)
+ TOutputMark Begin(size_t bytes)
{
- TStringStream out;
+ if ((Events.capacity() - Events.size()) < bytes) {
+ Events.reserve(FastClp2(Events.size() + bytes));
+ }
- return out.Reserve(bytes), out;
+ return TOutputMark{};
}
- TWriter& Push(TString buf, size_t bytes)
+ TWriter& Flush(size_t bytes)
{
- Y_VERIFY(buf.size() == bytes, "Got an invalid redo entry buffer");
-
- TotalSize += buf.size();
+ TotalSize += bytes;
- Events.emplace_back(std::move(buf));
+ Y_VERIFY(Events.size() == TotalSize, "Got an inconsistent redo entry size");
return *this;
}
- IAnnex::TLimit GetLimit(ui32 table, bool isDelta) const noexcept
- {
- IAnnex::TLimit limit;
- // FIXME: we cannot handle blob references during scans, so we
- // avoid creating large objects when they are in deltas
- if (!isDelta && Annex && table != Max<ui32>()) {
- limit = Annex->Limit(table);
- } else {
- limit = { Max<ui32>(), Max<ui32>() };
- }
- limit.MinExternSize = Max(limit.MinExternSize, Edge);
- return limit;
- }
-
- TSizeInfo CalcSize(TRawVals key, TOpsRef ops, ui32 table, bool isDelta) const noexcept
+ ui32 CalcSize(TRawVals key) const noexcept
{
- auto limit = GetLimit(table, isDelta);
-
ui32 size = 0;
- for (const auto &one : ops) {
- /* hack for annex blobs, its replaced by 4-byte reference */
- auto valueSize = one.Value.Size();
- if (Annex && table != Max<ui32>() && limit.IsExtern(valueSize)) {
- valueSize = 4;
- }
-
- size += sizeof(TUpdate) + valueSize;
- }
-
- size += CalcSize(key);
+ for (const auto &one: key)
+ size += sizeof(TValue) + one.Size();
- return TSizeInfo{ size, limit };
+ return size;
}
- ui32 CalcSize(TRawVals key) const noexcept
+ ui32 CalcSize(TRawVals key, TOpsRef ops) const noexcept
{
- ui32 size = 0;
+ ui32 size = CalcSize(key);
- for (const auto &one: key)
- size += sizeof(TValue) + one.Size();
+ for (const auto &one : ops) {
+ size += sizeof(TUpdate) + one.Value.Size();
+ }
return size;
}
- void Write(IOut &out, TRawVals key) const noexcept
+ void Write(IOut &out, TRawVals key) noexcept
{
for (const auto &one : key) {
/* The only way of converting nulls in keys now */
@@ -296,45 +268,30 @@ namespace NRedo {
}
}
- void Write(IOut &out, TOpsRef ops, ui32 table, const IAnnex::TLimit &limit) const noexcept
+ void Write(IOut &out, TOpsRef ops) noexcept
{
for (const auto &one: ops) {
/* Log enty cannot represent this ECellOp types with payload */
Y_VERIFY(!(one.Value && TCellOp::HaveNoPayload(one.Op)));
- /* Log entry cannot recover nulls written as ECellOp::Set with
- null cell. Nulls was encoded with hacky TypeId = 0, but
- the correct way is to use ECellOp::Null.
- */
-
- const ui16 type = one.Value ? one.Value.Type() : 0;
-
- auto cellOp = one.NormalizedCellOp();
+ const ui16 type = one.Value.Type();
- if (cellOp != ELargeObj::Inline) {
- Y_FAIL("User supplied cell value has an invalid ECellOp");
- } else if (auto got = Place(table, limit, one.Tag, one.AsRef())) {
- const auto payload = NUtil::NBin::ToRef(got.Ref);
-
- Write(out, cellOp = ELargeObj::Extern, one.Tag, type, payload);
-
- } else {
- Write(out, cellOp, one.Tag, type, one.Value.AsRef());
+ if (one.Value.IsEmpty()) {
+ // Log entry cannot recover null value type, since we
+ // store null values using a special 0 type id.
+ Y_VERIFY(type == 0, "Cannot write typed null values");
+ // Null value cannot have ECellOp::Set as its op, since we
+ // don't have the necessary type id, instead we expect
+ // it to be either ECellOp::Null or ECellOp::Reset.
+ Y_VERIFY(one.Op != ECellOp::Set, "Cannot write ECellOp::Set with a null value");
}
- }
- }
- IAnnex::TResult Place(ui32 table, const IAnnex::TLimit &limit, TTag tag, TArrayRef<const char> raw) const noexcept
- {
- if (Annex && table != Max<ui32>() && limit.IsExtern(raw.size())) {
- return Annex->Place(table, tag, raw);
- } else {
- return { };
+ Write(out, one.Op, one.Tag, type, one.Value.AsRef());
}
}
- static void Write(IOut &out, TCellOp cellOp, TTag tag, ui16 type, TArrayRef<const char> raw)
+ void Write(IOut &out, TCellOp cellOp, TTag tag, ui16 type, TArrayRef<const char> raw)
{
TUpdate up = { cellOp, tag, { type , ui32(raw.size()) } };
@@ -342,18 +299,15 @@ namespace NRedo {
Write(out, raw.data(), raw.size());
}
- static void Write(IOut &out, const void *ptr, size_t size)
+ void Write(IOut &, const void *ptr, size_t size)
{
NSan::CheckMemIsInitialized(ptr, size);
-
- out.Write(ptr, size);
+ Events.append(reinterpret_cast<const char*>(ptr), size);
}
private:
- const ui32 Edge = 1024;
- IAnnex * const Annex = nullptr;
size_t TotalSize = 0;
- TList<TString> Events;
+ TString Events;
};
}
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index d7cc19e635..ca3f4de632 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -20,6 +20,131 @@ TTable::TTable(TEpoch epoch) : Epoch(epoch) { }
TTable::~TTable() { }
+void TTable::PrepareRollback()
+{
+ Y_VERIFY(!RollbackState);
+ auto& state = RollbackState.emplace(Epoch);
+ state.Annexed = Annexed;
+ state.Scheme = Scheme;
+ state.EraseCacheEnabled = EraseCacheEnabled;
+ state.EraseCacheConfig = EraseCacheConfig;
+ state.MutableExisted = bool(Mutable);
+ state.MutableUpdated = false;
+}
+
+void TTable::RollbackChanges()
+{
+ Y_VERIFY(RollbackState, "PrepareRollback needed to rollback changes");
+ auto& state = *RollbackState;
+
+ while (!RollbackOps.empty()) {
+ struct TApplyRollbackOp {
+ TTable* Self;
+
+ void operator()(const TRollbackRemoveOpenTx& op) const {
+ Self->OpenTransactions.erase(op.TxId);
+ }
+
+ void operator()(const TRollbackRemoveOpenTxMem& op) const {
+ Self->OpenTransactions[op.TxId].Mem.erase(op.Mem);
+ }
+
+ void operator()(const TRollbackAddCommittedTx& op) const {
+ Self->CommittedTransactions.Add(op.TxId, op.RowVersion);
+ }
+
+ void operator()(const TRollbackRemoveCommittedTx& op) const {
+ Self->CommittedTransactions.Remove(op.TxId);
+ }
+
+ void operator()(const TRollbackAddRemovedTx& op) const {
+ Self->RemovedTransactions.Add(op.TxId);
+ }
+
+ void operator()(const TRollbackRemoveRemovedTx& op) const {
+ Self->RemovedTransactions.Remove(op.TxId);
+ }
+ };
+
+ std::visit(TApplyRollbackOp{ this }, RollbackOps.back());
+ RollbackOps.pop_back();
+ }
+
+ if (Epoch != state.Epoch) {
+ // We performed a snapshot, roll it back
+ if (Mutable) {
+ ErasedKeysCache.Reset();
+ Mutable = nullptr;
+ }
+ Y_VERIFY(MutableBackup, "Previous mem table missing");
+ Mutable = std::move(MutableBackup);
+ } else if (!state.MutableExisted) {
+ // New memtable doesn't need rollback
+ if (Mutable) {
+ ErasedKeysCache.Reset();
+ Mutable = nullptr;
+ }
+ } else if (state.MutableUpdated) {
+ ErasedKeysCache.Reset();
+ Y_VERIFY(Mutable, "Mutable was updated, but it is missing");
+ Mutable->RollbackChanges();
+ }
+ Y_VERIFY(!MutableBackup);
+
+ Epoch = state.Epoch;
+ Annexed = state.Annexed;
+ if (state.Scheme) {
+ Levels.Reset();
+ ErasedKeysCache.Reset();
+ Scheme = std::move(state.Scheme);
+ EraseCacheEnabled = state.EraseCacheEnabled;
+ EraseCacheConfig = state.EraseCacheConfig;
+ }
+ RollbackState.reset();
+}
+
+void TTable::CommitChanges(TArrayRef<const TMemGlob> blobs)
+{
+ Y_VERIFY(RollbackState, "PrepareRollback needed to rollback changes");
+ auto& state = *RollbackState;
+
+ RollbackOps.clear();
+
+ if (Epoch != state.Epoch) {
+ if (Mutable && blobs) {
+ Mutable->CommitBlobs(blobs);
+ }
+ // We performed a snapshot, move it to Frozen
+ Y_VERIFY(MutableBackup, "Mem table snaphot missing");
+ Frozen.insert(MutableBackup);
+ Stat_.FrozenWaste += MutableBackup->GetWastedMem();
+ Stat_.FrozenSize += MutableBackup->GetUsedMem();
+ Stat_.FrozenOps += MutableBackup->GetOpsCount();
+ Stat_.FrozenRows += MutableBackup->GetRowCount();
+ MutableBackup = nullptr;
+ } else if (!state.MutableExisted) {
+ // Fresh mem table is not prepared for rollback
+ if (Mutable && blobs) {
+ Mutable->CommitBlobs(blobs);
+ }
+ } else if (state.MutableUpdated) {
+ Y_VERIFY(Mutable, "Mutable was updated, but it is missing");
+ Mutable->CommitChanges(blobs);
+ }
+ Y_VERIFY(!MutableBackup);
+
+ RollbackState.reset();
+}
+
+void TTable::CommitNewTable(TArrayRef<const TMemGlob> blobs)
+{
+ Y_VERIFY(!RollbackState, "CommitBlobs must only be used for new tables without rollback");
+
+ if (Mutable && blobs) {
+ Mutable->CommitBlobs(blobs);
+ }
+}
+
void TTable::SetScheme(const TScheme::TTableInfo &table)
{
Snapshot();
@@ -29,6 +154,12 @@ void TTable::SetScheme(const TScheme::TTableInfo &table)
Y_VERIFY(!Mutable && table.Columns);
+ if (RollbackState && !RollbackState->Scheme) {
+ RollbackState->Scheme = Scheme;
+ RollbackState->EraseCacheEnabled = EraseCacheEnabled;
+ RollbackState->EraseCacheConfig = EraseCacheConfig;
+ }
+
auto to = TRowScheme::Make(table.Columns, NUtil::TSecond());
if (auto was = std::exchange(Scheme, to))
@@ -66,6 +197,9 @@ TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch hea
subset->Frozen.emplace_back(x, x->Immediate());
}
}
+ if (MutableBackup && MutableBackup->Epoch < head) {
+ subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate());
+ }
for (const auto &pr : TxStatus) {
if (pr.second->Epoch < head) {
subset->TxStatus.emplace_back(pr.second);
@@ -114,6 +248,10 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept
if (it->Epoch < head)
subset->Frozen.emplace_back(it, it->Immediate());
+ if (MutableBackup && MutableBackup->Epoch < head) {
+ subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate());
+ }
+
// This method is normally used when we want to take some state snapshot
// However it can still theoretically be used for iteration or compaction
subset->CommittedTransactions = CommittedTransactions;
@@ -141,6 +279,13 @@ bool TTable::HasBorrowed(ui64 selfTabletId) const noexcept
TAutoPtr<TSubset> TTable::ScanSnapshot(TRowVersion snapshot) noexcept
{
+ if (RollbackState) {
+ Y_VERIFY(Epoch == RollbackState->Epoch &&
+ RollbackState->MutableExisted == bool(Mutable) &&
+ !RollbackState->MutableUpdated,
+ "Cannot take scan snapshot of a modified table");
+ }
+
TAutoPtr<TSubset> subset = new TSubset(Epoch, Scheme);
// TODO: we could filter LSM by the provided snapshot version, but it
@@ -198,6 +343,8 @@ TBundleSlicesMap TTable::LookupSlices(TArrayRef<const TLogoBlobID> bundles) cons
void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
for (auto &kv : slices) {
auto it = Flatten.find(kv.first);
Y_VERIFY(it != Flatten.end(), "Got an unknown TPart in ReplaceSlices");
@@ -213,6 +360,8 @@ void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept
void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
for (const auto &partView : partViews) {
Y_VERIFY(partView, "Replace(...) shouldn't get empty parts");
Y_VERIFY(!partView.Screen, "Replace(...) shouldn't get screened parts");
@@ -341,6 +490,8 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset
void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus, const TSubset &subset) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
for (auto &part : subset.TxStatus) {
Y_VERIFY(part, "Unexpected empty TTxStatusPart in TSubset");
@@ -367,6 +518,8 @@ void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>
void TTable::Merge(TPartView partView) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
Y_VERIFY(partView, "Merge(...) shouldn't get empty part");
Y_VERIFY(partView.Slices, "Merge(...) shouldn't get parts without slices");
@@ -397,6 +550,8 @@ void TTable::Merge(TPartView partView) noexcept
void TTable::Merge(TIntrusiveConstPtr<TColdPart> part) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
Y_VERIFY(part, "Merge(...) shouldn't get empty parts");
if (Mutable && part->Epoch >= Mutable->Epoch) {
@@ -424,6 +579,8 @@ void TTable::Merge(TIntrusiveConstPtr<TColdPart> part) noexcept
void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
Y_VERIFY(txStatus, "Unexpected empty TTxStatusPart");
for (auto& item : txStatus->TxStatusPage->GetCommittedItems()) {
@@ -517,6 +674,8 @@ ui64 TTable::GetSearchHeight() const noexcept
TVector<TIntrusiveConstPtr<TMemTable>> TTable::GetMemTables() const noexcept
{
+ Y_VERIFY(!RollbackState, "Cannot perform this in a transaction");
+
TVector<TIntrusiveConstPtr<TMemTable>> vec(Frozen.begin(), Frozen.end());
if (Mutable)
@@ -530,11 +689,21 @@ TEpoch TTable::Snapshot() noexcept
if (Mutable) {
Annexed = Mutable->GetBlobs()->Tail();
- Frozen.insert(Mutable);
- Stat_.FrozenWaste += Mutable->GetWastedMem();
- Stat_.FrozenSize += Mutable->GetUsedMem();
- Stat_.FrozenOps += Mutable->GetOpsCount();
- Stat_.FrozenRows += Mutable->GetRowCount();
+ if (RollbackState) {
+ Y_VERIFY(
+ RollbackState->Epoch == Mutable->Epoch &&
+ RollbackState->MutableExisted &&
+ !RollbackState->MutableUpdated,
+ "Cannot snapshot a modified table");
+ Y_VERIFY(!MutableBackup, "Another mutable backup already exists");
+ MutableBackup = std::move(Mutable);
+ } else {
+ Frozen.insert(Mutable);
+ Stat_.FrozenWaste += Mutable->GetWastedMem();
+ Stat_.FrozenSize += Mutable->GetUsedMem();
+ Stat_.FrozenOps += Mutable->GetOpsCount();
+ Stat_.FrozenRows += Mutable->GetRowCount();
+ }
Mutable = nullptr; /* have to make new TMemTable on next update */
@@ -637,7 +806,7 @@ EReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags,
return ready ? EReady::Data : EReady::Page;
}
-void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, TRowVersion rowVersion)
+void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, TRowVersion rowVersion)
{
Y_VERIFY(!(ops && TCellOp::HaveNoOps(rop)), "Given ERowOp can't have ops");
@@ -652,12 +821,38 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> a
MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
}
-void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, ui64 txId)
+TTable::TOpenTransaction& TTable::AddOpenTransaction(ui64 txId)
+{
+ TOpenTransactions::insert_ctx ctx = nullptr;
+ TOpenTransactions::iterator it = OpenTransactions.find(txId, ctx);
+
+ if (it == OpenTransactions.end()) {
+ if (RollbackState) {
+ RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId });
+ }
+
+ it = OpenTransactions.emplace_direct(
+ ctx,
+ std::piecewise_construct,
+ std::forward_as_tuple(txId),
+ std::forward_as_tuple());
+ }
+
+ return it->second;
+}
+
+void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId)
{
// Use a special row version that marks this update as uncommitted
TRowVersion rowVersion(Max<ui64>(), txId);
MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions);
- OpenTransactions[txId].Mem.insert(Mutable);
+
+ Y_VERIFY_DEBUG(Mutable->GetTxIdStats().contains(txId));
+
+ auto& openTx = AddOpenTransaction(txId);
+ if (openTx.Mem.insert(Mutable).second && RollbackState) {
+ RollbackOps.emplace_back(TRollbackRemoveOpenTxMem{ txId, Mutable });
+ }
}
void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
@@ -668,8 +863,18 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion)
// Note: it is possible to have multiple CommitTx for the same TxId but at
// different row versions. The commit with the minimum row version wins.
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev) || *prev > rowVersion) {
+ if (RollbackState) {
+ if (prev) {
+ RollbackOps.emplace_back(TRollbackAddCommittedTx{ txId, *prev });
+ } else {
+ RollbackOps.emplace_back(TRollbackRemoveCommittedTx{ txId });
+ }
+ }
CommittedTransactions.Add(txId, rowVersion);
if (!prev) {
+ if (RollbackState && RemovedTransactions.Contains(txId)) {
+ RollbackOps.emplace_back(TRollbackAddRemovedTx{ txId });
+ }
RemovedTransactions.Remove(txId);
}
}
@@ -687,6 +892,9 @@ void TTable::RemoveTx(ui64 txId)
// due to complicated split/merge shard interactions. The commit actually
// wins over removes in all cases.
if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) {
+ if (RollbackState && !RemovedTransactions.Contains(txId)) {
+ RollbackOps.emplace_back(TRollbackRemoveRemovedTx{ txId });
+ }
RemovedTransactions.Add(txId);
}
}
@@ -700,10 +908,28 @@ bool TTable::HasOpenTx(ui64 txId) const
return false;
}
+bool TTable::HasCommittedTx(ui64 txId) const
+{
+ return CommittedTransactions.Find(txId);
+}
+
+bool TTable::HasRemovedTx(ui64 txId) const
+{
+ return RemovedTransactions.Contains(txId);
+}
+
TMemTable& TTable::MemTable()
{
- return
- *(Mutable ? Mutable : (Mutable = new TMemTable(Scheme, Epoch, Annexed)));
+ if (!Mutable) {
+ Mutable = new TMemTable(Scheme, Epoch, Annexed);
+ }
+ if (RollbackState && Epoch == RollbackState->Epoch && RollbackState->MutableExisted) {
+ if (!RollbackState->MutableUpdated) {
+ RollbackState->MutableUpdated = true;
+ Mutable->PrepareRollback();
+ }
+ }
+ return *Mutable;
}
TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek,
@@ -721,7 +947,11 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES
observer));
if (Mutable) {
- dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward));
+ dbIter->Push(TMemIt::Make(*Mutable, Mutable->Snapshot(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward));
+ }
+
+ if (MutableBackup) {
+ dbIter->Push(TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward));
}
for (auto& fti : Frozen) {
@@ -764,7 +994,11 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I
observer));
if (Mutable) {
- dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse));
+ dbIter->Push(TMemIt::Make(*Mutable, Mutable->Snapshot(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse));
+ }
+
+ if (MutableBackup) {
+ dbIter->Push(TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse));
}
for (auto& fti : Frozen) {
@@ -832,6 +1066,18 @@ EReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row,
}
}
+ // Mutable data that is transitioning to frozen
+ if (MutableBackup && !row.IsFinalized()) {
+ lastEpoch = MutableBackup->Epoch;
+ if (auto it = TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) {
+ if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) {
+ // N.B. stop looking for snapshot after the first hit
+ snapshotFound = true;
+ it->Apply(row, committed);
+ }
+ }
+ }
+
// Frozen are sorted by epoch, apply in reverse order
for (auto pos = Frozen.rbegin(); !row.IsFinalized() && pos != Frozen.rend(); ++pos) {
const auto& memTable = *pos;
@@ -926,6 +1172,8 @@ void TTable::DebugDump(IOutputStream& str, IPages* env, const NScheme::TTypeRegi
if (Mutable)
Mutable->DebugDump(str, reg);
+ if (MutableBackup)
+ MutableBackup->DebugDump(str, reg);
for (const auto& it : Frozen) {
str << "Frozen " << it->Epoch << " dump: " << Endl;
it->DebugDump(str, reg);
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index 36493d77d9..14f0ebf262 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -65,6 +65,11 @@ public:
explicit TTable(TEpoch);
~TTable();
+ void PrepareRollback();
+ void RollbackChanges();
+ void CommitChanges(TArrayRef<const TMemGlob> blobs);
+ void CommitNewTable(TArrayRef<const TMemGlob> blobs);
+
void SetScheme(const TScheme::TTableInfo& tableScheme);
TIntrusiveConstPtr<TRowScheme> GetScheme() const noexcept;
@@ -148,9 +153,9 @@ public:
ui64 itemsLimit, ui64 bytesLimit,
EDirection direction, TRowVersion snapshot, TSelectStats& stats) const;
- void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, TRowVersion rowVersion);
+ void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<const TMemGlob> apart, TRowVersion rowVersion);
- void UpdateTx(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, ui64 txId);
+ void UpdateTx(ERowOp, TRawVals key, TOpsRef, TArrayRef<const TMemGlob> apart, ui64 txId);
void CommitTx(ui64 txId, TRowVersion rowVersion);
void RemoveTx(ui64 txId);
@@ -158,6 +163,8 @@ public:
* Returns true when table has an open transaction that is not committed or removed yet
*/
bool HasOpenTx(ui64 txId) const;
+ bool HasCommittedTx(ui64 txId) const;
+ bool HasRemovedTx(ui64 txId) const;
TPartView GetPartView(const TLogoBlobID &bundle) const
{
@@ -217,7 +224,9 @@ public:
ui64 GetMemSize(TEpoch epoch = TEpoch::Max()) const noexcept
{
if (Y_LIKELY(epoch == TEpoch::Max())) {
- return Stat_.FrozenSize + (Mutable ? Mutable->GetUsedMem() : 0);
+ return Stat_.FrozenSize
+ + (Mutable ? Mutable->GetUsedMem() : 0)
+ + (MutableBackup ? MutableBackup->GetUsedMem() : 0);
}
ui64 size = 0;
@@ -228,6 +237,10 @@ public:
}
}
+ if (MutableBackup && MutableBackup->Epoch < epoch) {
+ size += MutableBackup->GetUsedMem();
+ }
+
if (Mutable && Mutable->Epoch < epoch) {
size += Mutable->GetUsedMem();
}
@@ -237,17 +250,23 @@ public:
ui64 GetMemWaste() const noexcept
{
- return Stat_.FrozenWaste + (Mutable ? Mutable->GetWastedMem() : 0);
+ return Stat_.FrozenWaste
+ + (Mutable ? Mutable->GetWastedMem() : 0)
+ + (MutableBackup ? MutableBackup->GetWastedMem() : 0);
}
ui64 GetMemRowCount() const noexcept
{
- return Stat_.FrozenRows + (Mutable ? Mutable->GetRowCount() : 0);
+ return Stat_.FrozenRows
+ + (Mutable ? Mutable->GetRowCount() : 0)
+ + (MutableBackup ? MutableBackup->GetRowCount() : 0);
}
ui64 GetOpsCount() const noexcept
{
- return Stat_.FrozenOps + (Mutable ? Mutable->GetOpsCount() : 0);
+ return Stat_.FrozenOps
+ + (Mutable ? Mutable->GetOpsCount() : 0)
+ + (MutableBackup ? MutableBackup->GetOpsCount() : 0);
}
ui64 GetPartsCount() const noexcept
@@ -257,8 +276,12 @@ public:
ui64 EstimateRowSize() const noexcept
{
- ui64 size = Stat_.FrozenSize + (Mutable ? Mutable->GetUsedMem() : 0);
- ui64 rows = Stat_.FrozenRows + (Mutable ? Mutable->GetRowCount() : 0);
+ ui64 size = Stat_.FrozenSize
+ + (Mutable ? Mutable->GetUsedMem() : 0)
+ + (MutableBackup ? MutableBackup->GetUsedMem() : 0);
+ ui64 rows = Stat_.FrozenRows
+ + (Mutable ? Mutable->GetRowCount() : 0)
+ + (MutableBackup ? MutableBackup->GetRowCount() : 0);
for (const auto& flat : Flatten) {
if (const TPartView &partView = flat.second) {
@@ -297,6 +320,10 @@ private:
THashSet<TIntrusiveConstPtr<TPart>> Parts;
};
+ using TOpenTransactions = THashMap<ui64, TOpenTransaction>;
+
+ TOpenTransaction& AddOpenTransaction(ui64 txId);
+
private:
TEpoch Epoch; /* Monotonic table change number, with holes */
ui64 Annexed = 0; /* Monotonic serial of attached external blobs */
@@ -317,9 +344,62 @@ private:
TRowVersionRanges RemovedRowVersions;
THashSet<ui64> CheckTransactions;
- THashMap<ui64, TOpenTransaction> OpenTransactions;
+ TOpenTransactions OpenTransactions;
TTransactionMap CommittedTransactions;
TTransactionSet RemovedTransactions;
+
+private:
+ struct TRollbackRemoveOpenTx {
+ ui64 TxId;
+ };
+
+ struct TRollbackRemoveOpenTxMem {
+ ui64 TxId;
+ TIntrusiveConstPtr<TMemTable> Mem;
+ };
+
+ struct TRollbackAddCommittedTx {
+ ui64 TxId;
+ TRowVersion RowVersion;
+ };
+
+ struct TRollbackRemoveCommittedTx {
+ ui64 TxId;
+ };
+
+ struct TRollbackAddRemovedTx {
+ ui64 TxId;
+ };
+
+ struct TRollbackRemoveRemovedTx {
+ ui64 TxId;
+ };
+
+ using TRollbackOp = std::variant<
+ TRollbackRemoveOpenTx,
+ TRollbackRemoveOpenTxMem,
+ TRollbackAddCommittedTx,
+ TRollbackRemoveCommittedTx,
+ TRollbackAddRemovedTx,
+ TRollbackRemoveRemovedTx>;
+
+ struct TRollbackState {
+ TEpoch Epoch;
+ TIntrusiveConstPtr<TRowScheme> Scheme;
+ ui64 Annexed;
+ TKeyRangeCacheConfig EraseCacheConfig;
+ bool EraseCacheEnabled;
+ bool MutableExisted;
+ bool MutableUpdated;
+
+ TRollbackState(TEpoch epoch)
+ : Epoch(epoch)
+ { }
+ };
+
+ std::optional<TRollbackState> RollbackState;
+ std::vector<TRollbackOp> RollbackOps;
+ TIntrusivePtr<TMemTable> MutableBackup;
};
}
diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
index bb846d1057..07cb17101a 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h
@@ -82,6 +82,15 @@ namespace NTest {
TDbExec& Reject() { return DoCommit(true, false); }
TDbExec& Relax() { return DoCommit(false, true); }
+ TDbExec& Cleanup() {
+ if (OnTx == EOnTx::Auto) {
+ DoCommit(false, false);
+ }
+
+ Y_VERIFY(OnTx == EOnTx::None);
+ return *this;
+ }
+
TDbExec& ReadVer(TRowVersion readVersion) {
DoBegin(false);
@@ -90,10 +99,44 @@ namespace NTest {
return *this;
}
+ TDbExec& ReadTx(ui64 txId) {
+ DoBegin(false);
+
+ ReadTxId = txId;
+
+ return *this;
+ }
+
TDbExec& WriteVer(TRowVersion writeVersion) {
Y_VERIFY(OnTx != EOnTx::None);
WriteVersion = writeVersion;
+ WriteTxId = 0;
+
+ return *this;
+ }
+
+ TDbExec& WriteTx(ui64 txId) {
+ Y_VERIFY(OnTx != EOnTx::None);
+
+ WriteVersion = TRowVersion::Min();
+ WriteTxId = txId;
+
+ return *this;
+ }
+
+ TDbExec& CommitTx(ui32 table, ui64 txId) {
+ Y_VERIFY(OnTx != EOnTx::None);
+
+ Base->CommitTx(table, txId, WriteVersion);
+
+ return *this;
+ }
+
+ TDbExec& RemoveTx(ui32 table, ui64 txId) {
+ Y_VERIFY(OnTx != EOnTx::None);
+
+ Base->RemoveTx(table, txId);
return *this;
}
@@ -103,7 +146,11 @@ namespace NTest {
const NTest::TRowTool tool(RowSchemeFor(table));
auto pair = tool.Split(row, true, rop != ERowOp::Erase);
- Base->Update(table, rop, pair.Key, pair.Ops, WriteVersion);
+ if (WriteTxId != 0) {
+ Base->UpdateTx(table, rop, pair.Key, pair.Ops, WriteTxId);
+ } else {
+ Base->Update(table, rop, pair.Key, pair.Ops, WriteVersion);
+ }
return *this;
}
@@ -119,6 +166,12 @@ namespace NTest {
return Add(table, row, ERowOp::Upsert);
}
+ template<typename ...TArgs>
+ inline TDbExec& PutN(ui32 table, TArgs&&... args)
+ {
+ return Put(table, *SchemedCookRow(table).Col(std::forward<TArgs>(args)...));
+ }
+
TDbExec& Apply(const TSchemeChanges &delta)
{
Last = Max<ui32>(), Altered = true;
@@ -126,9 +179,9 @@ namespace NTest {
return Base->Alter().Merge(delta), *this;
}
- TDbExec& Snapshot(ui32 table)
+ TEpoch Snapshot(ui32 table)
{
- return Base->TxSnapTable(table), *this;
+ return Base->TxSnapTable(table);
}
NTest::TSchemedCookRow SchemedCookRow(ui32 table) noexcept
@@ -140,7 +193,7 @@ namespace NTest {
{
DoBegin(false), RowSchemeFor(table);
- TCheckIter check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion };
+ TCheckIter check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion, ReadTxId };
return check.To(CurrentStep()), check;
}
@@ -149,7 +202,7 @@ namespace NTest {
{
DoBegin(false), RowSchemeFor(table);
- TCheckIter check{ *Base, { nullptr, 0, true }, table, Scheme, ReadVersion, ENext::Data };
+ TCheckIter check{ *Base, { nullptr, 0, true }, table, Scheme, ReadVersion, ReadTxId, ENext::Data };
return check.To(CurrentStep()), check;
}
@@ -158,13 +211,15 @@ namespace NTest {
{
DoBegin(false), RowSchemeFor(table);
- TCheckSelect check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion };
+ TCheckSelect check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion, ReadTxId };
return check.To(CurrentStep()), check;
}
TDbExec& Snap(ui32 table)
{
+ Cleanup();
+
const auto scn = Base->Head().Serial + 1;
RedoLog.emplace_back(new TChange({ Gen, ++Step }, scn));
@@ -225,6 +280,8 @@ namespace NTest {
{
Y_VERIFY(OnTx != EOnTx::Real, "Commit TX before replaying");
+ Cleanup();
+
const ui64 serial = Base->Head().Serial;
Birth(), Base = nullptr, OnTx = EOnTx::None;
@@ -275,17 +332,10 @@ namespace NTest {
const TRowScheme& RowSchemeFor(ui32 table) noexcept
{
- if (std::exchange(Last, table) == table) {
- /* Safetly can use row scheme from cache */
- } else if (Altered) {
- TScheme temp(Base->GetScheme());
- TSchemeModifier(temp).Apply(*Base->Alter());
-
- const auto *info = temp.GetTableInfo(table);
-
- Scheme = TRowScheme::Make(info->Columns, NUtil::TSecond());
- } else {
- Scheme = Base->Subset(table, { }, TEpoch::Zero())->Scheme;
+ if (std::exchange(Last, table) != table) {
+ // Note: it's ok if the table has been altered, since
+ // we should observe updated schema on all reads.
+ Scheme = Base->GetRowScheme(table);
}
return *Scheme;
@@ -415,6 +465,8 @@ namespace NTest {
ReadVersion = TRowVersion::Max();
WriteVersion = TRowVersion::Min();
+ ReadTxId = 0;
+ WriteTxId = 0;
return *this;
}
@@ -432,6 +484,8 @@ namespace NTest {
TAutoPtr<TSteppedCookieAllocator> Annex;
TRowVersion ReadVersion = TRowVersion::Max();
TRowVersion WriteVersion = TRowVersion::Min();
+ ui64 ReadTxId = 0;
+ ui64 WriteTxId = 0;
};
}
diff --git a/ydb/core/tablet_flat/test/libs/table/test_iter.h b/ydb/core/tablet_flat/test/libs/table/test_iter.h
index c274725613..c5e04a48a2 100644
--- a/ydb/core/tablet_flat/test/libs/table/test_iter.h
+++ b/ydb/core/tablet_flat/test/libs/table/test_iter.h
@@ -120,6 +120,12 @@ namespace NTest {
return erased ? Is(EReady::Gone) : Is(row, true, ERowOp::Erase);
}
+ template<typename ...TArgs>
+ inline TChecker& NoKeyN(TArgs&&... args)
+ {
+ return NoKey(*TSchemedCookRow(Scheme).Col(std::forward<TArgs>(args)...));
+ }
+
template<typename TListType>
TChecker& IsTheSame(const TListType &list)
{
diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h b/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h
index f800e51c25..6d89a40644 100644
--- a/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h
+++ b/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h
@@ -11,11 +11,13 @@ namespace NTest {
TWrapDbIterImpl(TDatabase &base, ui32 table, TIntrusiveConstPtr<TRowScheme> scheme,
TRowVersion snapshot = TRowVersion::Max(),
+ ui64 readTxId = 0,
ENext mode = ENext::All)
: Scheme(std::move(scheme))
, Base(base)
, Table(table)
, Snapshot(snapshot)
+ , ReadTxId(readTxId)
, Mode(mode)
{
@@ -68,7 +70,12 @@ namespace NTest {
swap(range.MinInclusive, range.MaxInclusive);
}
- Iter = Base.IterateRangeGeneric<TIter>(Table, range, Scheme->Tags(), Snapshot);
+ ITransactionMapPtr txMap;
+ if (ReadTxId != 0 && Base.HasOpenTx(Table, ReadTxId)) {
+ txMap = new TSingleTransactionMap(ReadTxId, TRowVersion::Min());
+ }
+
+ Iter = Base.IterateRangeGeneric<TIter>(Table, range, Scheme->Tags(), Snapshot, txMap);
return Iter->Next(Mode);
}
@@ -90,6 +97,7 @@ namespace NTest {
private:
const ui32 Table = Max<ui32>();
const TRowVersion Snapshot;
+ const ui64 ReadTxId;
const ENext Mode;
TAutoPtr<TIter> Iter;
};
diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_select.h b/ydb/core/tablet_flat/test/libs/table/wrap_select.h
index 06069a8d01..ac91e59205 100644
--- a/ydb/core/tablet_flat/test/libs/table/wrap_select.h
+++ b/ydb/core/tablet_flat/test/libs/table/wrap_select.h
@@ -10,12 +10,14 @@ namespace NTest {
struct TWrapDbSelect {
TWrapDbSelect(TDatabase &base, ui32 table, TIntrusiveConstPtr<TRowScheme> scheme,
- TRowVersion snapshot = TRowVersion::Max())
+ TRowVersion snapshot = TRowVersion::Max(),
+ ui64 readTxId = 0)
: Scheme(std::move(scheme))
, Remap_(TRemap::Full(*Scheme))
, Base(base)
, Table(table)
, Snapshot(snapshot)
+ , ReadTxId(readTxId)
{
}
@@ -44,7 +46,12 @@ namespace NTest {
{
Y_VERIFY(seek == ESeek::Exact, "Db Select(...) is a point lookup");
- return (Ready = Base.Select(Table, key, Scheme->Tags(), State, /* readFlags */ 0, Snapshot));
+ ITransactionMapPtr txMap;
+ if (ReadTxId != 0 && Base.HasOpenTx(Table, ReadTxId)) {
+ txMap = new TSingleTransactionMap(ReadTxId, TRowVersion::Min());
+ }
+
+ return (Ready = Base.Select(Table, key, Scheme->Tags(), State, /* readFlags */ 0, Snapshot, txMap));
}
EReady Next() noexcept
@@ -65,6 +72,7 @@ namespace NTest {
private:
const ui32 Table = Max<ui32>();
const TRowVersion Snapshot;
+ const ui64 ReadTxId;
EReady Ready = EReady::Gone;
TRowState State;
};
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
index f3f8405b23..de5d414efe 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt
@@ -48,6 +48,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/flat_test_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/util_pool_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_self.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_iterator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_memtable.cpp
diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
index 86d11475bd..846c4ab036 100644
--- a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
+++ b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt
@@ -52,6 +52,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/flat_test_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/util_pool_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_self.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_iterator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_memtable.cpp
diff --git a/ydb/core/tablet_flat/ut/ut_db_iface.cpp b/ydb/core/tablet_flat/ut/ut_db_iface.cpp
index 1e2c439af2..2e408c25fd 100644
--- a/ydb/core/tablet_flat/ut/ut_db_iface.cpp
+++ b/ydb/core/tablet_flat/ut/ut_db_iface.cpp
@@ -306,11 +306,13 @@ Y_UNIT_TEST_SUITE(DBase) {
me.To(30).Begin().Apply(*TAlter().DropTable(1));
me.To(31).Commit().Affects(0, { });
me.To(32).Begin().Add(2, row).Commit().Affects(0, { 2 });
- me.To(33).Begin().Snapshot(2).Commit().Affects(0, { 2 });
+ auto epoch = me.To(33).Begin().Snapshot(2);
+ UNIT_ASSERT(epoch == TEpoch::FromIndex(2));
+ me.Commit().Affects(0, { 2 });
me.To(34).Compact(2);
UNIT_ASSERT(me->Counters().MemTableOps == 0);
- UNIT_ASSERT(me.BackLog().Snapshots.at(0).Epoch == TEpoch::FromIndex(2));
+ UNIT_ASSERT(me.BackLog().Snapshots == 1);
{
const auto subset = me->Subset(2, TEpoch::Max(), { }, { });
@@ -808,6 +810,193 @@ Y_UNIT_TEST_SUITE(DBase) {
.Next().Is(EReady::Gone);
}
+ Y_UNIT_TEST(AlterAndUpsertChangesVisibility) {
+ TDbExec me;
+
+ const ui32 table1 = 1;
+ const ui32 table2 = 2;
+ me.To(10).Begin();
+
+ me.To(20).Apply(*TAlter()
+ .AddTable("me_1", table1)
+ .AddColumn(table1, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64))
+ .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64))
+ .AddColumnToKey(table1, 1));
+ me.To(21).PutN(table1, 1_u64, 11_u64, 12_u64);
+ me.To(22).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(23).Select(table1).NoKeyN(2_u64);
+
+ me.To(30).Apply(*TAlter()
+ .AddTable("me_2", table2)
+ .AddColumn(table2, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table2, "arg1", 4, ETypes::Uint64, false, Cimple(20004_u64))
+ .AddColumn(table2, "arg2", 5, ETypes::Uint64, false, Cimple(20005_u64))
+ .AddColumnToKey(table2, 1));
+ me.To(31).PutN(table2, 2_u64, 21_u64, 22_u64);
+ me.To(32).Select(table2).NoKeyN(1_u64);
+ me.To(33).Select(table2).HasN(2_u64, 21_u64, 22_u64);
+
+ me.Commit();
+
+ me.To(40).Begin();
+ me.To(41).Apply(*TAlter()
+ .DropColumn(table2, 5)
+ .AddColumn(table2, "arg3", 6, ETypes::Uint64, false, Cimple(20006_u64)));
+ me.To(42).Select(table2).HasN(2_u64, 21_u64, 20006_u64);
+ me.To(43).PutN(table2, 2_u64, ECellOp::Empty, 23_u64);
+ me.To(44).Select(table2).HasN(2_u64, 21_u64, 23_u64);
+ me.Reject();
+
+ me.To(50).Begin();
+ me.To(51).Select(table2).HasN(2_u64, 21_u64, 22_u64);
+ me.To(52).PutN(table2, 2_u64, 24_u64, ECellOp::Empty);
+ me.To(53).Select(table2).HasN(2_u64, 24_u64, 22_u64);
+ me.Commit();
+
+ me.To(60).Replay(EPlay::Boot);
+ me.To(61).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(62).Select(table2).HasN(2_u64, 24_u64, 22_u64);
+ me.To(63).Replay(EPlay::Redo);
+ me.To(64).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(65).Select(table2).HasN(2_u64, 24_u64, 22_u64);
+ }
+
+ Y_UNIT_TEST(UncommittedChangesVisibility) {
+ TDbExec me;
+
+ const ui32 table1 = 1;
+
+ me.To(10).Begin();
+ me.To(11).Apply(*TAlter()
+ .AddTable("me_1", table1)
+ .AddColumn(table1, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64))
+ .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64))
+ .AddColumnToKey(table1, 1));
+ me.To(12).PutN(table1, 1_u64, 11_u64, 12_u64);
+ me.To(13).WriteTx(123).PutN(table1, 1_u64, ECellOp::Empty, 13_u64);
+ UNIT_ASSERT(me->HasOpenTx(table1, 123));
+ me.To(14).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(15).ReadTx(123).Select(table1).HasN(1_u64, 11_u64, 13_u64);
+ me.To(16).Commit();
+
+ me.To(20).Begin();
+ me.To(21).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(22).CommitTx(table1, 123);
+ UNIT_ASSERT(!me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(me->HasCommittedTx(table1, 123));
+ me.To(23).Select(table1).HasN(1_u64, 11_u64, 13_u64);
+ me.To(24).Reject();
+
+ UNIT_ASSERT(me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(!me->HasCommittedTx(table1, 123));
+
+ me.To(30).Begin();
+ me.To(31).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(32).RemoveTx(table1, 123);
+ UNIT_ASSERT(!me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(me->HasRemovedTx(table1, 123));
+ me.To(33).Reject();
+
+ UNIT_ASSERT(me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(!me->HasRemovedTx(table1, 123));
+
+ me.To(40).Begin();
+ me.To(41).CommitTx(table1, 123);
+ me.To(42).Commit();
+
+ UNIT_ASSERT(!me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(me->HasCommittedTx(table1, 123));
+
+ me.To(50).Select(table1).HasN(1_u64, 11_u64, 13_u64);
+ me.To(51).Snap(table1).Compact(table1);
+
+ UNIT_ASSERT(!me->HasOpenTx(table1, 123));
+ UNIT_ASSERT(!me->HasCommittedTx(table1, 123));
+
+ me.To(52).Select(table1).HasN(1_u64, 11_u64, 13_u64);
+ }
+
+ Y_UNIT_TEST(ReplayNewTable) {
+ TDbExec me;
+
+ const ui32 table1 = 1;
+
+ me.To(10).Begin();
+ me.To(11).Apply(*TAlter()
+ .AddTable("me_1", table1)
+ .AddColumn(table1, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64))
+ .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64))
+ .AddColumnToKey(table1, 1));
+ me.To(13).Commit();
+ me.To(14).Affects(0, { });
+
+ me.To(21).Replay(EPlay::Boot);
+ me.To(22).Replay(EPlay::Redo);
+ }
+
+ Y_UNIT_TEST(SnapshotNewTable) {
+ TDbExec me;
+
+ const ui32 table1 = 1;
+
+ me.To(10).Begin();
+ me.To(11).Apply(*TAlter()
+ .AddTable("me_1", table1)
+ .AddColumn(table1, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64))
+ .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64))
+ .AddColumnToKey(table1, 1));
+ me.To(12).Snapshot(table1);
+ me.To(13).Commit();
+ me.To(14).Affects(0, { });
+
+ me.To(21).Replay(EPlay::Boot);
+ me.To(22).Replay(EPlay::Redo);
+ }
+
+ Y_UNIT_TEST(DropModifiedTable) {
+ TDbExec me;
+
+ const ui32 table1 = 1;
+
+ me.To(10).Begin();
+ me.To(11).Apply(*TAlter()
+ .AddTable("me_1", table1)
+ .AddColumn(table1, "key", 1, ETypes::Uint64, false)
+ .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64))
+ .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64))
+ .AddColumnToKey(table1, 1));
+ me.To(12).Commit();
+
+ me.To(20).Begin();
+ me.To(21).PutN(table1, 1_u64, 11_u64, 12_u64);
+ me.To(22).Commit();
+
+ me.To(30).Begin();
+ me.To(31).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(32).Apply(*TAlter()
+ .AddColumn(table1, "arg3", 6, ETypes::Uint64, false, Cimple(10006_u64)));
+ me.To(33).Select(table1).HasN(1_u64, 11_u64, 12_u64, 10006_u64);
+ me.To(34).Apply(*TAlter()
+ .DropTable(table1));
+ me.To(35).Reject();
+
+ me.To(40).Begin();
+ me.To(41).Select(table1).HasN(1_u64, 11_u64, 12_u64);
+ me.To(42).Apply(*TAlter()
+ .AddColumn(table1, "arg3", 6, ETypes::Uint64, false, Cimple(10006_u64)));
+ me.To(43).Select(table1).HasN(1_u64, 11_u64, 12_u64, 10006_u64);
+ me.To(44).Apply(*TAlter()
+ .DropTable(table1));
+ me.To(45).Commit();
+
+ me.To(51).Replay(EPlay::Boot);
+ me.To(52).Replay(EPlay::Redo);
+ }
+
}
}
diff --git a/ydb/core/tablet_flat/ut/ut_redo.cpp b/ydb/core/tablet_flat/ut/ut_redo.cpp
index 95523535f0..2ba7829372 100644
--- a/ydb/core/tablet_flat/ut/ut_redo.cpp
+++ b/ydb/core/tablet_flat/ut/ut_redo.cpp
@@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(Redo) {
ui32 serial = 0;
for (auto &one: changes) {
- me.To(++serial)->RollUp(one->Stamp, one->Scheme, one->Redo, { });
+ me.To(++serial).Cleanup()->RollUp(one->Stamp, one->Scheme, one->Redo, { });
if (1 == serial) {
/* Just applied initial alter script, nothing to check */
diff --git a/ydb/core/tablet_flat/util_pool.h b/ydb/core/tablet_flat/util_pool.h
new file mode 100644
index 0000000000..e3cb17a5ee
--- /dev/null
+++ b/ydb/core/tablet_flat/util_pool.h
@@ -0,0 +1,269 @@
+#pragma once
+
+#include <util/generic/utility.h>
+#include <util/system/align.h>
+#include <util/system/yassert.h>
+#include <memory>
+#include <optional>
+
+namespace NKikimr::NUtil {
+
+ /**
+ * Memory pool with support for transactions
+ */
+ class TMemoryPool
+ : private std::allocator<char>
+ {
+ using allocator_base = std::allocator<char>;
+ using allocator_traits = std::allocator_traits<allocator_base>;
+
+ struct TChunk {
+ TChunk* Next;
+ char* Ptr;
+ size_t Left;
+
+ TChunk(size_t size) noexcept
+ : Next(nullptr)
+ {
+ Y_VERIFY_DEBUG((((uintptr_t)this) % PLATFORM_DATA_ALIGN) == 0);
+ Y_VERIFY_DEBUG(size >= ChunkHeaderSize());
+ Ptr = DataStart();
+ Left = size - ChunkHeaderSize();
+ }
+
+ /**
+ * Aligns size to the specified power of two
+ */
+ static constexpr size_t AlignSizeUp(size_t size, size_t alignment) {
+ return size + ((-size) & (alignment - 1));
+ }
+
+ /**
+ * Aligned chunk header size that takes expected alignment into account
+ */
+ static constexpr size_t ChunkHeaderSize() noexcept {
+ return AlignSizeUp(sizeof(TChunk), PLATFORM_DATA_ALIGN);
+ }
+
+ char* ChunkStart() noexcept {
+ return reinterpret_cast<char*>(this);
+ }
+
+ const char* ChunkStart() const noexcept {
+ return reinterpret_cast<const char*>(this);
+ }
+
+ size_t ChunkSize() const noexcept {
+ return Left + (Ptr - ChunkStart());
+ }
+
+ char* DataStart() noexcept {
+ return ChunkStart() + ChunkHeaderSize();
+ }
+
+ const char* DataStart() const noexcept {
+ return ChunkStart() + ChunkHeaderSize();
+ }
+
+ size_t Used() const noexcept {
+ return Ptr - DataStart();
+ }
+
+ size_t Wasted() const noexcept {
+ return ChunkHeaderSize() + Left;
+ }
+
+ void Reset() noexcept {
+ Rollback(DataStart());
+ }
+
+ void Rollback(char* to) noexcept {
+ char* data = Ptr;
+ Ptr = to;
+ Left += data - to;
+ }
+
+ inline char* Allocate(size_t size) noexcept {
+ if (Left >= size) {
+ char* ptr = Ptr;
+
+ Ptr += size;
+ Left -= size;
+
+ return ptr;
+ }
+
+ return nullptr;
+ }
+
+ inline char* Allocate(size_t size, size_t align) noexcept {
+ size_t pad = AlignUp(Ptr, align) - Ptr;
+
+ if (char* ret = Allocate(pad + size)) {
+ return ret + pad;
+ }
+
+ return nullptr;
+ }
+ };
+
+ public:
+ TMemoryPool(size_t initial)
+ : First(AllocateChunk(Max(initial, TChunk::ChunkHeaderSize() + PLATFORM_DATA_ALIGN)))
+ , Current(First)
+ { }
+
+ ~TMemoryPool() {
+ TChunk* chunk = First;
+ while (chunk) {
+ TChunk* next = chunk->Next;
+ FreeChunk(chunk);
+ chunk = next;
+ }
+ }
+
+ void* Allocate(size_t size) {
+ return DoAllocate(AlignUp<size_t>(size, PLATFORM_DATA_ALIGN));
+ }
+
+ void* Allocate(size_t size, size_t align) {
+ return DoAllocate(AlignUp<size_t>(size, PLATFORM_DATA_ALIGN), align);
+ }
+
+ void* Append(const void* data, size_t len) {
+ void* ptr = this->Allocate(len);
+ if (len > 0) {
+ ::memcpy(ptr, data, len);
+ }
+ return ptr;
+ }
+
+ void BeginTransaction() noexcept {
+ Y_VERIFY(!RollbackState_);
+ auto& state = RollbackState_.emplace();
+ state.Chunk = Current;
+ state.Ptr = Current->Ptr;
+ }
+
+ void CommitTransaction() noexcept {
+ Y_VERIFY(RollbackState_);
+ RollbackState_.reset();
+ }
+
+ void RollbackTransaction() noexcept {
+ Y_VERIFY(RollbackState_);
+ auto& state = *RollbackState_;
+ DoRollback(state.Chunk, state.Ptr);
+ RollbackState_.reset();
+ }
+
+ size_t Used() const noexcept {
+ return Used_ + Current->Used();
+ }
+
+ size_t Wasted() const noexcept {
+ return Wasted_ + Current->Wasted();
+ }
+
+ size_t Available() const noexcept {
+ return Available_ + Current->Left;
+ }
+
+ private:
+ TChunk* AllocateChunk(size_t hint) {
+ size_t chunkSize = FastClp2(hint);
+ if (chunkSize - hint >= (chunkSize >> 2)) {
+ chunkSize -= chunkSize >> 2;
+ }
+ char* ptr = allocator_traits::allocate(*this, chunkSize);
+ return new (ptr) TChunk(chunkSize);
+ }
+
+ void FreeChunk(TChunk* chunk) {
+ char* ptr = chunk->ChunkStart();
+ size_t chunkSize = chunk->ChunkSize();
+ chunk->~TChunk();
+ allocator_traits::deallocate(*this, ptr, chunkSize);
+ }
+
+ TChunk* AddChunk(size_t size) {
+ Y_VERIFY(!Current->Next);
+ size_t hint = Max(AlignUp<size_t>(sizeof(TChunk), PLATFORM_DATA_ALIGN) + size, Current->ChunkSize() + 1);
+ TChunk* next = AllocateChunk(hint);
+ Used_ += Current->Used();
+ Wasted_ += Current->Wasted();
+ Current->Next = next;
+ Current = next;
+ return next;
+ }
+
+ bool NextChunk() {
+ if (Current->Next) {
+ Used_ += Current->Used();
+ Wasted_ += Current->Wasted();
+ Current = Current->Next;
+ Y_VERIFY_DEBUG(Current->Ptr == Current->DataStart());
+ Wasted_ -= Current->ChunkSize();
+ Available_ -= Current->Left;
+ return true;
+ }
+
+ return false;
+ }
+
+ void DoRollback(TChunk* target, char* ptr) {
+ if (target != Current) {
+ TChunk* chunk = target;
+ do {
+ // Remove previously added stats
+ Used_ -= chunk->Used();
+ Wasted_ -= chunk->Wasted();
+ // Switch to the next chunk in the chain
+ chunk = chunk->Next;
+ Y_VERIFY(chunk, "Rollback cannot find current chunk in the chain");
+ // Reset chunk and add it to stats as wasted/free space
+ chunk->Reset();
+ Wasted_ += chunk->ChunkSize();
+ Available_ += chunk->Left;
+ } while (chunk != Current);
+ Current = target;
+ }
+ target->Rollback(ptr);
+ }
+
+ void* DoAllocate(size_t size) {
+ do {
+ if (auto* ptr = Current->Allocate(size)) {
+ return ptr;
+ }
+ } while (NextChunk());
+
+ return AddChunk(size)->Allocate(size);
+ }
+
+ void* DoAllocate(size_t size, size_t align) {
+ do {
+ if (auto* ptr = Current->Allocate(size, align)) {
+ return ptr;
+ }
+ } while (NextChunk());
+
+ return AddChunk(size + align - 1)->Allocate(size, align);
+ }
+
+ private:
+ struct TRollbackState {
+ TChunk* Chunk;
+ char* Ptr;
+ };
+
+ private:
+ TChunk* First;
+ TChunk* Current;
+ size_t Used_ = 0;
+ size_t Wasted_ = 0;
+ size_t Available_ = 0;
+ std::optional<TRollbackState> RollbackState_;
+ };
+
+} // namespace NKikimr::NUtil
diff --git a/ydb/core/tablet_flat/util_pool_ut.cpp b/ydb/core/tablet_flat/util_pool_ut.cpp
new file mode 100644
index 0000000000..5f21e7af07
--- /dev/null
+++ b/ydb/core/tablet_flat/util_pool_ut.cpp
@@ -0,0 +1,73 @@
+#include "util_pool.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/random/random.h>
+#include <string.h>
+
+namespace NKikimr::NUtil {
+
+Y_UNIT_TEST_SUITE(TMemoryPoolTest) {
+
+ Y_UNIT_TEST(AllocOneByte) {
+ TMemoryPool pool(1);
+ UNIT_ASSERT_C(pool.Used() == 0, pool.Used());
+ UNIT_ASSERT_C(pool.Wasted() > 0, pool.Wasted());
+ UNIT_ASSERT_C(pool.Available() >= 1, pool.Available());
+ size_t initialWasted = pool.Wasted();
+ size_t initialAvailable = pool.Available();
+ void* ptr = pool.Allocate(1);
+ Y_UNUSED(ptr);
+ UNIT_ASSERT_C(pool.Used() >= 1, pool.Used());
+ UNIT_ASSERT_C(pool.Used() + pool.Wasted() == initialWasted, pool.Used() << " + " << pool.Wasted());
+ UNIT_ASSERT_C(pool.Used() + pool.Available() == initialAvailable, pool.Used() << " + " << pool.Available());
+ }
+
+ Y_UNIT_TEST(AppendString) {
+ TMemoryPool pool(128);
+ const char* str = "Hello, world!";
+ char* ptr = (char*)pool.Append(str, ::strlen(str) + 1);
+ UNIT_ASSERT(::strcmp(ptr, str) == 0);
+ }
+
+ void DoTransactions(size_t align = 0) {
+ TMemoryPool pool(128);
+ size_t expectedUsed = 0;
+ size_t expectedSize = 0;
+ for (int i = 0; i < 1000; ++i) {
+ size_t initialUsed = pool.Used();
+ pool.BeginTransaction();
+ ui32 r = RandomNumber<ui32>(20);
+ int count = (r >> 1) + 1;
+ bool commit = r & 1;
+ for (int i = 0; i < count; ++i) {
+ if (align) {
+ void* ptr = pool.Allocate(1, align);
+ UNIT_ASSERT(ptr);
+ } else {
+ void* ptr = pool.Allocate(1);
+ UNIT_ASSERT(ptr);
+ }
+ }
+ expectedSize = Max(expectedSize, pool.Used() + pool.Wasted());
+ if (commit) {
+ expectedUsed += pool.Used() - initialUsed;
+ pool.CommitTransaction();
+ } else {
+ pool.RollbackTransaction();
+ }
+ }
+ UNIT_ASSERT_C(pool.Used() == expectedUsed, pool.Used() << " != " << expectedUsed);
+ UNIT_ASSERT_C(pool.Used() + pool.Wasted() == expectedSize, pool.Used() << " + " << pool.Wasted() << " != " << expectedSize);
+ }
+
+ Y_UNIT_TEST(Transactions) {
+ DoTransactions();
+ }
+
+ Y_UNIT_TEST(TransactionsWithAlignment) {
+ DoTransactions(16);
+ }
+
+} // Y_UNIT_TEST_SUITE(TMemoryPoolTest)
+
+} // namespace NKikimr::NUtil
diff --git a/ydb/core/tablet_flat/util_store.h b/ydb/core/tablet_flat/util_store.h
index b61062d1d6..f5bf0ccb91 100644
--- a/ydb/core/tablet_flat/util_store.h
+++ b/ydb/core/tablet_flat/util_store.h
@@ -108,14 +108,6 @@ namespace NUtil {
TConcurrentStore() { }
~TConcurrentStore() noexcept {
- clear();
- }
-
- TConstIterator Iterator() const noexcept {
- return TConstIterator(this);
- }
-
- void clear() noexcept {
size_t count = Count.exchange(0, std::memory_order_release);
Head.store(nullptr, std::memory_order_release);
auto* tail = Tail.exchange(nullptr, std::memory_order_release);
@@ -133,6 +125,10 @@ namespace NUtil {
}
}
+ TConstIterator Iterator() const noexcept {
+ return TConstIterator(this);
+ }
+
/**
* Emplaces a new element, not thread safe
*/
@@ -142,7 +138,8 @@ namespace NUtil {
size_t index = Count.load(std::memory_order_relaxed);
// Allocate a new chunk if necessary
- auto* tail = Tail.load(std::memory_order_relaxed);
+ // Note: we acquire for the tail->Prev pointer here
+ auto* tail = Tail.load(std::memory_order_acquire);
if (!tail || tail->EndOffset() <= index) {
size_t offset = tail ? tail->EndOffset() : 0;
size_t bytes = tail ? tail->Bytes * 2 : 512;
@@ -158,16 +155,46 @@ namespace NUtil {
Tail.store(tail, std::memory_order_release);
}
- Y_VERIFY_DEBUG(tail->Offset <= index);
+ // It is possible for new index to be located on some earlier
+ // chunk after we do a truncation.
+ void* ptr = FindPtr(tail, index);
- // Construct new value and publish it by releasing the new count
- void* ptr = tail->Values() + (index - tail->Offset);
+ // Construct a new value and publish the new count. Note that this
+ // does not actually publish value contents, since count is not
+ // acquired on access, but it's useful for iteration.
+ // Items must be synchronized externally.
T* value = new (ptr) T(std::forward<TArgs>(args)...);
Count.store(index + 1, std::memory_order_release);
return *value;
}
/**
+ * Truncates store to a smaller size, not thread safe
+ */
+ void truncate(size_t new_size) {
+ size_t prev_size = Count.load(std::memory_order_relaxed);
+ Y_VERIFY(new_size <= prev_size);
+
+ if (new_size < prev_size) {
+ auto* tail = Tail.load(std::memory_order_acquire);
+ while (tail && new_size < tail->EndOffset()) {
+ // We want to call destructor for all items
+ // that are between new_size and prev_size
+ if (tail->Offset < prev_size) {
+ size_t fromIndex = std::max(new_size, tail->Offset) - tail->Offset;
+ size_t toIndex = std::min(prev_size, tail->EndOffset()) - tail->Offset;
+ T* values = tail->Values() + fromIndex;
+ for (size_t index = fromIndex; index < toIndex; ++index, ++values) {
+ values->~T();
+ }
+ }
+ tail = tail->Prev;
+ }
+ Count.store(new_size, std::memory_order_release);
+ }
+ }
+
+ /**
* Returns a thread-safe size of the container
*/
size_t size() const {
@@ -192,6 +219,41 @@ namespace NUtil {
return *FindPtr(Tail.load(std::memory_order_acquire), index);
}
+ /**
+ * Runs callback(index, value) for each value between index and endIndex, not thread safe
+ */
+ template<class TCallback>
+ void Enumerate(size_t index, size_t endIndex, TCallback&& callback) {
+ Y_VERIFY(index <= endIndex);
+ if (index == endIndex) {
+ return;
+ }
+
+ size_t count = Count.load(std::memory_order_acquire);
+ Y_VERIFY(endIndex <= count);
+
+ auto* tail = Tail.load(std::memory_order_acquire);
+ while (tail && index < tail->Offset) {
+ tail = tail->Prev;
+ }
+
+ do {
+ Y_VERIFY_DEBUG(tail);
+ auto endOffset = tail->EndOffset();
+ Y_VERIFY_DEBUG(tail->Offset <= index && index < endOffset);
+ T* values = tail->Values() + (index - tail->Offset);
+ while (index < endOffset && index < endIndex) {
+ callback(index, *values);
+ ++index;
+ ++values;
+ }
+ if (index == endIndex) {
+ break;
+ }
+ tail = tail->Next.load(std::memory_order_acquire);
+ } while (tail && index < tail->EndOffset());
+ }
+
private:
static T* FindPtr(TChunk* tail, size_t index) {
while (tail && index < tail->Offset) {
diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp
index e4501f3d6d..f1b90a0cd1 100644
--- a/ydb/core/tx/datashard/datashard_ut_order.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_order.cpp
@@ -983,6 +983,7 @@ public:
ui32 key = 0;
TString body = ProgramTextSwap();
+
for (ui32 point : Writes) {
body += Sprintf(writePattern, key, point, key, TxId);
++key;
@@ -1007,6 +1008,9 @@ public:
}
ui64 GetTxId() const { return TxId; }
+ const TVector<ui32>& GetWrites() const { return Writes; }
+ const TVector<ui32>& GetReads() const { return Reads; }
+ const TVector<TSimpleRange>& GetRanges() const { return Ranges; }
const TMap<ui32, ui32>& GetResults() const { return Results; }
void SetResults(const TVector<ui32>& kv) {
diff --git a/ydb/core/tx/schemeshard/ut_serverless.cpp b/ydb/core/tx/schemeshard/ut_serverless.cpp
index fc96dd2ebe..76d0d9d213 100644
--- a/ydb/core/tx/schemeshard/ut_serverless.cpp
+++ b/ydb/core/tx/schemeshard/ut_serverless.cpp
@@ -218,7 +218,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardServerLess) {
waitMeteringMessage();
{
- TString meteringData = R"({"usage":{"start":1600452120,"quantity":59,"finish":1600452179,"type":"delta","unit":"byte*second"},"tags":{"ydb_size":11728},"id":"8751008-3-1600452120-1600452179-11728","cloud_id":"CLOUD_ID_VAL","source_wt":1600452180,"source_id":"sless-docapi-ydb-storage","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})";
+ TString meteringData = R"({"usage":{"start":1600452120,"quantity":59,"finish":1600452179,"type":"delta","unit":"byte*second"},"tags":{"ydb_size":11664},"id":"8751008-3-1600452120-1600452179-11664","cloud_id":"CLOUD_ID_VAL","source_wt":1600452180,"source_id":"sless-docapi-ydb-storage","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})";
meteringData += "\n";
UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData);
}