diff options
author | snaury <snaury@ydb.tech> | 2022-08-10 17:00:32 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-08-10 17:00:32 +0300 |
commit | 42fb891327a4880587bd9617992bf0f75fcf0887 (patch) | |
tree | 76a5663f335eaa4d6ee72479acc9cb680b842a6f | |
parent | e3b22a667ccceb30e2952d1e7ddf1b43edf2ece8 (diff) | |
download | ydb-42fb891327a4880587bd9617992bf0f75fcf0887.tar.gz |
Make local tx changes visible inside transaction,
37 files changed, 2403 insertions, 419 deletions
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp index 3c972e6961..52a61a3eb2 100644 --- a/ydb/core/engine/mkql_engine_flat.cpp +++ b/ydb/core/engine/mkql_engine_flat.cpp @@ -1367,11 +1367,6 @@ public: NUdf::TUnboxedValue replyValue = runValue.GetElement(0); NUdf::TUnboxedValue writeValue = runValue.GetElement(1); - TEngineFlatApplyContext applyCtx; - applyCtx.Host = Settings.Host; - applyCtx.Env = &Env; - ApplyChanges(writeValue, applyCtx); - TCallableResults replyResults; for (ui32 i = 0; i < replyStruct.GetValuesCount(); ++i) { TRuntimeNode item = replyStruct.GetValue(i); @@ -1387,6 +1382,13 @@ public: } auto replyStr = replyResults.ToString(holderFactory, Env); + + // Note: we must apply side effects even if we reply with an error below + TEngineFlatApplyContext applyCtx; + applyCtx.Host = Settings.Host; + applyCtx.Env = &Env; + ApplyChanges(writeValue, applyCtx); + if (replyStr.size() > MaxDatashardReplySize) { TString error = TStringBuilder() << "Datashard " << pgm.first << ": reply size limit exceeded. (" diff --git a/ydb/core/mind/hive/tx__init_scheme.cpp b/ydb/core/mind/hive/tx__init_scheme.cpp index b65451f94a..7548dc09ba 100644 --- a/ydb/core/mind/hive/tx__init_scheme.cpp +++ b/ydb/core/mind/hive/tx__init_scheme.cpp @@ -15,8 +15,9 @@ public: bool Execute(TTransactionContext &txc, const TActorContext&) override { BLOG_D("THive::TTxInitScheme::Execute"); + bool wasEmpty = txc.DB.GetScheme().IsEmpty(); NIceDb::TNiceDb(txc.DB).Materialize<Schema>(); - if (!txc.DB.GetScheme().IsEmpty()) { + if (!wasEmpty) { auto row = NIceDb::TNiceDb(txc.DB).Table<Schema::State>().Key(TSchemeIds::State::DatabaseVersion).Select<Schema::State::Value>(); if (!row.IsReady()) return false; diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 7a44050218..cb99c617ec 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -745,7 +745,7 @@ Y_UNIT_TEST_SUITE(SystemView) { check.Uint64GreaterOrEquals(nowUs); // AccessTime check.DoubleGreaterOrEquals(0.0); // CPUCores check.Uint64(1u); // CoordinatedTxCompleted - check.Uint64(608u); // DataSize + check.Uint64(576u); // DataSize check.Uint64(1u); // ImmediateTxCompleted check.Uint64(0u); // IndexSize check.Uint64(0u); // InFlightTxCount diff --git a/ydb/core/tablet_flat/CMakeLists.txt b/ydb/core/tablet_flat/CMakeLists.txt index 55728be780..c2688b43d1 100644 --- a/ydb/core/tablet_flat/CMakeLists.txt +++ b/ydb/core/tablet_flat/CMakeLists.txt @@ -57,6 +57,7 @@ target_sources(ydb-core-tablet_flat PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_gclogic.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_bio_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_tx_env.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_executor_txloglogic.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_load_blob_queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_mem_warm.cpp diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 965463954c..bc106e823e 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -219,13 +219,43 @@ void TDatabase::Update(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUp Y_FAIL("Key index %" PRISZT " validation failure: %s", index, error.c_str()); } } - for (size_t index = 0; index < ops.size(); ++index) { - if (auto error = NScheme::HasUnexpectedValueSize(ops[index].Value)) { - Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, ops[index].Tag, error.c_str()); + + NRedo::IAnnex* annex = Annex.Get(); + NRedo::IAnnex::TLimit limit = annex->Limit(table); + limit.MinExternSize = Max(limit.MinExternSize, DatabaseImpl->AnnexByteLimit()); + + if (ModifiedRefs.size() < ops.size()) { + ModifiedRefs.resize(FastClp2(ops.size())); + } + ModifiedOps.assign(ops.begin(), ops.end()); + for (size_t index = 0; index < ModifiedOps.size(); ++index) { + TUpdateOp& op = ModifiedOps[index]; + if (auto error = NScheme::HasUnexpectedValueSize(op.Value)) { + Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, op.Tag, error.c_str()); + } + + if (op.Value.IsEmpty()) { + // Null values lose type id in redo log + op.Value = {}; + // Null values must change ECellOp::Set to ECellOp::Null in redo log + op.Op = op.NormalizedCellOp(); + } + + Y_VERIFY(op.Op == ELargeObj::Inline, "User provided an invalid ECellOp"); + + TArrayRef<const char> raw = op.AsRef(); + if (limit.IsExtern(raw.size())) { + if (auto got = annex->Place(table, op.Tag, raw)) { + ModifiedRefs[index] = got.Ref; + const auto payload = NUtil::NBin::ToRef(ModifiedRefs[index]); + op.Value = TRawTypeValue(payload, op.Value.Type()); + op.Op = ELargeObj::Extern; + } } } - Redo->EvUpdate(table, rop, key, ops, rowVersion); + Redo->EvUpdate(table, rop, key, ModifiedOps, rowVersion); + RequireForUpdate(table)->Update(rop, key, ModifiedOps, Annex->Current(), rowVersion); } void TDatabase::UpdateTx(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const TUpdateOp> ops, ui64 txId) @@ -235,23 +265,41 @@ void TDatabase::UpdateTx(ui32 table, ERowOp rop, TRawVals key, TArrayRef<const T Y_FAIL("Key index %" PRISZT " validation failure: %s", index, error.c_str()); } } - for (size_t index = 0; index < ops.size(); ++index) { - if (auto error = NScheme::HasUnexpectedValueSize(ops[index].Value)) { - Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, ops[index].Tag, error.c_str()); + + ModifiedOps.assign(ops.begin(), ops.end()); + for (size_t index = 0; index < ModifiedOps.size(); ++index) { + TUpdateOp& op = ModifiedOps[index]; + if (auto error = NScheme::HasUnexpectedValueSize(op.Value)) { + Y_FAIL("Op index %" PRISZT " tag %" PRIu32 " validation failure: %s", index, op.Tag, error.c_str()); + } + + if (op.Value.IsEmpty()) { + // Null values lose type id in redo log + op.Value = {}; + // Null values must change ECellOp::Set to ECellOp::Null in redo log + op.Op = op.NormalizedCellOp(); } + + Y_VERIFY(op.Op == ELargeObj::Inline, "User provided an invalid ECellOp"); + + // FIXME: we cannot handle blob references during scans, so we + // avoid creating large objects when they are in deltas } - Redo->EvUpdateTx(table, rop, key, ops, txId); + Redo->EvUpdateTx(table, rop, key, ModifiedOps, txId); + RequireForUpdate(table)->UpdateTx(rop, key, ModifiedOps, Annex->Current(), txId); } void TDatabase::RemoveTx(ui32 table, ui64 txId) { Redo->EvRemoveTx(table, txId); + RequireForUpdate(table)->RemoveTx(txId); } void TDatabase::CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion) { Redo->EvCommitTx(table, txId, rowVersion); + RequireForUpdate(table)->CommitTx(txId, rowVersion); } bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const @@ -259,6 +307,16 @@ bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const return Require(table)->HasOpenTx(txId); } +bool TDatabase::HasCommittedTx(ui32 table, ui64 txId) const +{ + return Require(table)->HasCommittedTx(txId); +} + +bool TDatabase::HasRemovedTx(ui32 table, ui64 txId) const +{ + return Require(table)->HasRemovedTx(txId); +} + void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper) { if (Y_LIKELY(lower < upper)) { @@ -285,8 +343,9 @@ void TDatabase::Begin(TTxStamp stamp, IPages& env) Y_VERIFY(!Redo, "Transaction already in progress"); Y_VERIFY(!Env); Annex = new TAnnex(*DatabaseImpl->Scheme); - Redo = new NRedo::TWriter{ Annex.Get(), DatabaseImpl->AnnexByteLimit() }; - Change = MakeHolder<TChange>(Stamp = stamp, DatabaseImpl->Serial() + 1); + Redo = new NRedo::TWriter; + DatabaseImpl->BeginTransaction(); + Change = MakeHolder<TChange>(stamp, DatabaseImpl->Serial()); Env = &env; NoMoreReadsFlag = false; } @@ -355,28 +414,34 @@ TDatabase::TChg TDatabase::Head(ui32 table) const noexcept } else { auto &wrap = DatabaseImpl->Get(table, true); - return { wrap.Serial, wrap->Head() }; + // We return numbers as they have been at the start of transaction, + // but possibly after schema changes or memtable flushes. + auto serial = wrap.DataModified && !wrap.EpochSnapshot ? wrap.SerialBackup : wrap.Serial; + auto head = wrap.EpochSnapshot ? *wrap.EpochSnapshot : wrap->Head(); + return { serial, head }; } } TString TDatabase::SnapshotToLog(ui32 table, TTxStamp stamp) { + Y_VERIFY(!Redo, "Cannot SnapshotToLog inside a transaction"); + auto scn = DatabaseImpl->Serial() + 1; auto epoch = DatabaseImpl->Get(table, true)->Snapshot(); DatabaseImpl->Rewind(scn); - return - NRedo::TWriter{ } - .EvBegin(ui32(ECompatibility::Head), ui32(ECompatibility::Edge), scn, stamp) - .EvFlush(table, stamp, epoch).Dump(); + NRedo::TWriter writer; + writer.EvBegin(ui32(ECompatibility::Head), ui32(ECompatibility::Edge), scn, stamp); + writer.EvFlush(table, stamp, epoch); + return std::move(writer).Finish(); } -ui32 TDatabase::TxSnapTable(ui32 table) +TEpoch TDatabase::TxSnapTable(ui32 table) { - Require(table); - Change->Snapshots.emplace_back(table); - return Change->Snapshots.size() - 1; + Y_VERIFY(Redo, "Cannot TxSnapTable outside a transaction"); + ++Change->Snapshots; + return DatabaseImpl->FlushTable(table); } TAutoPtr<TSubset> TDatabase::Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const @@ -452,9 +517,8 @@ void TDatabase::Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart> txStatus) TAlter& TDatabase::Alter() { Y_VERIFY(Redo, "Scheme change must be done within a transaction"); - Y_VERIFY(!*Redo, "Scheme change must be done before any data updates"); - return *(Alter_ ? Alter_ : (Alter_ = new TAlter())); + return *(Alter_ ? Alter_ : (Alter_ = new TAlter(DatabaseImpl.Get()))); } void TDatabase::DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const { @@ -510,15 +574,15 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator IteratedTables.clear(); } - if (commit && (*Redo || Alter_ || Change->Snapshots || Change->RemovedRowVersions)) { + if (commit && (*Redo || (Alter_ && *Alter_) || Change->Snapshots || Change->RemovedRowVersions)) { Y_VERIFY(stamp >= Change->Stamp); + Y_VERIFY(DatabaseImpl->Serial() == Change->Serial); - /* TODO: Temporary hack fot getting correct Stamp and Serial state - against invocation of SnapshotToLog() between Begin(...) and - Commit(...). Read KIKIMR-5366 for details and progress. */ - + // FIXME: Temporary hack for using up to date change stamp when scan + // is queued inside a transaction. In practice we just need to + // stop using empty commits for scans. See KIKIMR-5366 for + // details. const_cast<TTxStamp&>(Change->Stamp) = stamp; - const_cast<ui64&>(Change->Serial) = DatabaseImpl->Serial() + 1; NRedo::TWriter prefix{ }; @@ -526,12 +590,13 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator const ui32 head = ui32(ECompatibility::Head); const ui32 edge = ui32(ECompatibility::Edge); - prefix.EvBegin(head, edge, Change->Serial, Change->Stamp); + prefix.EvBegin(head, edge, Change->Serial, Change->Stamp); } const auto offset = prefix.Bytes(); /* useful payload starts here */ - if (auto annex = Annex->Unwrap()) { + auto annex = Annex->Unwrap(); + if (annex) { Y_VERIFY(cookieAllocator, "Have to provide TCookieAllocator with enabled annex"); TVector<NPageCollection::TGlobId> blobs; @@ -547,29 +612,18 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator } prefix.EvAnnex(blobs); - DatabaseImpl->Assign(std::move(annex)); } - DatabaseImpl->Switch(Stamp); + DatabaseImpl->CommitTransaction(Change->Stamp, annex, prefix); - if (Alter_) { + if (Alter_ && *Alter_) { auto delta = Alter_->Flush(); - - if (DatabaseImpl->Apply(*delta, &prefix)) - Y_PROTOBUF_SUPPRESS_NODISCARD delta->SerializeToString(&Change->Scheme); + Y_PROTOBUF_SUPPRESS_NODISCARD delta->SerializeToString(&Change->Scheme); } - for (auto &one: Change->Snapshots) { - one.Epoch = Require(one.Table)->Snapshot(); - prefix.EvFlush(one.Table, Stamp, one.Epoch); - } - - prefix.Join(*Redo); + prefix.Join(std::move(*Redo)); - Change->Redo = prefix.Dump(); - - for (auto &entry: prefix.Unwrap()) - DatabaseImpl->ApplyRedo(entry); + Change->Redo = std::move(prefix).Finish(); for (const auto& xpair : Change->RemovedRowVersions) { if (auto& wrap = DatabaseImpl->Get(xpair.first, false)) { @@ -582,7 +636,7 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator Change->Garbage = std::move(DatabaseImpl->Garbage); Change->Deleted = std::move(DatabaseImpl->Deleted); Change->Affects = DatabaseImpl->GrabAffects(); - Change->Annex = DatabaseImpl->GrabAnnex(); + Change->Annex = std::move(annex); if (Change->Redo.size() == offset && !Change->Affects) { std::exchange(Change->Redo, { }); /* omit complete NOOP redo */ @@ -592,13 +646,15 @@ TDatabase::TProd TDatabase::Commit(TTxStamp stamp, bool commit, TCookieAllocator Y_Fail( NFmt::Do(*Change) << " produced " << (Change->Redo.size() - offset) << "b of non technical redo without leaving effects on data"); - } else if (Change->Serial != DatabaseImpl->Serial()) { + } else if (Change->Redo && Change->Serial != DatabaseImpl->Serial()) { Y_Fail( NFmt::Do(*Change) << " serial diverged from current db " << DatabaseImpl->Serial() << " after rolling up redo log"); } else if (Change->Deleted.size() != Change->Garbage.size()) { Y_Fail(NFmt::Do(*Change) << " has inconsistent garbage data"); } + } else { + DatabaseImpl->RollbackTransaction(); } Redo = nullptr; @@ -614,6 +670,11 @@ TTable* TDatabase::Require(ui32 table) const noexcept return DatabaseImpl->Get(table, true).Self.Get(); } +TTable* TDatabase::RequireForUpdate(ui32 table) const noexcept +{ + return DatabaseImpl->GetForUpdate(table).Self.Get(); +} + TGarbage TDatabase::RollUp(TTxStamp stamp, TArrayRef<const char> delta, TArrayRef<const char> redo, TMemGlobs annex) { @@ -626,7 +687,7 @@ TGarbage TDatabase::RollUp(TTxStamp stamp, TArrayRef<const char> delta, TArrayRe bool parseOk = ParseFromStringNoSizeLimit(changes, delta); Y_VERIFY(parseOk); - DatabaseImpl->Apply(changes, nullptr); + DatabaseImpl->ApplySchema(changes); } if (redo) { diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 18fdd12d84..06740064e3 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -118,6 +118,8 @@ public: * Returns true when table has an open transaction that is not committed or removed yet */ bool HasOpenTx(ui32 table, ui64 txId) const; + bool HasCommittedTx(ui32 table, ui64 txId) const; + bool HasRemovedTx(ui32 table, ui64 txId) const; /** * Remove row versions [lower, upper) from the given table @@ -139,7 +141,7 @@ public: TAlter& Alter(); /* Begin DDL ALTER script */ - ui32 TxSnapTable(ui32 table); + TEpoch TxSnapTable(ui32 table); const TScheme& GetScheme() const noexcept; @@ -194,11 +196,11 @@ public: private: TTable* Require(ui32 tableId) const noexcept; + TTable* RequireForUpdate(ui32 tableId) const noexcept; private: const THolder<TDatabaseImpl> DatabaseImpl; - ui64 Stamp = Max<ui64>(); bool NoMoreReadsFlag; IPages* Env = nullptr; THolder<TChange> Change; @@ -206,6 +208,9 @@ private: TAutoPtr<TAnnex> Annex; TAutoPtr<NRedo::TWriter> Redo; + TVector<ui32> ModifiedRefs; + TVector<TUpdateOp> ModifiedOps; + mutable TDeque<TPartSimpleIt> TempIterators; // Keeps the last result of Select() valid mutable THashSet<ui32> IteratedTables; }; diff --git a/ydb/core/tablet_flat/flat_dbase_annex.h b/ydb/core/tablet_flat/flat_dbase_annex.h index 1a52eddca9..492e78c74d 100644 --- a/ydb/core/tablet_flat/flat_dbase_annex.h +++ b/ydb/core/tablet_flat/flat_dbase_annex.h @@ -25,6 +25,11 @@ namespace NTable { return bool(Blobs); } + TArrayRef<const NPageCollection::TMemGlob> Current() const noexcept + { + return Blobs; + } + private: TLimit Limit(ui32 table) noexcept override { @@ -41,11 +46,12 @@ namespace NTable { auto blob = NPage::TLabelWrapper::Wrap(data, EPage::Opaque, 0); - const TLogoBlobID fake(0, 0, 0, Room->Blobs, blob.size(), 0); + const ui32 ref = Blobs.size(); + const TLogoBlobID fake(0, 0, 0, Room->Blobs, blob.size(), ref); Blobs.emplace_back(TGlobId{ fake, 0 }, std::move(blob)); - return Blobs.size() - 1; + return ref; } bool Lookup(ui32 table) noexcept diff --git a/ydb/core/tablet_flat/flat_dbase_apply.cpp b/ydb/core/tablet_flat/flat_dbase_apply.cpp index f87b315d33..714fc05edc 100644 --- a/ydb/core/tablet_flat/flat_dbase_apply.cpp +++ b/ydb/core/tablet_flat/flat_dbase_apply.cpp @@ -6,8 +6,9 @@ namespace NKikimr { namespace NTable { -TSchemeModifier::TSchemeModifier(TScheme &scheme) +TSchemeModifier::TSchemeModifier(TScheme &scheme, TSchemeRollbackState *rollbackState) : Scheme(scheme) + , RollbackState(rollbackState) { } @@ -38,14 +39,24 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta) } else if (action == TAlterRecord::AddColumnToKey) { changes = AddColumnToKey(table, delta.GetColumnId()); } else if (action == TAlterRecord::AddFamily) { - auto &family = Table(table)->Families[delta.GetFamilyId()]; + auto &tableInfo = *Table(table); + if (!tableInfo.Families.contains(delta.GetFamilyId())) { + PreserveTable(table); + changes = true; + } + auto &family = tableInfo.Families[delta.GetFamilyId()]; const ui32 room = delta.GetRoomId(); - changes = (std::exchange(family.Room, room) != room); + changes = ChangeTableSetting(table, family.Room, room); } else if (action == TAlterRecord::SetFamily) { - auto &family = Table(table)->Families[delta.GetFamilyId()]; + auto &tableInfo = *Table(table); + if (!tableInfo.Families.contains(delta.GetFamilyId())) { + PreserveTable(table); + changes = true; + } + auto &family = tableInfo.Families[delta.GetFamilyId()]; auto codec = delta.HasCodec() ? ECodec(delta.GetCodec()) :family.Codec; @@ -60,51 +71,56 @@ bool TSchemeModifier::Apply(const TAlterRecord &delta) Y_VERIFY(ui32(cache) <= 2, "Invalid pages cache policy value"); - changes = - (std::exchange(family.Cache, cache) != cache) - | (std::exchange(family.Codec, codec) != codec) - | (std::exchange(family.Small, small) != small) - | (std::exchange(family.Large, large) != large); + changes |= ChangeTableSetting(table, family.Cache, cache); + changes |= ChangeTableSetting(table, family.Codec, codec); + changes |= ChangeTableSetting(table, family.Small, small); + changes |= ChangeTableSetting(table, family.Large, large); } else if (action == TAlterRecord::AddColumnToFamily) { changes = AddColumnToFamily(table, delta.GetColumnId(), delta.GetFamilyId()); } else if (action == TAlterRecord::SetRoom) { - auto &room = Table(table)->Rooms[delta.GetRoomId()]; + auto &tableInfo = *Table(table); + if (!tableInfo.Rooms.contains(delta.GetRoomId())) { + PreserveTable(table); + changes = true; + } + auto &room = tableInfo.Rooms[delta.GetRoomId()]; - ui32 main = delta.HasMain() ? delta.GetMain() : room.Main; - ui32 blobs = delta.HasBlobs() ? delta.GetBlobs() : room.Blobs; - ui32 outer = delta.HasOuter() ? delta.GetOuter() : room.Outer; + ui8 main = delta.HasMain() ? delta.GetMain() : room.Main; + ui8 blobs = delta.HasBlobs() ? delta.GetBlobs() : room.Blobs; + ui8 outer = delta.HasOuter() ? delta.GetOuter() : room.Outer; - changes = - (std::exchange(room.Main, main) != main) - | (std::exchange(room.Blobs, blobs) != blobs) - | (std::exchange(room.Outer, outer) != outer); + changes |= ChangeTableSetting(table, room.Main, main); + changes |= ChangeTableSetting(table, room.Blobs, blobs); + changes |= ChangeTableSetting(table, room.Outer, outer); } else if (action == TAlterRecord::SetRedo) { const ui32 annex = delta.HasAnnex() ? delta.GetAnnex() : 0; - changes = (std::exchange(Scheme.Redo.Annex, annex) != annex); + changes |= ChangeRedoSetting(Scheme.Redo.Annex, annex); } else if (action == TAlterRecord::SetTable) { + auto &tableInfo = *Table(table); + if (delta.HasByKeyFilter()) { bool enabled = delta.GetByKeyFilter(); - changes |= (std::exchange(Table(table)->ByKeyFilter, enabled) != enabled); + changes |= ChangeTableSetting(table, tableInfo.ByKeyFilter, enabled); } if (delta.HasEraseCacheEnabled()) { bool enabled = delta.GetEraseCacheEnabled(); - changes |= (std::exchange(Table(table)->EraseCacheEnabled, enabled) != enabled); + changes |= ChangeTableSetting(table, tableInfo.EraseCacheEnabled, enabled); if (enabled) { ui32 minRows = delta.GetEraseCacheMinRows(); ui32 maxBytes = delta.GetEraseCacheMaxBytes(); - changes |= (std::exchange(Table(table)->EraseCacheMinRows, minRows) != minRows); - changes |= (std::exchange(Table(table)->EraseCacheMaxBytes, maxBytes) != maxBytes); + changes |= ChangeTableSetting(table, tableInfo.EraseCacheMinRows, minRows); + changes |= ChangeTableSetting(table, tableInfo.EraseCacheMaxBytes, maxBytes); } } if (delta.HasColdBorrow()) { bool enabled = delta.GetColdBorrow(); - changes |= (std::exchange(Table(table)->ColdBorrow, enabled) != enabled); + changes |= ChangeTableSetting(table, tableInfo.ColdBorrow, enabled); } } else if (action == TAlterRecord::UpdateExecutorInfo) { @@ -136,38 +152,66 @@ bool TSchemeModifier::AddColumnToFamily(ui32 tid, ui32 cid, ui32 family) auto* column = Scheme.GetColumnInfo(Table(tid), cid); Y_VERIFY(column); - return std::exchange(column->Family, family) != family; + if (column->Family != family) { + PreserveTable(tid); + column->Family = family; + return true; + } + + return false; } bool TSchemeModifier::AddTable(const TString &name, ui32 id) { + auto it = Scheme.Tables.find(id); auto itName = Scheme.TableNames.find(name); - auto it = Scheme.Tables.emplace(id, TTable(name, id)); + + // We verify id match when a table with this name already exists if (itName != Scheme.TableNames.end()) { - Y_VERIFY(!it.second && it.first->second.Name == name && itName->second == it.first->first); + auto describeFailure = [&]() -> TString { + TStringBuilder out; + out << "Table " << id << " '" << name << "'" + << " conflicts with table " << itName->second << " '" << itName->first << "'"; + if (it != Scheme.Tables.end()) { + out << " and table " << it->first << " '" << it->second.Name << "'"; + } + return out; + }; + Y_VERIFY_S(itName->second == id, describeFailure()); + // Sanity check that this table really exists + Y_VERIFY(it != Scheme.Tables.end() && it->second.Name == name); return false; - } else if (!it.second) { - // renaming table - Scheme.TableNames.erase(it.first->second.Name); - Scheme.TableNames.emplace(name, id); - it.first->second.Name = name; - return true; - } else { - Y_VERIFY(it.second); + } + + PreserveTable(id); + + // We assume table is renamed when the same id already exists + if (it != Scheme.Tables.end()) { + Scheme.TableNames.erase(it->second.Name); + it->second.Name = name; Scheme.TableNames.emplace(name, id); - if (id >= 100) { - auto *table = &it.first->second; - // HACK: Force user tables to have some reasonable policy with multiple levels - table->CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy(); - } return true; } + + // Creating a new table + auto pr = Scheme.Tables.emplace(id, TTable(name, id)); + Y_VERIFY(pr.second); + it = pr.first; + Scheme.TableNames.emplace(name, id); + + if (id >= 100) { + // HACK: Force user tables to have some reasonable policy with multiple levels + it->second.CompactionPolicy = NLocalDb::CreateDefaultUserTablePolicy(); + } + + return true; } bool TSchemeModifier::DropTable(ui32 id) { auto it = Scheme.Tables.find(id); if (it != Scheme.Tables.end()) { + PreserveTable(id); Scheme.TableNames.erase(it->second.Name); Scheme.Tables.erase(it); return true; @@ -178,34 +222,51 @@ bool TSchemeModifier::DropTable(ui32 id) bool TSchemeModifier::AddColumn(ui32 tid, const TString &name, ui32 id, ui32 type, bool notNull, TCell null) { auto *table = Table(tid); + + auto it = table->Columns.find(id); auto itName = table->ColumnNames.find(name); - bool haveName = itName != table->ColumnNames.end(); - auto it = table->Columns.emplace(id, TColumn(name, id, type, notNull)); - if (it.second) - it.first->second.SetDefault(null); + // We verify ids and types match when column with the same name already exists + if (itName != table->ColumnNames.end()) { + auto describeFailure = [&]() -> TString { + TStringBuilder out; + out << "Table " << tid << " '" << table->Name << "'" + << " adding column " << id << " '" << name << "'" + << " conflicts with column " << itName->second << " '" << itName->first << "'"; + if (it != table->Columns.end()) { + out << " and column " << it->first << " '" << it->second.Name << "'"; + } + return out; + }; + Y_VERIFY_S(itName->second == id, describeFailure()); + // Sanity check that this column exists and types match + Y_VERIFY(it != table->Columns.end() && it->second.Name == name); + Y_VERIFY_S(it->second.PType == type, + "Table " << tid << " '" << table->Name << "' column " << id << " '" << name << "' expected type " << type << ", existing type " << it->second.PType); + return false; + } + + PreserveTable(tid); - if (!it.second && !haveName && it.first->second.PType == type) { - // renaming column - table->ColumnNames.erase(it.first->second.Name); + // We assume column is renamed when the same id already exists + if (it != table->Columns.end()) { + Y_VERIFY_S(it->second.PType == type, + "Table " << tid << " '" << table->Name << "' column " << id << " '" << it->second.Name << "' renamed to '" << name << "'" + << " with type " << type << ", existing type " << it->second.PType); + table->ColumnNames.erase(it->second.Name); + it->second.Name = name; table->ColumnNames.emplace(name, id); - it.first->second.Name = name; return true; - } else { - // do we have inserted a new column, OR we already have the same column with the same name? - bool insertedNew = it.second && !haveName; - bool replacedExisting = !it.second && it.first->second.Name == name && haveName && itName->second == it.first->first; - Y_VERIFY_S((insertedNew || replacedExisting), - "NewName: " << name << - " OldName: " << (haveName ? itName->first : it.first->second.Name) << - " NewId: " << id << - " OldId: " << (haveName ? itName->second : it.first->first)); - if (!haveName) { - table->ColumnNames.emplace(name, id); - return true; - } } - return false; + + auto pr = table->Columns.emplace(id, TColumn(name, id, type, notNull)); + Y_VERIFY(pr.second); + it = pr.first; + table->ColumnNames.emplace(name, id); + + it->second.SetDefault(null); + + return true; } bool TSchemeModifier::DropColumn(ui32 tid, ui32 id) @@ -213,6 +274,7 @@ bool TSchemeModifier::DropColumn(ui32 tid, ui32 id) auto *table = Table(tid); auto it = table->Columns.find(id); if (it != table->Columns.end()) { + PreserveTable(tid); table->ColumnNames.erase(it->second.Name); table->Columns.erase(it); return true; @@ -228,6 +290,7 @@ bool TSchemeModifier::AddColumnToKey(ui32 tid, ui32 columnId) auto keyPos = std::find(table->KeyColumns.begin(), table->KeyColumns.end(), column->Id); if (keyPos == table->KeyColumns.end()) { + PreserveTable(tid); column->KeyOrder = table->KeyColumns.size(); table->KeyColumns.push_back(column->Id); return true; @@ -237,32 +300,32 @@ bool TSchemeModifier::AddColumnToKey(ui32 tid, ui32 columnId) bool TSchemeModifier::SetExecutorCacheSize(ui64 size) { - return std::exchange(Scheme.Executor.CacheSize, size) != size; + return ChangeExecutorSetting(Scheme.Executor.CacheSize, size); } bool TSchemeModifier::SetExecutorAllowLogBatching(bool allow) { - return std::exchange(Scheme.Executor.AllowLogBatching, allow) != allow; + return ChangeExecutorSetting(Scheme.Executor.AllowLogBatching, allow); } bool TSchemeModifier::SetExecutorLogFastCommitTactic(bool allow) { - return std::exchange(Scheme.Executor.LogFastTactic, allow) != allow; + return ChangeExecutorSetting(Scheme.Executor.LogFastTactic, allow); } bool TSchemeModifier::SetExecutorLogFlushPeriod(TDuration delay) { - return std::exchange(Scheme.Executor.LogFlushPeriod, delay) != delay; + return ChangeExecutorSetting(Scheme.Executor.LogFlushPeriod, delay); } bool TSchemeModifier::SetExecutorLimitInFlyTx(ui32 limit) { - return std::exchange(Scheme.Executor.LimitInFlyTx, limit) != limit; + return ChangeExecutorSetting(Scheme.Executor.LimitInFlyTx, limit); } bool TSchemeModifier::SetExecutorResourceProfile(const TString &name) { - return std::exchange(Scheme.Executor.ResourceProfile, name) != name; + return ChangeExecutorSetting(Scheme.Executor.ResourceProfile, name); } bool TSchemeModifier::SetCompactionPolicy(ui32 tid, const NKikimrSchemeOp::TCompactionPolicy &proto) @@ -271,9 +334,36 @@ bool TSchemeModifier::SetCompactionPolicy(ui32 tid, const NKikimrSchemeOp::TComp TIntrusiveConstPtr<TCompactionPolicy> policy(new TCompactionPolicy(proto)); if (table->CompactionPolicy && *(table->CompactionPolicy) == *policy) return false; + PreserveTable(tid); table->CompactionPolicy = policy; return true; } +void TSchemeModifier::PreserveTable(ui32 tid) noexcept +{ + if (RollbackState && !RollbackState->Tables.contains(tid)) { + auto it = Scheme.Tables.find(tid); + if (it != Scheme.Tables.end()) { + RollbackState->Tables[tid] = it->second; + } else { + RollbackState->Tables[tid] = std::nullopt; + } + } +} + +void TSchemeModifier::PreserveExecutor() noexcept +{ + if (RollbackState && !RollbackState->Executor) { + RollbackState->Executor = Scheme.Executor; + } +} + +void TSchemeModifier::PreserveRedo() noexcept +{ + if (RollbackState && !RollbackState->Redo) { + RollbackState->Redo = Scheme.Redo; + } +} + } } diff --git a/ydb/core/tablet_flat/flat_dbase_apply.h b/ydb/core/tablet_flat/flat_dbase_apply.h index 3502030b71..279a598c1f 100644 --- a/ydb/core/tablet_flat/flat_dbase_apply.h +++ b/ydb/core/tablet_flat/flat_dbase_apply.h @@ -15,7 +15,7 @@ namespace NTable { using ECodec = NPage::ECodec; using ECache = NPage::ECache; - explicit TSchemeModifier(TScheme &scheme); + explicit TSchemeModifier(TScheme &scheme, TSchemeRollbackState *rollbackState = nullptr); bool Apply(const TSchemeChanges &delta) { @@ -27,9 +27,9 @@ namespace NTable { return changed; } - protected: bool Apply(const TAlterRecord &delta); + protected: bool AddTable(const TString& name, ui32 id); bool DropTable(ui32 id); bool AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, bool notNull, TCell null = { }); @@ -48,12 +48,47 @@ namespace NTable { TTable* Table(ui32 tid) const noexcept { auto* table = Scheme.GetTableInfo(tid); - Y_VERIFY(table, "Acccessing table that isn't exists"); + Y_VERIFY(table, "Acccessing table that doesn't exist"); return table; } + template<class T> + bool ChangeTableSetting(ui32 tid, T& dst, const T& value) { + if (dst != value) { + PreserveTable(tid); + dst = value; + return true; + } + return false; + } + + template<class T> + bool ChangeExecutorSetting(T& dst, const T& value) { + if (dst != value) { + PreserveExecutor(); + dst = value; + return true; + } + return false; + } + + template<class T> + bool ChangeRedoSetting(T& dst, const T& value) { + if (dst != value) { + PreserveRedo(); + dst = value; + return true; + } + return false; + } + + void PreserveTable(ui32 tid) noexcept; + void PreserveExecutor() noexcept; + void PreserveRedo() noexcept; + public: TScheme &Scheme; + TSchemeRollbackState *RollbackState; THashSet<ui32> Affects; }; diff --git a/ydb/core/tablet_flat/flat_dbase_change.h b/ydb/core/tablet_flat/flat_dbase_change.h index 641e08b63d..857fe29fdc 100644 --- a/ydb/core/tablet_flat/flat_dbase_change.h +++ b/ydb/core/tablet_flat/flat_dbase_change.h @@ -12,13 +12,6 @@ namespace NTable { struct TChange { using TMemGlobs = TVector<NPageCollection::TMemGlob>; - struct TSnapshot { - TSnapshot(ui32 table) : Table(table) { } - - ui32 Table = Max<ui32>(); - TEpoch Epoch = TEpoch::Max(); - }; - struct TStats { ui64 ChargeSieved = 0; ui64 ChargeWeeded = 0; @@ -63,7 +56,7 @@ namespace NTable { TVector<ui32> Deleted; /* Tables deleted in some alter */ TGarbage Garbage; /* Wiped tables, ids in Deleted */ - TVector<TSnapshot> Snapshots; + ui32 Snapshots = 0; TMap<ui32, TVector<TRemovedRowVersions>> RemovedRowVersions; diff --git a/ydb/core/tablet_flat/flat_dbase_naked.h b/ydb/core/tablet_flat/flat_dbase_naked.h index 493867a7f8..ebf4c26891 100644 --- a/ydb/core/tablet_flat/flat_dbase_naked.h +++ b/ydb/core/tablet_flat/flat_dbase_naked.h @@ -17,8 +17,9 @@ namespace NKikimr { namespace NTable { - class TDatabaseImpl { - + class TDatabaseImpl final + : public IAlterSink + { struct TArgs { ui32 Table; TEpoch Head; @@ -49,7 +50,12 @@ namespace NTable { bool Touch(ui64 edge, ui64 serial) noexcept { - return std::exchange(Serial, serial) <= edge; + ui64 prevSerial = std::exchange(Serial, serial); + if (prevSerial <= edge) { + SerialBackup = prevSerial; + return true; + } + return false; } void Aggr(TDbStats &aggr, bool enter) const noexcept @@ -77,10 +83,40 @@ namespace NTable { } } + void BackupMemStats() noexcept + { + BackupMemTableWaste = Self->GetMemWaste(); + BackupMemTableBytes = Self->GetMemSize(); + BackupMemTableOps = Self->GetOpsCount(); + } + + void RestoreMemStats(TDbStats &aggr) const noexcept + { + NUtil::SubSafe(aggr.MemTableWaste, BackupMemTableWaste); + NUtil::SubSafe(aggr.MemTableBytes, BackupMemTableBytes); + NUtil::SubSafe(aggr.MemTableOps, BackupMemTableOps); + aggr.MemTableWaste += Self->GetMemWaste(); + aggr.MemTableBytes += Self->GetMemSize(); + aggr.MemTableOps += Self->GetOpsCount(); + } + const ui32 Table = Max<ui32>(); const TIntrusivePtr<TTable> Self; const TTxStamp Edge = 0; /* Stamp of last snapshot */ ui64 Serial = 0; + ui64 SerialBackup = 0; + + std::optional<TEpoch> EpochSnapshot; + ui64 BackupMemTableWaste; + ui64 BackupMemTableBytes; + ui64 BackupMemTableOps; + + bool Created = false; + bool Dropped = false; + bool SchemePending = false; + bool SchemeModified = false; + bool DataModified = false; + bool RollbackPrepared = false; }; public: @@ -113,23 +149,312 @@ namespace NTable { { auto *wrap = Tables.FindPtr(table); - Y_VERIFY(wrap || !require, "Cannot find given table"); + if (!wrap || wrap->Dropped) { + Y_VERIFY(!require, "Cannot find table %" PRIu32, table); + return Dummy; + } + + if (wrap->SchemePending) { + Y_VERIFY(InTransaction); + + auto* info = Scheme->GetTableInfo(table); + Y_VERIFY(info, "No scheme for existing table %" PRIu32, table); - return wrap ? *wrap : Dummy; + if (!wrap->Created && !wrap->RollbackPrepared) { + wrap->BackupMemStats(); + (*wrap)->PrepareRollback(); + wrap->RollbackPrepared = true; + Prepared.push_back(table); + } + + if (!wrap->EpochSnapshot) { + // We always flush mem table on schema modification, + // which happens at the "start" of transaction. + wrap->EpochSnapshot.emplace((*wrap)->Snapshot()); + // When this is an existing table we also simulate + // EvFlush that is inserted in the redo log. + if (!wrap->Created) { + Flushed.push_back(table); + if (wrap->Touch(Begin_, Serial_)) { + Affects.push_back(table); + } + } + } + + (*wrap)->SetScheme(*info); + + wrap->SchemeModified = true; + wrap->SchemePending = false; + } + + return *wrap; + } + + TTableWrapper& GetForUpdate(ui32 table) noexcept + { + Y_VERIFY(InTransaction); + TTableWrapper& wrap = Get(table, true); + if (!wrap.Created && !wrap.RollbackPrepared) { + wrap.BackupMemStats(); + wrap->PrepareRollback(); + wrap.RollbackPrepared = true; + Prepared.push_back(table); + } + if (wrap.Touch(Begin_, Serial_)) { + Affects.push_back(table); + } + Y_VERIFY(wrap.Created || wrap.RollbackPrepared); + wrap.DataModified = true; + return wrap; } ui64 Rewind(ui64 serial) noexcept { + Y_VERIFY(!InTransaction, "Unexpected rewind inside a transaction"); return std::exchange(Serial_, Max(Serial_, serial)); } + void BeginTransaction() noexcept + { + Y_VERIFY(!InTransaction); + InTransaction = true; + + // We pretend as if we just processed Switch and EvBegin with the next serial + Begin_ = Serial_; + Affects = { }; + Serial_++; + + // Sanity checks + Y_VERIFY_DEBUG(Annex.empty()); + Y_VERIFY_DEBUG(Flushed.empty()); + Y_VERIFY_DEBUG(Prepared.empty()); + } + + TEpoch FlushTable(ui32 tid) noexcept + { + Y_VERIFY(InTransaction); + auto& wrap = Get(tid, true); + Y_VERIFY(!wrap.DataModified, "Cannot flush a modified table"); + if (!wrap.EpochSnapshot) { + Y_VERIFY(!wrap.Created); + wrap.EpochSnapshot.emplace(wrap->Snapshot()); + // Simulate inserting and processing EvFlush + Flushed.push_back(tid); + if (wrap.Touch(Begin_, Serial_)) { + Affects.push_back(tid); + } + } + return *wrap.EpochSnapshot; + } + + void CommitTransaction(TTxStamp stamp, TArrayRef<const TMemGlob> annex, NRedo::TWriter& writer) noexcept + { + Y_VERIFY(Stamp <= stamp, "Executor tx stamp cannot go to the past"); + Stamp = stamp; + + CommitScheme(annex); + + for (ui32 tid : Prepared) { + auto it = Tables.find(tid); + if (it == Tables.end()) { + // Table was actually dropped + continue; + } + auto& wrap = it->second; + Y_VERIFY(wrap.RollbackPrepared); + wrap->CommitChanges(annex); + wrap.RestoreMemStats(Stats); + wrap.RollbackPrepared = false; + wrap.DataModified = false; + } + Prepared.clear(); + + THashSet<ui32> dropped; + for (ui32 tid : Flushed) { + auto it = Tables.find(tid); + if (it == Tables.end()) { + // Table was actually dropped + dropped.insert(tid); + continue; + } + auto& wrap = it->second; + Y_VERIFY(wrap.EpochSnapshot); + writer.EvFlush(tid, Stamp - 1, *wrap.EpochSnapshot); + wrap.EpochSnapshot.reset(); + } + Flushed.clear(); + + // Remove dropped tables (if any) from affects + if (!dropped.empty()) { + auto end = std::remove_if( + Affects.begin(), Affects.end(), + [&dropped](ui32 tid) { + return dropped.contains(tid); + }); + Affects.erase(end, Affects.end()); + } + + // We expect database to drop commits without any side-effects + // So we rewind serial to match what it would be after a reboot + if (Affects.empty()) { + Serial_ = Begin_; + } + + Stats.TxCommited++; + InTransaction = false; + } + + void CommitScheme(TArrayRef<const TMemGlob> annex) noexcept + { + if (!SchemeRollbackState.Tables.empty() || SchemeRollbackState.Redo) { + // Table or redo settings have changed + CalculateAnnexEdge(); + } + + TScheme& scheme = *Scheme; + for (auto& pr : SchemeRollbackState.Tables) { + ui32 tid = pr.first; + auto* info = scheme.GetTableInfo(tid); + if (!info) { + // This table doesn't exist in current schema, + // which means it has been dropped. + Y_VERIFY(Tables.contains(tid), "Unexpected drop for a table that doesn't exist"); + auto& wrap = Tables.at(tid); + Y_VERIFY(wrap.Dropped); + Y_VERIFY(!wrap.DataModified, "Unexpected drop of a modified table"); + if (wrap.RollbackPrepared) { + wrap->CommitChanges(annex); + wrap.RestoreMemStats(Stats); + wrap.RollbackPrepared = false; + } + wrap.Aggr(Stats, false /* leave */); + Deleted.emplace_back(tid); + Garbage.emplace_back(wrap->Unwrap()); + Tables.erase(tid); + NUtil::SubSafe(Stats.Tables, ui32(1)); + continue; + } + + // This call will also apply schema changes + auto& wrap = Get(tid, true); + Y_VERIFY(!wrap.Dropped); + Y_VERIFY(!wrap.SchemePending); + Y_VERIFY(wrap.SchemeModified); + + if (wrap.Created) { + // If the table is both created and modified in the same + // transaction, then make sure flags are cleared and the + // table stats are accounted for. + wrap.Created = false; + wrap.DataModified = false; + Y_VERIFY(!wrap.RollbackPrepared); + wrap.EpochSnapshot.reset(); + wrap->CommitNewTable(annex); + wrap.Aggr(Stats, true /* enter */); + } + + wrap.SchemeModified = false; + } + + SchemeRollbackState.Tables.clear(); + SchemeRollbackState.Executor.reset(); + SchemeRollbackState.Redo.reset(); + } + + void RollbackTransaction() noexcept + { + for (ui32 tid : Prepared) { + auto& wrap = Tables.at(tid); + Y_VERIFY(wrap.RollbackPrepared); + wrap->RollbackChanges(); + wrap.RestoreMemStats(Stats); + wrap.RollbackPrepared = false; + wrap.SchemeModified = false; + wrap.DataModified = false; + } + Prepared.clear(); + + for (ui32 tid : Flushed) { + auto& wrap = Tables.at(tid); + Y_VERIFY(wrap.EpochSnapshot); + wrap.EpochSnapshot.reset(); + } + Flushed.clear(); + + for (ui32 tid : Affects) { + auto& wrap = Tables.at(tid); + if (!wrap.Created) { + wrap.Serial = wrap.SerialBackup; + } + } + Affects.clear(); + + RollbackScheme(); + Serial_ = Begin_; + InTransaction = false; + } + + void RollbackScheme() noexcept + { + // Note: we assume schema rollback is very rare, + // so it doesn't have to be efficient + TScheme& scheme = *Scheme; + if (SchemeRollbackState.Redo) { + scheme.Redo = *SchemeRollbackState.Redo; + SchemeRollbackState.Redo.reset(); + } + if (SchemeRollbackState.Executor) { + scheme.Executor = *SchemeRollbackState.Executor; + SchemeRollbackState.Executor.reset(); + } + // First pass: we remove all modified tables from schema to handle renames + for (auto& pr : SchemeRollbackState.Tables) { + auto it = scheme.Tables.find(pr.first); + if (it != scheme.Tables.end()) { + scheme.TableNames.erase(it->second.Name); + scheme.Tables.erase(it); + } + } + // Second pass: restore all tables that existed before transaction started + for (auto& pr : SchemeRollbackState.Tables) { + if (pr.second) { + auto res = scheme.Tables.emplace(pr.first, *pr.second); + Y_VERIFY(res.second); + scheme.TableNames.emplace(res.first->second.Name, pr.first); + } + } + // Third pass: we check modified tables and rollback their schema changes + for (auto& pr : SchemeRollbackState.Tables) { + ui32 tid = pr.first; + auto& wrap = Tables.at(tid); + if (wrap.Created) { + // This table didn't exist, just forget about it + Tables.erase(tid); + NUtil::SubSafe(Stats.Tables, ui32(1)); + continue; + } + // By the time schema rollback is called we expect changes to be rolled back already + Y_VERIFY(!wrap.SchemeModified, "Unexpected schema rollback on a modified table"); + Y_VERIFY(!wrap.EpochSnapshot, "Unexpected schema rollback on a flushed table"); + if (wrap.Dropped) { + // This table is no longer dropped + wrap.Dropped = false; + } + wrap.SchemePending = false; + } + SchemeRollbackState.Tables.clear(); + } + TDatabaseImpl& Switch(TTxStamp stamp) noexcept { - if (std::exchange(Stamp, stamp) > stamp) - Y_FAIL("Executor tx stamp cannot go to the past"); + Y_VERIFY(!InTransaction, "Unexpected switch inside a transaction"); + Y_VERIFY(Stamp <= stamp, "Executor tx stamp cannot go to the past"); + Stamp = stamp; - First_ = Max<ui64>(), Begin_ = Serial_; - Stats.TxCommited++, Affects = { }; + First_ = Max<ui64>(); + Begin_ = Serial_; + Stats.TxCommited++; + Affects = { }; return *this; } @@ -195,12 +520,12 @@ namespace NTable { wrap.Aggr(Stats, true /* enter */); } - bool Apply(const TSchemeChanges &delta, NRedo::TWriter *writer) + bool ApplySchema(const TSchemeChanges &delta) { TModifier modifier(*Scheme); if (modifier.Apply(delta)) { - Apply(modifier.Affects, writer); + ApplySchema(modifier.Affects); return true; } else { @@ -268,20 +593,12 @@ namespace NTable { return result.first->second; } - void Apply(const THashSet<ui32> &affects, NRedo::TWriter *writer) + void ApplySchema(const THashSet<ui32> &affects) { for (ui32 table : affects) { auto &wrap = Get(table, false); if (auto *info = Scheme->GetTableInfo(table)) { - if (wrap && writer) { - /* Hack keeps table epoches consistent in regular - flow and on db bootstap. Required due to scheme - deltas and redo log async rollup on bootstrap. - */ - - writer->EvFlush(table, Stamp, wrap->Snapshot()); - } (wrap ? wrap : MakeTable(table, { }))->SetScheme(*info); @@ -325,7 +642,7 @@ namespace NTable { Serial_ = serial; } else { Y_Fail("EvBegin{" << serial << " " << NFmt::TStamp(stamp) - << "} is not fits to db state {" << Serial_ << " " + << "} does not match db state {" << Serial_ << " " << NFmt::TStamp(Stamp) << "} (redo log was reordered)"); } @@ -415,11 +732,40 @@ namespace NTable { Legacy log (Evolution < 12) have no EvBegin and progression of serial require hack with virtual insertion of EvBegin here. */ + if (Y_UNLIKELY(Serial_ == Begin_)) { + ++Serial_; + } - if (wrap.Touch(Begin_, Begin_ == Serial_ ? ++Serial_ : Serial_)) + if (wrap.Touch(Begin_, Serial_)) Affects.emplace_back(table); - return First_ = Min(First_, Serial_), wrap; + First_ = Min(First_, Serial_); + return wrap; + } + + private: + bool ApplyAlterRecord(const TAlterRecord& record) override + { + Y_VERIFY(InTransaction, "Unexpected ApplyAlterRecord outside of transaction"); + TSchemeModifier modifier(*Scheme, &SchemeRollbackState); + bool changes = modifier.Apply(record); + if (changes) { + // There will be at most one table id + for (ui32 tid : modifier.Affects) { + auto* wrap = Tables.FindPtr(tid); + if (!wrap) { + wrap = &MakeTable(tid, { }); + wrap->Created = true; + } + Y_VERIFY(!wrap->DataModified, "Table %" PRIu32 " cannot be altered after being changed", tid); + Y_VERIFY(!wrap->Dropped, "Table %" PRIu32 " cannot be altered after being dropped", tid); + if (!Scheme->GetTableInfo(tid)) { + wrap->Dropped = true; + } + wrap->SchemePending = true; + } + } + return changes; } public: @@ -440,6 +786,11 @@ namespace NTable { NRedo::TPlayer<TDatabaseImpl> Redo; TVector<ui32> Affects; TVector<TMemGlob> Annex; + TVector<ui32> Flushed; + TVector<ui32> Prepared; + + bool InTransaction = false; + TSchemeRollbackState SchemeRollbackState; public: const TAutoPtr<TScheme> Scheme; diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.cpp b/ydb/core/tablet_flat/flat_dbase_scheme.cpp index fa12005009..61303df104 100644 --- a/ydb/core/tablet_flat/flat_dbase_scheme.cpp +++ b/ydb/core/tablet_flat/flat_dbase_scheme.cpp @@ -61,7 +61,17 @@ TAutoPtr<TSchemeChanges> TScheme::GetSnapshot() const { TAlter& TAlter::Merge(const TSchemeChanges &log) { - Log.MutableDelta()->MergeFrom(log.GetDelta()); + Y_VERIFY(&Log != &log, "Cannot merge changes onto itself"); + + int added = log.DeltaSize(); + if (added > 0) { + auto* dst = Log.MutableDelta(); + dst->Reserve(Log.DeltaSize() + added); + for (const auto& delta : log.GetDelta()) { + *dst->Add() = delta; + ApplyLastRecord(); + } + } return *this; } @@ -73,7 +83,7 @@ TAlter& TAlter::AddTable(const TString& name, ui32 id) delta.SetTableName(name); delta.SetTableId(id); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::DropTable(ui32 id) @@ -82,7 +92,7 @@ TAlter& TAlter::DropTable(ui32 id) delta.SetDeltaType(TAlterRecord::DropTable); delta.SetTableId(id); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, bool notNull, TCell null) @@ -98,7 +108,7 @@ TAlter& TAlter::AddColumn(ui32 table, const TString& name, ui32 id, ui32 type, b if (!null.IsNull()) delta.SetDefault(null.Data(), null.Size()); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::DropColumn(ui32 table, ui32 id) @@ -108,7 +118,7 @@ TAlter& TAlter::DropColumn(ui32 table, ui32 id) delta.SetTableId(table); delta.SetColumnId(id); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::AddColumnToFamily(ui32 table, ui32 column, ui32 family) @@ -119,7 +129,7 @@ TAlter& TAlter::AddColumnToFamily(ui32 table, ui32 column, ui32 family) delta.SetColumnId(column); delta.SetFamilyId(family); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::AddFamily(ui32 table, ui32 family, ui32 room) @@ -130,7 +140,7 @@ TAlter& TAlter::AddFamily(ui32 table, ui32 family, ui32 room) delta.SetFamilyId(family); delta.SetRoomId(room); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::AddColumnToKey(ui32 table, ui32 column) @@ -140,7 +150,7 @@ TAlter& TAlter::AddColumnToKey(ui32 table, ui32 column) delta.SetTableId(table); delta.SetColumnId(column); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetFamily(ui32 table, ui32 family, ECache cache, ECodec codec) @@ -153,7 +163,7 @@ TAlter& TAlter::SetFamily(ui32 table, ui32 family, ECache cache, ECodec codec) delta.SetCodec(ui32(codec)); delta.SetCache(ui32(cache)); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetFamilyBlobs(ui32 table, ui32 family, ui32 small, ui32 large) @@ -165,7 +175,7 @@ TAlter& TAlter::SetFamilyBlobs(ui32 table, ui32 family, ui32 small, ui32 large) delta.SetSmall(small); delta.SetLarge(large); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetRoom(ui32 table, ui32 room, ui32 main, ui32 blobs, ui32 outer) @@ -179,7 +189,7 @@ TAlter& TAlter::SetRoom(ui32 table, ui32 room, ui32 main, ui32 blobs, ui32 outer delta->SetBlobs(blobs); delta->SetOuter(outer); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetRedo(ui32 annex) @@ -189,7 +199,7 @@ TAlter& TAlter::SetRedo(ui32 annex) delta->SetDeltaType(TAlterRecord::SetRedo); delta->SetAnnex(annex); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorCacheSize(ui64 cacheSize) @@ -198,7 +208,7 @@ TAlter& TAlter::SetExecutorCacheSize(ui64 cacheSize) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorCacheSize(cacheSize); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorFastLogPolicy(bool allow) @@ -207,7 +217,7 @@ TAlter& TAlter::SetExecutorFastLogPolicy(bool allow) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorLogFastCommitTactic(allow); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorAllowLogBatching(bool allow) @@ -216,7 +226,7 @@ TAlter& TAlter::SetExecutorAllowLogBatching(bool allow) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorAllowLogBatching(allow); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorLogFlushPeriod(TDuration flushPeriod) @@ -225,7 +235,7 @@ TAlter& TAlter::SetExecutorLogFlushPeriod(TDuration flushPeriod) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorLogFlushPeriod(flushPeriod.MicroSeconds()); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorLimitInFlyTx(ui32 limitTxInFly) @@ -234,7 +244,7 @@ TAlter& TAlter::SetExecutorLimitInFlyTx(ui32 limitTxInFly) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorLimitInFlyTx(limitTxInFly); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetExecutorResourceProfile(const TString &name) @@ -243,7 +253,7 @@ TAlter& TAlter::SetExecutorResourceProfile(const TString &name) delta.SetDeltaType(TAlterRecord::UpdateExecutorInfo); delta.SetExecutorResourceProfile(name); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetCompactionPolicy(ui32 tableId, const TCompactionPolicy& newPolicy) @@ -253,7 +263,7 @@ TAlter& TAlter::SetCompactionPolicy(ui32 tableId, const TCompactionPolicy& newPo delta.SetTableId(tableId); newPolicy.Serialize(*delta.MutableCompactionPolicy()); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetByKeyFilter(ui32 tableId, bool enabled) @@ -263,7 +273,7 @@ TAlter& TAlter::SetByKeyFilter(ui32 tableId, bool enabled) delta.SetTableId(tableId); delta.SetByKeyFilter(enabled ? 1 : 0); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetColdBorrow(ui32 tableId, bool enabled) @@ -273,7 +283,7 @@ TAlter& TAlter::SetColdBorrow(ui32 tableId, bool enabled) delta.SetTableId(tableId); delta.SetColdBorrow(enabled); - return *this; + return ApplyLastRecord(); } TAlter& TAlter::SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 maxBytes) @@ -287,7 +297,12 @@ TAlter& TAlter::SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 max delta.SetEraseCacheMaxBytes(maxBytes); } - return *this; + return ApplyLastRecord(); +} + +TAlter::operator bool() const noexcept +{ + return Log.DeltaSize() > 0; } TAutoPtr<TSchemeChanges> TAlter::Flush() @@ -297,5 +312,19 @@ TAutoPtr<TSchemeChanges> TAlter::Flush() return log; } +TAlter& TAlter::ApplyLastRecord() +{ + if (Sink) { + int deltasCount = Log.DeltaSize(); + Y_VERIFY(deltasCount > 0); + + if (!Sink->ApplyAlterRecord(Log.GetDelta(deltasCount - 1))) { + Log.MutableDelta()->RemoveLast(); + } + } + + return *this; +} + } } diff --git a/ydb/core/tablet_flat/flat_dbase_scheme.h b/ydb/core/tablet_flat/flat_dbase_scheme.h index 1fd5ed2b48..da4d0f7799 100644 --- a/ydb/core/tablet_flat/flat_dbase_scheme.h +++ b/ydb/core/tablet_flat/flat_dbase_scheme.h @@ -200,12 +200,45 @@ public: TRedo Redo; }; +/** + * This structure holds the rollback state for database schema, which allows + * schema to be rolled back to initial state when transaction decides not to + * commit. As this almost never happens it doesn't need to be very efficient. + */ +struct TSchemeRollbackState { + // This hash map has key for each modified table, where value either holds + // previous table schema, or empty if table didn't exist. + THashMap<ui32, std::optional<TScheme::TTableInfo>> Tables; + // Previous executor settings if modified + std::optional<TScheme::TExecutorInfo> Executor; + // Previous redo settings if modified + std::optional<TScheme::TRedo> Redo; +}; + +/** + * Sink is used to inspect and apply alter records + */ +class IAlterSink { +public: + /** + * Sink attempts to apply the given alter record. When the return value + * is true the record is deemed useful, otherwise it is discarded. + */ + virtual bool ApplyAlterRecord(const TAlterRecord& record) = 0; +}; + // scheme delta class TAlter { public: using ECodec = NPage::ECodec; using ECache = NPage::ECache; + TAlter(IAlterSink* sink = nullptr) + : Sink(sink) + { } + + explicit operator bool() const noexcept; + const TSchemeChanges& operator*() const noexcept { return Log; @@ -235,7 +268,12 @@ public: TAlter& SetEraseCache(ui32 tableId, bool enabled, ui32 minRows, ui32 maxBytes); TAutoPtr<TSchemeChanges> Flush(); + +private: + TAlter& ApplyLastRecord(); + protected: + IAlterSink* Sink; TSchemeChanges Log; }; diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 7e17797173..2dd7869f29 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1579,7 +1579,7 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct THPTimer cpuTimer; - TPageCollectionTxEnv env(*PrivatePageCache); + TPageCollectionTxEnv env(*Database, *PrivatePageCache); TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId); txc.NotEnoughMemory(seat->NotEnoughMemoryCount); @@ -1590,11 +1590,6 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct LWTRACK(TransactionExecuteEnd, seat->Self->Orbit, seat->UniqID, done); seat->CPUExecTime += cpuTimer.PassedReset(); - if (done && !Stats->IsFollower) { /* possible rw commit */ - for (auto one: env.MakeSnap) - Database->TxSnapTable(one.first /* table */); - } - bool failed = false; TString failureReason; if (done && (failed = !Database->ValidateCommit(failureReason))) { @@ -1936,21 +1931,20 @@ void TExecutor::CommitTransactionLog(TAutoPtr<TSeat> seat, TPageCollectionTxEnv Y_VERIFY(!force || commitResult.Commit); auto *commit = commitResult.Commit.Get(); // could be nullptr - Y_VERIFY(env.MakeSnap.size() == change->Snapshots.size()); - - for (auto seq: xrange(env.MakeSnap.size())) { - const auto &snap = change->Snapshots[seq]; + for (auto& pr : env.MakeSnap) { + const ui32 table = pr.first; + auto& snap = pr.second; - Y_VERIFY(snap.Epoch != NTable::TEpoch::Max(), "Table was not snapshoted"); + Y_VERIFY(snap.Epoch, "Table was not snapshotted"); - for (auto &context: env.MakeSnap.at(snap.Table).Context) { - auto edge = NTable::TSnapEdge(change->Stamp, snap.Epoch); + for (auto &context: snap.Context) { + auto edge = NTable::TSnapEdge(change->Stamp - 1, *snap.Epoch); if (!context->Impl) context->Impl.Reset(new TTableSnapshotContext::TImpl); - context->Impl->Prepare(snap.Table, edge); - CompactionLogic->PrepareTableSnapshot(snap.Table, edge, context.Get()); + context->Impl->Prepare(table, edge); + CompactionLogic->PrepareTableSnapshot(table, edge, context.Get()); WaitingSnapshots.insert(std::make_pair(context.Get(), context)); } } @@ -2961,8 +2955,10 @@ THolder<TScanSnapshot> TExecutor::PrepareScanSnapshot(ui32 table, const NTable:: subset->ColdParts.insert(subset->ColdParts.end(), params->ColdParts.begin(), params->ColdParts.end()); } - if (*subset && !subset->IsStickedToHead()) { - Y_FAIL("Got table subset with unexpected epoch marker"); + if (*subset) { + Y_VERIFY_S(subset->IsStickedToHead(), + "Got table subset with unexpected head " << subset->Head + << " and epoch " << subset->Epoch()); } } else { // This grabs a volatile snapshot of the mutable table state diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.cpp b/ydb/core/tablet_flat/flat_executor_tx_env.cpp new file mode 100644 index 0000000000..83aad1965d --- /dev/null +++ b/ydb/core/tablet_flat/flat_executor_tx_env.cpp @@ -0,0 +1,25 @@ +#include "flat_executor_tx_env.h" +#include "flat_database.h" + +namespace NKikimr { +namespace NTabletFlatExecutor { + + void TPageCollectionTxEnv::MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) + { + auto tables = snap->TablesToSnapshot(); + Y_VERIFY(tables); + + for (ui32 table : tables) { + auto& entry = MakeSnap[table]; + entry.Context.push_back(snap); + auto epoch = DB.TxSnapTable(table); + if (entry.Epoch) { + Y_VERIFY(*entry.Epoch == epoch, "Table snapshot changed unexpectedly"); + } else { + entry.Epoch.emplace(epoch); + } + } + } + +} // namespace NTabletFlatExecutor +} // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_executor_tx_env.h b/ydb/core/tablet_flat/flat_executor_tx_env.h index e3b54db82e..bfad1f626f 100644 --- a/ydb/core/tablet_flat/flat_executor_tx_env.h +++ b/ydb/core/tablet_flat/flat_executor_tx_env.h @@ -67,6 +67,11 @@ namespace NTabletFlatExecutor { }; struct TPageCollectionTxEnv : public TPageCollectionReadEnv, public IExecuting { + TPageCollectionTxEnv(NTable::TDatabase& db, TPrivatePageCache& cache) + : TPageCollectionReadEnv(cache) + , DB(db) + { } + using TLogoId = TLogoBlobID; struct TBorrowSnap { @@ -118,6 +123,7 @@ namespace NTabletFlatExecutor { struct TSnapshot { TVector<TIntrusivePtr<TTableSnapshotContext>> Context; + std::optional<NTable::TEpoch> Epoch; }; using TPageCollectionReadEnv::TPageCollectionReadEnv; @@ -134,13 +140,7 @@ namespace NTabletFlatExecutor { } protected: /* IExecuting, tx stage func implementation */ - void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override - { - Y_VERIFY(snap->TablesToSnapshot()); - - for (ui32 table : snap->TablesToSnapshot()) - MakeSnap[table].Context.push_back(snap); - } + void MakeSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override; void DropSnapshot(TIntrusivePtr<TTableSnapshotContext> snap) override { @@ -194,6 +194,9 @@ namespace NTabletFlatExecutor { LoanConfirmation.insert(std::make_pair(bundle, TLoanConfirmation{borrow})); } + protected: + NTable::TDatabase& DB; + public: /*_ Pending database shanshots */ diff --git a/ydb/core/tablet_flat/flat_executor_ut.cpp b/ydb/core/tablet_flat/flat_executor_ut.cpp index d3f008825f..0568b33a50 100644 --- a/ydb/core/tablet_flat/flat_executor_ut.cpp +++ b/ydb/core/tablet_flat/flat_executor_ut.cpp @@ -4891,5 +4891,135 @@ Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs) { } // Y_UNIT_TEST_SUITE(TFlatTableLongTxAndBlobs) +Y_UNIT_TEST_SUITE(TFlatTableSnapshotWithCommits) { + + struct TTxMakeSnapshotAndWrite : public ITransaction { + bool Execute(TTransactionContext& txc, const TActorContext&) override { + TIntrusivePtr<TTableSnapshotContext> snapContext = + new TTestTableSnapshotContext({TRowsModel::TableId}); + txc.Env.MakeSnapshot(snapContext); + + for (i64 keyValue = 101; keyValue <= 104; ++keyValue) { + const auto key = NScheme::TInt64::TInstance(keyValue); + + TString str = "value"; + const auto val = NScheme::TString::TInstance(str); + NTable::TUpdateOp ops{ TRowsModel::ColumnValueId, NTable::ECellOp::Set, val }; + + txc.DB.Update(TRowsModel::TableId, NTable::ERowOp::Upsert, { key }, { ops }); + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } + }; + + struct TTxBorrowSnapshot : public ITransactionWithExecutor { + TTxBorrowSnapshot(TString& snapBody, TIntrusivePtr<TTableSnapshotContext> snapContext, ui64 targetTabletId) + : SnapBody(snapBody) + , SnapContext(std::move(snapContext)) + , TargetTabletId(targetTabletId) + { } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + SnapBody = Executor->BorrowSnapshot(TRowsModel::TableId, *SnapContext, { }, { }, TargetTabletId); + txc.Env.DropSnapshot(SnapContext); + return true; + } + + void Complete(const TActorContext& ctx) override { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } + + private: + TString& SnapBody; + TIntrusivePtr<TTableSnapshotContext> SnapContext; + const ui64 TargetTabletId; + }; + + struct TTxCheckRows : public ITransaction { + bool Execute(TTransactionContext &txc, const TActorContext &) override + { + i64 keyId; + TVector<NTable::TTag> tags; + tags.push_back(TRowsModel::ColumnValueId); + TVector<TRawTypeValue> key; + key.emplace_back(&keyId, sizeof(keyId), NScheme::TInt64::TypeId); + + for (keyId = 1; keyId <= 104; ++keyId) { + NTable::TRowState row; + auto ready = txc.DB.Select(TRowsModel::TableId, key, tags, row); + if (ready == NTable::EReady::Page) + return false; + Y_VERIFY_S(ready == NTable::EReady::Data, "Failed to find key " << keyId); + Y_VERIFY(row.GetRowState() == NTable::ERowOp::Upsert); + TStringBuf selected = row.Get(0).AsBuf(); + TString expected = "value"; + Y_VERIFY(selected == expected); + } + + return true; + } + + void Complete(const TActorContext &ctx) override + { + ctx.Send(ctx.SelfID, new NFake::TEvReturn); + } + }; + + Y_UNIT_TEST(SnapshotWithCommits) { + TMyEnvBase env; + TRowsModel rows; + + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); + + // Start the first tablet + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { + return new TTestFlatTablet(env.Edge, tablet, info); + }); + env.WaitForWakeUp(); + + // Init schema + { + TIntrusivePtr<TCompactionPolicy> policy = new TCompactionPolicy(); + env.SendSync(rows.MakeScheme(std::move(policy))); + } + + // Insert 100 rows + Cerr << "...inserting rows" << Endl; + env.SendSync(rows.MakeRows(100, 0, 100)); + + Cerr << "...making snapshot and writing to table" << Endl; + env.SendSync(new NFake::TEvExecute{ new TTxMakeSnapshotAndWrite }); + Cerr << "...waiting for snapshot to complete" << Endl; + auto evSnapshot = env.GrabEdgeEvent<TEvTestFlatTablet::TEvSnapshotComplete>(); + Cerr << "...borrowing snapshot" << Endl; + TString snapBody; + env.SendSync(new NFake::TEvExecute{ new TTxBorrowSnapshot(snapBody, evSnapshot->Get()->SnapContext, env.Tablet + 1) }); + + // Check all added rows one-by-one + Cerr << "...checking rows" << Endl; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows }); + + for (int i = 0; i < 2; ++i) { + Cerr << "...restarting tablet" << Endl; + env.SendSync(new TEvents::TEvPoison, false, true); + env.WaitForGone(); + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { + return new TTestFlatTablet(env.Edge, tablet, info); + }); + env.WaitForWakeUp(); + + // Check all added rows one-by-one + Cerr << "...checking rows" << Endl; + env.SendSync(new NFake::TEvExecute{ new TTxCheckRows }, /* retry */ true); + } + } + +} // Y_UNIT_TEST_SUITE(TFlatTableSnapshotWithCommits) + } // namespace NTabletFlatExecutor } // namespace NKikimr diff --git a/ydb/core/tablet_flat/flat_mem_blobs.h b/ydb/core/tablet_flat/flat_mem_blobs.h index cb29308fe8..25417cb168 100644 --- a/ydb/core/tablet_flat/flat_mem_blobs.h +++ b/ydb/core/tablet_flat/flat_mem_blobs.h @@ -64,7 +64,9 @@ namespace NMem { Store.emplace_back(glob, std::move(data)); Bytes += glob.Logo.BlobSize(); - return Head + (Store.size() - 1); + const ui64 ref = Head + (Store.size() - 1); + + return ref; } void Assign(TArrayRef<NPageCollection::TLoadedPage> pages) noexcept @@ -76,6 +78,33 @@ namespace NMem { } } + void Commit(size_t count, TArrayRef<const NPageCollection::TMemGlob> pages) noexcept + { + if (count > 0) { + size_t currentSize = Store.size(); + Y_VERIFY(count <= currentSize); + Store.Enumerate(currentSize - count, currentSize, [pages](size_t, NPageCollection::TMemGlob& blob) { + Y_VERIFY(blob.GId.Logo.TabletID() == 0); + const ui32 ref = blob.GId.Logo.Cookie(); + const auto& fixedBlob = pages.at(ref); + Y_VERIFY(fixedBlob.GId.Logo.TabletID() != 0); + blob.GId = fixedBlob.GId; + }); + } + } + + void Rollback(size_t count) noexcept + { + if (count > 0) { + size_t currentSize = Store.size(); + Y_VERIFY(count <= currentSize); + Store.Enumerate(currentSize - count, currentSize, [this](size_t, NPageCollection::TMemGlob& blob) { + Bytes -= blob.GId.Logo.BlobSize(); + }); + Store.truncate(currentSize - count); + } + } + public: const ui64 Head; diff --git a/ydb/core/tablet_flat/flat_mem_eggs.h b/ydb/core/tablet_flat/flat_mem_eggs.h index b422598f8d..d25d4cfe94 100644 --- a/ydb/core/tablet_flat/flat_mem_eggs.h +++ b/ydb/core/tablet_flat/flat_mem_eggs.h @@ -67,9 +67,16 @@ namespace NMem { void Push(TUpdate* update) { const TUpdate* next = GetFirst(); - while (next && update->RowVersion <= next->RowVersion) { - // Collapse row versions that are no longer reachable - next = next->Next; + // Collapse row versions that are no longer reachable to optimize + // future searches, however deltas are not collapsed because we + // don't know which version they may be committed at. + if (update->RowVersion.Step != Max<ui64>()) { + while (next && + next->RowVersion.Step != Max<ui64>() && + update->RowVersion <= next->RowVersion) + { + next = next->Next; + } } update->Next = next; Chain = update; diff --git a/ydb/core/tablet_flat/flat_mem_warm.cpp b/ydb/core/tablet_flat/flat_mem_warm.cpp index df1cb798e2..627a131d38 100644 --- a/ydb/core/tablet_flat/flat_mem_warm.cpp +++ b/ydb/core/tablet_flat/flat_mem_warm.cpp @@ -31,12 +31,87 @@ NMem::TTreeSnapshot TMemTable::Immediate() const { // 1) When taking a snapshot of a frozen mem table, in that case we know // the tree is frozen and will not be modified, so using an unsafe // snapshot is ok. - // 2) When taking a snapshot of a mutable mem table, but used in table - // iterators. In all those cases we know mem table is not modified - // until transaction is committed, so using unsafe snapshot is ok. + // 2) When taking a snapshot of a mutable mem table, but we can guarantee + // mem table will not be changed while iterator is still valid, for + // example during point reads. return NMem::TTreeSnapshot(Tree.UnsafeSnapshot()); } +void TMemTable::PrepareRollback() { + Y_VERIFY(!RollbackState); + auto& state = RollbackState.emplace(); + state.Snapshot = Tree.Snapshot(); + state.OpsCount = OpsCount; + state.RowCount = RowCount; + state.MinRowVersion = MinRowVersion; + state.MaxRowVersion = MaxRowVersion; + Pool.BeginTransaction(); +} + +void TMemTable::RollbackChanges() { + Y_VERIFY(RollbackState); + auto& state = *RollbackState; + Tree.RollbackTo(std::move(state.Snapshot)); + Tree.CollectGarbage(); + Pool.RollbackTransaction(); + Blobs.Rollback(state.AddedBlobs); + OpsCount = state.OpsCount; + RowCount = state.RowCount; + MinRowVersion = state.MinRowVersion; + MaxRowVersion = state.MaxRowVersion; + + struct TApplyUndoOp { + TMemTable* Self; + + void operator()(const TUndoOpUpdateCommitted& op) const { + Self->Committed[op.TxId] = op.Value; + } + + void operator()(const TUndoOpEraseCommitted& op) const { + Self->Committed.erase(op.TxId); + } + + void operator()(const TUndoOpInsertRemoved& op) const { + Self->Removed.insert(op.TxId); + } + + void operator()(const TUndoOpEraseRemoved& op) const { + Self->Removed.erase(op.TxId); + } + + void operator()(const TUndoOpUpdateTxIdStats& op) const { + Self->TxIdStats[op.TxId] = op.Value; + } + + void operator()(const TUndoOpEraseTxIdStats& op) const { + Self->TxIdStats.erase(op.TxId); + } + }; + + while (!UndoBuffer.empty()) { + std::visit(TApplyUndoOp{ this }, UndoBuffer.back()); + UndoBuffer.pop_back(); + } + + RollbackState.reset(); +} + +void TMemTable::CommitChanges(TArrayRef<const TMemGlob> blobs) { + Y_VERIFY(RollbackState); + auto& state = *RollbackState; + state.Snapshot = { }; + Tree.CollectGarbage(); + Pool.CommitTransaction(); + Blobs.Commit(state.AddedBlobs, blobs); + UndoBuffer.clear(); + RollbackState.reset(); +} + +void TMemTable::CommitBlobs(TArrayRef<const TMemGlob> blobs) { + Y_VERIFY(!RollbackState); + Blobs.Commit(Blobs.Size(), blobs); +} + void TMemTable::DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const { auto it = Immediate().Iterator(); auto types = Scheme->Keys->BasicTypes(); diff --git a/ydb/core/tablet_flat/flat_mem_warm.h b/ydb/core/tablet_flat/flat_mem_warm.h index b3e5ef9579..9ce331624f 100644 --- a/ydb/core/tablet_flat/flat_mem_warm.h +++ b/ydb/core/tablet_flat/flat_mem_warm.h @@ -9,6 +9,7 @@ #include "flat_page_blobs.h" #include "flat_sausage_solid.h" #include "flat_table_committed.h" +#include "util_pool.h" #include <ydb/core/scheme/scheme_tablecell.h> #include <ydb/core/scheme/scheme_type_id.h> #include <ydb/core/util/btree_cow.h> @@ -78,7 +79,90 @@ namespace NMem { } }; - using TTree = TCowBTree<TTreeKey, TTreeValue, TKeyCmp>; + class TTreeAllocatorState { + struct TFreeItem { + TFreeItem* Next; + }; + + public: + explicit TTreeAllocatorState(size_t pageSize) + : PageSize(pageSize) + { + Y_VERIFY(PageSize >= sizeof(TFreeItem)); + } + + ~TTreeAllocatorState() noexcept { + TFreeItem* head = Head; + while (head) { + TFreeItem* next = head->Next; + ::operator delete(head, PageSize); + head = next; + } + } + + [[nodiscard]] void* Allocate(size_t size) { + if (Y_LIKELY(size == PageSize)) { + if (Head) { + TFreeItem* item = Head; + Head = item->Next; + NSan::Poison(item, PageSize); + return item; + } + } + return ::operator new(size); + } + + void Deallocate(void* p, size_t size) noexcept { + if (Y_LIKELY(size == PageSize)) { + NSan::Poison(p, size); + TFreeItem* item = reinterpret_cast<TFreeItem*>(p); + item->Next = Head; + Head = item; + } else { + ::operator delete(p, size); + } + } + + private: + const size_t PageSize; + TFreeItem* Head = nullptr; + }; + + template<class T> + class TTreeAllocator { + public: + template<class U> + friend class TTreeAllocator; + + using value_type = T; + + TTreeAllocator(TTreeAllocatorState* state) noexcept + : State(state) + { } + + template<class U> + TTreeAllocator(const TTreeAllocator<U>& other) noexcept + : State(other.State) + { } + + [[nodiscard]] T* allocate(size_t n) { + return static_cast<T*>(State->Allocate(sizeof(T) * n)); + } + + void deallocate(T* p, size_t n) noexcept { + State->Deallocate(p, sizeof(T) * n); + } + + template<class U> + bool operator==(const TTreeAllocator<U>& rhs) const noexcept { + return State == rhs.State; + } + + private: + TTreeAllocatorState* State = nullptr; + }; + + using TTree = TCowBTree<TTreeKey, TTreeValue, TKeyCmp, TTreeAllocator<TTreeValue>, 512>; class TTreeIterator; class TTreeSnapshot; @@ -89,27 +173,11 @@ namespace NMem { struct TMemTableSnapshot; + struct TMemTableRollbackState; + class TMemTable : public TThrRefBase { friend class TMemIt; - template <size_t SizeCap = 512*1024, size_t Overhead = 64> - class TMyPolicy : public TMemoryPool::IGrowPolicy { - public: - size_t Next(size_t prev) const noexcept override - { - if (prev >= SizeCap - Overhead) - return SizeCap - Overhead; - - // Use same buckets as LF-alloc (4KB, 6KB, 8KB, 12KB, 16KB ...) - size_t size = FastClp2(prev); - - if (size < prev + prev/3) - size += size/2; - - return size - Overhead; - } - }; - public: struct TTxIdStat { ui64 OpsCount = 0; @@ -127,11 +195,12 @@ namespace NMem { , Scheme(scheme) , Blobs(annex) , Comparator(*Scheme) - , Pool(chunk, &Policy) - , Tree(Comparator) // TODO: support TMemoryPool with caching + , Pool(chunk) + , TreeAllocatorState(TTree::PageSize) + , Tree(Comparator, &TreeAllocatorState) {} - void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<TMemGlob> pages, TRowVersion rowVersion, + void Update(ERowOp rop, TRawVals key_, TOpsRef ops, TArrayRef<const TMemGlob> pages, TRowVersion rowVersion, NTable::ITransactionMapSimplePtr committed) { Y_VERIFY_DEBUG( @@ -302,6 +371,9 @@ namespace NMem { /* Transformation REDO ELargeObj to TBlobs reference */ const auto ref = Blobs.Push(pages.at(cell.AsValue<ui32>())); + if (RollbackState) { + RollbackState->AddedBlobs++; + } cell = TCell::Make<ui64>(ref); @@ -331,6 +403,14 @@ namespace NMem { if (rowVersion.Step == Max<ui64>()) { MinRowVersion = TRowVersion::Min(); MaxRowVersion = TRowVersion::Max(); + if (RollbackState) { + auto it = TxIdStats.find(rowVersion.TxId); + if (it != TxIdStats.end()) { + UndoBuffer.push_back(TUndoOpUpdateTxIdStats{ rowVersion.TxId, it->second }); + } else { + UndoBuffer.push_back(TUndoOpEraseTxIdStats{ rowVersion.TxId }); + } + } ++TxIdStats[rowVersion.TxId].OpsCount; } else { MinRowVersion = Min(MinRowVersion, rowVersion); @@ -341,14 +421,14 @@ namespace NMem { size_t GetUsedMem() const noexcept { return - Pool.MemoryAllocated() + Pool.Used() + (Tree.AllocatedPages() - Tree.DroppedPages()) * TTree::PageSize + Blobs.GetBytes(); } size_t GetWastedMem() const noexcept { - return Pool.MemoryWaste(); + return Pool.Wasted(); } ui64 GetOpsCount() const noexcept { return OpsCount; } @@ -369,6 +449,11 @@ namespace NMem { return &Blobs; } + void PrepareRollback(); + void RollbackChanges(); + void CommitChanges(TArrayRef<const TMemGlob> blobs); + void CommitBlobs(TArrayRef<const TMemGlob> blobs); + const TTxIdStats& GetTxIdStats() const { return TxIdStats; } @@ -376,9 +461,22 @@ namespace NMem { void CommitTx(ui64 txId, TRowVersion rowVersion) { auto it = Committed.find(txId); if (it == Committed.end() || it->second > rowVersion) { + if (RollbackState) { + if (it != Committed.end()) { + UndoBuffer.push_back(TUndoOpUpdateCommitted{ txId, it->second }); + } else { + UndoBuffer.push_back(TUndoOpEraseCommitted{ txId }); + } + } Committed[txId] = rowVersion; if (it == Committed.end()) { - Removed.erase(txId); + auto itRemoved = Removed.find(txId); + if (itRemoved != Removed.end()) { + if (RollbackState) { + UndoBuffer.push_back(TUndoOpInsertRemoved{ txId }); + } + Removed.erase(itRemoved); + } } } } @@ -386,7 +484,13 @@ namespace NMem { void RemoveTx(ui64 txId) { auto it = Committed.find(txId); if (it == Committed.end()) { - Removed.insert(txId); + auto itRemoved = Removed.find(txId); + if (itRemoved == Removed.end()) { + if (RollbackState) { + UndoBuffer.push_back(TUndoOpEraseRemoved{ txId }); + } + Removed.insert(txId); + } } } @@ -425,7 +529,7 @@ namespace NMem { { const bool small = TCell::CanInline(size); - return { small ? data : Pool.Append(data, size), size }; + return { small ? data : (const char*)Pool.Append(data, size), size }; } void DebugDump() const; @@ -437,8 +541,8 @@ namespace NMem { private: NMem::TBlobs Blobs; const NMem::TKeyCmp Comparator; - TMyPolicy<> Policy; - TMemoryPool Pool; + NUtil::TMemoryPool Pool; + NMem::TTreeAllocatorState TreeAllocatorState; TTree Tree; ui64 OpsCount = 0; ui64 RowCount = 0; @@ -449,6 +553,52 @@ namespace NMem { THashSet<ui64> Removed; private: + struct TRollbackState { + TTree::TSnapshot Snapshot; + ui64 OpsCount; + ui64 RowCount; + TRowVersion MinRowVersion; + TRowVersion MaxRowVersion; + size_t AddedBlobs = 0; + }; + + std::optional<TRollbackState> RollbackState; + + private: + struct TUndoOpUpdateCommitted { + ui64 TxId; + TRowVersion Value; + }; + struct TUndoOpEraseCommitted { + ui64 TxId; + }; + struct TUndoOpInsertRemoved { + ui64 TxId; + }; + struct TUndoOpEraseRemoved { + ui64 TxId; + }; + struct TUndoOpUpdateTxIdStats { + ui64 TxId; + TTxIdStat Value; + }; + struct TUndoOpEraseTxIdStats { + ui64 TxId; + }; + + using TUndoOp = std::variant< + TUndoOpUpdateCommitted, + TUndoOpEraseCommitted, + TUndoOpInsertRemoved, + TUndoOpEraseRemoved, + TUndoOpUpdateTxIdStats, + TUndoOpEraseTxIdStats>; + + // This buffer is applied in reverse on rollback + // Memory is reused to avoid hot path allocations + std::vector<TUndoOp> UndoBuffer; + + private: // Temporary buffers to avoid hot path allocations using TTagWithPos = std::pair<TTag, ui32>; TSmallVec<TTagWithPos> ScratchUpdateTags; diff --git a/ydb/core/tablet_flat/flat_redo_writer.h b/ydb/core/tablet_flat/flat_redo_writer.h index e768179171..8ec6427dd5 100644 --- a/ydb/core/tablet_flat/flat_redo_writer.h +++ b/ydb/core/tablet_flat/flat_redo_writer.h @@ -50,21 +50,14 @@ namespace NRedo { class TWriter { private: - struct TSizeInfo { - ui32 Size; - IAnnex::TLimit Limit; + struct TOutputMark { }; public: using TOpsRef = TArrayRef<const TUpdateOp>; - using IOut = IOutputStream; + using IOut = TOutputMark; - TWriter(IAnnex *annex = nullptr, ui32 edge = 1024) - : Edge(edge) - , Annex(annex) - { - - } + TWriter() = default; explicit operator bool() const noexcept { @@ -76,23 +69,21 @@ namespace NRedo { return TotalSize; } - TList<TString> Unwrap() noexcept - { - return TotalSize = 0, std::move(Events); - } - TWriter& EvBegin(ui32 tail, ui32 head, ui64 serial, ui64 stamp) { Y_VERIFY(tail <= head, "Invalid ABI/API evolution span"); Y_VERIFY(serial > 0, "Serial of EvBegin starts with 1"); - const ui32 size = sizeof(TEvBegin_v1); + const ui32 size = sizeof(TEvBegin_v1); TEvBegin_v1 ev{ { ERedo::Begin, 0, 0x8000, size }, tail, head, serial, stamp }; - void* evBegin = &ev; - return Push(TString(NUtil::NBin::ToByte(evBegin), size), size); + auto out = Begin(size); + + Write(out, &ev, sizeof(ev)); + + return Flush(size); } TWriter& EvFlush(ui32 table, ui64 stamp, TEpoch epoch) @@ -101,8 +92,12 @@ namespace NRedo { TEvFlush ev{ { ERedo::Flush, 0, 0x8000, size }, table, 0, stamp, epoch.ToRedoLog() }; - void* evBegin = &ev; - return Push(TString(NUtil::NBin::ToByte(evBegin), size), size); + + auto out = Begin(size); + + Write(out, &ev, sizeof(ev)); + + return Flush(size); } TWriter& EvAnnex(TArrayRef<const NPageCollection::TGlobId> blobs) @@ -111,7 +106,7 @@ namespace NRedo { Y_VERIFY(blobs.size() <= Max<ui32>(), "Too large blobs catalog"); - const ui32 size = sizeof(TEvAnnex) + SizeOf(blobs); + const ui32 size = sizeof(TEvAnnex) + SizeOf(blobs); TEvAnnex ev{ { ERedo::Annex, 0, 0x8000, size }, ui32(blobs.size()) }; @@ -120,11 +115,11 @@ namespace NRedo { Write(out, &ev, sizeof(ev)); Write(out, static_cast<const void*>(blobs.begin()), SizeOf(blobs)); - return Push(std::move(out.Str()), size); + return Flush(size); } template<class TCallback> - TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, ERedo tag, ui32 tailSize, TCallback&& tailCallback, bool isDelta = false) + TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, ERedo tag, ui32 tailSize, TCallback&& tailCallback) { if (TCellOp::HaveNoOps(rop) && ops) { Y_FAIL("Given ERowOp cannot have update operations"); @@ -132,9 +127,7 @@ namespace NRedo { Y_FAIL("Too large key or too many operations in one ops"); } - const auto sizeInfo = CalcSize(key, ops, table, isDelta); - - const ui32 size = sizeof(TEvUpdate) + tailSize + sizeInfo.Size; + const ui32 size = sizeof(TEvUpdate) + tailSize + CalcSize(key, ops); auto out = Begin(size); TEvUpdate ev{ { tag, 0, 0x8000, size }, @@ -145,9 +138,9 @@ namespace NRedo { tailCallback(out); Write(out, key); - Write(out, ops, table, sizeInfo.Limit); + Write(out, ops); - return Push(std::move(out.Str()), size); + return Flush(size); } TWriter& EvUpdate(ui32 table, ERowOp rop, TRawVals key, TOpsRef ops, TRowVersion rowVersion) @@ -170,8 +163,7 @@ namespace NRedo { [&](auto& out) { TEvUpdateTx tail{ txId }; Write(out, &tail, sizeof(tail)); - }, - /* isDelta */ true); + }); } TWriter& EvRemoveTx(ui32 table, ui64 txId) @@ -181,8 +173,11 @@ namespace NRedo { TEvRemoveTx ev{ { ERedo::RemoveTx, 0, 0x8000, size }, table, 0, txId }; - void* evBegin = &ev; - return Push(TString(NUtil::NBin::ToByte(evBegin), size), size); + auto out = Begin(size); + + Write(out, &ev, sizeof(ev)); + + return Flush(size); } TWriter& EvCommitTx(ui32 table, ui64 txId, TRowVersion rowVersion) @@ -192,97 +187,74 @@ namespace NRedo { TEvCommitTx ev{ { ERedo::CommitTx, 0, 0x8000, size }, table, 0, txId, rowVersion.Step, rowVersion.TxId }; - void* evBegin = &ev; - return Push(TString(NUtil::NBin::ToByte(evBegin), size), size); + auto out = Begin(size); + + Write(out, &ev, sizeof(ev)); + + return Flush(size); } - TWriter& Join(TWriter &log) + TWriter& Join(TWriter &&log) { TotalSize += std::exchange(log.TotalSize, 0); - Events.splice(Events.end(), log.Events); + Events.append(log.Events); + log.Events.clear(); return *this; } - TString Dump() const noexcept + TString Finish() && noexcept { - auto out = Begin(TotalSize); - - for (auto one : Events) - out.Write(one); - - Y_VERIFY(out.Str().size() == TotalSize); + TString events; + events.swap(Events); - NSan::CheckMemIsInitialized(out.Str().data(), out.Str().size()); + NSan::CheckMemIsInitialized(events.data(), events.size()); - return std::move(out.Str()); + TotalSize = 0; + return events; } private: - static TStringStream Begin(size_t bytes) + TOutputMark Begin(size_t bytes) { - TStringStream out; + if ((Events.capacity() - Events.size()) < bytes) { + Events.reserve(FastClp2(Events.size() + bytes)); + } - return out.Reserve(bytes), out; + return TOutputMark{}; } - TWriter& Push(TString buf, size_t bytes) + TWriter& Flush(size_t bytes) { - Y_VERIFY(buf.size() == bytes, "Got an invalid redo entry buffer"); - - TotalSize += buf.size(); + TotalSize += bytes; - Events.emplace_back(std::move(buf)); + Y_VERIFY(Events.size() == TotalSize, "Got an inconsistent redo entry size"); return *this; } - IAnnex::TLimit GetLimit(ui32 table, bool isDelta) const noexcept - { - IAnnex::TLimit limit; - // FIXME: we cannot handle blob references during scans, so we - // avoid creating large objects when they are in deltas - if (!isDelta && Annex && table != Max<ui32>()) { - limit = Annex->Limit(table); - } else { - limit = { Max<ui32>(), Max<ui32>() }; - } - limit.MinExternSize = Max(limit.MinExternSize, Edge); - return limit; - } - - TSizeInfo CalcSize(TRawVals key, TOpsRef ops, ui32 table, bool isDelta) const noexcept + ui32 CalcSize(TRawVals key) const noexcept { - auto limit = GetLimit(table, isDelta); - ui32 size = 0; - for (const auto &one : ops) { - /* hack for annex blobs, its replaced by 4-byte reference */ - auto valueSize = one.Value.Size(); - if (Annex && table != Max<ui32>() && limit.IsExtern(valueSize)) { - valueSize = 4; - } - - size += sizeof(TUpdate) + valueSize; - } - - size += CalcSize(key); + for (const auto &one: key) + size += sizeof(TValue) + one.Size(); - return TSizeInfo{ size, limit }; + return size; } - ui32 CalcSize(TRawVals key) const noexcept + ui32 CalcSize(TRawVals key, TOpsRef ops) const noexcept { - ui32 size = 0; + ui32 size = CalcSize(key); - for (const auto &one: key) - size += sizeof(TValue) + one.Size(); + for (const auto &one : ops) { + size += sizeof(TUpdate) + one.Value.Size(); + } return size; } - void Write(IOut &out, TRawVals key) const noexcept + void Write(IOut &out, TRawVals key) noexcept { for (const auto &one : key) { /* The only way of converting nulls in keys now */ @@ -296,45 +268,30 @@ namespace NRedo { } } - void Write(IOut &out, TOpsRef ops, ui32 table, const IAnnex::TLimit &limit) const noexcept + void Write(IOut &out, TOpsRef ops) noexcept { for (const auto &one: ops) { /* Log enty cannot represent this ECellOp types with payload */ Y_VERIFY(!(one.Value && TCellOp::HaveNoPayload(one.Op))); - /* Log entry cannot recover nulls written as ECellOp::Set with - null cell. Nulls was encoded with hacky TypeId = 0, but - the correct way is to use ECellOp::Null. - */ - - const ui16 type = one.Value ? one.Value.Type() : 0; - - auto cellOp = one.NormalizedCellOp(); + const ui16 type = one.Value.Type(); - if (cellOp != ELargeObj::Inline) { - Y_FAIL("User supplied cell value has an invalid ECellOp"); - } else if (auto got = Place(table, limit, one.Tag, one.AsRef())) { - const auto payload = NUtil::NBin::ToRef(got.Ref); - - Write(out, cellOp = ELargeObj::Extern, one.Tag, type, payload); - - } else { - Write(out, cellOp, one.Tag, type, one.Value.AsRef()); + if (one.Value.IsEmpty()) { + // Log entry cannot recover null value type, since we + // store null values using a special 0 type id. + Y_VERIFY(type == 0, "Cannot write typed null values"); + // Null value cannot have ECellOp::Set as its op, since we + // don't have the necessary type id, instead we expect + // it to be either ECellOp::Null or ECellOp::Reset. + Y_VERIFY(one.Op != ECellOp::Set, "Cannot write ECellOp::Set with a null value"); } - } - } - IAnnex::TResult Place(ui32 table, const IAnnex::TLimit &limit, TTag tag, TArrayRef<const char> raw) const noexcept - { - if (Annex && table != Max<ui32>() && limit.IsExtern(raw.size())) { - return Annex->Place(table, tag, raw); - } else { - return { }; + Write(out, one.Op, one.Tag, type, one.Value.AsRef()); } } - static void Write(IOut &out, TCellOp cellOp, TTag tag, ui16 type, TArrayRef<const char> raw) + void Write(IOut &out, TCellOp cellOp, TTag tag, ui16 type, TArrayRef<const char> raw) { TUpdate up = { cellOp, tag, { type , ui32(raw.size()) } }; @@ -342,18 +299,15 @@ namespace NRedo { Write(out, raw.data(), raw.size()); } - static void Write(IOut &out, const void *ptr, size_t size) + void Write(IOut &, const void *ptr, size_t size) { NSan::CheckMemIsInitialized(ptr, size); - - out.Write(ptr, size); + Events.append(reinterpret_cast<const char*>(ptr), size); } private: - const ui32 Edge = 1024; - IAnnex * const Annex = nullptr; size_t TotalSize = 0; - TList<TString> Events; + TString Events; }; } diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index d7cc19e635..ca3f4de632 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -20,6 +20,131 @@ TTable::TTable(TEpoch epoch) : Epoch(epoch) { } TTable::~TTable() { } +void TTable::PrepareRollback() +{ + Y_VERIFY(!RollbackState); + auto& state = RollbackState.emplace(Epoch); + state.Annexed = Annexed; + state.Scheme = Scheme; + state.EraseCacheEnabled = EraseCacheEnabled; + state.EraseCacheConfig = EraseCacheConfig; + state.MutableExisted = bool(Mutable); + state.MutableUpdated = false; +} + +void TTable::RollbackChanges() +{ + Y_VERIFY(RollbackState, "PrepareRollback needed to rollback changes"); + auto& state = *RollbackState; + + while (!RollbackOps.empty()) { + struct TApplyRollbackOp { + TTable* Self; + + void operator()(const TRollbackRemoveOpenTx& op) const { + Self->OpenTransactions.erase(op.TxId); + } + + void operator()(const TRollbackRemoveOpenTxMem& op) const { + Self->OpenTransactions[op.TxId].Mem.erase(op.Mem); + } + + void operator()(const TRollbackAddCommittedTx& op) const { + Self->CommittedTransactions.Add(op.TxId, op.RowVersion); + } + + void operator()(const TRollbackRemoveCommittedTx& op) const { + Self->CommittedTransactions.Remove(op.TxId); + } + + void operator()(const TRollbackAddRemovedTx& op) const { + Self->RemovedTransactions.Add(op.TxId); + } + + void operator()(const TRollbackRemoveRemovedTx& op) const { + Self->RemovedTransactions.Remove(op.TxId); + } + }; + + std::visit(TApplyRollbackOp{ this }, RollbackOps.back()); + RollbackOps.pop_back(); + } + + if (Epoch != state.Epoch) { + // We performed a snapshot, roll it back + if (Mutable) { + ErasedKeysCache.Reset(); + Mutable = nullptr; + } + Y_VERIFY(MutableBackup, "Previous mem table missing"); + Mutable = std::move(MutableBackup); + } else if (!state.MutableExisted) { + // New memtable doesn't need rollback + if (Mutable) { + ErasedKeysCache.Reset(); + Mutable = nullptr; + } + } else if (state.MutableUpdated) { + ErasedKeysCache.Reset(); + Y_VERIFY(Mutable, "Mutable was updated, but it is missing"); + Mutable->RollbackChanges(); + } + Y_VERIFY(!MutableBackup); + + Epoch = state.Epoch; + Annexed = state.Annexed; + if (state.Scheme) { + Levels.Reset(); + ErasedKeysCache.Reset(); + Scheme = std::move(state.Scheme); + EraseCacheEnabled = state.EraseCacheEnabled; + EraseCacheConfig = state.EraseCacheConfig; + } + RollbackState.reset(); +} + +void TTable::CommitChanges(TArrayRef<const TMemGlob> blobs) +{ + Y_VERIFY(RollbackState, "PrepareRollback needed to rollback changes"); + auto& state = *RollbackState; + + RollbackOps.clear(); + + if (Epoch != state.Epoch) { + if (Mutable && blobs) { + Mutable->CommitBlobs(blobs); + } + // We performed a snapshot, move it to Frozen + Y_VERIFY(MutableBackup, "Mem table snaphot missing"); + Frozen.insert(MutableBackup); + Stat_.FrozenWaste += MutableBackup->GetWastedMem(); + Stat_.FrozenSize += MutableBackup->GetUsedMem(); + Stat_.FrozenOps += MutableBackup->GetOpsCount(); + Stat_.FrozenRows += MutableBackup->GetRowCount(); + MutableBackup = nullptr; + } else if (!state.MutableExisted) { + // Fresh mem table is not prepared for rollback + if (Mutable && blobs) { + Mutable->CommitBlobs(blobs); + } + } else if (state.MutableUpdated) { + Y_VERIFY(Mutable, "Mutable was updated, but it is missing"); + Mutable->CommitChanges(blobs); + } + Y_VERIFY(!MutableBackup); + + RollbackState.reset(); +} + +void TTable::CommitNewTable(TArrayRef<const TMemGlob> blobs) +{ + Y_VERIFY(!RollbackState, "CommitBlobs must only be used for new tables without rollback"); + + if (Mutable && blobs) { + Mutable->CommitBlobs(blobs); + } +} + void TTable::SetScheme(const TScheme::TTableInfo &table) { Snapshot(); @@ -29,6 +154,12 @@ void TTable::SetScheme(const TScheme::TTableInfo &table) Y_VERIFY(!Mutable && table.Columns); + if (RollbackState && !RollbackState->Scheme) { + RollbackState->Scheme = Scheme; + RollbackState->EraseCacheEnabled = EraseCacheEnabled; + RollbackState->EraseCacheConfig = EraseCacheConfig; + } + auto to = TRowScheme::Make(table.Columns, NUtil::TSecond()); if (auto was = std::exchange(Scheme, to)) @@ -66,6 +197,9 @@ TAutoPtr<TSubset> TTable::Subset(TArrayRef<const TLogoBlobID> bundle, TEpoch hea subset->Frozen.emplace_back(x, x->Immediate()); } } + if (MutableBackup && MutableBackup->Epoch < head) { + subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate()); + } for (const auto &pr : TxStatus) { if (pr.second->Epoch < head) { subset->TxStatus.emplace_back(pr.second); @@ -114,6 +248,10 @@ TAutoPtr<TSubset> TTable::Subset(TEpoch head) const noexcept if (it->Epoch < head) subset->Frozen.emplace_back(it, it->Immediate()); + if (MutableBackup && MutableBackup->Epoch < head) { + subset->Frozen.emplace_back(MutableBackup, MutableBackup->Immediate()); + } + // This method is normally used when we want to take some state snapshot // However it can still theoretically be used for iteration or compaction subset->CommittedTransactions = CommittedTransactions; @@ -141,6 +279,13 @@ bool TTable::HasBorrowed(ui64 selfTabletId) const noexcept TAutoPtr<TSubset> TTable::ScanSnapshot(TRowVersion snapshot) noexcept { + if (RollbackState) { + Y_VERIFY(Epoch == RollbackState->Epoch && + RollbackState->MutableExisted == bool(Mutable) && + !RollbackState->MutableUpdated, + "Cannot take scan snapshot of a modified table"); + } + TAutoPtr<TSubset> subset = new TSubset(Epoch, Scheme); // TODO: we could filter LSM by the provided snapshot version, but it @@ -198,6 +343,8 @@ TBundleSlicesMap TTable::LookupSlices(TArrayRef<const TLogoBlobID> bundles) cons void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + for (auto &kv : slices) { auto it = Flatten.find(kv.first); Y_VERIFY(it != Flatten.end(), "Got an unknown TPart in ReplaceSlices"); @@ -213,6 +360,8 @@ void TTable::ReplaceSlices(TBundleSlicesMap slices) noexcept void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + for (const auto &partView : partViews) { Y_VERIFY(partView, "Replace(...) shouldn't get empty parts"); Y_VERIFY(!partView.Screen, "Replace(...) shouldn't get screened parts"); @@ -341,6 +490,8 @@ void TTable::Replace(TArrayRef<const TPartView> partViews, const TSubset &subset void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> newTxStatus, const TSubset &subset) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + for (auto &part : subset.TxStatus) { Y_VERIFY(part, "Unexpected empty TTxStatusPart in TSubset"); @@ -367,6 +518,8 @@ void TTable::ReplaceTxStatus(TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>> void TTable::Merge(TPartView partView) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + Y_VERIFY(partView, "Merge(...) shouldn't get empty part"); Y_VERIFY(partView.Slices, "Merge(...) shouldn't get parts without slices"); @@ -397,6 +550,8 @@ void TTable::Merge(TPartView partView) noexcept void TTable::Merge(TIntrusiveConstPtr<TColdPart> part) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + Y_VERIFY(part, "Merge(...) shouldn't get empty parts"); if (Mutable && part->Epoch >= Mutable->Epoch) { @@ -424,6 +579,8 @@ void TTable::Merge(TIntrusiveConstPtr<TColdPart> part) noexcept void TTable::Merge(TIntrusiveConstPtr<TTxStatusPart> txStatus) noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + Y_VERIFY(txStatus, "Unexpected empty TTxStatusPart"); for (auto& item : txStatus->TxStatusPage->GetCommittedItems()) { @@ -517,6 +674,8 @@ ui64 TTable::GetSearchHeight() const noexcept TVector<TIntrusiveConstPtr<TMemTable>> TTable::GetMemTables() const noexcept { + Y_VERIFY(!RollbackState, "Cannot perform this in a transaction"); + TVector<TIntrusiveConstPtr<TMemTable>> vec(Frozen.begin(), Frozen.end()); if (Mutable) @@ -530,11 +689,21 @@ TEpoch TTable::Snapshot() noexcept if (Mutable) { Annexed = Mutable->GetBlobs()->Tail(); - Frozen.insert(Mutable); - Stat_.FrozenWaste += Mutable->GetWastedMem(); - Stat_.FrozenSize += Mutable->GetUsedMem(); - Stat_.FrozenOps += Mutable->GetOpsCount(); - Stat_.FrozenRows += Mutable->GetRowCount(); + if (RollbackState) { + Y_VERIFY( + RollbackState->Epoch == Mutable->Epoch && + RollbackState->MutableExisted && + !RollbackState->MutableUpdated, + "Cannot snapshot a modified table"); + Y_VERIFY(!MutableBackup, "Another mutable backup already exists"); + MutableBackup = std::move(Mutable); + } else { + Frozen.insert(Mutable); + Stat_.FrozenWaste += Mutable->GetWastedMem(); + Stat_.FrozenSize += Mutable->GetUsedMem(); + Stat_.FrozenOps += Mutable->GetOpsCount(); + Stat_.FrozenRows += Mutable->GetRowCount(); + } Mutable = nullptr; /* have to make new TMemTable on next update */ @@ -637,7 +806,7 @@ EReady TTable::Precharge(TRawVals minKey_, TRawVals maxKey_, TTagsRef tags, return ready ? EReady::Data : EReady::Page; } -void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, TRowVersion rowVersion) +void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, TRowVersion rowVersion) { Y_VERIFY(!(ops && TCellOp::HaveNoOps(rop)), "Given ERowOp can't have ops"); @@ -652,12 +821,38 @@ void TTable::Update(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> a MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions); } -void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<TMemGlob> apart, ui64 txId) +TTable::TOpenTransaction& TTable::AddOpenTransaction(ui64 txId) +{ + TOpenTransactions::insert_ctx ctx = nullptr; + TOpenTransactions::iterator it = OpenTransactions.find(txId, ctx); + + if (it == OpenTransactions.end()) { + if (RollbackState) { + RollbackOps.emplace_back(TRollbackRemoveOpenTx{ txId }); + } + + it = OpenTransactions.emplace_direct( + ctx, + std::piecewise_construct, + std::forward_as_tuple(txId), + std::forward_as_tuple()); + } + + return it->second; +} + +void TTable::UpdateTx(ERowOp rop, TRawVals key, TOpsRef ops, TArrayRef<const TMemGlob> apart, ui64 txId) { // Use a special row version that marks this update as uncommitted TRowVersion rowVersion(Max<ui64>(), txId); MemTable().Update(rop, key, ops, apart, rowVersion, CommittedTransactions); - OpenTransactions[txId].Mem.insert(Mutable); + + Y_VERIFY_DEBUG(Mutable->GetTxIdStats().contains(txId)); + + auto& openTx = AddOpenTransaction(txId); + if (openTx.Mem.insert(Mutable).second && RollbackState) { + RollbackOps.emplace_back(TRollbackRemoveOpenTxMem{ txId, Mutable }); + } } void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) @@ -668,8 +863,18 @@ void TTable::CommitTx(ui64 txId, TRowVersion rowVersion) // Note: it is possible to have multiple CommitTx for the same TxId but at // different row versions. The commit with the minimum row version wins. if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev) || *prev > rowVersion) { + if (RollbackState) { + if (prev) { + RollbackOps.emplace_back(TRollbackAddCommittedTx{ txId, *prev }); + } else { + RollbackOps.emplace_back(TRollbackRemoveCommittedTx{ txId }); + } + } CommittedTransactions.Add(txId, rowVersion); if (!prev) { + if (RollbackState && RemovedTransactions.Contains(txId)) { + RollbackOps.emplace_back(TRollbackAddRemovedTx{ txId }); + } RemovedTransactions.Remove(txId); } } @@ -687,6 +892,9 @@ void TTable::RemoveTx(ui64 txId) // due to complicated split/merge shard interactions. The commit actually // wins over removes in all cases. if (const auto* prev = CommittedTransactions.Find(txId); Y_LIKELY(!prev)) { + if (RollbackState && !RemovedTransactions.Contains(txId)) { + RollbackOps.emplace_back(TRollbackRemoveRemovedTx{ txId }); + } RemovedTransactions.Add(txId); } } @@ -700,10 +908,28 @@ bool TTable::HasOpenTx(ui64 txId) const return false; } +bool TTable::HasCommittedTx(ui64 txId) const +{ + return CommittedTransactions.Find(txId); +} + +bool TTable::HasRemovedTx(ui64 txId) const +{ + return RemovedTransactions.Contains(txId); +} + TMemTable& TTable::MemTable() { - return - *(Mutable ? Mutable : (Mutable = new TMemTable(Scheme, Epoch, Annexed))); + if (!Mutable) { + Mutable = new TMemTable(Scheme, Epoch, Annexed); + } + if (RollbackState && Epoch == RollbackState->Epoch && RollbackState->MutableExisted) { + if (!RollbackState->MutableUpdated) { + RollbackState->MutableUpdated = true; + Mutable->PrepareRollback(); + } + } + return *Mutable; } TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ESeek seek, @@ -721,7 +947,11 @@ TAutoPtr<TTableIt> TTable::Iterate(TRawVals key_, TTagsRef tags, IPages* env, ES observer)); if (Mutable) { - dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward)); + dbIter->Push(TMemIt::Make(*Mutable, Mutable->Snapshot(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward)); + } + + if (MutableBackup) { + dbIter->Push(TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Forward)); } for (auto& fti : Frozen) { @@ -764,7 +994,11 @@ TAutoPtr<TTableReverseIt> TTable::IterateReverse(TRawVals key_, TTagsRef tags, I observer)); if (Mutable) { - dbIter->Push(TMemIt::Make(*Mutable, Mutable->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse)); + dbIter->Push(TMemIt::Make(*Mutable, Mutable->Snapshot(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse)); + } + + if (MutableBackup) { + dbIter->Push(TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, seek, Scheme->Keys, &dbIter->Remap, env, EDirection::Reverse)); } for (auto& fti : Frozen) { @@ -832,6 +1066,18 @@ EReady TTable::Select(TRawVals key_, TTagsRef tags, IPages* env, TRowState& row, } } + // Mutable data that is transitioning to frozen + if (MutableBackup && !row.IsFinalized()) { + lastEpoch = MutableBackup->Epoch; + if (auto it = TMemIt::Make(*MutableBackup, MutableBackup->Immediate(), key, ESeek::Exact, Scheme->Keys, &remap, env, EDirection::Forward)) { + if (it->IsValid() && (snapshotFound || it->SkipToRowVersion(snapshot, stats, committed, observer))) { + // N.B. stop looking for snapshot after the first hit + snapshotFound = true; + it->Apply(row, committed); + } + } + } + // Frozen are sorted by epoch, apply in reverse order for (auto pos = Frozen.rbegin(); !row.IsFinalized() && pos != Frozen.rend(); ++pos) { const auto& memTable = *pos; @@ -926,6 +1172,8 @@ void TTable::DebugDump(IOutputStream& str, IPages* env, const NScheme::TTypeRegi if (Mutable) Mutable->DebugDump(str, reg); + if (MutableBackup) + MutableBackup->DebugDump(str, reg); for (const auto& it : Frozen) { str << "Frozen " << it->Epoch << " dump: " << Endl; it->DebugDump(str, reg); diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index 36493d77d9..14f0ebf262 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -65,6 +65,11 @@ public: explicit TTable(TEpoch); ~TTable(); + void PrepareRollback(); + void RollbackChanges(); + void CommitChanges(TArrayRef<const TMemGlob> blobs); + void CommitNewTable(TArrayRef<const TMemGlob> blobs); + void SetScheme(const TScheme::TTableInfo& tableScheme); TIntrusiveConstPtr<TRowScheme> GetScheme() const noexcept; @@ -148,9 +153,9 @@ public: ui64 itemsLimit, ui64 bytesLimit, EDirection direction, TRowVersion snapshot, TSelectStats& stats) const; - void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, TRowVersion rowVersion); + void Update(ERowOp, TRawVals key, TOpsRef, TArrayRef<const TMemGlob> apart, TRowVersion rowVersion); - void UpdateTx(ERowOp, TRawVals key, TOpsRef, TArrayRef<TMemGlob> apart, ui64 txId); + void UpdateTx(ERowOp, TRawVals key, TOpsRef, TArrayRef<const TMemGlob> apart, ui64 txId); void CommitTx(ui64 txId, TRowVersion rowVersion); void RemoveTx(ui64 txId); @@ -158,6 +163,8 @@ public: * Returns true when table has an open transaction that is not committed or removed yet */ bool HasOpenTx(ui64 txId) const; + bool HasCommittedTx(ui64 txId) const; + bool HasRemovedTx(ui64 txId) const; TPartView GetPartView(const TLogoBlobID &bundle) const { @@ -217,7 +224,9 @@ public: ui64 GetMemSize(TEpoch epoch = TEpoch::Max()) const noexcept { if (Y_LIKELY(epoch == TEpoch::Max())) { - return Stat_.FrozenSize + (Mutable ? Mutable->GetUsedMem() : 0); + return Stat_.FrozenSize + + (Mutable ? Mutable->GetUsedMem() : 0) + + (MutableBackup ? MutableBackup->GetUsedMem() : 0); } ui64 size = 0; @@ -228,6 +237,10 @@ public: } } + if (MutableBackup && MutableBackup->Epoch < epoch) { + size += MutableBackup->GetUsedMem(); + } + if (Mutable && Mutable->Epoch < epoch) { size += Mutable->GetUsedMem(); } @@ -237,17 +250,23 @@ public: ui64 GetMemWaste() const noexcept { - return Stat_.FrozenWaste + (Mutable ? Mutable->GetWastedMem() : 0); + return Stat_.FrozenWaste + + (Mutable ? Mutable->GetWastedMem() : 0) + + (MutableBackup ? MutableBackup->GetWastedMem() : 0); } ui64 GetMemRowCount() const noexcept { - return Stat_.FrozenRows + (Mutable ? Mutable->GetRowCount() : 0); + return Stat_.FrozenRows + + (Mutable ? Mutable->GetRowCount() : 0) + + (MutableBackup ? MutableBackup->GetRowCount() : 0); } ui64 GetOpsCount() const noexcept { - return Stat_.FrozenOps + (Mutable ? Mutable->GetOpsCount() : 0); + return Stat_.FrozenOps + + (Mutable ? Mutable->GetOpsCount() : 0) + + (MutableBackup ? MutableBackup->GetOpsCount() : 0); } ui64 GetPartsCount() const noexcept @@ -257,8 +276,12 @@ public: ui64 EstimateRowSize() const noexcept { - ui64 size = Stat_.FrozenSize + (Mutable ? Mutable->GetUsedMem() : 0); - ui64 rows = Stat_.FrozenRows + (Mutable ? Mutable->GetRowCount() : 0); + ui64 size = Stat_.FrozenSize + + (Mutable ? Mutable->GetUsedMem() : 0) + + (MutableBackup ? MutableBackup->GetUsedMem() : 0); + ui64 rows = Stat_.FrozenRows + + (Mutable ? Mutable->GetRowCount() : 0) + + (MutableBackup ? MutableBackup->GetRowCount() : 0); for (const auto& flat : Flatten) { if (const TPartView &partView = flat.second) { @@ -297,6 +320,10 @@ private: THashSet<TIntrusiveConstPtr<TPart>> Parts; }; + using TOpenTransactions = THashMap<ui64, TOpenTransaction>; + + TOpenTransaction& AddOpenTransaction(ui64 txId); + private: TEpoch Epoch; /* Monotonic table change number, with holes */ ui64 Annexed = 0; /* Monotonic serial of attached external blobs */ @@ -317,9 +344,62 @@ private: TRowVersionRanges RemovedRowVersions; THashSet<ui64> CheckTransactions; - THashMap<ui64, TOpenTransaction> OpenTransactions; + TOpenTransactions OpenTransactions; TTransactionMap CommittedTransactions; TTransactionSet RemovedTransactions; + +private: + struct TRollbackRemoveOpenTx { + ui64 TxId; + }; + + struct TRollbackRemoveOpenTxMem { + ui64 TxId; + TIntrusiveConstPtr<TMemTable> Mem; + }; + + struct TRollbackAddCommittedTx { + ui64 TxId; + TRowVersion RowVersion; + }; + + struct TRollbackRemoveCommittedTx { + ui64 TxId; + }; + + struct TRollbackAddRemovedTx { + ui64 TxId; + }; + + struct TRollbackRemoveRemovedTx { + ui64 TxId; + }; + + using TRollbackOp = std::variant< + TRollbackRemoveOpenTx, + TRollbackRemoveOpenTxMem, + TRollbackAddCommittedTx, + TRollbackRemoveCommittedTx, + TRollbackAddRemovedTx, + TRollbackRemoveRemovedTx>; + + struct TRollbackState { + TEpoch Epoch; + TIntrusiveConstPtr<TRowScheme> Scheme; + ui64 Annexed; + TKeyRangeCacheConfig EraseCacheConfig; + bool EraseCacheEnabled; + bool MutableExisted; + bool MutableUpdated; + + TRollbackState(TEpoch epoch) + : Epoch(epoch) + { } + }; + + std::optional<TRollbackState> RollbackState; + std::vector<TRollbackOp> RollbackOps; + TIntrusivePtr<TMemTable> MutableBackup; }; } diff --git a/ydb/core/tablet_flat/test/libs/table/test_dbase.h b/ydb/core/tablet_flat/test/libs/table/test_dbase.h index bb846d1057..07cb17101a 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_dbase.h +++ b/ydb/core/tablet_flat/test/libs/table/test_dbase.h @@ -82,6 +82,15 @@ namespace NTest { TDbExec& Reject() { return DoCommit(true, false); } TDbExec& Relax() { return DoCommit(false, true); } + TDbExec& Cleanup() { + if (OnTx == EOnTx::Auto) { + DoCommit(false, false); + } + + Y_VERIFY(OnTx == EOnTx::None); + return *this; + } + TDbExec& ReadVer(TRowVersion readVersion) { DoBegin(false); @@ -90,10 +99,44 @@ namespace NTest { return *this; } + TDbExec& ReadTx(ui64 txId) { + DoBegin(false); + + ReadTxId = txId; + + return *this; + } + TDbExec& WriteVer(TRowVersion writeVersion) { Y_VERIFY(OnTx != EOnTx::None); WriteVersion = writeVersion; + WriteTxId = 0; + + return *this; + } + + TDbExec& WriteTx(ui64 txId) { + Y_VERIFY(OnTx != EOnTx::None); + + WriteVersion = TRowVersion::Min(); + WriteTxId = txId; + + return *this; + } + + TDbExec& CommitTx(ui32 table, ui64 txId) { + Y_VERIFY(OnTx != EOnTx::None); + + Base->CommitTx(table, txId, WriteVersion); + + return *this; + } + + TDbExec& RemoveTx(ui32 table, ui64 txId) { + Y_VERIFY(OnTx != EOnTx::None); + + Base->RemoveTx(table, txId); return *this; } @@ -103,7 +146,11 @@ namespace NTest { const NTest::TRowTool tool(RowSchemeFor(table)); auto pair = tool.Split(row, true, rop != ERowOp::Erase); - Base->Update(table, rop, pair.Key, pair.Ops, WriteVersion); + if (WriteTxId != 0) { + Base->UpdateTx(table, rop, pair.Key, pair.Ops, WriteTxId); + } else { + Base->Update(table, rop, pair.Key, pair.Ops, WriteVersion); + } return *this; } @@ -119,6 +166,12 @@ namespace NTest { return Add(table, row, ERowOp::Upsert); } + template<typename ...TArgs> + inline TDbExec& PutN(ui32 table, TArgs&&... args) + { + return Put(table, *SchemedCookRow(table).Col(std::forward<TArgs>(args)...)); + } + TDbExec& Apply(const TSchemeChanges &delta) { Last = Max<ui32>(), Altered = true; @@ -126,9 +179,9 @@ namespace NTest { return Base->Alter().Merge(delta), *this; } - TDbExec& Snapshot(ui32 table) + TEpoch Snapshot(ui32 table) { - return Base->TxSnapTable(table), *this; + return Base->TxSnapTable(table); } NTest::TSchemedCookRow SchemedCookRow(ui32 table) noexcept @@ -140,7 +193,7 @@ namespace NTest { { DoBegin(false), RowSchemeFor(table); - TCheckIter check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion }; + TCheckIter check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion, ReadTxId }; return check.To(CurrentStep()), check; } @@ -149,7 +202,7 @@ namespace NTest { { DoBegin(false), RowSchemeFor(table); - TCheckIter check{ *Base, { nullptr, 0, true }, table, Scheme, ReadVersion, ENext::Data }; + TCheckIter check{ *Base, { nullptr, 0, true }, table, Scheme, ReadVersion, ReadTxId, ENext::Data }; return check.To(CurrentStep()), check; } @@ -158,13 +211,15 @@ namespace NTest { { DoBegin(false), RowSchemeFor(table); - TCheckSelect check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion }; + TCheckSelect check{ *Base, { nullptr, 0, erased }, table, Scheme, ReadVersion, ReadTxId }; return check.To(CurrentStep()), check; } TDbExec& Snap(ui32 table) { + Cleanup(); + const auto scn = Base->Head().Serial + 1; RedoLog.emplace_back(new TChange({ Gen, ++Step }, scn)); @@ -225,6 +280,8 @@ namespace NTest { { Y_VERIFY(OnTx != EOnTx::Real, "Commit TX before replaying"); + Cleanup(); + const ui64 serial = Base->Head().Serial; Birth(), Base = nullptr, OnTx = EOnTx::None; @@ -275,17 +332,10 @@ namespace NTest { const TRowScheme& RowSchemeFor(ui32 table) noexcept { - if (std::exchange(Last, table) == table) { - /* Safetly can use row scheme from cache */ - } else if (Altered) { - TScheme temp(Base->GetScheme()); - TSchemeModifier(temp).Apply(*Base->Alter()); - - const auto *info = temp.GetTableInfo(table); - - Scheme = TRowScheme::Make(info->Columns, NUtil::TSecond()); - } else { - Scheme = Base->Subset(table, { }, TEpoch::Zero())->Scheme; + if (std::exchange(Last, table) != table) { + // Note: it's ok if the table has been altered, since + // we should observe updated schema on all reads. + Scheme = Base->GetRowScheme(table); } return *Scheme; @@ -415,6 +465,8 @@ namespace NTest { ReadVersion = TRowVersion::Max(); WriteVersion = TRowVersion::Min(); + ReadTxId = 0; + WriteTxId = 0; return *this; } @@ -432,6 +484,8 @@ namespace NTest { TAutoPtr<TSteppedCookieAllocator> Annex; TRowVersion ReadVersion = TRowVersion::Max(); TRowVersion WriteVersion = TRowVersion::Min(); + ui64 ReadTxId = 0; + ui64 WriteTxId = 0; }; } diff --git a/ydb/core/tablet_flat/test/libs/table/test_iter.h b/ydb/core/tablet_flat/test/libs/table/test_iter.h index c274725613..c5e04a48a2 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_iter.h +++ b/ydb/core/tablet_flat/test/libs/table/test_iter.h @@ -120,6 +120,12 @@ namespace NTest { return erased ? Is(EReady::Gone) : Is(row, true, ERowOp::Erase); } + template<typename ...TArgs> + inline TChecker& NoKeyN(TArgs&&... args) + { + return NoKey(*TSchemedCookRow(Scheme).Col(std::forward<TArgs>(args)...)); + } + template<typename TListType> TChecker& IsTheSame(const TListType &list) { diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h b/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h index f800e51c25..6d89a40644 100644 --- a/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h +++ b/ydb/core/tablet_flat/test/libs/table/wrap_dbase.h @@ -11,11 +11,13 @@ namespace NTest { TWrapDbIterImpl(TDatabase &base, ui32 table, TIntrusiveConstPtr<TRowScheme> scheme, TRowVersion snapshot = TRowVersion::Max(), + ui64 readTxId = 0, ENext mode = ENext::All) : Scheme(std::move(scheme)) , Base(base) , Table(table) , Snapshot(snapshot) + , ReadTxId(readTxId) , Mode(mode) { @@ -68,7 +70,12 @@ namespace NTest { swap(range.MinInclusive, range.MaxInclusive); } - Iter = Base.IterateRangeGeneric<TIter>(Table, range, Scheme->Tags(), Snapshot); + ITransactionMapPtr txMap; + if (ReadTxId != 0 && Base.HasOpenTx(Table, ReadTxId)) { + txMap = new TSingleTransactionMap(ReadTxId, TRowVersion::Min()); + } + + Iter = Base.IterateRangeGeneric<TIter>(Table, range, Scheme->Tags(), Snapshot, txMap); return Iter->Next(Mode); } @@ -90,6 +97,7 @@ namespace NTest { private: const ui32 Table = Max<ui32>(); const TRowVersion Snapshot; + const ui64 ReadTxId; const ENext Mode; TAutoPtr<TIter> Iter; }; diff --git a/ydb/core/tablet_flat/test/libs/table/wrap_select.h b/ydb/core/tablet_flat/test/libs/table/wrap_select.h index 06069a8d01..ac91e59205 100644 --- a/ydb/core/tablet_flat/test/libs/table/wrap_select.h +++ b/ydb/core/tablet_flat/test/libs/table/wrap_select.h @@ -10,12 +10,14 @@ namespace NTest { struct TWrapDbSelect { TWrapDbSelect(TDatabase &base, ui32 table, TIntrusiveConstPtr<TRowScheme> scheme, - TRowVersion snapshot = TRowVersion::Max()) + TRowVersion snapshot = TRowVersion::Max(), + ui64 readTxId = 0) : Scheme(std::move(scheme)) , Remap_(TRemap::Full(*Scheme)) , Base(base) , Table(table) , Snapshot(snapshot) + , ReadTxId(readTxId) { } @@ -44,7 +46,12 @@ namespace NTest { { Y_VERIFY(seek == ESeek::Exact, "Db Select(...) is a point lookup"); - return (Ready = Base.Select(Table, key, Scheme->Tags(), State, /* readFlags */ 0, Snapshot)); + ITransactionMapPtr txMap; + if (ReadTxId != 0 && Base.HasOpenTx(Table, ReadTxId)) { + txMap = new TSingleTransactionMap(ReadTxId, TRowVersion::Min()); + } + + return (Ready = Base.Select(Table, key, Scheme->Tags(), State, /* readFlags */ 0, Snapshot, txMap)); } EReady Next() noexcept @@ -65,6 +72,7 @@ namespace NTest { private: const ui32 Table = Max<ui32>(); const TRowVersion Snapshot; + const ui64 ReadTxId; EReady Ready = EReady::Gone; TRowState State; }; diff --git a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt index f3f8405b23..de5d414efe 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.darwin.txt @@ -48,6 +48,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/flat_test_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/util_pool_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_self.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_iterator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_memtable.cpp diff --git a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt index 86d11475bd..846c4ab036 100644 --- a/ydb/core/tablet_flat/ut/CMakeLists.linux.txt +++ b/ydb/core/tablet_flat/ut/CMakeLists.linux.txt @@ -52,6 +52,7 @@ target_sources(ydb-core-tablet_flat-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/flat_table_part_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/flat_test_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/shared_handle_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/util_pool_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_self.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_iterator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tablet_flat/ut/ut_memtable.cpp diff --git a/ydb/core/tablet_flat/ut/ut_db_iface.cpp b/ydb/core/tablet_flat/ut/ut_db_iface.cpp index 1e2c439af2..2e408c25fd 100644 --- a/ydb/core/tablet_flat/ut/ut_db_iface.cpp +++ b/ydb/core/tablet_flat/ut/ut_db_iface.cpp @@ -306,11 +306,13 @@ Y_UNIT_TEST_SUITE(DBase) { me.To(30).Begin().Apply(*TAlter().DropTable(1)); me.To(31).Commit().Affects(0, { }); me.To(32).Begin().Add(2, row).Commit().Affects(0, { 2 }); - me.To(33).Begin().Snapshot(2).Commit().Affects(0, { 2 }); + auto epoch = me.To(33).Begin().Snapshot(2); + UNIT_ASSERT(epoch == TEpoch::FromIndex(2)); + me.Commit().Affects(0, { 2 }); me.To(34).Compact(2); UNIT_ASSERT(me->Counters().MemTableOps == 0); - UNIT_ASSERT(me.BackLog().Snapshots.at(0).Epoch == TEpoch::FromIndex(2)); + UNIT_ASSERT(me.BackLog().Snapshots == 1); { const auto subset = me->Subset(2, TEpoch::Max(), { }, { }); @@ -808,6 +810,193 @@ Y_UNIT_TEST_SUITE(DBase) { .Next().Is(EReady::Gone); } + Y_UNIT_TEST(AlterAndUpsertChangesVisibility) { + TDbExec me; + + const ui32 table1 = 1; + const ui32 table2 = 2; + me.To(10).Begin(); + + me.To(20).Apply(*TAlter() + .AddTable("me_1", table1) + .AddColumn(table1, "key", 1, ETypes::Uint64, false) + .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64)) + .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64)) + .AddColumnToKey(table1, 1)); + me.To(21).PutN(table1, 1_u64, 11_u64, 12_u64); + me.To(22).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(23).Select(table1).NoKeyN(2_u64); + + me.To(30).Apply(*TAlter() + .AddTable("me_2", table2) + .AddColumn(table2, "key", 1, ETypes::Uint64, false) + .AddColumn(table2, "arg1", 4, ETypes::Uint64, false, Cimple(20004_u64)) + .AddColumn(table2, "arg2", 5, ETypes::Uint64, false, Cimple(20005_u64)) + .AddColumnToKey(table2, 1)); + me.To(31).PutN(table2, 2_u64, 21_u64, 22_u64); + me.To(32).Select(table2).NoKeyN(1_u64); + me.To(33).Select(table2).HasN(2_u64, 21_u64, 22_u64); + + me.Commit(); + + me.To(40).Begin(); + me.To(41).Apply(*TAlter() + .DropColumn(table2, 5) + .AddColumn(table2, "arg3", 6, ETypes::Uint64, false, Cimple(20006_u64))); + me.To(42).Select(table2).HasN(2_u64, 21_u64, 20006_u64); + me.To(43).PutN(table2, 2_u64, ECellOp::Empty, 23_u64); + me.To(44).Select(table2).HasN(2_u64, 21_u64, 23_u64); + me.Reject(); + + me.To(50).Begin(); + me.To(51).Select(table2).HasN(2_u64, 21_u64, 22_u64); + me.To(52).PutN(table2, 2_u64, 24_u64, ECellOp::Empty); + me.To(53).Select(table2).HasN(2_u64, 24_u64, 22_u64); + me.Commit(); + + me.To(60).Replay(EPlay::Boot); + me.To(61).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(62).Select(table2).HasN(2_u64, 24_u64, 22_u64); + me.To(63).Replay(EPlay::Redo); + me.To(64).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(65).Select(table2).HasN(2_u64, 24_u64, 22_u64); + } + + Y_UNIT_TEST(UncommittedChangesVisibility) { + TDbExec me; + + const ui32 table1 = 1; + + me.To(10).Begin(); + me.To(11).Apply(*TAlter() + .AddTable("me_1", table1) + .AddColumn(table1, "key", 1, ETypes::Uint64, false) + .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64)) + .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64)) + .AddColumnToKey(table1, 1)); + me.To(12).PutN(table1, 1_u64, 11_u64, 12_u64); + me.To(13).WriteTx(123).PutN(table1, 1_u64, ECellOp::Empty, 13_u64); + UNIT_ASSERT(me->HasOpenTx(table1, 123)); + me.To(14).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(15).ReadTx(123).Select(table1).HasN(1_u64, 11_u64, 13_u64); + me.To(16).Commit(); + + me.To(20).Begin(); + me.To(21).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(22).CommitTx(table1, 123); + UNIT_ASSERT(!me->HasOpenTx(table1, 123)); + UNIT_ASSERT(me->HasCommittedTx(table1, 123)); + me.To(23).Select(table1).HasN(1_u64, 11_u64, 13_u64); + me.To(24).Reject(); + + UNIT_ASSERT(me->HasOpenTx(table1, 123)); + UNIT_ASSERT(!me->HasCommittedTx(table1, 123)); + + me.To(30).Begin(); + me.To(31).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(32).RemoveTx(table1, 123); + UNIT_ASSERT(!me->HasOpenTx(table1, 123)); + UNIT_ASSERT(me->HasRemovedTx(table1, 123)); + me.To(33).Reject(); + + UNIT_ASSERT(me->HasOpenTx(table1, 123)); + UNIT_ASSERT(!me->HasRemovedTx(table1, 123)); + + me.To(40).Begin(); + me.To(41).CommitTx(table1, 123); + me.To(42).Commit(); + + UNIT_ASSERT(!me->HasOpenTx(table1, 123)); + UNIT_ASSERT(me->HasCommittedTx(table1, 123)); + + me.To(50).Select(table1).HasN(1_u64, 11_u64, 13_u64); + me.To(51).Snap(table1).Compact(table1); + + UNIT_ASSERT(!me->HasOpenTx(table1, 123)); + UNIT_ASSERT(!me->HasCommittedTx(table1, 123)); + + me.To(52).Select(table1).HasN(1_u64, 11_u64, 13_u64); + } + + Y_UNIT_TEST(ReplayNewTable) { + TDbExec me; + + const ui32 table1 = 1; + + me.To(10).Begin(); + me.To(11).Apply(*TAlter() + .AddTable("me_1", table1) + .AddColumn(table1, "key", 1, ETypes::Uint64, false) + .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64)) + .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64)) + .AddColumnToKey(table1, 1)); + me.To(13).Commit(); + me.To(14).Affects(0, { }); + + me.To(21).Replay(EPlay::Boot); + me.To(22).Replay(EPlay::Redo); + } + + Y_UNIT_TEST(SnapshotNewTable) { + TDbExec me; + + const ui32 table1 = 1; + + me.To(10).Begin(); + me.To(11).Apply(*TAlter() + .AddTable("me_1", table1) + .AddColumn(table1, "key", 1, ETypes::Uint64, false) + .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64)) + .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64)) + .AddColumnToKey(table1, 1)); + me.To(12).Snapshot(table1); + me.To(13).Commit(); + me.To(14).Affects(0, { }); + + me.To(21).Replay(EPlay::Boot); + me.To(22).Replay(EPlay::Redo); + } + + Y_UNIT_TEST(DropModifiedTable) { + TDbExec me; + + const ui32 table1 = 1; + + me.To(10).Begin(); + me.To(11).Apply(*TAlter() + .AddTable("me_1", table1) + .AddColumn(table1, "key", 1, ETypes::Uint64, false) + .AddColumn(table1, "arg1", 4, ETypes::Uint64, false, Cimple(10004_u64)) + .AddColumn(table1, "arg2", 5, ETypes::Uint64, false, Cimple(10005_u64)) + .AddColumnToKey(table1, 1)); + me.To(12).Commit(); + + me.To(20).Begin(); + me.To(21).PutN(table1, 1_u64, 11_u64, 12_u64); + me.To(22).Commit(); + + me.To(30).Begin(); + me.To(31).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(32).Apply(*TAlter() + .AddColumn(table1, "arg3", 6, ETypes::Uint64, false, Cimple(10006_u64))); + me.To(33).Select(table1).HasN(1_u64, 11_u64, 12_u64, 10006_u64); + me.To(34).Apply(*TAlter() + .DropTable(table1)); + me.To(35).Reject(); + + me.To(40).Begin(); + me.To(41).Select(table1).HasN(1_u64, 11_u64, 12_u64); + me.To(42).Apply(*TAlter() + .AddColumn(table1, "arg3", 6, ETypes::Uint64, false, Cimple(10006_u64))); + me.To(43).Select(table1).HasN(1_u64, 11_u64, 12_u64, 10006_u64); + me.To(44).Apply(*TAlter() + .DropTable(table1)); + me.To(45).Commit(); + + me.To(51).Replay(EPlay::Boot); + me.To(52).Replay(EPlay::Redo); + } + } } diff --git a/ydb/core/tablet_flat/ut/ut_redo.cpp b/ydb/core/tablet_flat/ut/ut_redo.cpp index 95523535f0..2ba7829372 100644 --- a/ydb/core/tablet_flat/ut/ut_redo.cpp +++ b/ydb/core/tablet_flat/ut/ut_redo.cpp @@ -41,7 +41,7 @@ Y_UNIT_TEST_SUITE(Redo) { ui32 serial = 0; for (auto &one: changes) { - me.To(++serial)->RollUp(one->Stamp, one->Scheme, one->Redo, { }); + me.To(++serial).Cleanup()->RollUp(one->Stamp, one->Scheme, one->Redo, { }); if (1 == serial) { /* Just applied initial alter script, nothing to check */ diff --git a/ydb/core/tablet_flat/util_pool.h b/ydb/core/tablet_flat/util_pool.h new file mode 100644 index 0000000000..e3cb17a5ee --- /dev/null +++ b/ydb/core/tablet_flat/util_pool.h @@ -0,0 +1,269 @@ +#pragma once + +#include <util/generic/utility.h> +#include <util/system/align.h> +#include <util/system/yassert.h> +#include <memory> +#include <optional> + +namespace NKikimr::NUtil { + + /** + * Memory pool with support for transactions + */ + class TMemoryPool + : private std::allocator<char> + { + using allocator_base = std::allocator<char>; + using allocator_traits = std::allocator_traits<allocator_base>; + + struct TChunk { + TChunk* Next; + char* Ptr; + size_t Left; + + TChunk(size_t size) noexcept + : Next(nullptr) + { + Y_VERIFY_DEBUG((((uintptr_t)this) % PLATFORM_DATA_ALIGN) == 0); + Y_VERIFY_DEBUG(size >= ChunkHeaderSize()); + Ptr = DataStart(); + Left = size - ChunkHeaderSize(); + } + + /** + * Aligns size to the specified power of two + */ + static constexpr size_t AlignSizeUp(size_t size, size_t alignment) { + return size + ((-size) & (alignment - 1)); + } + + /** + * Aligned chunk header size that takes expected alignment into account + */ + static constexpr size_t ChunkHeaderSize() noexcept { + return AlignSizeUp(sizeof(TChunk), PLATFORM_DATA_ALIGN); + } + + char* ChunkStart() noexcept { + return reinterpret_cast<char*>(this); + } + + const char* ChunkStart() const noexcept { + return reinterpret_cast<const char*>(this); + } + + size_t ChunkSize() const noexcept { + return Left + (Ptr - ChunkStart()); + } + + char* DataStart() noexcept { + return ChunkStart() + ChunkHeaderSize(); + } + + const char* DataStart() const noexcept { + return ChunkStart() + ChunkHeaderSize(); + } + + size_t Used() const noexcept { + return Ptr - DataStart(); + } + + size_t Wasted() const noexcept { + return ChunkHeaderSize() + Left; + } + + void Reset() noexcept { + Rollback(DataStart()); + } + + void Rollback(char* to) noexcept { + char* data = Ptr; + Ptr = to; + Left += data - to; + } + + inline char* Allocate(size_t size) noexcept { + if (Left >= size) { + char* ptr = Ptr; + + Ptr += size; + Left -= size; + + return ptr; + } + + return nullptr; + } + + inline char* Allocate(size_t size, size_t align) noexcept { + size_t pad = AlignUp(Ptr, align) - Ptr; + + if (char* ret = Allocate(pad + size)) { + return ret + pad; + } + + return nullptr; + } + }; + + public: + TMemoryPool(size_t initial) + : First(AllocateChunk(Max(initial, TChunk::ChunkHeaderSize() + PLATFORM_DATA_ALIGN))) + , Current(First) + { } + + ~TMemoryPool() { + TChunk* chunk = First; + while (chunk) { + TChunk* next = chunk->Next; + FreeChunk(chunk); + chunk = next; + } + } + + void* Allocate(size_t size) { + return DoAllocate(AlignUp<size_t>(size, PLATFORM_DATA_ALIGN)); + } + + void* Allocate(size_t size, size_t align) { + return DoAllocate(AlignUp<size_t>(size, PLATFORM_DATA_ALIGN), align); + } + + void* Append(const void* data, size_t len) { + void* ptr = this->Allocate(len); + if (len > 0) { + ::memcpy(ptr, data, len); + } + return ptr; + } + + void BeginTransaction() noexcept { + Y_VERIFY(!RollbackState_); + auto& state = RollbackState_.emplace(); + state.Chunk = Current; + state.Ptr = Current->Ptr; + } + + void CommitTransaction() noexcept { + Y_VERIFY(RollbackState_); + RollbackState_.reset(); + } + + void RollbackTransaction() noexcept { + Y_VERIFY(RollbackState_); + auto& state = *RollbackState_; + DoRollback(state.Chunk, state.Ptr); + RollbackState_.reset(); + } + + size_t Used() const noexcept { + return Used_ + Current->Used(); + } + + size_t Wasted() const noexcept { + return Wasted_ + Current->Wasted(); + } + + size_t Available() const noexcept { + return Available_ + Current->Left; + } + + private: + TChunk* AllocateChunk(size_t hint) { + size_t chunkSize = FastClp2(hint); + if (chunkSize - hint >= (chunkSize >> 2)) { + chunkSize -= chunkSize >> 2; + } + char* ptr = allocator_traits::allocate(*this, chunkSize); + return new (ptr) TChunk(chunkSize); + } + + void FreeChunk(TChunk* chunk) { + char* ptr = chunk->ChunkStart(); + size_t chunkSize = chunk->ChunkSize(); + chunk->~TChunk(); + allocator_traits::deallocate(*this, ptr, chunkSize); + } + + TChunk* AddChunk(size_t size) { + Y_VERIFY(!Current->Next); + size_t hint = Max(AlignUp<size_t>(sizeof(TChunk), PLATFORM_DATA_ALIGN) + size, Current->ChunkSize() + 1); + TChunk* next = AllocateChunk(hint); + Used_ += Current->Used(); + Wasted_ += Current->Wasted(); + Current->Next = next; + Current = next; + return next; + } + + bool NextChunk() { + if (Current->Next) { + Used_ += Current->Used(); + Wasted_ += Current->Wasted(); + Current = Current->Next; + Y_VERIFY_DEBUG(Current->Ptr == Current->DataStart()); + Wasted_ -= Current->ChunkSize(); + Available_ -= Current->Left; + return true; + } + + return false; + } + + void DoRollback(TChunk* target, char* ptr) { + if (target != Current) { + TChunk* chunk = target; + do { + // Remove previously added stats + Used_ -= chunk->Used(); + Wasted_ -= chunk->Wasted(); + // Switch to the next chunk in the chain + chunk = chunk->Next; + Y_VERIFY(chunk, "Rollback cannot find current chunk in the chain"); + // Reset chunk and add it to stats as wasted/free space + chunk->Reset(); + Wasted_ += chunk->ChunkSize(); + Available_ += chunk->Left; + } while (chunk != Current); + Current = target; + } + target->Rollback(ptr); + } + + void* DoAllocate(size_t size) { + do { + if (auto* ptr = Current->Allocate(size)) { + return ptr; + } + } while (NextChunk()); + + return AddChunk(size)->Allocate(size); + } + + void* DoAllocate(size_t size, size_t align) { + do { + if (auto* ptr = Current->Allocate(size, align)) { + return ptr; + } + } while (NextChunk()); + + return AddChunk(size + align - 1)->Allocate(size, align); + } + + private: + struct TRollbackState { + TChunk* Chunk; + char* Ptr; + }; + + private: + TChunk* First; + TChunk* Current; + size_t Used_ = 0; + size_t Wasted_ = 0; + size_t Available_ = 0; + std::optional<TRollbackState> RollbackState_; + }; + +} // namespace NKikimr::NUtil diff --git a/ydb/core/tablet_flat/util_pool_ut.cpp b/ydb/core/tablet_flat/util_pool_ut.cpp new file mode 100644 index 0000000000..5f21e7af07 --- /dev/null +++ b/ydb/core/tablet_flat/util_pool_ut.cpp @@ -0,0 +1,73 @@ +#include "util_pool.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <util/random/random.h> +#include <string.h> + +namespace NKikimr::NUtil { + +Y_UNIT_TEST_SUITE(TMemoryPoolTest) { + + Y_UNIT_TEST(AllocOneByte) { + TMemoryPool pool(1); + UNIT_ASSERT_C(pool.Used() == 0, pool.Used()); + UNIT_ASSERT_C(pool.Wasted() > 0, pool.Wasted()); + UNIT_ASSERT_C(pool.Available() >= 1, pool.Available()); + size_t initialWasted = pool.Wasted(); + size_t initialAvailable = pool.Available(); + void* ptr = pool.Allocate(1); + Y_UNUSED(ptr); + UNIT_ASSERT_C(pool.Used() >= 1, pool.Used()); + UNIT_ASSERT_C(pool.Used() + pool.Wasted() == initialWasted, pool.Used() << " + " << pool.Wasted()); + UNIT_ASSERT_C(pool.Used() + pool.Available() == initialAvailable, pool.Used() << " + " << pool.Available()); + } + + Y_UNIT_TEST(AppendString) { + TMemoryPool pool(128); + const char* str = "Hello, world!"; + char* ptr = (char*)pool.Append(str, ::strlen(str) + 1); + UNIT_ASSERT(::strcmp(ptr, str) == 0); + } + + void DoTransactions(size_t align = 0) { + TMemoryPool pool(128); + size_t expectedUsed = 0; + size_t expectedSize = 0; + for (int i = 0; i < 1000; ++i) { + size_t initialUsed = pool.Used(); + pool.BeginTransaction(); + ui32 r = RandomNumber<ui32>(20); + int count = (r >> 1) + 1; + bool commit = r & 1; + for (int i = 0; i < count; ++i) { + if (align) { + void* ptr = pool.Allocate(1, align); + UNIT_ASSERT(ptr); + } else { + void* ptr = pool.Allocate(1); + UNIT_ASSERT(ptr); + } + } + expectedSize = Max(expectedSize, pool.Used() + pool.Wasted()); + if (commit) { + expectedUsed += pool.Used() - initialUsed; + pool.CommitTransaction(); + } else { + pool.RollbackTransaction(); + } + } + UNIT_ASSERT_C(pool.Used() == expectedUsed, pool.Used() << " != " << expectedUsed); + UNIT_ASSERT_C(pool.Used() + pool.Wasted() == expectedSize, pool.Used() << " + " << pool.Wasted() << " != " << expectedSize); + } + + Y_UNIT_TEST(Transactions) { + DoTransactions(); + } + + Y_UNIT_TEST(TransactionsWithAlignment) { + DoTransactions(16); + } + +} // Y_UNIT_TEST_SUITE(TMemoryPoolTest) + +} // namespace NKikimr::NUtil diff --git a/ydb/core/tablet_flat/util_store.h b/ydb/core/tablet_flat/util_store.h index b61062d1d6..f5bf0ccb91 100644 --- a/ydb/core/tablet_flat/util_store.h +++ b/ydb/core/tablet_flat/util_store.h @@ -108,14 +108,6 @@ namespace NUtil { TConcurrentStore() { } ~TConcurrentStore() noexcept { - clear(); - } - - TConstIterator Iterator() const noexcept { - return TConstIterator(this); - } - - void clear() noexcept { size_t count = Count.exchange(0, std::memory_order_release); Head.store(nullptr, std::memory_order_release); auto* tail = Tail.exchange(nullptr, std::memory_order_release); @@ -133,6 +125,10 @@ namespace NUtil { } } + TConstIterator Iterator() const noexcept { + return TConstIterator(this); + } + /** * Emplaces a new element, not thread safe */ @@ -142,7 +138,8 @@ namespace NUtil { size_t index = Count.load(std::memory_order_relaxed); // Allocate a new chunk if necessary - auto* tail = Tail.load(std::memory_order_relaxed); + // Note: we acquire for the tail->Prev pointer here + auto* tail = Tail.load(std::memory_order_acquire); if (!tail || tail->EndOffset() <= index) { size_t offset = tail ? tail->EndOffset() : 0; size_t bytes = tail ? tail->Bytes * 2 : 512; @@ -158,16 +155,46 @@ namespace NUtil { Tail.store(tail, std::memory_order_release); } - Y_VERIFY_DEBUG(tail->Offset <= index); + // It is possible for new index to be located on some earlier + // chunk after we do a truncation. + void* ptr = FindPtr(tail, index); - // Construct new value and publish it by releasing the new count - void* ptr = tail->Values() + (index - tail->Offset); + // Construct a new value and publish the new count. Note that this + // does not actually publish value contents, since count is not + // acquired on access, but it's useful for iteration. + // Items must be synchronized externally. T* value = new (ptr) T(std::forward<TArgs>(args)...); Count.store(index + 1, std::memory_order_release); return *value; } /** + * Truncates store to a smaller size, not thread safe + */ + void truncate(size_t new_size) { + size_t prev_size = Count.load(std::memory_order_relaxed); + Y_VERIFY(new_size <= prev_size); + + if (new_size < prev_size) { + auto* tail = Tail.load(std::memory_order_acquire); + while (tail && new_size < tail->EndOffset()) { + // We want to call destructor for all items + // that are between new_size and prev_size + if (tail->Offset < prev_size) { + size_t fromIndex = std::max(new_size, tail->Offset) - tail->Offset; + size_t toIndex = std::min(prev_size, tail->EndOffset()) - tail->Offset; + T* values = tail->Values() + fromIndex; + for (size_t index = fromIndex; index < toIndex; ++index, ++values) { + values->~T(); + } + } + tail = tail->Prev; + } + Count.store(new_size, std::memory_order_release); + } + } + + /** * Returns a thread-safe size of the container */ size_t size() const { @@ -192,6 +219,41 @@ namespace NUtil { return *FindPtr(Tail.load(std::memory_order_acquire), index); } + /** + * Runs callback(index, value) for each value between index and endIndex, not thread safe + */ + template<class TCallback> + void Enumerate(size_t index, size_t endIndex, TCallback&& callback) { + Y_VERIFY(index <= endIndex); + if (index == endIndex) { + return; + } + + size_t count = Count.load(std::memory_order_acquire); + Y_VERIFY(endIndex <= count); + + auto* tail = Tail.load(std::memory_order_acquire); + while (tail && index < tail->Offset) { + tail = tail->Prev; + } + + do { + Y_VERIFY_DEBUG(tail); + auto endOffset = tail->EndOffset(); + Y_VERIFY_DEBUG(tail->Offset <= index && index < endOffset); + T* values = tail->Values() + (index - tail->Offset); + while (index < endOffset && index < endIndex) { + callback(index, *values); + ++index; + ++values; + } + if (index == endIndex) { + break; + } + tail = tail->Next.load(std::memory_order_acquire); + } while (tail && index < tail->EndOffset()); + } + private: static T* FindPtr(TChunk* tail, size_t index) { while (tail && index < tail->Offset) { diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index e4501f3d6d..f1b90a0cd1 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -983,6 +983,7 @@ public: ui32 key = 0; TString body = ProgramTextSwap(); + for (ui32 point : Writes) { body += Sprintf(writePattern, key, point, key, TxId); ++key; @@ -1007,6 +1008,9 @@ public: } ui64 GetTxId() const { return TxId; } + const TVector<ui32>& GetWrites() const { return Writes; } + const TVector<ui32>& GetReads() const { return Reads; } + const TVector<TSimpleRange>& GetRanges() const { return Ranges; } const TMap<ui32, ui32>& GetResults() const { return Results; } void SetResults(const TVector<ui32>& kv) { diff --git a/ydb/core/tx/schemeshard/ut_serverless.cpp b/ydb/core/tx/schemeshard/ut_serverless.cpp index fc96dd2ebe..76d0d9d213 100644 --- a/ydb/core/tx/schemeshard/ut_serverless.cpp +++ b/ydb/core/tx/schemeshard/ut_serverless.cpp @@ -218,7 +218,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardServerLess) { waitMeteringMessage(); { - TString meteringData = R"({"usage":{"start":1600452120,"quantity":59,"finish":1600452179,"type":"delta","unit":"byte*second"},"tags":{"ydb_size":11728},"id":"8751008-3-1600452120-1600452179-11728","cloud_id":"CLOUD_ID_VAL","source_wt":1600452180,"source_id":"sless-docapi-ydb-storage","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; + TString meteringData = R"({"usage":{"start":1600452120,"quantity":59,"finish":1600452179,"type":"delta","unit":"byte*second"},"tags":{"ydb_size":11664},"id":"8751008-3-1600452120-1600452179-11664","cloud_id":"CLOUD_ID_VAL","source_wt":1600452180,"source_id":"sless-docapi-ydb-storage","resource_id":"DATABASE_ID_VAL","schema":"ydb.serverless.v1","folder_id":"FOLDER_ID_VAL","version":"1.0.0"})"; meteringData += "\n"; UNIT_ASSERT_NO_DIFF(meteringMessages, meteringData); } |