diff options
author | snaury <snaury@ydb.tech> | 2022-10-31 13:47:56 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2022-10-31 13:47:56 +0300 |
commit | 1a8010a735a218f84480e8ef1e6ffd5a45033c94 (patch) | |
tree | bf27ab00c48d9141d35c3748545f602654dd3cba | |
parent | 18dbfee5bb16564165534f8008728df7cc8d595f (diff) | |
download | ydb-1a8010a735a218f84480e8ef1e6ffd5a45033c94.tar.gz |
Support uncommitted changes in cdc
39 files changed, 1441 insertions, 406 deletions
diff --git a/ydb/core/engine/minikql/change_collector_iface.h b/ydb/core/engine/minikql/change_collector_iface.h index c96ffaa3ce..10d167bc97 100644 --- a/ydb/core/engine/minikql/change_collector_iface.h +++ b/ydb/core/engine/minikql/change_collector_iface.h @@ -9,16 +9,17 @@ namespace NMiniKQL { class IChangeCollector { public: // basic change record's info - struct TChange: public std::tuple<ui64, TPathId, ui64, TPathId, ui64> { - using std::tuple<ui64, TPathId, ui64, TPathId, ui64>::tuple; - - ui64 Order() const { return std::get<0>(*this); } - const TPathId& PathId() const { return std::get<1>(*this); } - ui64 BodySize() const { return std::get<2>(*this); } - const TPathId& TableId() const { return std::get<3>(*this); } - ui64 SchemaVersion() const { return std::get<4>(*this); } - - void SetPathId(const TPathId& value) { std::get<1>(*this) = value; } + struct TChange { + ui64 Order; + ui64 Group; + ui64 Step; + ui64 TxId; + TPathId PathId; + ui64 BodySize; + TPathId TableId; + ui64 SchemaVersion; + ui64 LockId = 0; + ui64 LockOffset = 0; }; public: @@ -27,6 +28,7 @@ public: virtual bool NeedToReadKeys() const = 0; virtual void SetReadVersion(const TRowVersion& readVersion) = 0; virtual void SetWriteVersion(const TRowVersion& writeVersion) = 0; + virtual void SetWriteTxId(ui64 txId) = 0; virtual bool Collect(const TTableId& tableId, NTable::ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates) = 0; @@ -41,10 +43,10 @@ public: Y_DECLARE_OUT_SPEC(inline, NKikimr::NMiniKQL::IChangeCollector::TChange, o, x) { o << "{" - << " Order: " << x.Order() - << " PathId: " << x.PathId() - << " BodySize: " << x.BodySize() - << " TableId: " << x.TableId() - << " SchemaVersion: " << x.SchemaVersion() + << " Order: " << x.Order + << " PathId: " << x.PathId + << " BodySize: " << x.BodySize + << " TableId: " << x.TableId + << " SchemaVersion: " << x.SchemaVersion << " }"; } diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp index 84cf7fa3b2..4e5a4688c3 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.cpp +++ b/ydb/core/engine/minikql/minikql_engine_host.cpp @@ -897,22 +897,25 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell const ui64 writeTxId = GetWriteTxId(tableId); - if (writeTxId == 0) { - if (auto collector = GetChangeCollector(tableId)) { + if (auto collector = GetChangeCollector(tableId)) { + if (writeTxId == 0) { collector->SetWriteVersion(GetWriteVersion(tableId)); - if (collector->NeedToReadKeys()) { - collector->SetReadVersion(GetReadVersion(tableId)); - } + } else { + collector->SetWriteTxId(writeTxId); + } + if (collector->NeedToReadKeys()) { + collector->SetReadVersion(GetReadVersion(tableId)); + } - if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) { - collector->Reset(); - throw TNotReadyTabletException(); - } + if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) { + collector->Reset(); + throw TNotReadyTabletException(); } + } + if (writeTxId == 0) { Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId)); } else { - // TODO: integrate with change collector somehow Db.UpdateTx(localTid, NTable::ERowOp::Upsert, key, ops, writeTxId); } @@ -932,22 +935,25 @@ void TEngineHost::EraseRow(const TTableId& tableId, const TArrayRef<const TCell> const ui64 writeTxId = GetWriteTxId(tableId); - if (writeTxId == 0) { - if (auto collector = GetChangeCollector(tableId)) { + if (auto collector = GetChangeCollector(tableId)) { + if (writeTxId == 0) { collector->SetWriteVersion(GetWriteVersion(tableId)); - if (collector->NeedToReadKeys()) { - collector->SetReadVersion(GetReadVersion(tableId)); - } + } else { + collector->SetWriteTxId(writeTxId); + } + if (collector->NeedToReadKeys()) { + collector->SetReadVersion(GetReadVersion(tableId)); + } - if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) { - collector->Reset(); - throw TNotReadyTabletException(); - } + if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) { + collector->Reset(); + throw TNotReadyTabletException(); } + } + if (writeTxId == 0) { Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId)); } else { - // TODO: integrate with change collector somehow Db.UpdateTx(localTid, NTable::ERowOp::Erase, key, { }, writeTxId); } diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 790319ef41..de095f9a4b 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -415,4 +415,5 @@ enum ETxTypes { TXTYPE_READ = 69 [(TxTypeOpts) = {Name: "TxRead"}]; TXTYPE_SPLIT_REPLICATION_SOURCE_OFFSETS = 70 [(TxTypeOpts) = {Name: "TxSplitReplicationSourceOffsets"}]; TXTYPE_REMOVE_LOCK = 71 [(TxTypeOpts) = {Name: "TxRemoveLock"}]; + TXTYPE_REMOVE_LOCK_CHANGE_RECORDS = 72 [(TxTypeOpts) = {Name: "TxRemoveLockChangeRecords"}]; } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 8e88e6ccd1..d73ebbb707 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -1641,6 +1641,9 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct } else if (done) { Y_VERIFY(!txc.IsRescheduled()); Y_VERIFY(!seat->RequestedMemory); + for (auto it = txc.OnCommit_.begin(); it != txc.OnCommit_.end(); ++it) { + (*it)(); + } seat->OnCommitted = std::move(txc.OnCommitted_); CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx); } else { @@ -1649,6 +1652,9 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type " << NFmt::Do(*seat->Self) << " postoned w/o demands"); } + for (auto it = txc.OnRollback_.rbegin(); it != txc.OnRollback_.rend(); ++it) { + (*it)(); + } PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx); } diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h index e3a145dcbc..35d584e38f 100644 --- a/ydb/core/tablet_flat/tablet_flat_executor.h +++ b/ydb/core/tablet_flat/tablet_flat_executor.h @@ -211,6 +211,14 @@ public: ~TTransactionContext() {} + void OnCommit(std::function<void()> callback) { + OnCommit_.emplace_back(std::move(callback)); + } + + void OnRollback(std::function<void()> callback) { + OnRollback_.emplace_back(std::move(callback)); + } + void OnCommitted(std::function<void()> callback) { OnCommitted_.emplace_back(std::move(callback)); } @@ -236,6 +244,8 @@ public: NTable::TDatabase &DB; private: + TVector<std::function<void()>> OnCommit_; + TVector<std::function<void()>> OnRollback_; TVector<std::function<void()>> OnCommitted_; bool Rescheduled_ = false; }; diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index f7f36a6afb..1ec1742b8a 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -92,7 +92,6 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp @@ -170,6 +169,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_schema_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_snapshots.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_unsafe_upload.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_user_db.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_user_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_kqp_compute.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_kqp_effects.cpp @@ -222,6 +222,7 @@ target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_lock_change_records.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_ops.cpp diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp index 216d0876a9..939f1476be 100644 --- a/ydb/core/tx/datashard/change_collector.cpp +++ b/ydb/core/tx/datashard/change_collector.cpp @@ -2,6 +2,7 @@ #include "change_collector_async_index.h" #include "change_collector_cdc_stream.h" #include "datashard_impl.h" +#include "datashard_user_db.h" #include <util/generic/vector.h> @@ -10,9 +11,17 @@ namespace NDataShard { using namespace NMiniKQL; -class TChangeCollectorProxy: public IChangeCollector { +class TChangeCollectorProxy: public IDataShardChangeCollector { public: - void AddUnderlying(THolder<IChangeCollector> collector) { + TChangeCollectorProxy(TDataShard& dataShard, bool isImmediateTx) + : DataShard(dataShard) + { + if (!isImmediateTx) { + Group = 0; + } + } + + void AddUnderlying(THolder<IBaseChangeCollector> collector) { Underlying.emplace_back(std::move(collector)); } @@ -33,11 +42,18 @@ public: } void SetWriteVersion(const TRowVersion& writeVersion) override { + WriteVersion = writeVersion; for (auto& collector : Underlying) { collector->SetWriteVersion(writeVersion); } } + void SetWriteTxId(ui64 txId) override { + for (auto& collector : Underlying) { + collector->SetWriteTxId(txId); + } + } + bool Collect(const TTableId& tableId, NTable::ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates) override { @@ -53,6 +69,10 @@ public: const TVector<TChange>& GetCollected() const override { CollectedBuf.clear(); + if (!LockChanges.empty()) { + std::copy(LockChanges.begin(), LockChanges.end(), std::back_inserter(CollectedBuf)); + } + for (const auto& collector : Underlying) { const auto& collected = collector->GetCollected(); std::copy(collected.begin(), collected.end(), std::back_inserter(CollectedBuf)); @@ -64,6 +84,10 @@ public: TVector<TChange>&& GetCollected() override { CollectedBuf.clear(); + if (!LockChanges.empty()) { + std::move(LockChanges.begin(), LockChanges.end(), std::back_inserter(CollectedBuf)); + } + for (auto& collector : Underlying) { auto collected = std::move(collector->GetCollected()); std::move(collected.begin(), collected.end(), std::back_inserter(CollectedBuf)); @@ -80,13 +104,47 @@ public: CollectedBuf.clear(); } + void CommitLockChanges(ui64 lockId, const TVector<TChange>& changes, TTransactionContext& txc) override { + if (changes.empty()) { + return; + } + + NIceDb::TNiceDb db(txc.DB); + + ui64 count = changes.back().LockOffset + 1; + ui64 order = DataShard.AllocateChangeRecordOrder(db, count); + + if (!Group) { + Group = DataShard.AllocateChangeRecordGroup(db); + for (auto& collector : Underlying) { + collector->SetGroup(*Group); + } + } + + LockChanges.reserve(LockChanges.size() + changes.size()); + for (const auto& change : changes) { + TChange fixed = change; + fixed.Order = order + change.LockOffset; + fixed.Group = *Group; + fixed.Step = WriteVersion.Step; + fixed.TxId = WriteVersion.TxId; + LockChanges.push_back(fixed); + } + + DataShard.PersistCommitLockChangeRecords(txc, order, lockId, *Group, WriteVersion); + } + private: - TVector<THolder<IChangeCollector>> Underlying; + TDataShard& DataShard; + TMaybe<ui64> Group; + TRowVersion WriteVersion = TRowVersion::Min(); + TVector<THolder<IBaseChangeCollector>> Underlying; + TVector<TChange> LockChanges; mutable TVector<TChange> CollectedBuf; }; // TChangeCollectorProxy -IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) { +IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) { const bool hasAsyncIndexes = table.HasAsyncIndexes(); const bool hasCdcStreams = table.HasCdcStreams(); @@ -94,23 +152,23 @@ IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase return nullptr; } - auto proxy = MakeHolder<TChangeCollectorProxy>(); + auto proxy = MakeHolder<TChangeCollectorProxy>(dataShard, isImmediateTx); if (hasAsyncIndexes) { - proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, db, isImmediateTx)); + proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, userDb, db, isImmediateTx)); } if (hasCdcStreams) { - proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, db, isImmediateTx)); + proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, userDb, db, isImmediateTx)); } return proxy.Release(); } -IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) { +IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) { Y_VERIFY(dataShard.GetUserTables().contains(tableId)); const TUserTable& tableInfo = *dataShard.GetUserTables().at(tableId); - return CreateChangeCollector(dataShard, db, tableInfo, isImmediateTx); + return CreateChangeCollector(dataShard, userDb, db, tableInfo, isImmediateTx); } } // NDataShard diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h index 7b24a8d46b..cad92e8109 100644 --- a/ydb/core/tx/datashard/change_collector.h +++ b/ydb/core/tx/datashard/change_collector.h @@ -1,7 +1,7 @@ #pragma once #include <ydb/core/engine/minikql/change_collector_iface.h> -#include <ydb/core/tablet_flat/flat_database.h> +#include <ydb/core/tablet_flat/tablet_flat_executor.h> namespace NKikimr { namespace NDataShard { @@ -9,8 +9,15 @@ namespace NDataShard { class TDataShard; struct TUserTable; -NMiniKQL::IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx); -NMiniKQL::IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx); +class IDataShardUserDb; + +class IDataShardChangeCollector : public NMiniKQL::IChangeCollector { +public: + virtual void CommitLockChanges(ui64 lockId, const TVector<TChange>& changes, NTabletFlatExecutor::TTransactionContext& txc) = 0; +}; + +IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx); +IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx); } // NDataShard } // NKikimr diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp index d2f5400366..a5828f9852 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.cpp +++ b/ydb/core/tx/datashard/change_collector_async_index.cpp @@ -1,5 +1,6 @@ #include "change_collector_async_index.h" #include "datashard_impl.h" +#include "datashard_user_db.h" #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -13,12 +14,6 @@ class TCachedTagsBuilder { using TCachedTags = TAsyncIndexChangeCollector::TCachedTags; public: - void MakeKeyTagToPos(const TVector<TTag>& keyTags) { - for (TPos pos = 0; pos < keyTags.size(); ++pos) { - KeyTagToPos.emplace(keyTags.at(pos), pos); - } - } - void AddIndexTags(const TVector<TTag>& tags) { IndexTags.insert(tags.begin(), tags.end()); } @@ -40,13 +35,11 @@ public: Y_VERIFY(!tags.empty()); Y_VERIFY(!IndexTags.empty()); - Y_VERIFY(!KeyTagToPos.empty()); - return TCachedTags(std::move(KeyTagToPos), std::move(tags), std::make_pair(0, IndexTags.size() - 1)); + return TCachedTags(std::move(tags), std::make_pair(0, IndexTags.size() - 1)); } private: - THashMap<TTag, TPos> KeyTagToPos; THashSet<TTag> IndexTags; THashSet<TTag> DataTags; }; @@ -76,7 +69,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates) { Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId); - const auto localTableId = Self->GetLocalTableId(tableId); auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); Y_VERIFY_S(key.size() == userTable->KeyColumnIds.size(), "Count doesn't match" @@ -92,11 +84,10 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop)); } - const auto& keyTagToPos = GetKeyTagToPos(tableId); const auto tagsToSelect = GetTagsToSelect(tableId, rop); TRowState row; - const auto ready = RowsCache.SelectRow(Db, localTableId, key, keyTagToPos, tagsToSelect, row, ReadVersion); + const auto ready = UserDb.SelectRow(tableId, key, tagsToSelect, row); if (ready == EReady::Page) { return false; @@ -183,13 +174,11 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } } - RowsCache.UpdateCachedRow(localTableId, rop, key, updates); return true; } void TAsyncIndexChangeCollector::Reset() { TBaseChangeCollector::Reset(); - RowsCache.Reset(); } auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const { @@ -197,7 +186,6 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const { auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); TCachedTagsBuilder builder; - builder.MakeKeyTagToPos(userTable->KeyColumnIds); for (const auto& [_, index] : userTable->Indexes) { if (index.Type != TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) { @@ -211,15 +199,6 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const { return CachedTags.emplace(tableId, builder.Build()).first; } -const THashMap<TTag, TPos>& TAsyncIndexChangeCollector::GetKeyTagToPos(const TTableId& tableId) const { - auto it = CachedTags.find(tableId); - if (it == CachedTags.end()) { - it = CacheTags(tableId); - } - - return it->second.KeyTagToPos; -} - TArrayRef<TTag> TAsyncIndexChangeCollector::GetTagsToSelect(const TTableId& tableId, ERowOp rop) const { auto it = CachedTags.find(tableId); if (it == CachedTags.end()) { diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h index 273e231601..614cb0334c 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.h +++ b/ydb/core/tx/datashard/change_collector_async_index.h @@ -1,7 +1,6 @@ #pragma once #include "change_collector_base.h" -#include "change_collector_helpers.h" #include <util/generic/hash.h> #include <util/generic/hash_set.h> @@ -17,22 +16,18 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector { struct TCachedTags { explicit TCachedTags( - THashMap<NTable::TTag, NTable::TPos>&& keyTagToPos, TVector<NTable::TTag>&& columns, const std::pair<ui32, ui32>& indexRange) - : KeyTagToPos(std::move(keyTagToPos)) - , Columns(std::move(columns)) + : Columns(std::move(columns)) , IndexColumns(&Columns.at(indexRange.first), indexRange.second + 1) { } - THashMap<NTable::TTag, NTable::TPos> KeyTagToPos; TVector<NTable::TTag> Columns; // Index + Data TArrayRef<NTable::TTag> IndexColumns; }; auto CacheTags(const TTableId& tableId) const; - const THashMap<NTable::TTag, NTable::TPos>& GetKeyTagToPos(const TTableId& tableId) const; TArrayRef<NTable::TTag> GetTagsToSelect(const TTableId& tableId, NTable::ERowOp rop) const; void FillKeyFromRowState(NTable::TTag tag, NTable::TPos pos, const NTable::TRowState& rowState, NScheme::TTypeInfo type); @@ -63,7 +58,6 @@ private: TRowVersion ReadVersion; mutable THashMap<TTableId, TCachedTags> CachedTags; - TRowsCache RowsCache; // reused between Collect() calls, cleared after every Clear() call THashSet<NTable::TTag> TagsSeen; diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp index 29fca25676..2bbe8daf6e 100644 --- a/ydb/core/tx/datashard/change_collector_base.cpp +++ b/ydb/core/tx/datashard/change_collector_base.cpp @@ -1,5 +1,6 @@ #include "change_collector_base.h" #include "datashard_impl.h" +#include "datashard_user_db.h" #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/util/yverify_stream.h> @@ -10,8 +11,9 @@ namespace NDataShard { using namespace NMiniKQL; using namespace NTable; -TBaseChangeCollector::TBaseChangeCollector(TDataShard* self, TDatabase& db, bool isImmediateTx) +TBaseChangeCollector::TBaseChangeCollector(TDataShard* self, IDataShardUserDb& userDb, TDatabase& db, bool isImmediateTx) : Self(self) + , UserDb(userDb) , Db(db) { if (!isImmediateTx) { @@ -31,6 +33,16 @@ void TBaseChangeCollector::SetWriteVersion(const TRowVersion& writeVersion) { WriteVersion = writeVersion; } +void TBaseChangeCollector::SetWriteTxId(ui64 txId) { + WriteTxId = txId; +} + +void TBaseChangeCollector::SetGroup(ui64 group) { + if (!Group) { + Group = group; + } +} + const TVector<IChangeCollector::TChange>& TBaseChangeCollector::GetCollected() const { return Collected; } @@ -129,19 +141,28 @@ void TBaseChangeCollector::Persist( { NIceDb::TNiceDb db(Db); - if (!Group) { - Group = Self->AllocateChangeRecordGroup(db); - } - Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId); auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); Y_VERIFY(userTable->GetTableSchemaVersion()); - auto record = TChangeRecordBuilder(kind) - .WithOrder(Self->AllocateChangeRecordOrder(db)) - .WithGroup(*Group) - .WithStep(WriteVersion.Step) - .WithTxId(WriteVersion.TxId) + TChangeRecordBuilder builder(kind); + if (!WriteTxId) { + if (!Group) { + Group = Self->AllocateChangeRecordGroup(db); + } + builder + .WithOrder(Self->AllocateChangeRecordOrder(db)) + .WithGroup(*Group) + .WithStep(WriteVersion.Step) + .WithTxId(WriteVersion.TxId); + } else { + ui64 lockOffset = Self->GetNextChangeRecordLockOffset(WriteTxId) + Collected.size(); + builder + .WithLockId(WriteTxId) + .WithLockOffset(lockOffset); + } + + auto record = builder .WithPathId(pathId) .WithTableId(tableId.PathId) .WithSchemaVersion(userTable->GetTableSchemaVersion()) @@ -149,13 +170,18 @@ void TBaseChangeCollector::Persist( .Build(); Self->PersistChangeRecord(db, record); - Collected.emplace_back( - record.GetOrder(), - record.GetPathId(), - record.GetBody().size(), - record.GetTableId(), - record.GetSchemaVersion() - ); + Collected.push_back(TChange{ + .Order = record.GetOrder(), + .Group = record.GetGroup(), + .Step = record.GetStep(), + .TxId = record.GetTxId(), + .PathId = record.GetPathId(), + .BodySize = record.GetBody().size(), + .TableId = record.GetTableId(), + .SchemaVersion = record.GetSchemaVersion(), + .LockId = record.GetLockId(), + .LockOffset = record.GetLockOffset(), + }); } } // NDataShard diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h index 1e582ac74e..82ba562d5d 100644 --- a/ydb/core/tx/datashard/change_collector_base.h +++ b/ydb/core/tx/datashard/change_collector_base.h @@ -12,8 +12,14 @@ namespace NKikimr { namespace NDataShard { class TDataShard; +class IDataShardUserDb; -class TBaseChangeCollector: public NMiniKQL::IChangeCollector { +class IBaseChangeCollector : public NMiniKQL::IChangeCollector { +public: + virtual void SetGroup(ui64 group) = 0; +}; + +class TBaseChangeCollector: public IBaseChangeCollector { using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange; using TSerializedCells = TDataChange::TSerializedCells; @@ -31,11 +37,13 @@ protected: void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body); public: - explicit TBaseChangeCollector(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx); + explicit TBaseChangeCollector(TDataShard* self, IDataShardUserDb& userDb, NTable::TDatabase& db, bool isImmediateTx); bool NeedToReadKeys() const override; void SetReadVersion(const TRowVersion& readVersion) override; void SetWriteVersion(const TRowVersion& writeVersion) override; + void SetWriteTxId(ui64 txId) override; + void SetGroup(ui64 group) override; const TVector<TChange>& GetCollected() const override; TVector<TChange>&& GetCollected() override; @@ -45,9 +53,11 @@ public: protected: TDataShard* Self; + IDataShardUserDb& UserDb; NTable::TDatabase& Db; TRowVersion WriteVersion; + ui64 WriteTxId = 0; TMaybe<ui64> Group; TVector<TChange> Collected; diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index 0f4c6de400..c47e343139 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -1,5 +1,6 @@ #include "change_collector_cdc_stream.h" #include "datashard_impl.h" +#include "datashard_user_db.h" #include <ydb/core/tablet_flat/flat_cxx_database.h> @@ -48,10 +49,11 @@ namespace { case ERowOp::Upsert: case ERowOp::Reset: return state; + case ERowOp::Absent: case ERowOp::Erase: return nullptr; default: - Y_FAIL_S("Unexpected row op: " << static_cast<ui8>(state->GetRowState())); + Y_FAIL_S("Unexpected row op: " << static_cast<int>(state->GetRowState())); } } @@ -96,7 +98,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates) { Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId); - const auto localTableId = Self->GetLocalTableId(tableId); auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); const auto& keyTags = userTable->KeyColumnIds; @@ -114,8 +115,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop)); } - bool read = false; - for (const auto& [pathId, stream] : userTable->CdcStreams) { if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) { continue; @@ -132,7 +131,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, case NKikimrSchemeOp::ECdcStreamModeOldImage: case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: { const auto valueTags = MakeValueTags(userTable->Columns); - const auto oldState = GetCurrentState(localTableId, key, keyTags, valueTags); + const auto oldState = GetCurrentState(tableId, key, valueTags); if (!oldState) { return false; @@ -150,7 +149,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } } - read = true; break; } default: @@ -158,23 +156,18 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } } - if (read) { - RowsCache.UpdateCachedRow(localTableId, rop, key, updates); - } - return true; } void TCdcStreamChangeCollector::Reset() { TBaseChangeCollector::Reset(); - RowsCache.Reset(); } -TMaybe<TRowState> TCdcStreamChangeCollector::GetCurrentState(ui32 tid, TArrayRef<const TRawTypeValue> key, - TArrayRef<const TTag> keyTags, TArrayRef<const TTag> valueTags) +TMaybe<TRowState> TCdcStreamChangeCollector::GetCurrentState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key, + TArrayRef<const TTag> valueTags) { TRowState row; - const auto ready = RowsCache.SelectRow(Db, tid, key, MakeTagToPos(keyTags), valueTags, row, ReadVersion); + const auto ready = UserDb.SelectRow(tableId, key, valueTags, row); if (ready == EReady::Page) { return Nothing(); @@ -198,9 +191,11 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO auto it = updates.find(tag); if (it != updates.end()) { newState.Set(pos, it->second.Op, it->second.AsCell()); - } else if (rop == ERowOp::Upsert && oldState.GetRowState() != ERowOp::Erase) { - newState.Set(pos, oldState.GetCellOp(pos), oldState.Get(pos)); + } else if (rop == ERowOp::Upsert) { + // Copy value from the old state, this also handles schema default values + newState.Set(pos, ECellOp::Set, oldState.Get(pos)); } else { + // FIXME: reset fills columns with schema defaults, which are currently always null newState.Set(pos, ECellOp::Null, TCell()); } } diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.h b/ydb/core/tx/datashard/change_collector_cdc_stream.h index 83dd37376c..a307da39d0 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.h +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.h @@ -1,15 +1,14 @@ #pragma once #include "change_collector_base.h" -#include "change_collector_helpers.h" #include "datashard_user_table.h" namespace NKikimr { namespace NDataShard { class TCdcStreamChangeCollector: public TBaseChangeCollector { - TMaybe<NTable::TRowState> GetCurrentState(ui32 tid, TArrayRef<const TRawTypeValue> key, - TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TTag> valueTags); + TMaybe<NTable::TRowState> GetCurrentState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key, + TArrayRef<const NTable::TTag> valueTags); static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop, const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates); @@ -34,7 +33,6 @@ private: TRowVersion ReadVersion; mutable TMaybe<bool> CachedNeedToReadKeys; - TRowsCache RowsCache; }; // TCdcStreamChangeCollector diff --git a/ydb/core/tx/datashard/change_collector_helpers.cpp b/ydb/core/tx/datashard/change_collector_helpers.cpp deleted file mode 100644 index 2a13eb57ff..0000000000 --- a/ydb/core/tx/datashard/change_collector_helpers.cpp +++ /dev/null @@ -1,148 +0,0 @@ -#include "change_collector_helpers.h" - -namespace NKikimr { -namespace NDataShard { - -using namespace NTable; - -namespace { - - TString SerializeKey(TArrayRef<const TRawTypeValue> key) { - TVector<TCell> cells(Reserve(key.size())); - for (const auto& k : key) { - cells.emplace_back(k.AsRef()); - } - - return TSerializedCellVec::Serialize(cells); - } - -} // anonymous - -EReady TRowsCache::SelectRow(TDatabase& db, ui32 tid, - TArrayRef<const TRawTypeValue> key, const THashMap<TTag, TPos>& keyTagToPos, - TArrayRef<const TTag> tags, TRowState& rowState, const TRowVersion& readVersion) -{ - const auto* row = FindCachedRow(tid, key); - - if (!row) { - TRowState selected; - const auto ready = db.Select(tid, key, tags, selected, 0, readVersion); - - if (ready == EReady::Page) { - return ready; - } - - row = CacheRow(tid, ready, key, tags, selected); - } - - rowState.Init(tags.size()); - rowState.Touch(row->Rop); - - for (TPos pos = 0; pos < tags.size(); ++pos) { - auto it = keyTagToPos.find(tags.at(pos)); - if (it == keyTagToPos.end()) { - continue; - } - - Y_VERIFY(it->second < key.size()); - rowState.Set(pos, ECellOp::Set, key.at(it->second).AsRef()); - } - - switch (row->Ready) { - case EReady::Data: - switch (row->Rop) { - case ERowOp::Upsert: - case ERowOp::Reset: - for (TPos pos = 0; pos < tags.size(); ++pos) { - const auto tag = tags.at(pos); - if (keyTagToPos.contains(tag)) { - continue; - } - - if (const auto* cell = row->Cells.FindPtr(tag)) { - rowState.Set(pos, ECellOp::Set, TCell(cell->data(), cell->size())); - } else { - rowState.Set(pos, ECellOp::Null, TCell()); - } - } - break; - default: - Y_FAIL("unreachable"); - } - break; - case EReady::Gone: - break; - default: - Y_FAIL("unreachable"); - } - - Y_VERIFY(rowState.IsFinalized()); - return row->Ready; -} - -void TRowsCache::UpdateCachedRow(ui32 tid, ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates) -{ - auto& row = Rows[tid][SerializeKey(key)]; - - row.Rop = rop; - if (rop == ERowOp::Erase) { - row.Ready = EReady::Gone; - row.Cells.clear(); - return; - } - - row.Ready = EReady::Data; - if (rop == ERowOp::Reset) { - row.Cells.clear(); - } - - for (const auto& update : updates) { - if (update.Value.IsEmpty()) { - continue; - } - - row.Cells[update.Tag] = update.Value.ToStringBuf(); - } -} - -void TRowsCache::Reset() { - Rows.clear(); -} - -const TRowsCache::TRow* TRowsCache::CacheRow(ui32 tid, EReady ready, - TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> tags, const TRowState& rowState) -{ - Y_VERIFY(ready != EReady::Page); - Y_VERIFY(tags.size() == rowState.Size()); - - auto& row = Rows[tid][SerializeKey(key)]; - - row.Ready = ready; - if (ready == EReady::Gone) { - row.Rop = ERowOp::Erase; - return &row; - } - - row.Rop = rowState.GetRowState(); - for (TPos pos = 0; pos < tags.size(); ++pos) { - const auto& cell = rowState.Get(pos); - if (cell.IsNull()) { - continue; - } - - row.Cells[tags.at(pos)] = cell.AsBuf(); - } - - return &row; -} - -const TRowsCache::TRow* TRowsCache::FindCachedRow(ui32 tid, TArrayRef<const TRawTypeValue> key) const { - const auto* cached = Rows.FindPtr(tid); - return cached - ? cached->FindPtr(SerializeKey(key)) - : nullptr; -} - -} // NDataShard -} // NKikimr diff --git a/ydb/core/tx/datashard/change_collector_helpers.h b/ydb/core/tx/datashard/change_collector_helpers.h deleted file mode 100644 index 54bba063cd..0000000000 --- a/ydb/core/tx/datashard/change_collector_helpers.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include <ydb/core/tablet_flat/flat_database.h> - -#include <util/generic/hash.h> - -namespace NKikimr { -namespace NDataShard { - -class TRowsCache { - struct TRow { - NTable::EReady Ready; - NTable::ERowOp Rop; - THashMap<NTable::TTag, TString> Cells; - }; - - const TRow* CacheRow(ui32 tid, NTable::EReady ready, - TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> tags, const NTable::TRowState& rowState); - - const TRow* FindCachedRow(ui32 tid, TArrayRef<const TRawTypeValue> key) const; - -public: - NTable::EReady SelectRow(NTable::TDatabase& db, ui32 tid, - TArrayRef<const TRawTypeValue> key, const THashMap<NTable::TTag, NTable::TPos>& keyTagToPos, - TArrayRef<const NTable::TTag> tags, NTable::TRowState& rowState, const TRowVersion& readVersion); - - void UpdateCachedRow(ui32 tid, NTable::ERowOp rop, - TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates); - - void Reset(); - -private: - THashMap<ui32, THashMap<TString, TRow>> Rows; - -}; // TRowsCache - -} // NDataShard -} // NKikimr diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 622b6fcb4a..ad96a77331 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -268,6 +268,8 @@ void TChangeRecord::Out(IOutputStream& out) const { << " Body: " << Body.size() << "b" << " TableId: " << TableId << " SchemaVersion: " << SchemaVersion + << " LockId: " << LockId + << " LockOffset: " << LockOffset << " }"; } @@ -275,6 +277,16 @@ TChangeRecordBuilder::TChangeRecordBuilder(EKind kind) { Record.Kind = kind; } +TChangeRecordBuilder& TChangeRecordBuilder::WithLockId(ui64 lockId) { + Record.LockId = lockId; + return *this; +} + +TChangeRecordBuilder& TChangeRecordBuilder::WithLockOffset(ui64 lockOffset) { + Record.LockOffset = lockOffset; + return *this; +} + TChangeRecordBuilder& TChangeRecordBuilder::WithOrder(ui64 order) { Record.Order = order; return *this; diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index db7d920ddb..5c4f746942 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -33,6 +33,8 @@ public: ui64 GetGroup() const { return Group; } ui64 GetStep() const { return Step; } ui64 GetTxId() const { return TxId; } + ui64 GetLockId() const { return LockId; } + ui64 GetLockOffset() const { return LockOffset; } const TPathId& GetPathId() const { return PathId; } EKind GetKind() const { return Kind; } const TString& GetBody() const { return Body; } @@ -51,10 +53,12 @@ public: void Out(IOutputStream& out) const; private: - ui64 Order; - ui64 Group; - ui64 Step; - ui64 TxId; + ui64 Order = Max<ui64>(); + ui64 Group = 0; + ui64 Step = 0; + ui64 TxId = 0; + ui64 LockId = 0; + ui64 LockOffset = 0; TPathId PathId; EKind Kind; TString Body; @@ -74,6 +78,9 @@ class TChangeRecordBuilder { public: explicit TChangeRecordBuilder(EKind kind); + TChangeRecordBuilder& WithLockId(ui64 lockId); + TChangeRecordBuilder& WithLockOffset(ui64 lockOffset); + TChangeRecordBuilder& WithOrder(ui64 order); TChangeRecordBuilder& WithGroup(ui64 group); TChangeRecordBuilder& WithStep(ui64 step); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a60dc40a2b..eeb299a46b 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -512,8 +512,9 @@ void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, TEvDat stats.SetCpuTimeUsec(totalCpuTime.MicroSeconds()); } -ui64 TDataShard::AllocateChangeRecordOrder(NIceDb::TNiceDb& db) { - const ui64 result = NextChangeRecordOrder++; +ui64 TDataShard::AllocateChangeRecordOrder(NIceDb::TNiceDb& db, ui64 count) { + const ui64 result = NextChangeRecordOrder; + NextChangeRecordOrder = result + count; PersistSys(db, Schema::Sys_NextChangeRecordOrder, NextChangeRecordOrder); return result; @@ -529,24 +530,83 @@ ui64 TDataShard::AllocateChangeRecordGroup(NIceDb::TNiceDb& db) { return result; } +ui64 TDataShard::GetNextChangeRecordLockOffset(ui64 lockId) { + auto it = LockChangeRecords.find(lockId); + if (it == LockChangeRecords.end() || it->second.empty()) { + return 0; + } + + return it->second.back().LockOffset + 1; +} + void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PersistChangeRecord" << ": record: " << record << ", at tablet: " << TabletID()); - db.Table<Schema::ChangeRecords>().Key(record.GetOrder()).Update( - NIceDb::TUpdate<Schema::ChangeRecords::Group>(record.GetGroup()), - NIceDb::TUpdate<Schema::ChangeRecords::PlanStep>(record.GetStep()), - NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()), - NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId), - NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId), - NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()), - NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()), - NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId), - NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); - db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update( - NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()), - NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody())); + if (record.GetLockId() == 0) { + db.Table<Schema::ChangeRecords>().Key(record.GetOrder()).Update( + NIceDb::TUpdate<Schema::ChangeRecords::Group>(record.GetGroup()), + NIceDb::TUpdate<Schema::ChangeRecords::PlanStep>(record.GetStep()), + NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()), + NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId), + NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId), + NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()), + NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()), + NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId), + NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); + db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update( + NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()), + NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody())); + } else { + db.Table<Schema::LockChangeRecords>().Key(record.GetLockId(), record.GetLockOffset()).Update( + NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(record.GetPathId().OwnerId), + NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(record.GetPathId().LocalPathId), + NIceDb::TUpdate<Schema::LockChangeRecords::BodySize>(record.GetBody().size()), + NIceDb::TUpdate<Schema::LockChangeRecords::SchemaVersion>(record.GetSchemaVersion()), + NIceDb::TUpdate<Schema::LockChangeRecords::TableOwnerId>(record.GetTableId().OwnerId), + NIceDb::TUpdate<Schema::LockChangeRecords::TablePathId>(record.GetTableId().LocalPathId)); + db.Table<Schema::LockChangeRecordDetails>().Key(record.GetLockId(), record.GetLockOffset()).Update( + NIceDb::TUpdate<Schema::LockChangeRecordDetails::Kind>(record.GetKind()), + NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody())); + } +} + +void TDataShard::PersistCommitLockChangeRecords(TTransactionContext& txc, ui64 order, ui64 lockId, ui64 group, const TRowVersion& rowVersion) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PersistCommitLockChangeRecords" + << ": order# " << order + << ", lockId# " << lockId + << ", group# " << group + << ", version# " << rowVersion + << ", at tablet: " << TabletID()); + + auto it = LockChangeRecords.find(lockId); + Y_VERIFY_S(it != LockChangeRecords.end() && !it->second.empty(), "Cannot commit lock " << lockId << " change records: there are no pending change records"); + + auto& entry = CommittedLockChangeRecords[lockId]; + Y_VERIFY_S(entry.Order == Max<ui64>(), "Cannot commit lock " << lockId << " change records multiple times"); + entry.Order = order; + entry.Group = group; + entry.Step = rowVersion.Step; + entry.TxId = rowVersion.TxId; + entry.Count = it->second.size(); + + NIceDb::TNiceDb db(txc.DB); + + db.Table<Schema::ChangeRecordCommits>().Key(order).Update( + NIceDb::TUpdate<Schema::ChangeRecordCommits::LockId>(lockId), + NIceDb::TUpdate<Schema::ChangeRecordCommits::Group>(group), + NIceDb::TUpdate<Schema::ChangeRecordCommits::PlanStep>(rowVersion.Step), + NIceDb::TUpdate<Schema::ChangeRecordCommits::TxId>(rowVersion.TxId)); + + txc.OnCommit([this, lockId]() { + // We expect operation to enqueue transformed change records, + // so we no longer need original uncommitted records. + LockChangeRecords.erase(lockId); + }); + txc.OnRollback([this, lockId]() { + CommittedLockChangeRecords.erase(lockId); + }); } void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId) { @@ -560,14 +620,23 @@ void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(pathId.LocalPathId)); } +void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId) { + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "MoveChangeRecord" + << ": lockId: " << lockId + << ", lockOffset: " << lockOffset + << ": pathId: " << pathId + << ", at tablet: " << TabletID()); + + db.Table<Schema::LockChangeRecords>().Key(lockId, lockOffset).Update( + NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(pathId.OwnerId), + NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(pathId.LocalPathId)); +} + void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "RemoveChangeRecord" << ": order: " << order << ", at tablet: " << TabletID()); - db.Table<Schema::ChangeRecords>().Key(order).Delete(); - db.Table<Schema::ChangeRecordDetails>().Key(order).Delete(); - auto it = ChangesQueue.find(order); if (it == ChangesQueue.end()) { Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order); @@ -576,6 +645,24 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { const auto& record = it->second; + if (record.LockId) { + db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete(); + db.Table<Schema::LockChangeRecordDetails>().Key(record.LockId, record.LockOffset).Delete(); + // Delete ChangeRecordCommits row when the last record is removed + auto it = CommittedLockChangeRecords.find(record.LockId); + if (it != CommittedLockChangeRecords.end()) { + Y_VERIFY_DEBUG(it->second.Count > 0); + if (it->second.Count > 0 && 0 == --it->second.Count) { + db.Table<Schema::ChangeRecordCommits>().Key(it->second.Order).Delete(); + CommittedLockChangeRecords.erase(it); + LockChangeRecords.erase(record.LockId); + } + } + } else { + db.Table<Schema::ChangeRecords>().Key(order).Delete(); + db.Table<Schema::ChangeRecordDetails>().Key(order).Delete(); + } + Y_VERIFY(record.BodySize <= ChangesQueueBytes); ChangesQueueBytes -= record.BodySize; @@ -625,15 +712,15 @@ void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChang TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); for (const auto& record : records) { - forward.emplace_back(record.Order(), record.PathId(), record.BodySize()); + forward.emplace_back(record.Order, record.PathId, record.BodySize); - if (auto res = ChangesQueue.emplace(record.Order(), record); res.second) { - Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize())); - ChangesQueueBytes += record.BodySize(); + if (auto res = ChangesQueue.emplace(record.Order, record); res.second) { + Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize)); + ChangesQueueBytes += record.BodySize; - if (record.SchemaVersion()) { + if (record.SchemaVersion) { res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference( - TSchemaSnapshotKey(record.TableId(), record.SchemaVersion())); + TSchemaSnapshotKey(record.TableId, record.SchemaVersion)); } } } @@ -645,6 +732,48 @@ void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChang Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); } +void TDataShard::AddLockChangeRecords(ui64 lockId, TVector<NMiniKQL::IChangeCollector::TChange>&& records) { + if (!records) { + return; + } + + auto orderedByLockOffset = [](auto& records) -> bool { + auto it = records.begin(); + ui64 prevOffset = it->LockOffset; + while (++it != records.end()) { + ui64 offset = it->LockOffset; + if (!(prevOffset < offset)) { + return false; + } + } + return true; + }; + Y_VERIFY_DEBUG(orderedByLockOffset(records)); + + auto& lockChanges = LockChangeRecords[lockId]; + if (lockChanges.empty()) { + lockChanges = std::move(records); + return; + } + + Y_VERIFY_DEBUG(lockChanges.back().LockOffset < records.front().LockOffset); + + lockChanges.reserve(lockChanges.size() + records.size()); + for (auto& record : records) { + lockChanges.emplace_back(std::move(record)); + } +} + +const TVector<NMiniKQL::IChangeCollector::TChange>& TDataShard::GetLockChangeRecords(ui64 lockId) const { + auto it = LockChangeRecords.find(lockId); + if (it == LockChangeRecords.end()) { + static TVector<NMiniKQL::IChangeCollector::TChange> empty; + return empty; + } + + return it->second; +} + void TDataShard::CreateChangeSender(const TActorContext& ctx) { Y_VERIFY(!OutChangeSender); OutChangeSender = Register(NDataShard::CreateChangeSender(this)); @@ -727,6 +856,9 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang while (!rowset.EndOfSet()) { const ui64 order = rowset.GetValue<Schema::ChangeRecords::Order>(); + const ui64 group = rowset.GetValue<Schema::ChangeRecords::Group>(); + const ui64 step = rowset.GetValue<Schema::ChangeRecords::PlanStep>(); + const ui64 txId = rowset.GetValue<Schema::ChangeRecords::TxId>(); const ui64 bodySize = rowset.GetValue<Schema::ChangeRecords::BodySize>(); const ui64 schemaVersion = rowset.GetValue<Schema::ChangeRecords::SchemaVersion>(); const auto pathId = TPathId( @@ -738,7 +870,63 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang rowset.GetValue<Schema::ChangeRecords::TablePathId>() ); - records.emplace_back(order, pathId, bodySize, tableId, schemaVersion); + records.push_back(NMiniKQL::IChangeCollector::TChange{ + .Order = order, + .Group = group, + .Step = step, + .TxId = txId, + .PathId = pathId, + .BodySize = bodySize, + .TableId = tableId, + .SchemaVersion = schemaVersion, + }); + + if (!rowset.Next()) { + return false; + } + } + + return true; +} + +bool TDataShard::LoadLockChangeRecords(NIceDb::TNiceDb& db) { + using Schema = TDataShard::Schema; + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadLockChangeRecords" + << " at tablet: " << TabletID()); + + auto rowset = db.Table<Schema::LockChangeRecords>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + const ui64 lockId = rowset.GetValue<Schema::LockChangeRecords::LockId>(); + const ui64 lockOffset = rowset.GetValue<Schema::LockChangeRecords::LockOffset>(); + const ui64 bodySize = rowset.GetValue<Schema::LockChangeRecords::BodySize>(); + const ui64 schemaVersion = rowset.GetValue<Schema::LockChangeRecords::SchemaVersion>(); + const auto pathId = TPathId( + rowset.GetValue<Schema::LockChangeRecords::PathOwnerId>(), + rowset.GetValue<Schema::LockChangeRecords::LocalPathId>() + ); + const auto tableId = TPathId( + rowset.GetValue<Schema::LockChangeRecords::TableOwnerId>(), + rowset.GetValue<Schema::LockChangeRecords::TablePathId>() + ); + + LockChangeRecords[lockId].push_back(NMiniKQL::IChangeCollector::TChange{ + .Order = Max<ui64>(), + .Group = 0, + .Step = 0, + .TxId = 0, + .PathId = pathId, + .BodySize = bodySize, + .TableId = tableId, + .SchemaVersion = schemaVersion, + .LockId = lockId, + .LockOffset = lockOffset, + }); + if (!rowset.Next()) { return false; } @@ -747,6 +935,100 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang return true; } +bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records) { + using Schema = TDataShard::Schema; + + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecordCommits" + << " at tablet: " << TabletID()); + + bool needSort = false; + + auto rowset = db.Table<Schema::ChangeRecordCommits>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + while (!rowset.EndOfSet()) { + const ui64 order = rowset.GetValue<Schema::ChangeRecordCommits::Order>(); + const ui64 lockId = rowset.GetValue<Schema::ChangeRecordCommits::LockId>(); + const ui64 group = rowset.GetValue<Schema::ChangeRecordCommits::Group>(); + const ui64 step = rowset.GetValue<Schema::ChangeRecordCommits::PlanStep>(); + const ui64 txId = rowset.GetValue<Schema::ChangeRecordCommits::TxId>(); + + auto& entry = CommittedLockChangeRecords[lockId]; + entry.Order = order; + entry.Group = group; + entry.Step = step; + entry.TxId = txId; + + for (auto& record : LockChangeRecords[lockId]) { + records.push_back(NMiniKQL::IChangeCollector::TChange{ + .Order = order + record.LockOffset, + .Group = group, + .Step = step, + .TxId = txId, + .PathId = record.PathId, + .BodySize = record.BodySize, + .TableId = record.TableId, + .SchemaVersion = record.SchemaVersion, + .LockId = record.LockId, + .LockOffset = record.LockOffset, + }); + entry.Count++; + needSort = true; + } + + LockChangeRecords.erase(lockId); + + if (!rowset.Next()) { + return false; + } + } + + if (needSort) { + std::sort(records.begin(), records.end(), [](const auto& a, const auto& b) -> bool { + return a.Order < b.Order; + }); + } + + return true; +} + +void TDataShard::ScheduleRemoveLockChanges(ui64 lockId) { + if (LockChangeRecords.contains(lockId) && !CommittedLockChangeRecords.contains(lockId)) { + bool wasEmpty = PendingLockChangeRecordsToRemove.empty(); + PendingLockChangeRecordsToRemove.push_back(lockId); + if (wasEmpty) { + Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords); + } + } +} + +void TDataShard::ScheduleRemoveAbandonedLockChanges() { + bool wasEmpty = PendingLockChangeRecordsToRemove.empty(); + + for (const auto& pr : LockChangeRecords) { + ui64 lockId = pr.first; + + if (CommittedLockChangeRecords.contains(lockId)) { + // Skip committed lock changes + continue; + } + + auto lock = SysLocksTable().GetRawLock(lockId); + if (lock && lock->IsPersistent()) { + // Skip lock changes attached to persistent locks + continue; + } + + PendingLockChangeRecordsToRemove.push_back(lockId); + } + + if (wasEmpty && !PendingLockChangeRecordsToRemove.empty()) { + Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords); + } +} + void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) { db.Table<Schema::SchemaOperations>().Key(op.TxId).Update( NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success), diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index dd99d11cee..939a9bbcab 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -1,5 +1,6 @@ #include "change_collector.h" #include "datashard_impl.h" +#include "datashard_user_db.h" #include "datashard__engine_host.h" #include "sys_tables.h" @@ -191,7 +192,7 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) { } /// -class TDataShardEngineHost : public TEngineHost { +class TDataShardEngineHost : public TEngineHost, public IDataShardUserDb { public: TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now) : TEngineHost(db, counters, @@ -205,7 +206,24 @@ public: , LockTxId(lockTxId) , LockNodeId(lockNodeId) , Now(now) - {} + { + } + + NTable::EReady SelectRow( + const TTableId& tableId, + TArrayRef<const TRawTypeValue> key, + TArrayRef<const NTable::TTag> tags, + NTable::TRowState& row) override + { + auto tid = LocalTableId(tableId); + + return DB.Select( + tid, key, tags, row, + /* readFlags */ 0, + ReadVersion, + GetReadTxMap(tableId), + GetReadTxObserver(tableId)); + } void SetWriteVersion(TRowVersion writeVersion) { WriteVersion = writeVersion; @@ -235,7 +253,7 @@ public: IsRepeatableSnapshot = true; } - IChangeCollector* GetChangeCollector(const TTableId& tableId) const override { + IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override { auto it = ChangeCollectors.find(tableId); if (it != ChangeCollectors.end()) { return it->second.Get(); @@ -246,10 +264,33 @@ public: return it->second.Get(); } - it->second.Reset(CreateChangeCollector(*Self, DB, tableId.PathId.LocalPathId, IsImmediateTx)); + it->second.Reset(CreateChangeCollector(*Self, *const_cast<TDataShardEngineHost*>(this), DB, tableId.PathId.LocalPathId, IsImmediateTx)); return it->second.Get(); } + void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) { + auto localTid = Self->GetLocalTableId(tableId); + Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self->TabletID()); + + if (!DB.HasOpenTx(localTid, lockId)) { + return; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, + "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID()); + DB.CommitTx(localTid, lockId, writeVersion); + + if (!CommittedLockChanges.contains(lockId)) { + if (const auto& lockChanges = Self->GetLockChangeRecords(lockId)) { + if (auto* collector = GetChangeCollector(tableId)) { + collector->SetWriteVersion(WriteVersion); + collector->CommitLockChanges(lockId, lockChanges, txc); + CommittedLockChanges.insert(lockId); + } + } + } + } + TVector<IChangeCollector::TChange> GetCollectedChanges() const { TVector<IChangeCollector::TChange> total; @@ -636,7 +677,8 @@ private: TInstant Now; TRowVersion WriteVersion = TRowVersion::Max(); TRowVersion ReadVersion = TRowVersion::Min(); - mutable THashMap<TTableId, THolder<IChangeCollector>> ChangeCollectors; + THashSet<ui64> CommittedLockChanges; + mutable THashMap<TTableId, THolder<IDataShardChangeCollector>> ChangeCollectors; mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps; mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers; }; @@ -823,6 +865,13 @@ void TEngineBay::SetIsRepeatableSnapshot() { host->SetIsRepeatableSnapshot(); } +void TEngineBay::CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + host->CommitChanges(tableId, lockId, writeVersion, txc); +} + TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const { Y_VERIFY(EngineHost); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 588d7d940c..9cb91f7f7a 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -100,6 +100,8 @@ public: void SetIsImmediateTx(); void SetIsRepeatableSnapshot(); + void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc); + TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const; void ResetCollectedChanges(); diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp index 3f2d579d6a..626b9afa5b 100644 --- a/ydb/core/tx/datashard/datashard__init.cpp +++ b/ydb/core/tx/datashard/datashard__init.cpp @@ -33,6 +33,8 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext& Self->KillChangeSender(ctx); Self->ChangesQueue.clear(); + Self->LockChangeRecords.clear(); + Self->CommittedLockChangeRecords.clear(); ChangeRecords.clear(); bool done = ReadEverything(txc); @@ -418,6 +420,18 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { } } + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::LockChangeRecords::TableId)) { + if (!Self->LoadLockChangeRecords(db)) { + return false; + } + } + + if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecordCommits::TableId)) { + if (!Self->LoadChangeRecordCommits(db, ChangeRecords)) { + return false; + } + } + Self->ReplicatedTables.clear(); if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ReplicationSources::TableId)) { auto rowset = db.Table<Schema::ReplicationSources>().Select(); @@ -502,6 +516,8 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) { Self->SubscribeNewLocks(); + Self->ScheduleRemoveAbandonedLockChanges(); + return true; } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index f2b882e785..ce629bb4b2 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -180,6 +180,10 @@ public: void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); } void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); } + void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) { + EngineBay.CommitChanges(tableId, lockId, writeVersion, txc); + } + TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); } void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); } diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 7e532420ba..aeac9bc5a6 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -16,14 +16,24 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { for (const auto& [_, records] : Self->ChangeRecordsRequested) { for (const auto& record : records) { + auto itQueue = Self->ChangesQueue.find(record.Order); + if (itQueue == Self->ChangesQueue.end()) { + continue; + } + if (bodiesSize && (bodiesSize + record.BodySize) > MemLimit) { break; } bodiesSize += record.BodySize; - ok = ok && db.Table<Schema::ChangeRecords>().Key(record.Order).Precharge(); - ok = ok && db.Table<Schema::ChangeRecordDetails>().Key(record.Order).Precharge(); + if (itQueue->second.LockId) { + ok = ok && db.Table<Schema::LockChangeRecords>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Precharge(); + ok = ok && db.Table<Schema::LockChangeRecordDetails>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Precharge(); + } else { + ok = ok && db.Table<Schema::ChangeRecords>().Key(record.Order).Precharge(); + ok = ok && db.Table<Schema::ChangeRecordDetails>().Key(record.Order).Precharge(); + } } } @@ -38,59 +48,130 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { auto it = records.begin(); while (it != records.end()) { + auto itQueue = Self->ChangesQueue.find(it->Order); + if (itQueue == Self->ChangesQueue.end()) { + RecordsToForget[recipient].emplace_back(it->Order); + it = records.erase(it); + continue; + } + if (MemUsage && (MemUsage + it->BodySize) > MemLimit) { break; } - auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select(); - auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select(); + if (itQueue->second.LockId) { + auto itCommit = Self->CommittedLockChangeRecords.find(itQueue->second.LockId); + if (itCommit == Self->CommittedLockChangeRecords.end()) { + Y_VERIFY_DEBUG_S(false, "Unexpected change record " << it->Order << " from an uncommitted lock " << itQueue->second.LockId); + RecordsToForget[recipient].emplace_back(it->Order); + it = records.erase(it); + continue; + } - if (!basic.IsReady() || !details.IsReady()) { - return false; - } + auto basic = db.Table<Schema::LockChangeRecords>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Select(); + auto details = db.Table<Schema::LockChangeRecordDetails>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Select(); - if (!basic.IsValid() && !details.IsValid()) { - RecordsToForget[recipient].emplace_back(it->Order); - it = records.erase(it); - continue; - } + if (!basic.IsReady() || !details.IsReady()) { + return false; + } + + if (!basic.IsValid() && !details.IsValid()) { + RecordsToForget[recipient].emplace_back(it->Order); + it = records.erase(it); + continue; + } + + Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details" + << ", basic.IsValid: " << basic.IsValid() + << ", details.IsValid: " << details.IsValid() + << ", recipient: " << recipient + << ", records.size: " << records.size() + << ", it->Order: " << it->Order + << ", it->BodySize: " << it->BodySize + << ", LockId: " << itQueue->second.LockId + << ", LockOffset: " << itQueue->second.LockOffset); + + const auto schemaVersion = basic.GetValue<Schema::LockChangeRecords::SchemaVersion>(); + const auto tableId = TPathId( + basic.GetValue<Schema::LockChangeRecords::TableOwnerId>(), + basic.GetValue<Schema::LockChangeRecords::TablePathId>() + ); + + TUserTable::TCPtr schema; + if (schemaVersion) { + const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion); + if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) { + schema = snapshot->Schema; + } + } + + RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>()) + .WithOrder(it->Order) + .WithGroup(itCommit->second.Group) + .WithStep(itCommit->second.Step) + .WithTxId(itCommit->second.TxId) + .WithPathId(TPathId( + basic.GetValue<Schema::LockChangeRecords::PathOwnerId>(), + basic.GetValue<Schema::LockChangeRecords::LocalPathId>() + )) + .WithTableId(tableId) + .WithSchemaVersion(schemaVersion) + .WithSchema(schema) + .WithBody(details.GetValue<Schema::LockChangeRecordDetails::Body>()) + .WithLockId(itQueue->second.LockId) + .WithLockOffset(itQueue->second.LockOffset) + .Build()); + } else { + auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select(); + auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select(); + + if (!basic.IsReady() || !details.IsReady()) { + return false; + } + + if (!basic.IsValid() && !details.IsValid()) { + RecordsToForget[recipient].emplace_back(it->Order); + it = records.erase(it); + continue; + } - Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details" - << ", basic.IsValid: " << basic.IsValid() - << ", details.IsValid: " << details.IsValid() - << ", recipient: " << recipient - << ", records.size: " << records.size() - << ", it->Order: " << it->Order - << ", it->BodySize: " << it->BodySize); - - const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>(); - const auto tableId = TPathId( - basic.GetValue<Schema::ChangeRecords::TableOwnerId>(), - basic.GetValue<Schema::ChangeRecords::TablePathId>() - ); - - TUserTable::TCPtr schema; - if (schemaVersion) { - const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion); - if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) { - schema = snapshot->Schema; + Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details" + << ", basic.IsValid: " << basic.IsValid() + << ", details.IsValid: " << details.IsValid() + << ", recipient: " << recipient + << ", records.size: " << records.size() + << ", it->Order: " << it->Order + << ", it->BodySize: " << it->BodySize); + + const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>(); + const auto tableId = TPathId( + basic.GetValue<Schema::ChangeRecords::TableOwnerId>(), + basic.GetValue<Schema::ChangeRecords::TablePathId>() + ); + + TUserTable::TCPtr schema; + if (schemaVersion) { + const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion); + if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) { + schema = snapshot->Schema; + } } + + RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>()) + .WithOrder(it->Order) + .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>()) + .WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>()) + .WithTxId(basic.GetValue<Schema::ChangeRecords::TxId>()) + .WithPathId(TPathId( + basic.GetValue<Schema::ChangeRecords::PathOwnerId>(), + basic.GetValue<Schema::ChangeRecords::LocalPathId>() + )) + .WithTableId(tableId) + .WithSchemaVersion(schemaVersion) + .WithSchema(schema) + .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) + .Build()); } - - RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>()) - .WithOrder(it->Order) - .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>()) - .WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>()) - .WithTxId(basic.GetValue<Schema::ChangeRecords::TxId>()) - .WithPathId(TPathId( - basic.GetValue<Schema::ChangeRecords::PathOwnerId>(), - basic.GetValue<Schema::ChangeRecords::LocalPathId>() - )) - .WithTableId(tableId) - .WithSchemaVersion(schemaVersion) - .WithSchema(schema) - .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) - .Build()); MemUsage += it->BodySize; it = records.erase(it); diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp index 3501f8d7cb..1e2dbf816c 100644 --- a/ydb/core/tx/datashard/datashard_common_upload.cpp +++ b/ydb/core/tx/datashard/datashard_common_upload.cpp @@ -1,5 +1,6 @@ #include "change_collector.h" #include "datashard_common_upload.h" +#include "datashard_user_db.h" namespace NKikimr { namespace NDataShard { @@ -59,8 +60,10 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId); + TDataShardUserDb userDb(*self, txc.DB, readVersion); + if (CollectChanges) { - ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true)); + ChangeCollector.Reset(CreateChangeCollector(*self, userDb, txc.DB, tableInfo, true)); } if (ChangeCollector) { diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp index 2014d71bc2..1862e7d81e 100644 --- a/ydb/core/tx/datashard/datashard_direct_erase.cpp +++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp @@ -1,5 +1,6 @@ #include "change_collector.h" #include "datashard_direct_erase.h" +#include "datashard_user_db.h" #include "erase_rows_condition.h" #include <ydb/core/base/appdata.h> @@ -58,6 +59,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( } } + std::optional<TDataShardUserDb> userDb; + THolder<IEraseRowsCondition> condition; if (params) { condition.Reset(CreateEraseRowsCondition(request)); @@ -65,7 +68,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute( condition->Prepare(params.Txc->DB.GetRowScheme(localTableId), 0); } - params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, params.Txc->DB, tableInfo, true)); + userDb.emplace(*self, params.Txc->DB, params.ReadVersion); + params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, params.Txc->DB, tableInfo, true)); } if (auto collector = params.GetChangeCollector()) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 01333c30e6..f0ec337de9 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -214,6 +214,7 @@ class TDataShard class TTxPersistFullCompactionTs; class TTxRemoveLock; class TTxGetOpenTxs; + class TTxRemoveLockChangeRecords; template <typename T> friend class TTxDirectBase; class TTxUploadRows; @@ -317,6 +318,7 @@ class TDataShard EvRemoveChangeRecords, EvReplicationSourceOffsets, EvMediatorRestoreBackup, + EvRemoveLockChangeRecords, EvEnd }; @@ -458,6 +460,8 @@ class TDataShard }; struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {}; + + struct TEvRemoveLockChangeRecords : public TEventLocal<TEvRemoveLockChangeRecords, EvRemoveLockChangeRecords> {}; }; struct Schema : NIceDb::Schema { @@ -816,12 +820,65 @@ class TDataShard using TColumns = TableColumns<LockId, ConflictId>; }; + struct LockChangeRecords : Table<101> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {}; + struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {}; + struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {}; + struct BodySize : Column<5, NScheme::NTypeIds::Uint64> {}; + struct SchemaVersion : Column<6, NScheme::NTypeIds::Uint64> {}; + struct TableOwnerId : Column<7, NScheme::NTypeIds::Uint64> {}; + struct TablePathId : Column<8, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<LockId, LockOffset>; + using TColumns = TableColumns< + LockId, + LockOffset, + PathOwnerId, + LocalPathId, + BodySize, + SchemaVersion, + TableOwnerId, + TablePathId + >; + }; + + struct LockChangeRecordDetails : Table<102> { + struct LockId : Column<1, NScheme::NTypeIds::Uint64> {}; + struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Kind : Column<3, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; }; + struct Body : Column<4, NScheme::NTypeIds::String> { using Type = TString; }; + + using TKey = TableKey<LockId, LockOffset>; + using TColumns = TableColumns<LockId, LockOffset, Kind, Body>; + }; + + // Maps [Order ... Order+N-1] change records in the shard order + // to [0 ... N-1] change records from LockId + struct ChangeRecordCommits : Table<103> { + struct Order : Column<1, NScheme::NTypeIds::Uint64> {}; + struct LockId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct Group : Column<3, NScheme::NTypeIds::Uint64> {}; + struct PlanStep : Column<4, NScheme::NTypeIds::Uint64> {}; + struct TxId : Column<5, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<Order>; + using TColumns = TableColumns< + Order, + LockId, + Group, + PlanStep, + TxId + >; + }; + using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue, DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress, Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts, SrcChangeSenderActivations, DstChangeSenderActivations, ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived, - UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts>; + UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts, + LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits>; // These settings are persisted on each Init. So we use empty settings in order not to overwrite what // was changed by the user @@ -1076,6 +1133,8 @@ class TDataShard void Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr& ev, const TActorContext& ctx); + void HandleByReplicationSourceOffsetsServer(STATEFN_SIG); void DoPeriodicTasks(const TActorContext &ctx); @@ -1517,18 +1576,27 @@ public: void PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid); - ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db); + ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db, ui64 count = 1); ui64 AllocateChangeRecordGroup(NIceDb::TNiceDb& db); + ui64 GetNextChangeRecordLockOffset(ui64 lockId); void PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record); + void PersistCommitLockChangeRecords(TTransactionContext& txc, ui64 order, ui64 lockId, ui64 group, const TRowVersion& rowVersion); void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId); + void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); void EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records); + void AddLockChangeRecords(ui64 lockId, TVector<NMiniKQL::IChangeCollector::TChange>&& records); + const TVector<NMiniKQL::IChangeCollector::TChange>& GetLockChangeRecords(ui64 lockId) const; void CreateChangeSender(const TActorContext& ctx); void KillChangeSender(const TActorContext& ctx); void MaybeActivateChangeSender(const TActorContext& ctx); void SuspendChangeSender(const TActorContext& ctx); const TActorId& GetChangeSender() const { return OutChangeSender; } bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records); + bool LoadLockChangeRecords(NIceDb::TNiceDb& db); + bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records); + void ScheduleRemoveLockChanges(ui64 lockId); + void ScheduleRemoveAbandonedLockChanges(); static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op); @@ -2281,17 +2349,23 @@ private: TPathId TableId; ui64 SchemaVersion; bool SchemaSnapshotAcquired; + ui64 LockId; + ui64 LockOffset; - explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion) + explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, + ui64 schemaVersion, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) + , LockId(lockId) + , LockOffset(lockOffset) { } explicit TEnqueuedRecord(const NMiniKQL::IChangeCollector::TChange& record) - : TEnqueuedRecord(record.BodySize(), record.TableId(), record.SchemaVersion()) + : TEnqueuedRecord(record.BodySize, record.TableId, + record.SchemaVersion, record.LockId, record.LockOffset) { } }; @@ -2313,6 +2387,20 @@ private: TActorId OutChangeSender; bool OutChangeSenderSuspended = false; + struct TCommittedLockChangeRecords { + ui64 Order = Max<ui64>(); + ui64 Group; + ui64 Step; + ui64 TxId; + + // The number of records that are not deleted yet + size_t Count = 0; + }; + + THashMap<ui64, TVector<NMiniKQL::IChangeCollector::TChange>> LockChangeRecords; // ui64 is lock id + THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id + TVector<ui64> PendingLockChangeRecordsToRemove; + // in THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id @@ -2355,6 +2443,35 @@ private: NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver; +public: + auto& GetLockChangeRecords() { + return LockChangeRecords; + } + + auto TakeLockChangeRecords() { + auto result = std::move(LockChangeRecords); + LockChangeRecords.clear(); + return result; + } + + void SetLockChangeRecords(THashMap<ui64, TVector<NMiniKQL::IChangeCollector::TChange>>&& lockChangeRecords) { + LockChangeRecords = std::move(lockChangeRecords); + } + + auto& GetCommittedLockChangeRecords() { + return CommittedLockChangeRecords; + } + + auto TakeCommittedLockChangeRecords() { + auto result = std::move(CommittedLockChangeRecords); + CommittedLockChangeRecords.clear(); + return result; + } + + void SetCommittedLockChangeRecords(THashMap<ui64, TCommittedLockChangeRecords>&& committedLockChangeRecords) { + CommittedLockChangeRecords = std::move(committedLockChangeRecords); + } + protected: // Redundant init state required by flat executor implementation void StateInit(TAutoPtr<NActors::IEventHandle> &ev, const NActors::TActorContext &ctx) { @@ -2379,6 +2496,7 @@ protected: HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle); HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle); HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle); + HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); HFuncTraced(TEvents::TEvPoisonPill, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { @@ -2494,6 +2612,7 @@ protected: fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer); HFunc(TEvLongTxService::TEvLockStatus, Handle); HFunc(TEvDataShard::TEvGetOpenTxs, Handle); + HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle); default: if (!HandleDefaultEvents(ev, ctx)) { LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index fa574570ae..fbde3b04fb 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -709,16 +709,9 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writ sysLocks.CommitLock(lockKey); TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); - auto localTid = dataShard.GetLocalTableId(tableId); - Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin); - auto txId = lockProto.GetLockId(); - if (!txc.DB.HasOpenTx(localTid, txId)) { - continue; - } - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid); - txc.DB.CommitTx(localTid, txId, writeVersion); + tx->GetDataTx()->CommitChanges(tableId, txId, writeVersion, txc); } } else { KqpEraseLocks(origin, tx, sysLocks); diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp index 3c74047d8b..bb2a1ad6ef 100644 --- a/ydb/core/tx/datashard/datashard_locks_db.cpp +++ b/ydb/core/tx/datashard/datashard_locks_db.cpp @@ -122,6 +122,8 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) { NIceDb::TNiceDb db(DB); db.Table<Schema::Locks>().Key(lockId).Delete(); HasChanges_ = true; + + Self.ScheduleRemoveLockChanges(lockId); } void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) { diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp new file mode 100644 index 0000000000..fccc69d2e7 --- /dev/null +++ b/ydb/core/tx/datashard/datashard_user_db.cpp @@ -0,0 +1,17 @@ +#include "datashard_user_db.h" + +namespace NKikimr::NDataShard { + +NTable::EReady TDataShardUserDb::SelectRow( + const TTableId& tableId, + TArrayRef<const TRawTypeValue> key, + TArrayRef<const NTable::TTag> tags, + NTable::TRowState& row) +{ + auto tid = Self.GetLocalTableId(tableId); + Y_VERIFY(tid != 0, "Unexpected SelectRow for an unknown table"); + + return Db.Select(tid, key, tags, row, /* readFlags */ 0, ReadVersion); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_user_db.h b/ydb/core/tx/datashard/datashard_user_db.h new file mode 100644 index 0000000000..1bace953ee --- /dev/null +++ b/ydb/core/tx/datashard/datashard_user_db.h @@ -0,0 +1,38 @@ +#pragma once +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class IDataShardUserDb { +protected: + ~IDataShardUserDb() = default; + +public: + virtual NTable::EReady SelectRow( + const TTableId& tableId, + TArrayRef<const TRawTypeValue> key, + TArrayRef<const NTable::TTag> tags, + NTable::TRowState& row) = 0; +}; + +class TDataShardUserDb final : public IDataShardUserDb { +public: + TDataShardUserDb(TDataShard& self, NTable::TDatabase& db, const TRowVersion& readVersion = TRowVersion::Min()) + : Self(self) + , Db(db) + , ReadVersion(readVersion) + { } + + NTable::EReady SelectRow( + const TTableId& tableId, + TArrayRef<const TRawTypeValue> key, + TArrayRef<const NTable::TTag> tags, + NTable::TRowState& row) override; + +private: + TDataShard& Self; + NTable::TDatabase& Db; + TRowVersion ReadVersion; +}; + +} // namespace NKikimr::NDataShard diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h index d398647ff9..b74aa31369 100644 --- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h +++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h @@ -161,6 +161,20 @@ namespace NKqpHelpers { return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); } + inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query) { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleStaleRoRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return "<empty>"; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + } + inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) { auto reqSender = runtime.AllocateEdgeActor(); sessionId = CreateSession(runtime, reqSender); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 88a7364530..5e5a1ea915 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1620,6 +1620,14 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { } break; } + case TEvChangeExchange::TEvApplyRecords::EventType: { + if (BlockApplyRecords) { + Cerr << "... blocked ApplyRecords" << Endl; + BlockedApplyRecords.push_back(THolder(ev.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + break; + } } return PrevObserver(Runtime, ev); } @@ -1643,6 +1651,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { bool InjectClearTasks = false; bool BlockReadSets = false; TVector<THolder<IEventHandle>> BlockedReadSets; + bool BlockApplyRecords = false; + TVector<THolder<IEventHandle>> BlockedApplyRecords; }; Y_UNIT_TEST(MvccSnapshotLockedWrites) { @@ -3453,6 +3463,152 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); } + Y_UNIT_TEST_TWIN(LockedWriteWithAsyncIndex, WithRestart) { + constexpr bool UseNewEngine = true; + + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + WaitTxNotification(server, sender, + AsyncAlterAddIndex(server, "/Root", "/Root/table-1", + TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync})); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + TInjectLockSnapshotObserver observer(runtime); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleBegin(runtime, sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We will reuse this snapshot + auto snapshot = observer.Last.MvccSnapshot; + + using NLongTxService::TLockHandle; + TLockHandle lock1handle(123, runtime.GetActorSystem(0)); + TLockHandle lock2handle(234, runtime.GetActorSystem(0)); + + // Write uncommitted changes to keys 1 and 2 using tx 123 + observer.Inject.LockId = 123; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 21) + )")), + "<empty>"); + auto locks1 = observer.LastLocks; + observer.Inject = {}; + + // Write uncommitted changes to keys 1 and 2 using tx 234 + observer.Inject.LockId = 234; + observer.Inject.LockNodeId = runtime.GetNodeId(0); + observer.Inject.MvccSnapshot = snapshot; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 12), (2, 22) + )")), + "<empty>"); + auto locks2 = observer.LastLocks; + observer.Inject = {}; + + SimulateSleep(server, TDuration::Seconds(1)); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` VIEW by_value + WHERE value in (1, 11, 21, 12, 22) + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + if (WithRestart) { + observer.BlockApplyRecords = true; + } + + // Commit changes in tx 123 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks1; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "<empty>"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + + SimulateSleep(server, TDuration::Seconds(1)); + + if (WithRestart) { + UNIT_ASSERT(!observer.BlockedApplyRecords.empty()); + observer.BlockedApplyRecords.clear(); + observer.BlockApplyRecords = false; + + auto shards = GetTableShards(server, sender, "/Root/table-1"); + RebootTablet(runtime, shards.at(0), sender); + + SimulateSleep(server, TDuration::Seconds(1)); + } + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleStaleRoExec(runtime, Q_(R"( + SELECT key, value + FROM `/Root/table-1` VIEW by_value + WHERE value in (1, 11, 21, 12, 22) + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 11 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } " + "} Struct { Bool: false }"); + + // Commit changes in tx 234 + observer.InjectClearTasks = true; + observer.InjectLocks.emplace().Locks = locks2; + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleExec(runtime, Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0) + )")), + "ERROR: ABORTED"); + observer.InjectClearTasks = false; + observer.InjectLocks.reset(); + } + } } // namespace NKikimr diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp index d02c82aae7..033d2516f8 100644 --- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp @@ -3,6 +3,7 @@ #include "datashard_distributed_erase.h" #include "datashard_impl.h" #include "datashard_pipeline.h" +#include "datashard_user_db.h" #include "execution_unit_ctors.h" #include "setup_sys_locks.h" #include "datashard_locks_db.h" @@ -44,7 +45,8 @@ public: auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(op.Get()); if (eraseTx->HasDependents()) { - THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, txc.DB, request.GetTableId(), false)}; + TDataShardUserDb userDb(DataShard, txc.DB, readVersion); + THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, txc.DB, request.GetTableId(), false)}; if (changeCollector) { changeCollector->SetWriteVersion(writeVersion); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 2a4c014a41..91c3956452 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -274,7 +274,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio AddLocksToResult(op, ctx); - op->ChangeRecords() = std::move(dataTx->GetCollectedChanges()); + auto changes = std::move(dataTx->GetCollectedChanges()); + if (guardLocks.LockTxId) { + if (changes) { + DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes)); + } + } else { + // FIXME: handle lock changes commit + op->ChangeRecords() = std::move(changes); + } KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters()); auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode(); diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp index e3d9506335..8750fcc4a7 100644 --- a/ydb/core/tx/datashard/move_index_unit.cpp +++ b/ydb/core/tx/datashard/move_index_unit.cpp @@ -22,9 +22,22 @@ public: const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId()); for (auto& record: changeRecords) { - if (record.PathId() == remapPrevId) { - record.SetPathId(remapNewId); - DataShard.MoveChangeRecord(db, record.Order(), record.PathId()); + if (record.PathId == remapPrevId) { + record.PathId = remapNewId; + if (record.LockId) { + DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId); + } else { + DataShard.MoveChangeRecord(db, record.Order, record.PathId); + } + } + } + + for (auto& pr : DataShard.GetLockChangeRecords()) { + for (auto& record : pr.second) { + if (record.PathId == remapPrevId) { + record.PathId = remapNewId; + DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId); + } } } } @@ -51,6 +64,21 @@ public: return EExecutionStatus::Restart; } + auto lockChangeRecords = DataShard.TakeLockChangeRecords(); + auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + + if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveIndexUnit Execute" << ": schemeTx# " << schemeTx.DebugString() << ": changeRecords size# " << ChangeRecords.size() diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp index e0f21eb6af..dae1331ff7 100644 --- a/ydb/core/tx/datashard/move_table_unit.cpp +++ b/ydb/core/tx/datashard/move_table_unit.cpp @@ -21,9 +21,23 @@ public: const THashMap<TPathId, TPathId> remap = DataShard.GetRemapIndexes(move); for (auto& record: changeRecords) { - if (remap.contains(record.PathId())) { // here could be the records for already deleted indexes, so skip them - record.SetPathId(remap.at(record.PathId())); - DataShard.MoveChangeRecord(db, record.Order(), record.PathId()); + // We skip records for deleted indexes + if (remap.contains(record.PathId)) { + record.PathId = remap.at(record.PathId); + if (record.LockId) { + DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId); + } else { + DataShard.MoveChangeRecord(db, record.Order, record.PathId); + } + } + } + + for (auto& pr : DataShard.GetLockChangeRecords()) { + for (auto& record : pr.second) { + if (remap.contains(record.PathId)) { + record.PathId = remap.at(record.PathId); + DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId); + } } } } @@ -50,6 +64,21 @@ public: return EExecutionStatus::Restart; } + auto lockChangeRecords = DataShard.TakeLockChangeRecords(); + auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords(); + + if (!DataShard.LoadLockChangeRecords(db)) { + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + + if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) { + DataShard.SetLockChangeRecords(std::move(lockChangeRecords)); + DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords)); + return EExecutionStatus::Restart; + } + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveTableUnit Execute" << ": schemeTx# " << schemeTx.DebugString() << ": changeRecords size# " << ChangeRecords.size() diff --git a/ydb/core/tx/datashard/remove_lock_change_records.cpp b/ydb/core/tx/datashard/remove_lock_change_records.cpp new file mode 100644 index 0000000000..e5b09bcd3a --- /dev/null +++ b/ydb/core/tx/datashard/remove_lock_change_records.cpp @@ -0,0 +1,71 @@ +#include "datashard_impl.h" + +namespace NKikimr::NDataShard { + +class TDataShard::TTxRemoveLockChangeRecords + : public NTabletFlatExecutor::TTransactionBase<TDataShard> +{ +public: + TTxRemoveLockChangeRecords(TDataShard* self) + : TBase(self) + { } + + TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK_CHANGE_RECORDS; } + + static constexpr size_t MaxRecordsToRemove = 1000; + + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + NIceDb::TNiceDb db(txc.DB); + + size_t removed = 0; + + while (!Self->PendingLockChangeRecordsToRemove.empty() && removed < MaxRecordsToRemove) { + ui64 lockId = Self->PendingLockChangeRecordsToRemove.back(); + + auto it = Self->LockChangeRecords.find(lockId); + if (it == Self->LockChangeRecords.end()) { + // Nothing to remove, just skip it + Self->PendingLockChangeRecordsToRemove.pop_back(); + continue; + } + + if (Self->CommittedLockChangeRecords.contains(lockId)) { + // Don't remove records that are committed + Self->PendingLockChangeRecordsToRemove.pop_back(); + continue; + } + + while (!it->second.empty() && removed < MaxRecordsToRemove) { + auto& record = it->second.back(); + db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete(); + db.Table<Schema::LockChangeRecordDetails>().Key(record.LockId, record.LockOffset).Delete(); + it->second.pop_back(); + ++removed; + } + + if (!it->second.empty()) { + // We couldn't remove everything, continue in the next transaction + break; + } + + Self->LockChangeRecords.erase(it); + Self->PendingLockChangeRecordsToRemove.pop_back(); + } + + if (!Self->PendingLockChangeRecordsToRemove.empty()) { + ctx.Send(ctx.SelfID, new TEvPrivate::TEvRemoveLockChangeRecords()); + } + + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } +}; + +void TDataShard::Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr&, const TActorContext& ctx) { + Execute(new TTxRemoveLockChangeRecords(this), ctx); +} + +} // namespace NKikimr::NDataShard diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 9db6a1eaef..ceb0578368 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -1723,5 +1723,196 @@ "Blobs": 1 } } + }, + { + "TableId": 101, + "TableName": "LockChangeRecords", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LockOffset", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "PathOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "LocalPathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "BodySize", + "ColumnType": "Uint64" + }, + { + "ColumnId": 6, + "ColumnName": "SchemaVersion", + "ColumnType": "Uint64" + }, + { + "ColumnId": 7, + "ColumnName": "TableOwnerId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 8, + "ColumnName": "TablePathId", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { + "TableId": 102, + "TableName": "LockChangeRecordDetails", + "TableKey": [ + 1, + 2 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LockOffset", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "Kind", + "ColumnType": "Byte" + }, + { + "ColumnId": 4, + "ColumnName": "Body", + "ColumnType": "String" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } + }, + { + "TableId": 103, + "TableName": "ChangeRecordCommits", + "TableKey": [ + 1 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "Order", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "LockId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "Group", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "PlanStep", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "TxId", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |