aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2022-10-31 13:47:56 +0300
committersnaury <snaury@ydb.tech>2022-10-31 13:47:56 +0300
commit1a8010a735a218f84480e8ef1e6ffd5a45033c94 (patch)
treebf27ab00c48d9141d35c3748545f602654dd3cba
parent18dbfee5bb16564165534f8008728df7cc8d595f (diff)
downloadydb-1a8010a735a218f84480e8ef1e6ffd5a45033c94.tar.gz
Support uncommitted changes in cdc
-rw-r--r--ydb/core/engine/minikql/change_collector_iface.h32
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.cpp46
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tablet_flat/flat_executor.cpp6
-rw-r--r--ydb/core/tablet_flat/tablet_flat_executor.h10
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt3
-rw-r--r--ydb/core/tx/datashard/change_collector.cpp76
-rw-r--r--ydb/core/tx/datashard/change_collector.h13
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.cpp27
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.h8
-rw-r--r--ydb/core/tx/datashard/change_collector_base.cpp60
-rw-r--r--ydb/core/tx/datashard/change_collector_base.h14
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp27
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.h6
-rw-r--r--ydb/core/tx/datashard/change_collector_helpers.cpp148
-rw-r--r--ydb/core/tx/datashard/change_collector_helpers.h38
-rw-r--r--ydb/core/tx/datashard/change_record.cpp12
-rw-r--r--ydb/core/tx/datashard/change_record.h15
-rw-r--r--ydb/core/tx/datashard/datashard.cpp332
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp59
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard__init.cpp16
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h4
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp173
-rw-r--r--ydb/core/tx/datashard/datashard_common_upload.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_direct_erase.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h127
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_locks_db.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_user_db.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard_user_db.h38
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common_kqp.h14
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp156
-rw-r--r--ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp10
-rw-r--r--ydb/core/tx/datashard/move_index_unit.cpp34
-rw-r--r--ydb/core/tx/datashard/move_table_unit.cpp35
-rw-r--r--ydb/core/tx/datashard/remove_lock_change_records.cpp71
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema191
39 files changed, 1441 insertions, 406 deletions
diff --git a/ydb/core/engine/minikql/change_collector_iface.h b/ydb/core/engine/minikql/change_collector_iface.h
index c96ffaa3ce..10d167bc97 100644
--- a/ydb/core/engine/minikql/change_collector_iface.h
+++ b/ydb/core/engine/minikql/change_collector_iface.h
@@ -9,16 +9,17 @@ namespace NMiniKQL {
class IChangeCollector {
public:
// basic change record's info
- struct TChange: public std::tuple<ui64, TPathId, ui64, TPathId, ui64> {
- using std::tuple<ui64, TPathId, ui64, TPathId, ui64>::tuple;
-
- ui64 Order() const { return std::get<0>(*this); }
- const TPathId& PathId() const { return std::get<1>(*this); }
- ui64 BodySize() const { return std::get<2>(*this); }
- const TPathId& TableId() const { return std::get<3>(*this); }
- ui64 SchemaVersion() const { return std::get<4>(*this); }
-
- void SetPathId(const TPathId& value) { std::get<1>(*this) = value; }
+ struct TChange {
+ ui64 Order;
+ ui64 Group;
+ ui64 Step;
+ ui64 TxId;
+ TPathId PathId;
+ ui64 BodySize;
+ TPathId TableId;
+ ui64 SchemaVersion;
+ ui64 LockId = 0;
+ ui64 LockOffset = 0;
};
public:
@@ -27,6 +28,7 @@ public:
virtual bool NeedToReadKeys() const = 0;
virtual void SetReadVersion(const TRowVersion& readVersion) = 0;
virtual void SetWriteVersion(const TRowVersion& writeVersion) = 0;
+ virtual void SetWriteTxId(ui64 txId) = 0;
virtual bool Collect(const TTableId& tableId, NTable::ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates) = 0;
@@ -41,10 +43,10 @@ public:
Y_DECLARE_OUT_SPEC(inline, NKikimr::NMiniKQL::IChangeCollector::TChange, o, x) {
o << "{"
- << " Order: " << x.Order()
- << " PathId: " << x.PathId()
- << " BodySize: " << x.BodySize()
- << " TableId: " << x.TableId()
- << " SchemaVersion: " << x.SchemaVersion()
+ << " Order: " << x.Order
+ << " PathId: " << x.PathId
+ << " BodySize: " << x.BodySize
+ << " TableId: " << x.TableId
+ << " SchemaVersion: " << x.SchemaVersion
<< " }";
}
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp
index 84cf7fa3b2..4e5a4688c3 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.cpp
+++ b/ydb/core/engine/minikql/minikql_engine_host.cpp
@@ -897,22 +897,25 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell
const ui64 writeTxId = GetWriteTxId(tableId);
- if (writeTxId == 0) {
- if (auto collector = GetChangeCollector(tableId)) {
+ if (auto collector = GetChangeCollector(tableId)) {
+ if (writeTxId == 0) {
collector->SetWriteVersion(GetWriteVersion(tableId));
- if (collector->NeedToReadKeys()) {
- collector->SetReadVersion(GetReadVersion(tableId));
- }
+ } else {
+ collector->SetWriteTxId(writeTxId);
+ }
+ if (collector->NeedToReadKeys()) {
+ collector->SetReadVersion(GetReadVersion(tableId));
+ }
- if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) {
- collector->Reset();
- throw TNotReadyTabletException();
- }
+ if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) {
+ collector->Reset();
+ throw TNotReadyTabletException();
}
+ }
+ if (writeTxId == 0) {
Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId));
} else {
- // TODO: integrate with change collector somehow
Db.UpdateTx(localTid, NTable::ERowOp::Upsert, key, ops, writeTxId);
}
@@ -932,22 +935,25 @@ void TEngineHost::EraseRow(const TTableId& tableId, const TArrayRef<const TCell>
const ui64 writeTxId = GetWriteTxId(tableId);
- if (writeTxId == 0) {
- if (auto collector = GetChangeCollector(tableId)) {
+ if (auto collector = GetChangeCollector(tableId)) {
+ if (writeTxId == 0) {
collector->SetWriteVersion(GetWriteVersion(tableId));
- if (collector->NeedToReadKeys()) {
- collector->SetReadVersion(GetReadVersion(tableId));
- }
+ } else {
+ collector->SetWriteTxId(writeTxId);
+ }
+ if (collector->NeedToReadKeys()) {
+ collector->SetReadVersion(GetReadVersion(tableId));
+ }
- if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) {
- collector->Reset();
- throw TNotReadyTabletException();
- }
+ if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) {
+ collector->Reset();
+ throw TNotReadyTabletException();
}
+ }
+ if (writeTxId == 0) {
Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId));
} else {
- // TODO: integrate with change collector somehow
Db.UpdateTx(localTid, NTable::ERowOp::Erase, key, { }, writeTxId);
}
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 790319ef41..de095f9a4b 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -415,4 +415,5 @@ enum ETxTypes {
TXTYPE_READ = 69 [(TxTypeOpts) = {Name: "TxRead"}];
TXTYPE_SPLIT_REPLICATION_SOURCE_OFFSETS = 70 [(TxTypeOpts) = {Name: "TxSplitReplicationSourceOffsets"}];
TXTYPE_REMOVE_LOCK = 71 [(TxTypeOpts) = {Name: "TxRemoveLock"}];
+ TXTYPE_REMOVE_LOCK_CHANGE_RECORDS = 72 [(TxTypeOpts) = {Name: "TxRemoveLockChangeRecords"}];
}
diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp
index 8e88e6ccd1..d73ebbb707 100644
--- a/ydb/core/tablet_flat/flat_executor.cpp
+++ b/ydb/core/tablet_flat/flat_executor.cpp
@@ -1641,6 +1641,9 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
} else if (done) {
Y_VERIFY(!txc.IsRescheduled());
Y_VERIFY(!seat->RequestedMemory);
+ for (auto it = txc.OnCommit_.begin(); it != txc.OnCommit_.end(); ++it) {
+ (*it)();
+ }
seat->OnCommitted = std::move(txc.OnCommitted_);
CommitTransactionLog(seat, env, prod.Change, cpuTimer, ctx);
} else {
@@ -1649,6 +1652,9 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
Y_Fail(NFmt::Do(*this) << " " << NFmt::Do(*seat) << " type "
<< NFmt::Do(*seat->Self) << " postoned w/o demands");
}
+ for (auto it = txc.OnRollback_.rbegin(); it != txc.OnRollback_.rend(); ++it) {
+ (*it)();
+ }
PostponeTransaction(seat, env, prod.Change, cpuTimer, ctx);
}
diff --git a/ydb/core/tablet_flat/tablet_flat_executor.h b/ydb/core/tablet_flat/tablet_flat_executor.h
index e3a145dcbc..35d584e38f 100644
--- a/ydb/core/tablet_flat/tablet_flat_executor.h
+++ b/ydb/core/tablet_flat/tablet_flat_executor.h
@@ -211,6 +211,14 @@ public:
~TTransactionContext() {}
+ void OnCommit(std::function<void()> callback) {
+ OnCommit_.emplace_back(std::move(callback));
+ }
+
+ void OnRollback(std::function<void()> callback) {
+ OnRollback_.emplace_back(std::move(callback));
+ }
+
void OnCommitted(std::function<void()> callback) {
OnCommitted_.emplace_back(std::move(callback));
}
@@ -236,6 +244,8 @@ public:
NTable::TDatabase &DB;
private:
+ TVector<std::function<void()>> OnCommit_;
+ TVector<std::function<void()>> OnRollback_;
TVector<std::function<void()>> OnCommitted_;
bool Rescheduled_ = false;
};
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index f7f36a6afb..1ec1742b8a 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -92,7 +92,6 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_async_index.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_collector.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/change_exchange_split.cpp
@@ -170,6 +169,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_schema_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_snapshots.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_unsafe_upload.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_user_db.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_user_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_kqp_compute.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/datashard_kqp_effects.cpp
@@ -222,6 +222,7 @@ target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/read_table_scan_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_cleanup_unit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/receive_snapshot_unit.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_lock_change_records.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/remove_locks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_avl_tree.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/range_ops.cpp
diff --git a/ydb/core/tx/datashard/change_collector.cpp b/ydb/core/tx/datashard/change_collector.cpp
index 216d0876a9..939f1476be 100644
--- a/ydb/core/tx/datashard/change_collector.cpp
+++ b/ydb/core/tx/datashard/change_collector.cpp
@@ -2,6 +2,7 @@
#include "change_collector_async_index.h"
#include "change_collector_cdc_stream.h"
#include "datashard_impl.h"
+#include "datashard_user_db.h"
#include <util/generic/vector.h>
@@ -10,9 +11,17 @@ namespace NDataShard {
using namespace NMiniKQL;
-class TChangeCollectorProxy: public IChangeCollector {
+class TChangeCollectorProxy: public IDataShardChangeCollector {
public:
- void AddUnderlying(THolder<IChangeCollector> collector) {
+ TChangeCollectorProxy(TDataShard& dataShard, bool isImmediateTx)
+ : DataShard(dataShard)
+ {
+ if (!isImmediateTx) {
+ Group = 0;
+ }
+ }
+
+ void AddUnderlying(THolder<IBaseChangeCollector> collector) {
Underlying.emplace_back(std::move(collector));
}
@@ -33,11 +42,18 @@ public:
}
void SetWriteVersion(const TRowVersion& writeVersion) override {
+ WriteVersion = writeVersion;
for (auto& collector : Underlying) {
collector->SetWriteVersion(writeVersion);
}
}
+ void SetWriteTxId(ui64 txId) override {
+ for (auto& collector : Underlying) {
+ collector->SetWriteTxId(txId);
+ }
+ }
+
bool Collect(const TTableId& tableId, NTable::ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates) override
{
@@ -53,6 +69,10 @@ public:
const TVector<TChange>& GetCollected() const override {
CollectedBuf.clear();
+ if (!LockChanges.empty()) {
+ std::copy(LockChanges.begin(), LockChanges.end(), std::back_inserter(CollectedBuf));
+ }
+
for (const auto& collector : Underlying) {
const auto& collected = collector->GetCollected();
std::copy(collected.begin(), collected.end(), std::back_inserter(CollectedBuf));
@@ -64,6 +84,10 @@ public:
TVector<TChange>&& GetCollected() override {
CollectedBuf.clear();
+ if (!LockChanges.empty()) {
+ std::move(LockChanges.begin(), LockChanges.end(), std::back_inserter(CollectedBuf));
+ }
+
for (auto& collector : Underlying) {
auto collected = std::move(collector->GetCollected());
std::move(collected.begin(), collected.end(), std::back_inserter(CollectedBuf));
@@ -80,13 +104,47 @@ public:
CollectedBuf.clear();
}
+ void CommitLockChanges(ui64 lockId, const TVector<TChange>& changes, TTransactionContext& txc) override {
+ if (changes.empty()) {
+ return;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ ui64 count = changes.back().LockOffset + 1;
+ ui64 order = DataShard.AllocateChangeRecordOrder(db, count);
+
+ if (!Group) {
+ Group = DataShard.AllocateChangeRecordGroup(db);
+ for (auto& collector : Underlying) {
+ collector->SetGroup(*Group);
+ }
+ }
+
+ LockChanges.reserve(LockChanges.size() + changes.size());
+ for (const auto& change : changes) {
+ TChange fixed = change;
+ fixed.Order = order + change.LockOffset;
+ fixed.Group = *Group;
+ fixed.Step = WriteVersion.Step;
+ fixed.TxId = WriteVersion.TxId;
+ LockChanges.push_back(fixed);
+ }
+
+ DataShard.PersistCommitLockChangeRecords(txc, order, lockId, *Group, WriteVersion);
+ }
+
private:
- TVector<THolder<IChangeCollector>> Underlying;
+ TDataShard& DataShard;
+ TMaybe<ui64> Group;
+ TRowVersion WriteVersion = TRowVersion::Min();
+ TVector<THolder<IBaseChangeCollector>> Underlying;
+ TVector<TChange> LockChanges;
mutable TVector<TChange> CollectedBuf;
}; // TChangeCollectorProxy
-IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) {
+IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx) {
const bool hasAsyncIndexes = table.HasAsyncIndexes();
const bool hasCdcStreams = table.HasCdcStreams();
@@ -94,23 +152,23 @@ IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase
return nullptr;
}
- auto proxy = MakeHolder<TChangeCollectorProxy>();
+ auto proxy = MakeHolder<TChangeCollectorProxy>(dataShard, isImmediateTx);
if (hasAsyncIndexes) {
- proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, db, isImmediateTx));
+ proxy->AddUnderlying(MakeHolder<TAsyncIndexChangeCollector>(&dataShard, userDb, db, isImmediateTx));
}
if (hasCdcStreams) {
- proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, db, isImmediateTx));
+ proxy->AddUnderlying(MakeHolder<TCdcStreamChangeCollector>(&dataShard, userDb, db, isImmediateTx));
}
return proxy.Release();
}
-IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) {
+IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx) {
Y_VERIFY(dataShard.GetUserTables().contains(tableId));
const TUserTable& tableInfo = *dataShard.GetUserTables().at(tableId);
- return CreateChangeCollector(dataShard, db, tableInfo, isImmediateTx);
+ return CreateChangeCollector(dataShard, userDb, db, tableInfo, isImmediateTx);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h
index 7b24a8d46b..cad92e8109 100644
--- a/ydb/core/tx/datashard/change_collector.h
+++ b/ydb/core/tx/datashard/change_collector.h
@@ -1,7 +1,7 @@
#pragma once
#include <ydb/core/engine/minikql/change_collector_iface.h>
-#include <ydb/core/tablet_flat/flat_database.h>
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
namespace NKikimr {
namespace NDataShard {
@@ -9,8 +9,15 @@ namespace NDataShard {
class TDataShard;
struct TUserTable;
-NMiniKQL::IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx);
-NMiniKQL::IChangeCollector* CreateChangeCollector(TDataShard& dataShard, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx);
+class IDataShardUserDb;
+
+class IDataShardChangeCollector : public NMiniKQL::IChangeCollector {
+public:
+ virtual void CommitLockChanges(ui64 lockId, const TVector<TChange>& changes, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+};
+
+IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, const TUserTable& table, bool isImmediateTx);
+IDataShardChangeCollector* CreateChangeCollector(TDataShard& dataShard, IDataShardUserDb& userDb, NTable::TDatabase& db, ui64 tableId, bool isImmediateTx);
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp
index d2f5400366..a5828f9852 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.cpp
+++ b/ydb/core/tx/datashard/change_collector_async_index.cpp
@@ -1,5 +1,6 @@
#include "change_collector_async_index.h"
#include "datashard_impl.h"
+#include "datashard_user_db.h"
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -13,12 +14,6 @@ class TCachedTagsBuilder {
using TCachedTags = TAsyncIndexChangeCollector::TCachedTags;
public:
- void MakeKeyTagToPos(const TVector<TTag>& keyTags) {
- for (TPos pos = 0; pos < keyTags.size(); ++pos) {
- KeyTagToPos.emplace(keyTags.at(pos), pos);
- }
- }
-
void AddIndexTags(const TVector<TTag>& tags) {
IndexTags.insert(tags.begin(), tags.end());
}
@@ -40,13 +35,11 @@ public:
Y_VERIFY(!tags.empty());
Y_VERIFY(!IndexTags.empty());
- Y_VERIFY(!KeyTagToPos.empty());
- return TCachedTags(std::move(KeyTagToPos), std::move(tags), std::make_pair(0, IndexTags.size() - 1));
+ return TCachedTags(std::move(tags), std::make_pair(0, IndexTags.size() - 1));
}
private:
- THashMap<TTag, TPos> KeyTagToPos;
THashSet<TTag> IndexTags;
THashSet<TTag> DataTags;
};
@@ -76,7 +69,6 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates)
{
Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId);
- const auto localTableId = Self->GetLocalTableId(tableId);
auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
Y_VERIFY_S(key.size() == userTable->KeyColumnIds.size(), "Count doesn't match"
@@ -92,11 +84,10 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop));
}
- const auto& keyTagToPos = GetKeyTagToPos(tableId);
const auto tagsToSelect = GetTagsToSelect(tableId, rop);
TRowState row;
- const auto ready = RowsCache.SelectRow(Db, localTableId, key, keyTagToPos, tagsToSelect, row, ReadVersion);
+ const auto ready = UserDb.SelectRow(tableId, key, tagsToSelect, row);
if (ready == EReady::Page) {
return false;
@@ -183,13 +174,11 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
}
- RowsCache.UpdateCachedRow(localTableId, rop, key, updates);
return true;
}
void TAsyncIndexChangeCollector::Reset() {
TBaseChangeCollector::Reset();
- RowsCache.Reset();
}
auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
@@ -197,7 +186,6 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
TCachedTagsBuilder builder;
- builder.MakeKeyTagToPos(userTable->KeyColumnIds);
for (const auto& [_, index] : userTable->Indexes) {
if (index.Type != TUserTable::TTableIndex::EIndexType::EIndexTypeGlobalAsync) {
@@ -211,15 +199,6 @@ auto TAsyncIndexChangeCollector::CacheTags(const TTableId& tableId) const {
return CachedTags.emplace(tableId, builder.Build()).first;
}
-const THashMap<TTag, TPos>& TAsyncIndexChangeCollector::GetKeyTagToPos(const TTableId& tableId) const {
- auto it = CachedTags.find(tableId);
- if (it == CachedTags.end()) {
- it = CacheTags(tableId);
- }
-
- return it->second.KeyTagToPos;
-}
-
TArrayRef<TTag> TAsyncIndexChangeCollector::GetTagsToSelect(const TTableId& tableId, ERowOp rop) const {
auto it = CachedTags.find(tableId);
if (it == CachedTags.end()) {
diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h
index 273e231601..614cb0334c 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.h
+++ b/ydb/core/tx/datashard/change_collector_async_index.h
@@ -1,7 +1,6 @@
#pragma once
#include "change_collector_base.h"
-#include "change_collector_helpers.h"
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
@@ -17,22 +16,18 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector {
struct TCachedTags {
explicit TCachedTags(
- THashMap<NTable::TTag, NTable::TPos>&& keyTagToPos,
TVector<NTable::TTag>&& columns,
const std::pair<ui32, ui32>& indexRange)
- : KeyTagToPos(std::move(keyTagToPos))
- , Columns(std::move(columns))
+ : Columns(std::move(columns))
, IndexColumns(&Columns.at(indexRange.first), indexRange.second + 1)
{
}
- THashMap<NTable::TTag, NTable::TPos> KeyTagToPos;
TVector<NTable::TTag> Columns; // Index + Data
TArrayRef<NTable::TTag> IndexColumns;
};
auto CacheTags(const TTableId& tableId) const;
- const THashMap<NTable::TTag, NTable::TPos>& GetKeyTagToPos(const TTableId& tableId) const;
TArrayRef<NTable::TTag> GetTagsToSelect(const TTableId& tableId, NTable::ERowOp rop) const;
void FillKeyFromRowState(NTable::TTag tag, NTable::TPos pos, const NTable::TRowState& rowState, NScheme::TTypeInfo type);
@@ -63,7 +58,6 @@ private:
TRowVersion ReadVersion;
mutable THashMap<TTableId, TCachedTags> CachedTags;
- TRowsCache RowsCache;
// reused between Collect() calls, cleared after every Clear() call
THashSet<NTable::TTag> TagsSeen;
diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp
index 29fca25676..2bbe8daf6e 100644
--- a/ydb/core/tx/datashard/change_collector_base.cpp
+++ b/ydb/core/tx/datashard/change_collector_base.cpp
@@ -1,5 +1,6 @@
#include "change_collector_base.h"
#include "datashard_impl.h"
+#include "datashard_user_db.h"
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/util/yverify_stream.h>
@@ -10,8 +11,9 @@ namespace NDataShard {
using namespace NMiniKQL;
using namespace NTable;
-TBaseChangeCollector::TBaseChangeCollector(TDataShard* self, TDatabase& db, bool isImmediateTx)
+TBaseChangeCollector::TBaseChangeCollector(TDataShard* self, IDataShardUserDb& userDb, TDatabase& db, bool isImmediateTx)
: Self(self)
+ , UserDb(userDb)
, Db(db)
{
if (!isImmediateTx) {
@@ -31,6 +33,16 @@ void TBaseChangeCollector::SetWriteVersion(const TRowVersion& writeVersion) {
WriteVersion = writeVersion;
}
+void TBaseChangeCollector::SetWriteTxId(ui64 txId) {
+ WriteTxId = txId;
+}
+
+void TBaseChangeCollector::SetGroup(ui64 group) {
+ if (!Group) {
+ Group = group;
+ }
+}
+
const TVector<IChangeCollector::TChange>& TBaseChangeCollector::GetCollected() const {
return Collected;
}
@@ -129,19 +141,28 @@ void TBaseChangeCollector::Persist(
{
NIceDb::TNiceDb db(Db);
- if (!Group) {
- Group = Self->AllocateChangeRecordGroup(db);
- }
-
Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId);
auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
Y_VERIFY(userTable->GetTableSchemaVersion());
- auto record = TChangeRecordBuilder(kind)
- .WithOrder(Self->AllocateChangeRecordOrder(db))
- .WithGroup(*Group)
- .WithStep(WriteVersion.Step)
- .WithTxId(WriteVersion.TxId)
+ TChangeRecordBuilder builder(kind);
+ if (!WriteTxId) {
+ if (!Group) {
+ Group = Self->AllocateChangeRecordGroup(db);
+ }
+ builder
+ .WithOrder(Self->AllocateChangeRecordOrder(db))
+ .WithGroup(*Group)
+ .WithStep(WriteVersion.Step)
+ .WithTxId(WriteVersion.TxId);
+ } else {
+ ui64 lockOffset = Self->GetNextChangeRecordLockOffset(WriteTxId) + Collected.size();
+ builder
+ .WithLockId(WriteTxId)
+ .WithLockOffset(lockOffset);
+ }
+
+ auto record = builder
.WithPathId(pathId)
.WithTableId(tableId.PathId)
.WithSchemaVersion(userTable->GetTableSchemaVersion())
@@ -149,13 +170,18 @@ void TBaseChangeCollector::Persist(
.Build();
Self->PersistChangeRecord(db, record);
- Collected.emplace_back(
- record.GetOrder(),
- record.GetPathId(),
- record.GetBody().size(),
- record.GetTableId(),
- record.GetSchemaVersion()
- );
+ Collected.push_back(TChange{
+ .Order = record.GetOrder(),
+ .Group = record.GetGroup(),
+ .Step = record.GetStep(),
+ .TxId = record.GetTxId(),
+ .PathId = record.GetPathId(),
+ .BodySize = record.GetBody().size(),
+ .TableId = record.GetTableId(),
+ .SchemaVersion = record.GetSchemaVersion(),
+ .LockId = record.GetLockId(),
+ .LockOffset = record.GetLockOffset(),
+ });
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h
index 1e582ac74e..82ba562d5d 100644
--- a/ydb/core/tx/datashard/change_collector_base.h
+++ b/ydb/core/tx/datashard/change_collector_base.h
@@ -12,8 +12,14 @@ namespace NKikimr {
namespace NDataShard {
class TDataShard;
+class IDataShardUserDb;
-class TBaseChangeCollector: public NMiniKQL::IChangeCollector {
+class IBaseChangeCollector : public NMiniKQL::IChangeCollector {
+public:
+ virtual void SetGroup(ui64 group) = 0;
+};
+
+class TBaseChangeCollector: public IBaseChangeCollector {
using TDataChange = NKikimrChangeExchange::TChangeRecord::TDataChange;
using TSerializedCells = TDataChange::TSerializedCells;
@@ -31,11 +37,13 @@ protected:
void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body);
public:
- explicit TBaseChangeCollector(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx);
+ explicit TBaseChangeCollector(TDataShard* self, IDataShardUserDb& userDb, NTable::TDatabase& db, bool isImmediateTx);
bool NeedToReadKeys() const override;
void SetReadVersion(const TRowVersion& readVersion) override;
void SetWriteVersion(const TRowVersion& writeVersion) override;
+ void SetWriteTxId(ui64 txId) override;
+ void SetGroup(ui64 group) override;
const TVector<TChange>& GetCollected() const override;
TVector<TChange>&& GetCollected() override;
@@ -45,9 +53,11 @@ public:
protected:
TDataShard* Self;
+ IDataShardUserDb& UserDb;
NTable::TDatabase& Db;
TRowVersion WriteVersion;
+ ui64 WriteTxId = 0;
TMaybe<ui64> Group;
TVector<TChange> Collected;
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index 0f4c6de400..c47e343139 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -1,5 +1,6 @@
#include "change_collector_cdc_stream.h"
#include "datashard_impl.h"
+#include "datashard_user_db.h"
#include <ydb/core/tablet_flat/flat_cxx_database.h>
@@ -48,10 +49,11 @@ namespace {
case ERowOp::Upsert:
case ERowOp::Reset:
return state;
+ case ERowOp::Absent:
case ERowOp::Erase:
return nullptr;
default:
- Y_FAIL_S("Unexpected row op: " << static_cast<ui8>(state->GetRowState()));
+ Y_FAIL_S("Unexpected row op: " << static_cast<int>(state->GetRowState()));
}
}
@@ -96,7 +98,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates)
{
Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId);
- const auto localTableId = Self->GetLocalTableId(tableId);
auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
const auto& keyTags = userTable->KeyColumnIds;
@@ -114,8 +115,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
Y_FAIL_S("Unsupported row op: " << static_cast<ui8>(rop));
}
- bool read = false;
-
for (const auto& [pathId, stream] : userTable->CdcStreams) {
if (stream.State == NKikimrSchemeOp::ECdcStreamStateDisabled) {
continue;
@@ -132,7 +131,7 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
case NKikimrSchemeOp::ECdcStreamModeOldImage:
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: {
const auto valueTags = MakeValueTags(userTable->Columns);
- const auto oldState = GetCurrentState(localTableId, key, keyTags, valueTags);
+ const auto oldState = GetCurrentState(tableId, key, valueTags);
if (!oldState) {
return false;
@@ -150,7 +149,6 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
}
- read = true;
break;
}
default:
@@ -158,23 +156,18 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
}
- if (read) {
- RowsCache.UpdateCachedRow(localTableId, rop, key, updates);
- }
-
return true;
}
void TCdcStreamChangeCollector::Reset() {
TBaseChangeCollector::Reset();
- RowsCache.Reset();
}
-TMaybe<TRowState> TCdcStreamChangeCollector::GetCurrentState(ui32 tid, TArrayRef<const TRawTypeValue> key,
- TArrayRef<const TTag> keyTags, TArrayRef<const TTag> valueTags)
+TMaybe<TRowState> TCdcStreamChangeCollector::GetCurrentState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const TTag> valueTags)
{
TRowState row;
- const auto ready = RowsCache.SelectRow(Db, tid, key, MakeTagToPos(keyTags), valueTags, row, ReadVersion);
+ const auto ready = UserDb.SelectRow(tableId, key, valueTags, row);
if (ready == EReady::Page) {
return Nothing();
@@ -198,9 +191,11 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO
auto it = updates.find(tag);
if (it != updates.end()) {
newState.Set(pos, it->second.Op, it->second.AsCell());
- } else if (rop == ERowOp::Upsert && oldState.GetRowState() != ERowOp::Erase) {
- newState.Set(pos, oldState.GetCellOp(pos), oldState.Get(pos));
+ } else if (rop == ERowOp::Upsert) {
+ // Copy value from the old state, this also handles schema default values
+ newState.Set(pos, ECellOp::Set, oldState.Get(pos));
} else {
+ // FIXME: reset fills columns with schema defaults, which are currently always null
newState.Set(pos, ECellOp::Null, TCell());
}
}
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.h b/ydb/core/tx/datashard/change_collector_cdc_stream.h
index 83dd37376c..a307da39d0 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.h
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.h
@@ -1,15 +1,14 @@
#pragma once
#include "change_collector_base.h"
-#include "change_collector_helpers.h"
#include "datashard_user_table.h"
namespace NKikimr {
namespace NDataShard {
class TCdcStreamChangeCollector: public TBaseChangeCollector {
- TMaybe<NTable::TRowState> GetCurrentState(ui32 tid, TArrayRef<const TRawTypeValue> key,
- TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TTag> valueTags);
+ TMaybe<NTable::TRowState> GetCurrentState(const TTableId& tableId, TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const NTable::TTag> valueTags);
static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop,
const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates);
@@ -34,7 +33,6 @@ private:
TRowVersion ReadVersion;
mutable TMaybe<bool> CachedNeedToReadKeys;
- TRowsCache RowsCache;
}; // TCdcStreamChangeCollector
diff --git a/ydb/core/tx/datashard/change_collector_helpers.cpp b/ydb/core/tx/datashard/change_collector_helpers.cpp
deleted file mode 100644
index 2a13eb57ff..0000000000
--- a/ydb/core/tx/datashard/change_collector_helpers.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-#include "change_collector_helpers.h"
-
-namespace NKikimr {
-namespace NDataShard {
-
-using namespace NTable;
-
-namespace {
-
- TString SerializeKey(TArrayRef<const TRawTypeValue> key) {
- TVector<TCell> cells(Reserve(key.size()));
- for (const auto& k : key) {
- cells.emplace_back(k.AsRef());
- }
-
- return TSerializedCellVec::Serialize(cells);
- }
-
-} // anonymous
-
-EReady TRowsCache::SelectRow(TDatabase& db, ui32 tid,
- TArrayRef<const TRawTypeValue> key, const THashMap<TTag, TPos>& keyTagToPos,
- TArrayRef<const TTag> tags, TRowState& rowState, const TRowVersion& readVersion)
-{
- const auto* row = FindCachedRow(tid, key);
-
- if (!row) {
- TRowState selected;
- const auto ready = db.Select(tid, key, tags, selected, 0, readVersion);
-
- if (ready == EReady::Page) {
- return ready;
- }
-
- row = CacheRow(tid, ready, key, tags, selected);
- }
-
- rowState.Init(tags.size());
- rowState.Touch(row->Rop);
-
- for (TPos pos = 0; pos < tags.size(); ++pos) {
- auto it = keyTagToPos.find(tags.at(pos));
- if (it == keyTagToPos.end()) {
- continue;
- }
-
- Y_VERIFY(it->second < key.size());
- rowState.Set(pos, ECellOp::Set, key.at(it->second).AsRef());
- }
-
- switch (row->Ready) {
- case EReady::Data:
- switch (row->Rop) {
- case ERowOp::Upsert:
- case ERowOp::Reset:
- for (TPos pos = 0; pos < tags.size(); ++pos) {
- const auto tag = tags.at(pos);
- if (keyTagToPos.contains(tag)) {
- continue;
- }
-
- if (const auto* cell = row->Cells.FindPtr(tag)) {
- rowState.Set(pos, ECellOp::Set, TCell(cell->data(), cell->size()));
- } else {
- rowState.Set(pos, ECellOp::Null, TCell());
- }
- }
- break;
- default:
- Y_FAIL("unreachable");
- }
- break;
- case EReady::Gone:
- break;
- default:
- Y_FAIL("unreachable");
- }
-
- Y_VERIFY(rowState.IsFinalized());
- return row->Ready;
-}
-
-void TRowsCache::UpdateCachedRow(ui32 tid, ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const TUpdateOp> updates)
-{
- auto& row = Rows[tid][SerializeKey(key)];
-
- row.Rop = rop;
- if (rop == ERowOp::Erase) {
- row.Ready = EReady::Gone;
- row.Cells.clear();
- return;
- }
-
- row.Ready = EReady::Data;
- if (rop == ERowOp::Reset) {
- row.Cells.clear();
- }
-
- for (const auto& update : updates) {
- if (update.Value.IsEmpty()) {
- continue;
- }
-
- row.Cells[update.Tag] = update.Value.ToStringBuf();
- }
-}
-
-void TRowsCache::Reset() {
- Rows.clear();
-}
-
-const TRowsCache::TRow* TRowsCache::CacheRow(ui32 tid, EReady ready,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> tags, const TRowState& rowState)
-{
- Y_VERIFY(ready != EReady::Page);
- Y_VERIFY(tags.size() == rowState.Size());
-
- auto& row = Rows[tid][SerializeKey(key)];
-
- row.Ready = ready;
- if (ready == EReady::Gone) {
- row.Rop = ERowOp::Erase;
- return &row;
- }
-
- row.Rop = rowState.GetRowState();
- for (TPos pos = 0; pos < tags.size(); ++pos) {
- const auto& cell = rowState.Get(pos);
- if (cell.IsNull()) {
- continue;
- }
-
- row.Cells[tags.at(pos)] = cell.AsBuf();
- }
-
- return &row;
-}
-
-const TRowsCache::TRow* TRowsCache::FindCachedRow(ui32 tid, TArrayRef<const TRawTypeValue> key) const {
- const auto* cached = Rows.FindPtr(tid);
- return cached
- ? cached->FindPtr(SerializeKey(key))
- : nullptr;
-}
-
-} // NDataShard
-} // NKikimr
diff --git a/ydb/core/tx/datashard/change_collector_helpers.h b/ydb/core/tx/datashard/change_collector_helpers.h
deleted file mode 100644
index 54bba063cd..0000000000
--- a/ydb/core/tx/datashard/change_collector_helpers.h
+++ /dev/null
@@ -1,38 +0,0 @@
-#pragma once
-
-#include <ydb/core/tablet_flat/flat_database.h>
-
-#include <util/generic/hash.h>
-
-namespace NKikimr {
-namespace NDataShard {
-
-class TRowsCache {
- struct TRow {
- NTable::EReady Ready;
- NTable::ERowOp Rop;
- THashMap<NTable::TTag, TString> Cells;
- };
-
- const TRow* CacheRow(ui32 tid, NTable::EReady ready,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> tags, const NTable::TRowState& rowState);
-
- const TRow* FindCachedRow(ui32 tid, TArrayRef<const TRawTypeValue> key) const;
-
-public:
- NTable::EReady SelectRow(NTable::TDatabase& db, ui32 tid,
- TArrayRef<const TRawTypeValue> key, const THashMap<NTable::TTag, NTable::TPos>& keyTagToPos,
- TArrayRef<const NTable::TTag> tags, NTable::TRowState& rowState, const TRowVersion& readVersion);
-
- void UpdateCachedRow(ui32 tid, NTable::ERowOp rop,
- TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TUpdateOp> updates);
-
- void Reset();
-
-private:
- THashMap<ui32, THashMap<TString, TRow>> Rows;
-
-}; // TRowsCache
-
-} // NDataShard
-} // NKikimr
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 622b6fcb4a..ad96a77331 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -268,6 +268,8 @@ void TChangeRecord::Out(IOutputStream& out) const {
<< " Body: " << Body.size() << "b"
<< " TableId: " << TableId
<< " SchemaVersion: " << SchemaVersion
+ << " LockId: " << LockId
+ << " LockOffset: " << LockOffset
<< " }";
}
@@ -275,6 +277,16 @@ TChangeRecordBuilder::TChangeRecordBuilder(EKind kind) {
Record.Kind = kind;
}
+TChangeRecordBuilder& TChangeRecordBuilder::WithLockId(ui64 lockId) {
+ Record.LockId = lockId;
+ return *this;
+}
+
+TChangeRecordBuilder& TChangeRecordBuilder::WithLockOffset(ui64 lockOffset) {
+ Record.LockOffset = lockOffset;
+ return *this;
+}
+
TChangeRecordBuilder& TChangeRecordBuilder::WithOrder(ui64 order) {
Record.Order = order;
return *this;
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index db7d920ddb..5c4f746942 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -33,6 +33,8 @@ public:
ui64 GetGroup() const { return Group; }
ui64 GetStep() const { return Step; }
ui64 GetTxId() const { return TxId; }
+ ui64 GetLockId() const { return LockId; }
+ ui64 GetLockOffset() const { return LockOffset; }
const TPathId& GetPathId() const { return PathId; }
EKind GetKind() const { return Kind; }
const TString& GetBody() const { return Body; }
@@ -51,10 +53,12 @@ public:
void Out(IOutputStream& out) const;
private:
- ui64 Order;
- ui64 Group;
- ui64 Step;
- ui64 TxId;
+ ui64 Order = Max<ui64>();
+ ui64 Group = 0;
+ ui64 Step = 0;
+ ui64 TxId = 0;
+ ui64 LockId = 0;
+ ui64 LockOffset = 0;
TPathId PathId;
EKind Kind;
TString Body;
@@ -74,6 +78,9 @@ class TChangeRecordBuilder {
public:
explicit TChangeRecordBuilder(EKind kind);
+ TChangeRecordBuilder& WithLockId(ui64 lockId);
+ TChangeRecordBuilder& WithLockOffset(ui64 lockOffset);
+
TChangeRecordBuilder& WithOrder(ui64 order);
TChangeRecordBuilder& WithGroup(ui64 group);
TChangeRecordBuilder& WithStep(ui64 step);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a60dc40a2b..eeb299a46b 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -512,8 +512,9 @@ void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, TEvDat
stats.SetCpuTimeUsec(totalCpuTime.MicroSeconds());
}
-ui64 TDataShard::AllocateChangeRecordOrder(NIceDb::TNiceDb& db) {
- const ui64 result = NextChangeRecordOrder++;
+ui64 TDataShard::AllocateChangeRecordOrder(NIceDb::TNiceDb& db, ui64 count) {
+ const ui64 result = NextChangeRecordOrder;
+ NextChangeRecordOrder = result + count;
PersistSys(db, Schema::Sys_NextChangeRecordOrder, NextChangeRecordOrder);
return result;
@@ -529,24 +530,83 @@ ui64 TDataShard::AllocateChangeRecordGroup(NIceDb::TNiceDb& db) {
return result;
}
+ui64 TDataShard::GetNextChangeRecordLockOffset(ui64 lockId) {
+ auto it = LockChangeRecords.find(lockId);
+ if (it == LockChangeRecords.end() || it->second.empty()) {
+ return 0;
+ }
+
+ return it->second.back().LockOffset + 1;
+}
+
void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PersistChangeRecord"
<< ": record: " << record
<< ", at tablet: " << TabletID());
- db.Table<Schema::ChangeRecords>().Key(record.GetOrder()).Update(
- NIceDb::TUpdate<Schema::ChangeRecords::Group>(record.GetGroup()),
- NIceDb::TUpdate<Schema::ChangeRecords::PlanStep>(record.GetStep()),
- NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()),
- NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
- NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
- NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()),
- NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
- NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
- NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
- db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
- NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
- NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()));
+ if (record.GetLockId() == 0) {
+ db.Table<Schema::ChangeRecords>().Key(record.GetOrder()).Update(
+ NIceDb::TUpdate<Schema::ChangeRecords::Group>(record.GetGroup()),
+ NIceDb::TUpdate<Schema::ChangeRecords::PlanStep>(record.GetStep()),
+ NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()),
+ NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
+ NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
+ NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()),
+ NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
+ NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
+ NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
+ db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
+ NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
+ NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()));
+ } else {
+ db.Table<Schema::LockChangeRecords>().Key(record.GetLockId(), record.GetLockOffset()).Update(
+ NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
+ NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
+ NIceDb::TUpdate<Schema::LockChangeRecords::BodySize>(record.GetBody().size()),
+ NIceDb::TUpdate<Schema::LockChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
+ NIceDb::TUpdate<Schema::LockChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
+ NIceDb::TUpdate<Schema::LockChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
+ db.Table<Schema::LockChangeRecordDetails>().Key(record.GetLockId(), record.GetLockOffset()).Update(
+ NIceDb::TUpdate<Schema::LockChangeRecordDetails::Kind>(record.GetKind()),
+ NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody()));
+ }
+}
+
+void TDataShard::PersistCommitLockChangeRecords(TTransactionContext& txc, ui64 order, ui64 lockId, ui64 group, const TRowVersion& rowVersion) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PersistCommitLockChangeRecords"
+ << ": order# " << order
+ << ", lockId# " << lockId
+ << ", group# " << group
+ << ", version# " << rowVersion
+ << ", at tablet: " << TabletID());
+
+ auto it = LockChangeRecords.find(lockId);
+ Y_VERIFY_S(it != LockChangeRecords.end() && !it->second.empty(), "Cannot commit lock " << lockId << " change records: there are no pending change records");
+
+ auto& entry = CommittedLockChangeRecords[lockId];
+ Y_VERIFY_S(entry.Order == Max<ui64>(), "Cannot commit lock " << lockId << " change records multiple times");
+ entry.Order = order;
+ entry.Group = group;
+ entry.Step = rowVersion.Step;
+ entry.TxId = rowVersion.TxId;
+ entry.Count = it->second.size();
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ db.Table<Schema::ChangeRecordCommits>().Key(order).Update(
+ NIceDb::TUpdate<Schema::ChangeRecordCommits::LockId>(lockId),
+ NIceDb::TUpdate<Schema::ChangeRecordCommits::Group>(group),
+ NIceDb::TUpdate<Schema::ChangeRecordCommits::PlanStep>(rowVersion.Step),
+ NIceDb::TUpdate<Schema::ChangeRecordCommits::TxId>(rowVersion.TxId));
+
+ txc.OnCommit([this, lockId]() {
+ // We expect operation to enqueue transformed change records,
+ // so we no longer need original uncommitted records.
+ LockChangeRecords.erase(lockId);
+ });
+ txc.OnRollback([this, lockId]() {
+ CommittedLockChangeRecords.erase(lockId);
+ });
}
void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId) {
@@ -560,14 +620,23 @@ void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId
NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(pathId.LocalPathId));
}
+void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId) {
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "MoveChangeRecord"
+ << ": lockId: " << lockId
+ << ", lockOffset: " << lockOffset
+ << ": pathId: " << pathId
+ << ", at tablet: " << TabletID());
+
+ db.Table<Schema::LockChangeRecords>().Key(lockId, lockOffset).Update(
+ NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(pathId.OwnerId),
+ NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(pathId.LocalPathId));
+}
+
void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "RemoveChangeRecord"
<< ": order: " << order
<< ", at tablet: " << TabletID());
- db.Table<Schema::ChangeRecords>().Key(order).Delete();
- db.Table<Schema::ChangeRecordDetails>().Key(order).Delete();
-
auto it = ChangesQueue.find(order);
if (it == ChangesQueue.end()) {
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
@@ -576,6 +645,24 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
const auto& record = it->second;
+ if (record.LockId) {
+ db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete();
+ db.Table<Schema::LockChangeRecordDetails>().Key(record.LockId, record.LockOffset).Delete();
+ // Delete ChangeRecordCommits row when the last record is removed
+ auto it = CommittedLockChangeRecords.find(record.LockId);
+ if (it != CommittedLockChangeRecords.end()) {
+ Y_VERIFY_DEBUG(it->second.Count > 0);
+ if (it->second.Count > 0 && 0 == --it->second.Count) {
+ db.Table<Schema::ChangeRecordCommits>().Key(it->second.Order).Delete();
+ CommittedLockChangeRecords.erase(it);
+ LockChangeRecords.erase(record.LockId);
+ }
+ }
+ } else {
+ db.Table<Schema::ChangeRecords>().Key(order).Delete();
+ db.Table<Schema::ChangeRecordDetails>().Key(order).Delete();
+ }
+
Y_VERIFY(record.BodySize <= ChangesQueueBytes);
ChangesQueueBytes -= record.BodySize;
@@ -625,15 +712,15 @@ void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChang
TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
- forward.emplace_back(record.Order(), record.PathId(), record.BodySize());
+ forward.emplace_back(record.Order, record.PathId, record.BodySize);
- if (auto res = ChangesQueue.emplace(record.Order(), record); res.second) {
- Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize()));
- ChangesQueueBytes += record.BodySize();
+ if (auto res = ChangesQueue.emplace(record.Order, record); res.second) {
+ Y_VERIFY(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
+ ChangesQueueBytes += record.BodySize;
- if (record.SchemaVersion()) {
+ if (record.SchemaVersion) {
res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
- TSchemaSnapshotKey(record.TableId(), record.SchemaVersion()));
+ TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
}
}
}
@@ -645,6 +732,48 @@ void TDataShard::EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChang
Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}
+void TDataShard::AddLockChangeRecords(ui64 lockId, TVector<NMiniKQL::IChangeCollector::TChange>&& records) {
+ if (!records) {
+ return;
+ }
+
+ auto orderedByLockOffset = [](auto& records) -> bool {
+ auto it = records.begin();
+ ui64 prevOffset = it->LockOffset;
+ while (++it != records.end()) {
+ ui64 offset = it->LockOffset;
+ if (!(prevOffset < offset)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ Y_VERIFY_DEBUG(orderedByLockOffset(records));
+
+ auto& lockChanges = LockChangeRecords[lockId];
+ if (lockChanges.empty()) {
+ lockChanges = std::move(records);
+ return;
+ }
+
+ Y_VERIFY_DEBUG(lockChanges.back().LockOffset < records.front().LockOffset);
+
+ lockChanges.reserve(lockChanges.size() + records.size());
+ for (auto& record : records) {
+ lockChanges.emplace_back(std::move(record));
+ }
+}
+
+const TVector<NMiniKQL::IChangeCollector::TChange>& TDataShard::GetLockChangeRecords(ui64 lockId) const {
+ auto it = LockChangeRecords.find(lockId);
+ if (it == LockChangeRecords.end()) {
+ static TVector<NMiniKQL::IChangeCollector::TChange> empty;
+ return empty;
+ }
+
+ return it->second;
+}
+
void TDataShard::CreateChangeSender(const TActorContext& ctx) {
Y_VERIFY(!OutChangeSender);
OutChangeSender = Register(NDataShard::CreateChangeSender(this));
@@ -727,6 +856,9 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang
while (!rowset.EndOfSet()) {
const ui64 order = rowset.GetValue<Schema::ChangeRecords::Order>();
+ const ui64 group = rowset.GetValue<Schema::ChangeRecords::Group>();
+ const ui64 step = rowset.GetValue<Schema::ChangeRecords::PlanStep>();
+ const ui64 txId = rowset.GetValue<Schema::ChangeRecords::TxId>();
const ui64 bodySize = rowset.GetValue<Schema::ChangeRecords::BodySize>();
const ui64 schemaVersion = rowset.GetValue<Schema::ChangeRecords::SchemaVersion>();
const auto pathId = TPathId(
@@ -738,7 +870,63 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang
rowset.GetValue<Schema::ChangeRecords::TablePathId>()
);
- records.emplace_back(order, pathId, bodySize, tableId, schemaVersion);
+ records.push_back(NMiniKQL::IChangeCollector::TChange{
+ .Order = order,
+ .Group = group,
+ .Step = step,
+ .TxId = txId,
+ .PathId = pathId,
+ .BodySize = bodySize,
+ .TableId = tableId,
+ .SchemaVersion = schemaVersion,
+ });
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool TDataShard::LoadLockChangeRecords(NIceDb::TNiceDb& db) {
+ using Schema = TDataShard::Schema;
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadLockChangeRecords"
+ << " at tablet: " << TabletID());
+
+ auto rowset = db.Table<Schema::LockChangeRecords>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ const ui64 lockId = rowset.GetValue<Schema::LockChangeRecords::LockId>();
+ const ui64 lockOffset = rowset.GetValue<Schema::LockChangeRecords::LockOffset>();
+ const ui64 bodySize = rowset.GetValue<Schema::LockChangeRecords::BodySize>();
+ const ui64 schemaVersion = rowset.GetValue<Schema::LockChangeRecords::SchemaVersion>();
+ const auto pathId = TPathId(
+ rowset.GetValue<Schema::LockChangeRecords::PathOwnerId>(),
+ rowset.GetValue<Schema::LockChangeRecords::LocalPathId>()
+ );
+ const auto tableId = TPathId(
+ rowset.GetValue<Schema::LockChangeRecords::TableOwnerId>(),
+ rowset.GetValue<Schema::LockChangeRecords::TablePathId>()
+ );
+
+ LockChangeRecords[lockId].push_back(NMiniKQL::IChangeCollector::TChange{
+ .Order = Max<ui64>(),
+ .Group = 0,
+ .Step = 0,
+ .TxId = 0,
+ .PathId = pathId,
+ .BodySize = bodySize,
+ .TableId = tableId,
+ .SchemaVersion = schemaVersion,
+ .LockId = lockId,
+ .LockOffset = lockOffset,
+ });
+
if (!rowset.Next()) {
return false;
}
@@ -747,6 +935,100 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChang
return true;
}
+bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records) {
+ using Schema = TDataShard::Schema;
+
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecordCommits"
+ << " at tablet: " << TabletID());
+
+ bool needSort = false;
+
+ auto rowset = db.Table<Schema::ChangeRecordCommits>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ while (!rowset.EndOfSet()) {
+ const ui64 order = rowset.GetValue<Schema::ChangeRecordCommits::Order>();
+ const ui64 lockId = rowset.GetValue<Schema::ChangeRecordCommits::LockId>();
+ const ui64 group = rowset.GetValue<Schema::ChangeRecordCommits::Group>();
+ const ui64 step = rowset.GetValue<Schema::ChangeRecordCommits::PlanStep>();
+ const ui64 txId = rowset.GetValue<Schema::ChangeRecordCommits::TxId>();
+
+ auto& entry = CommittedLockChangeRecords[lockId];
+ entry.Order = order;
+ entry.Group = group;
+ entry.Step = step;
+ entry.TxId = txId;
+
+ for (auto& record : LockChangeRecords[lockId]) {
+ records.push_back(NMiniKQL::IChangeCollector::TChange{
+ .Order = order + record.LockOffset,
+ .Group = group,
+ .Step = step,
+ .TxId = txId,
+ .PathId = record.PathId,
+ .BodySize = record.BodySize,
+ .TableId = record.TableId,
+ .SchemaVersion = record.SchemaVersion,
+ .LockId = record.LockId,
+ .LockOffset = record.LockOffset,
+ });
+ entry.Count++;
+ needSort = true;
+ }
+
+ LockChangeRecords.erase(lockId);
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ if (needSort) {
+ std::sort(records.begin(), records.end(), [](const auto& a, const auto& b) -> bool {
+ return a.Order < b.Order;
+ });
+ }
+
+ return true;
+}
+
+void TDataShard::ScheduleRemoveLockChanges(ui64 lockId) {
+ if (LockChangeRecords.contains(lockId) && !CommittedLockChangeRecords.contains(lockId)) {
+ bool wasEmpty = PendingLockChangeRecordsToRemove.empty();
+ PendingLockChangeRecordsToRemove.push_back(lockId);
+ if (wasEmpty) {
+ Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords);
+ }
+ }
+}
+
+void TDataShard::ScheduleRemoveAbandonedLockChanges() {
+ bool wasEmpty = PendingLockChangeRecordsToRemove.empty();
+
+ for (const auto& pr : LockChangeRecords) {
+ ui64 lockId = pr.first;
+
+ if (CommittedLockChangeRecords.contains(lockId)) {
+ // Skip committed lock changes
+ continue;
+ }
+
+ auto lock = SysLocksTable().GetRawLock(lockId);
+ if (lock && lock->IsPersistent()) {
+ // Skip lock changes attached to persistent locks
+ continue;
+ }
+
+ PendingLockChangeRecordsToRemove.push_back(lockId);
+ }
+
+ if (wasEmpty && !PendingLockChangeRecordsToRemove.empty()) {
+ Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords);
+ }
+}
+
void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index dd99d11cee..939a9bbcab 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -1,5 +1,6 @@
#include "change_collector.h"
#include "datashard_impl.h"
+#include "datashard_user_db.h"
#include "datashard__engine_host.h"
#include "sys_tables.h"
@@ -191,7 +192,7 @@ TIntrusivePtr<TThrRefBase> InitDataShardSysTables(TDataShard* self) {
}
///
-class TDataShardEngineHost : public TEngineHost {
+class TDataShardEngineHost : public TEngineHost, public IDataShardUserDb {
public:
TDataShardEngineHost(TDataShard* self, TEngineBay& engineBay, NTable::TDatabase& db, TEngineHostCounters& counters, ui64& lockTxId, ui32& lockNodeId, TInstant now)
: TEngineHost(db, counters,
@@ -205,7 +206,24 @@ public:
, LockTxId(lockTxId)
, LockNodeId(lockNodeId)
, Now(now)
- {}
+ {
+ }
+
+ NTable::EReady SelectRow(
+ const TTableId& tableId,
+ TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const NTable::TTag> tags,
+ NTable::TRowState& row) override
+ {
+ auto tid = LocalTableId(tableId);
+
+ return DB.Select(
+ tid, key, tags, row,
+ /* readFlags */ 0,
+ ReadVersion,
+ GetReadTxMap(tableId),
+ GetReadTxObserver(tableId));
+ }
void SetWriteVersion(TRowVersion writeVersion) {
WriteVersion = writeVersion;
@@ -235,7 +253,7 @@ public:
IsRepeatableSnapshot = true;
}
- IChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
+ IDataShardChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
auto it = ChangeCollectors.find(tableId);
if (it != ChangeCollectors.end()) {
return it->second.Get();
@@ -246,10 +264,33 @@ public:
return it->second.Get();
}
- it->second.Reset(CreateChangeCollector(*Self, DB, tableId.PathId.LocalPathId, IsImmediateTx));
+ it->second.Reset(CreateChangeCollector(*Self, *const_cast<TDataShardEngineHost*>(this), DB, tableId.PathId.LocalPathId, IsImmediateTx));
return it->second.Get();
}
+ void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) {
+ auto localTid = Self->GetLocalTableId(tableId);
+ Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << Self->TabletID());
+
+ if (!DB.HasOpenTx(localTid, lockId)) {
+ return;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+ "Committing changes lockId# " << lockId << " in localTid# " << localTid << " shard# " << Self->TabletID());
+ DB.CommitTx(localTid, lockId, writeVersion);
+
+ if (!CommittedLockChanges.contains(lockId)) {
+ if (const auto& lockChanges = Self->GetLockChangeRecords(lockId)) {
+ if (auto* collector = GetChangeCollector(tableId)) {
+ collector->SetWriteVersion(WriteVersion);
+ collector->CommitLockChanges(lockId, lockChanges, txc);
+ CommittedLockChanges.insert(lockId);
+ }
+ }
+ }
+ }
+
TVector<IChangeCollector::TChange> GetCollectedChanges() const {
TVector<IChangeCollector::TChange> total;
@@ -636,7 +677,8 @@ private:
TInstant Now;
TRowVersion WriteVersion = TRowVersion::Max();
TRowVersion ReadVersion = TRowVersion::Min();
- mutable THashMap<TTableId, THolder<IChangeCollector>> ChangeCollectors;
+ THashSet<ui64> CommittedLockChanges;
+ mutable THashMap<TTableId, THolder<IDataShardChangeCollector>> ChangeCollectors;
mutable THashMap<TTableId, NTable::ITransactionMapPtr> TxMaps;
mutable THashMap<TTableId, NTable::ITransactionObserverPtr> TxObservers;
};
@@ -823,6 +865,13 @@ void TEngineBay::SetIsRepeatableSnapshot() {
host->SetIsRepeatableSnapshot();
}
+void TEngineBay::CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ host->CommitChanges(tableId, lockId, writeVersion, txc);
+}
+
TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const {
Y_VERIFY(EngineHost);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 588d7d940c..9cb91f7f7a 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -100,6 +100,8 @@ public:
void SetIsImmediateTx();
void SetIsRepeatableSnapshot();
+ void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc);
+
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const;
void ResetCollectedChanges();
diff --git a/ydb/core/tx/datashard/datashard__init.cpp b/ydb/core/tx/datashard/datashard__init.cpp
index 3f2d579d6a..626b9afa5b 100644
--- a/ydb/core/tx/datashard/datashard__init.cpp
+++ b/ydb/core/tx/datashard/datashard__init.cpp
@@ -33,6 +33,8 @@ bool TDataShard::TTxInit::Execute(TTransactionContext& txc, const TActorContext&
Self->KillChangeSender(ctx);
Self->ChangesQueue.clear();
+ Self->LockChangeRecords.clear();
+ Self->CommittedLockChangeRecords.clear();
ChangeRecords.clear();
bool done = ReadEverything(txc);
@@ -418,6 +420,18 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
}
}
+ if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::LockChangeRecords::TableId)) {
+ if (!Self->LoadLockChangeRecords(db)) {
+ return false;
+ }
+ }
+
+ if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ChangeRecordCommits::TableId)) {
+ if (!Self->LoadChangeRecordCommits(db, ChangeRecords)) {
+ return false;
+ }
+ }
+
Self->ReplicatedTables.clear();
if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::ReplicationSources::TableId)) {
auto rowset = db.Table<Schema::ReplicationSources>().Select();
@@ -502,6 +516,8 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
Self->SubscribeNewLocks();
+ Self->ScheduleRemoveAbandonedLockChanges();
+
return true;
}
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index f2b882e785..ce629bb4b2 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -180,6 +180,10 @@ public:
void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
+ void CommitChanges(const TTableId& tableId, ui64 lockId, const TRowVersion& writeVersion, TTransactionContext& txc) {
+ EngineBay.CommitChanges(tableId, lockId, writeVersion, txc);
+ }
+
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const { return EngineBay.GetCollectedChanges(); }
void ResetCollectedChanges() { EngineBay.ResetCollectedChanges(); }
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 7e532420ba..aeac9bc5a6 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -16,14 +16,24 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
for (const auto& [_, records] : Self->ChangeRecordsRequested) {
for (const auto& record : records) {
+ auto itQueue = Self->ChangesQueue.find(record.Order);
+ if (itQueue == Self->ChangesQueue.end()) {
+ continue;
+ }
+
if (bodiesSize && (bodiesSize + record.BodySize) > MemLimit) {
break;
}
bodiesSize += record.BodySize;
- ok = ok && db.Table<Schema::ChangeRecords>().Key(record.Order).Precharge();
- ok = ok && db.Table<Schema::ChangeRecordDetails>().Key(record.Order).Precharge();
+ if (itQueue->second.LockId) {
+ ok = ok && db.Table<Schema::LockChangeRecords>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Precharge();
+ ok = ok && db.Table<Schema::LockChangeRecordDetails>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Precharge();
+ } else {
+ ok = ok && db.Table<Schema::ChangeRecords>().Key(record.Order).Precharge();
+ ok = ok && db.Table<Schema::ChangeRecordDetails>().Key(record.Order).Precharge();
+ }
}
}
@@ -38,59 +48,130 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
auto it = records.begin();
while (it != records.end()) {
+ auto itQueue = Self->ChangesQueue.find(it->Order);
+ if (itQueue == Self->ChangesQueue.end()) {
+ RecordsToForget[recipient].emplace_back(it->Order);
+ it = records.erase(it);
+ continue;
+ }
+
if (MemUsage && (MemUsage + it->BodySize) > MemLimit) {
break;
}
- auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select();
- auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select();
+ if (itQueue->second.LockId) {
+ auto itCommit = Self->CommittedLockChangeRecords.find(itQueue->second.LockId);
+ if (itCommit == Self->CommittedLockChangeRecords.end()) {
+ Y_VERIFY_DEBUG_S(false, "Unexpected change record " << it->Order << " from an uncommitted lock " << itQueue->second.LockId);
+ RecordsToForget[recipient].emplace_back(it->Order);
+ it = records.erase(it);
+ continue;
+ }
- if (!basic.IsReady() || !details.IsReady()) {
- return false;
- }
+ auto basic = db.Table<Schema::LockChangeRecords>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Select();
+ auto details = db.Table<Schema::LockChangeRecordDetails>().Key(itQueue->second.LockId, itQueue->second.LockOffset).Select();
- if (!basic.IsValid() && !details.IsValid()) {
- RecordsToForget[recipient].emplace_back(it->Order);
- it = records.erase(it);
- continue;
- }
+ if (!basic.IsReady() || !details.IsReady()) {
+ return false;
+ }
+
+ if (!basic.IsValid() && !details.IsValid()) {
+ RecordsToForget[recipient].emplace_back(it->Order);
+ it = records.erase(it);
+ continue;
+ }
+
+ Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details"
+ << ", basic.IsValid: " << basic.IsValid()
+ << ", details.IsValid: " << details.IsValid()
+ << ", recipient: " << recipient
+ << ", records.size: " << records.size()
+ << ", it->Order: " << it->Order
+ << ", it->BodySize: " << it->BodySize
+ << ", LockId: " << itQueue->second.LockId
+ << ", LockOffset: " << itQueue->second.LockOffset);
+
+ const auto schemaVersion = basic.GetValue<Schema::LockChangeRecords::SchemaVersion>();
+ const auto tableId = TPathId(
+ basic.GetValue<Schema::LockChangeRecords::TableOwnerId>(),
+ basic.GetValue<Schema::LockChangeRecords::TablePathId>()
+ );
+
+ TUserTable::TCPtr schema;
+ if (schemaVersion) {
+ const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion);
+ if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) {
+ schema = snapshot->Schema;
+ }
+ }
+
+ RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::LockChangeRecordDetails::Kind>())
+ .WithOrder(it->Order)
+ .WithGroup(itCommit->second.Group)
+ .WithStep(itCommit->second.Step)
+ .WithTxId(itCommit->second.TxId)
+ .WithPathId(TPathId(
+ basic.GetValue<Schema::LockChangeRecords::PathOwnerId>(),
+ basic.GetValue<Schema::LockChangeRecords::LocalPathId>()
+ ))
+ .WithTableId(tableId)
+ .WithSchemaVersion(schemaVersion)
+ .WithSchema(schema)
+ .WithBody(details.GetValue<Schema::LockChangeRecordDetails::Body>())
+ .WithLockId(itQueue->second.LockId)
+ .WithLockOffset(itQueue->second.LockOffset)
+ .Build());
+ } else {
+ auto basic = db.Table<Schema::ChangeRecords>().Key(it->Order).Select();
+ auto details = db.Table<Schema::ChangeRecordDetails>().Key(it->Order).Select();
+
+ if (!basic.IsReady() || !details.IsReady()) {
+ return false;
+ }
+
+ if (!basic.IsValid() && !details.IsValid()) {
+ RecordsToForget[recipient].emplace_back(it->Order);
+ it = records.erase(it);
+ continue;
+ }
- Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details"
- << ", basic.IsValid: " << basic.IsValid()
- << ", details.IsValid: " << details.IsValid()
- << ", recipient: " << recipient
- << ", records.size: " << records.size()
- << ", it->Order: " << it->Order
- << ", it->BodySize: " << it->BodySize);
-
- const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>();
- const auto tableId = TPathId(
- basic.GetValue<Schema::ChangeRecords::TableOwnerId>(),
- basic.GetValue<Schema::ChangeRecords::TablePathId>()
- );
-
- TUserTable::TCPtr schema;
- if (schemaVersion) {
- const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion);
- if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) {
- schema = snapshot->Schema;
+ Y_VERIFY_S(basic.IsValid() && details.IsValid(), "Inconsistent basic and details"
+ << ", basic.IsValid: " << basic.IsValid()
+ << ", details.IsValid: " << details.IsValid()
+ << ", recipient: " << recipient
+ << ", records.size: " << records.size()
+ << ", it->Order: " << it->Order
+ << ", it->BodySize: " << it->BodySize);
+
+ const auto schemaVersion = basic.GetValue<Schema::ChangeRecords::SchemaVersion>();
+ const auto tableId = TPathId(
+ basic.GetValue<Schema::ChangeRecords::TableOwnerId>(),
+ basic.GetValue<Schema::ChangeRecords::TablePathId>()
+ );
+
+ TUserTable::TCPtr schema;
+ if (schemaVersion) {
+ const auto snapshotKey = TSchemaSnapshotKey(tableId, schemaVersion);
+ if (const auto* snapshot = Self->GetSchemaSnapshotManager().FindSnapshot(snapshotKey)) {
+ schema = snapshot->Schema;
+ }
}
+
+ RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>())
+ .WithOrder(it->Order)
+ .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>())
+ .WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>())
+ .WithTxId(basic.GetValue<Schema::ChangeRecords::TxId>())
+ .WithPathId(TPathId(
+ basic.GetValue<Schema::ChangeRecords::PathOwnerId>(),
+ basic.GetValue<Schema::ChangeRecords::LocalPathId>()
+ ))
+ .WithTableId(tableId)
+ .WithSchemaVersion(schemaVersion)
+ .WithSchema(schema)
+ .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
+ .Build());
}
-
- RecordsToSend[recipient].emplace_back(TChangeRecordBuilder(details.GetValue<Schema::ChangeRecordDetails::Kind>())
- .WithOrder(it->Order)
- .WithGroup(basic.GetValue<Schema::ChangeRecords::Group>())
- .WithStep(basic.GetValue<Schema::ChangeRecords::PlanStep>())
- .WithTxId(basic.GetValue<Schema::ChangeRecords::TxId>())
- .WithPathId(TPathId(
- basic.GetValue<Schema::ChangeRecords::PathOwnerId>(),
- basic.GetValue<Schema::ChangeRecords::LocalPathId>()
- ))
- .WithTableId(tableId)
- .WithSchemaVersion(schemaVersion)
- .WithSchema(schema)
- .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
- .Build());
MemUsage += it->BodySize;
it = records.erase(it);
diff --git a/ydb/core/tx/datashard/datashard_common_upload.cpp b/ydb/core/tx/datashard/datashard_common_upload.cpp
index 3501f8d7cb..1e2dbf816c 100644
--- a/ydb/core/tx/datashard/datashard_common_upload.cpp
+++ b/ydb/core/tx/datashard/datashard_common_upload.cpp
@@ -1,5 +1,6 @@
#include "change_collector.h"
#include "datashard_common_upload.h"
+#include "datashard_user_db.h"
namespace NKikimr {
namespace NDataShard {
@@ -59,8 +60,10 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
const bool breakWriteConflicts = BreakLocks && self->SysLocksTable().HasWriteLocks(fullTableId);
+ TDataShardUserDb userDb(*self, txc.DB, readVersion);
+
if (CollectChanges) {
- ChangeCollector.Reset(CreateChangeCollector(*self, txc.DB, tableInfo, true));
+ ChangeCollector.Reset(CreateChangeCollector(*self, userDb, txc.DB, tableInfo, true));
}
if (ChangeCollector) {
diff --git a/ydb/core/tx/datashard/datashard_direct_erase.cpp b/ydb/core/tx/datashard/datashard_direct_erase.cpp
index 2014d71bc2..1862e7d81e 100644
--- a/ydb/core/tx/datashard/datashard_direct_erase.cpp
+++ b/ydb/core/tx/datashard/datashard_direct_erase.cpp
@@ -1,5 +1,6 @@
#include "change_collector.h"
#include "datashard_direct_erase.h"
+#include "datashard_user_db.h"
#include "erase_rows_condition.h"
#include <ydb/core/base/appdata.h>
@@ -58,6 +59,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
}
}
+ std::optional<TDataShardUserDb> userDb;
+
THolder<IEraseRowsCondition> condition;
if (params) {
condition.Reset(CreateEraseRowsCondition(request));
@@ -65,7 +68,8 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
condition->Prepare(params.Txc->DB.GetRowScheme(localTableId), 0);
}
- params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, params.Txc->DB, tableInfo, true));
+ userDb.emplace(*self, params.Txc->DB, params.ReadVersion);
+ params.Tx->ChangeCollector.Reset(CreateChangeCollector(*self, *userDb, params.Txc->DB, tableInfo, true));
}
if (auto collector = params.GetChangeCollector()) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 01333c30e6..f0ec337de9 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -214,6 +214,7 @@ class TDataShard
class TTxPersistFullCompactionTs;
class TTxRemoveLock;
class TTxGetOpenTxs;
+ class TTxRemoveLockChangeRecords;
template <typename T> friend class TTxDirectBase;
class TTxUploadRows;
@@ -317,6 +318,7 @@ class TDataShard
EvRemoveChangeRecords,
EvReplicationSourceOffsets,
EvMediatorRestoreBackup,
+ EvRemoveLockChangeRecords,
EvEnd
};
@@ -458,6 +460,8 @@ class TDataShard
};
struct TEvMediatorRestoreBackup : public TEventLocal<TEvMediatorRestoreBackup, EvMediatorRestoreBackup> {};
+
+ struct TEvRemoveLockChangeRecords : public TEventLocal<TEvRemoveLockChangeRecords, EvRemoveLockChangeRecords> {};
};
struct Schema : NIceDb::Schema {
@@ -816,12 +820,65 @@ class TDataShard
using TColumns = TableColumns<LockId, ConflictId>;
};
+ struct LockChangeRecords : Table<101> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct BodySize : Column<5, NScheme::NTypeIds::Uint64> {};
+ struct SchemaVersion : Column<6, NScheme::NTypeIds::Uint64> {};
+ struct TableOwnerId : Column<7, NScheme::NTypeIds::Uint64> {};
+ struct TablePathId : Column<8, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<LockId, LockOffset>;
+ using TColumns = TableColumns<
+ LockId,
+ LockOffset,
+ PathOwnerId,
+ LocalPathId,
+ BodySize,
+ SchemaVersion,
+ TableOwnerId,
+ TablePathId
+ >;
+ };
+
+ struct LockChangeRecordDetails : Table<102> {
+ struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct LockOffset : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct Kind : Column<3, NScheme::NTypeIds::Uint8> { using Type = TChangeRecord::EKind; };
+ struct Body : Column<4, NScheme::NTypeIds::String> { using Type = TString; };
+
+ using TKey = TableKey<LockId, LockOffset>;
+ using TColumns = TableColumns<LockId, LockOffset, Kind, Body>;
+ };
+
+ // Maps [Order ... Order+N-1] change records in the shard order
+ // to [0 ... N-1] change records from LockId
+ struct ChangeRecordCommits : Table<103> {
+ struct Order : Column<1, NScheme::NTypeIds::Uint64> {};
+ struct LockId : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct Group : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct PlanStep : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct TxId : Column<5, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<Order>;
+ using TColumns = TableColumns<
+ Order,
+ LockId,
+ Group,
+ PlanStep,
+ TxId
+ >;
+ };
+
using TTables = SchemaTables<Sys, UserTables, TxMain, TxDetails, InReadSets, OutReadSets, PlanQueue,
DeadlineQueue, SchemaOperations, SplitSrcSnapshots, SplitDstReceivedSnapshots, TxArtifacts, ScanProgress,
Snapshots, S3Uploads, S3Downloads, ChangeRecords, ChangeRecordDetails, ChangeSenders, S3UploadedParts,
SrcChangeSenderActivations, DstChangeSenderActivations,
ReplicationSourceOffsets, ReplicationSources, DstReplicationSourceOffsetsReceived,
- UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts>;
+ UserTablesStats, SchemaSnapshots, Locks, LockRanges, LockConflicts,
+ LockChangeRecords, LockChangeRecordDetails, ChangeRecordCommits>;
// These settings are persisted on each Init. So we use empty settings in order not to overwrite what
// was changed by the user
@@ -1076,6 +1133,8 @@ class TDataShard
void Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr& ev, const TActorContext& ctx);
+
void HandleByReplicationSourceOffsetsServer(STATEFN_SIG);
void DoPeriodicTasks(const TActorContext &ctx);
@@ -1517,18 +1576,27 @@ public:
void PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid);
- ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db);
+ ui64 AllocateChangeRecordOrder(NIceDb::TNiceDb& db, ui64 count = 1);
ui64 AllocateChangeRecordGroup(NIceDb::TNiceDb& db);
+ ui64 GetNextChangeRecordLockOffset(ui64 lockId);
void PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record);
+ void PersistCommitLockChangeRecords(TTransactionContext& txc, ui64 order, ui64 lockId, ui64 group, const TRowVersion& rowVersion);
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
+ void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<NMiniKQL::IChangeCollector::TChange>&& records);
+ void AddLockChangeRecords(ui64 lockId, TVector<NMiniKQL::IChangeCollector::TChange>&& records);
+ const TVector<NMiniKQL::IChangeCollector::TChange>& GetLockChangeRecords(ui64 lockId) const;
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
void MaybeActivateChangeSender(const TActorContext& ctx);
void SuspendChangeSender(const TActorContext& ctx);
const TActorId& GetChangeSender() const { return OutChangeSender; }
bool LoadChangeRecords(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records);
+ bool LoadLockChangeRecords(NIceDb::TNiceDb& db);
+ bool LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<NMiniKQL::IChangeCollector::TChange>& records);
+ void ScheduleRemoveLockChanges(ui64 lockId);
+ void ScheduleRemoveAbandonedLockChanges();
static void PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation& op);
@@ -2281,17 +2349,23 @@ private:
TPathId TableId;
ui64 SchemaVersion;
bool SchemaSnapshotAcquired;
+ ui64 LockId;
+ ui64 LockOffset;
- explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, ui64 schemaVersion)
+ explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
+ ui64 schemaVersion, ui64 lockId = 0, ui64 lockOffset = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, SchemaSnapshotAcquired(false)
+ , LockId(lockId)
+ , LockOffset(lockOffset)
{
}
explicit TEnqueuedRecord(const NMiniKQL::IChangeCollector::TChange& record)
- : TEnqueuedRecord(record.BodySize(), record.TableId(), record.SchemaVersion())
+ : TEnqueuedRecord(record.BodySize, record.TableId,
+ record.SchemaVersion, record.LockId, record.LockOffset)
{
}
};
@@ -2313,6 +2387,20 @@ private:
TActorId OutChangeSender;
bool OutChangeSenderSuspended = false;
+ struct TCommittedLockChangeRecords {
+ ui64 Order = Max<ui64>();
+ ui64 Group;
+ ui64 Step;
+ ui64 TxId;
+
+ // The number of records that are not deleted yet
+ size_t Count = 0;
+ };
+
+ THashMap<ui64, TVector<NMiniKQL::IChangeCollector::TChange>> LockChangeRecords; // ui64 is lock id
+ THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id
+ TVector<ui64> PendingLockChangeRecordsToRemove;
+
// in
THashMap<ui64, TInChangeSender> InChangeSenders; // ui64 is shard id
@@ -2355,6 +2443,35 @@ private:
NTable::ITransactionObserverPtr BreakWriteConflictsTxObserver;
+public:
+ auto& GetLockChangeRecords() {
+ return LockChangeRecords;
+ }
+
+ auto TakeLockChangeRecords() {
+ auto result = std::move(LockChangeRecords);
+ LockChangeRecords.clear();
+ return result;
+ }
+
+ void SetLockChangeRecords(THashMap<ui64, TVector<NMiniKQL::IChangeCollector::TChange>>&& lockChangeRecords) {
+ LockChangeRecords = std::move(lockChangeRecords);
+ }
+
+ auto& GetCommittedLockChangeRecords() {
+ return CommittedLockChangeRecords;
+ }
+
+ auto TakeCommittedLockChangeRecords() {
+ auto result = std::move(CommittedLockChangeRecords);
+ CommittedLockChangeRecords.clear();
+ return result;
+ }
+
+ void SetCommittedLockChangeRecords(THashMap<ui64, TCommittedLockChangeRecords>&& committedLockChangeRecords) {
+ CommittedLockChangeRecords = std::move(committedLockChangeRecords);
+ }
+
protected:
// Redundant init state required by flat executor implementation
void StateInit(TAutoPtr<NActors::IEventHandle> &ev, const NActors::TActorContext &ctx) {
@@ -2379,6 +2496,7 @@ protected:
HFuncTraced(TEvMediatorTimecast::TEvSubscribeReadStepResult, Handle);
HFuncTraced(TEvMediatorTimecast::TEvNotifyPlanStep, Handle);
HFuncTraced(TEvPrivate::TEvMediatorRestoreBackup, Handle);
+ HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
HFuncTraced(TEvents::TEvPoisonPill, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
@@ -2494,6 +2612,7 @@ protected:
fFunc(TEvDataShard::EvReplicationSourceOffsetsCancel, HandleByReplicationSourceOffsetsServer);
HFunc(TEvLongTxService::TEvLockStatus, Handle);
HFunc(TEvDataShard::TEvGetOpenTxs, Handle);
+ HFuncTraced(TEvPrivate::TEvRemoveLockChangeRecords, Handle);
default:
if (!HandleDefaultEvents(ev, ctx)) {
LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index fa574570ae..fbde3b04fb 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -709,16 +709,9 @@ void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writ
sysLocks.CommitLock(lockKey);
TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId());
- auto localTid = dataShard.GetLocalTableId(tableId);
- Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin);
-
auto txId = lockProto.GetLockId();
- if (!txc.DB.HasOpenTx(localTid, txId)) {
- continue;
- }
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLocks: commit txId# " << txId << " in localTid# " << localTid);
- txc.DB.CommitTx(localTid, txId, writeVersion);
+ tx->GetDataTx()->CommitChanges(tableId, txId, writeVersion, txc);
}
} else {
KqpEraseLocks(origin, tx, sysLocks);
diff --git a/ydb/core/tx/datashard/datashard_locks_db.cpp b/ydb/core/tx/datashard/datashard_locks_db.cpp
index 3c74047d8b..bb2a1ad6ef 100644
--- a/ydb/core/tx/datashard/datashard_locks_db.cpp
+++ b/ydb/core/tx/datashard/datashard_locks_db.cpp
@@ -122,6 +122,8 @@ void TDataShardLocksDb::PersistRemoveLock(ui64 lockId) {
NIceDb::TNiceDb db(DB);
db.Table<Schema::Locks>().Key(lockId).Delete();
HasChanges_ = true;
+
+ Self.ScheduleRemoveLockChanges(lockId);
}
void TDataShardLocksDb::PersistAddRange(ui64 lockId, ui64 rangeId, const TPathId& tableId, ui64 flags, const TString& data) {
diff --git a/ydb/core/tx/datashard/datashard_user_db.cpp b/ydb/core/tx/datashard/datashard_user_db.cpp
new file mode 100644
index 0000000000..fccc69d2e7
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_user_db.cpp
@@ -0,0 +1,17 @@
+#include "datashard_user_db.h"
+
+namespace NKikimr::NDataShard {
+
+NTable::EReady TDataShardUserDb::SelectRow(
+ const TTableId& tableId,
+ TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const NTable::TTag> tags,
+ NTable::TRowState& row)
+{
+ auto tid = Self.GetLocalTableId(tableId);
+ Y_VERIFY(tid != 0, "Unexpected SelectRow for an unknown table");
+
+ return Db.Select(tid, key, tags, row, /* readFlags */ 0, ReadVersion);
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_user_db.h b/ydb/core/tx/datashard/datashard_user_db.h
new file mode 100644
index 0000000000..1bace953ee
--- /dev/null
+++ b/ydb/core/tx/datashard/datashard_user_db.h
@@ -0,0 +1,38 @@
+#pragma once
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+class IDataShardUserDb {
+protected:
+ ~IDataShardUserDb() = default;
+
+public:
+ virtual NTable::EReady SelectRow(
+ const TTableId& tableId,
+ TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const NTable::TTag> tags,
+ NTable::TRowState& row) = 0;
+};
+
+class TDataShardUserDb final : public IDataShardUserDb {
+public:
+ TDataShardUserDb(TDataShard& self, NTable::TDatabase& db, const TRowVersion& readVersion = TRowVersion::Min())
+ : Self(self)
+ , Db(db)
+ , ReadVersion(readVersion)
+ { }
+
+ NTable::EReady SelectRow(
+ const TTableId& tableId,
+ TArrayRef<const TRawTypeValue> key,
+ TArrayRef<const NTable::TTag> tags,
+ NTable::TRowState& row) override;
+
+private:
+ TDataShard& Self;
+ NTable::TDatabase& Db;
+ TRowVersion ReadVersion;
+};
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/datashard_ut_common_kqp.h b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
index d398647ff9..b74aa31369 100644
--- a/ydb/core/tx/datashard/datashard_ut_common_kqp.h
+++ b/ydb/core/tx/datashard/datashard_ut_common_kqp.h
@@ -161,6 +161,20 @@ namespace NKqpHelpers {
return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
}
+ inline TString KqpSimpleStaleRoExec(TTestActorRuntime& runtime, const TString& query) {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleStaleRoRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "<empty>";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ }
+
inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
auto reqSender = runtime.AllocateEdgeActor();
sessionId = CreateSession(runtime, reqSender);
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 88a7364530..5e5a1ea915 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1620,6 +1620,14 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
}
break;
}
+ case TEvChangeExchange::TEvApplyRecords::EventType: {
+ if (BlockApplyRecords) {
+ Cerr << "... blocked ApplyRecords" << Endl;
+ BlockedApplyRecords.push_back(THolder(ev.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
}
return PrevObserver(Runtime, ev);
}
@@ -1643,6 +1651,8 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
bool InjectClearTasks = false;
bool BlockReadSets = false;
TVector<THolder<IEventHandle>> BlockedReadSets;
+ bool BlockApplyRecords = false;
+ TVector<THolder<IEventHandle>> BlockedApplyRecords;
};
Y_UNIT_TEST(MvccSnapshotLockedWrites) {
@@ -3453,6 +3463,152 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
}
+ Y_UNIT_TEST_TWIN(LockedWriteWithAsyncIndex, WithRestart) {
+ constexpr bool UseNewEngine = true;
+
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+ WaitTxNotification(server, sender,
+ AsyncAlterAddIndex(server, "/Root", "/Root/table-1",
+ TShardedTableOptions::TIndex{"by_value", {"value"}, {}, NKikimrSchemeOp::EIndexTypeGlobalAsync}));
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ TInjectLockSnapshotObserver observer(runtime);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleBegin(runtime, sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We will reuse this snapshot
+ auto snapshot = observer.Last.MvccSnapshot;
+
+ using NLongTxService::TLockHandle;
+ TLockHandle lock1handle(123, runtime.GetActorSystem(0));
+ TLockHandle lock2handle(234, runtime.GetActorSystem(0));
+
+ // Write uncommitted changes to keys 1 and 2 using tx 123
+ observer.Inject.LockId = 123;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 21)
+ )")),
+ "<empty>");
+ auto locks1 = observer.LastLocks;
+ observer.Inject = {};
+
+ // Write uncommitted changes to keys 1 and 2 using tx 234
+ observer.Inject.LockId = 234;
+ observer.Inject.LockNodeId = runtime.GetNodeId(0);
+ observer.Inject.MvccSnapshot = snapshot;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 12), (2, 22)
+ )")),
+ "<empty>");
+ auto locks2 = observer.LastLocks;
+ observer.Inject = {};
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1` VIEW by_value
+ WHERE value in (1, 11, 21, 12, 22)
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ if (WithRestart) {
+ observer.BlockApplyRecords = true;
+ }
+
+ // Commit changes in tx 123
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks1;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "<empty>");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ if (WithRestart) {
+ UNIT_ASSERT(!observer.BlockedApplyRecords.empty());
+ observer.BlockedApplyRecords.clear();
+ observer.BlockApplyRecords = false;
+
+ auto shards = GetTableShards(server, sender, "/Root/table-1");
+ RebootTablet(runtime, shards.at(0), sender);
+
+ SimulateSleep(server, TDuration::Seconds(1));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleStaleRoExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1` VIEW by_value
+ WHERE value in (1, 11, 21, 12, 22)
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 11 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 21 } } } "
+ "} Struct { Bool: false }");
+
+ // Commit changes in tx 234
+ observer.InjectClearTasks = true;
+ observer.InjectLocks.emplace().Locks = locks2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (0, 0)
+ )")),
+ "ERROR: ABORTED");
+ observer.InjectClearTasks = false;
+ observer.InjectLocks.reset();
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
index d02c82aae7..033d2516f8 100644
--- a/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_distributed_erase_tx_unit.cpp
@@ -3,6 +3,7 @@
#include "datashard_distributed_erase.h"
#include "datashard_impl.h"
#include "datashard_pipeline.h"
+#include "datashard_user_db.h"
#include "execution_unit_ctors.h"
#include "setup_sys_locks.h"
#include "datashard_locks_db.h"
@@ -44,7 +45,8 @@ public:
auto [readVersion, writeVersion] = DataShard.GetReadWriteVersions(op.Get());
if (eraseTx->HasDependents()) {
- THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, txc.DB, request.GetTableId(), false)};
+ TDataShardUserDb userDb(DataShard, txc.DB, readVersion);
+ THolder<IChangeCollector> changeCollector{CreateChangeCollector(DataShard, userDb, txc.DB, request.GetTableId(), false)};
if (changeCollector) {
changeCollector->SetWriteVersion(writeVersion);
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index 2a4c014a41..91c3956452 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -274,7 +274,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
AddLocksToResult(op, ctx);
- op->ChangeRecords() = std::move(dataTx->GetCollectedChanges());
+ auto changes = std::move(dataTx->GetCollectedChanges());
+ if (guardLocks.LockTxId) {
+ if (changes) {
+ DataShard.AddLockChangeRecords(guardLocks.LockTxId, std::move(changes));
+ }
+ } else {
+ // FIXME: handle lock changes commit
+ op->ChangeRecords() = std::move(changes);
+ }
KqpUpdateDataShardStatCounters(DataShard, dataTx->GetCounters());
auto statsMode = kqpTx.GetRuntimeSettings().GetStatsMode();
diff --git a/ydb/core/tx/datashard/move_index_unit.cpp b/ydb/core/tx/datashard/move_index_unit.cpp
index e3d9506335..8750fcc4a7 100644
--- a/ydb/core/tx/datashard/move_index_unit.cpp
+++ b/ydb/core/tx/datashard/move_index_unit.cpp
@@ -22,9 +22,22 @@ public:
const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId());
for (auto& record: changeRecords) {
- if (record.PathId() == remapPrevId) {
- record.SetPathId(remapNewId);
- DataShard.MoveChangeRecord(db, record.Order(), record.PathId());
+ if (record.PathId == remapPrevId) {
+ record.PathId = remapNewId;
+ if (record.LockId) {
+ DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId);
+ } else {
+ DataShard.MoveChangeRecord(db, record.Order, record.PathId);
+ }
+ }
+ }
+
+ for (auto& pr : DataShard.GetLockChangeRecords()) {
+ for (auto& record : pr.second) {
+ if (record.PathId == remapPrevId) {
+ record.PathId = remapNewId;
+ DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId);
+ }
}
}
}
@@ -51,6 +64,21 @@ public:
return EExecutionStatus::Restart;
}
+ auto lockChangeRecords = DataShard.TakeLockChangeRecords();
+ auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords();
+
+ if (!DataShard.LoadLockChangeRecords(db)) {
+ DataShard.SetLockChangeRecords(std::move(lockChangeRecords));
+ DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords));
+ return EExecutionStatus::Restart;
+ }
+
+ if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) {
+ DataShard.SetLockChangeRecords(std::move(lockChangeRecords));
+ DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords));
+ return EExecutionStatus::Restart;
+ }
+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveIndexUnit Execute"
<< ": schemeTx# " << schemeTx.DebugString()
<< ": changeRecords size# " << ChangeRecords.size()
diff --git a/ydb/core/tx/datashard/move_table_unit.cpp b/ydb/core/tx/datashard/move_table_unit.cpp
index e0f21eb6af..dae1331ff7 100644
--- a/ydb/core/tx/datashard/move_table_unit.cpp
+++ b/ydb/core/tx/datashard/move_table_unit.cpp
@@ -21,9 +21,23 @@ public:
const THashMap<TPathId, TPathId> remap = DataShard.GetRemapIndexes(move);
for (auto& record: changeRecords) {
- if (remap.contains(record.PathId())) { // here could be the records for already deleted indexes, so skip them
- record.SetPathId(remap.at(record.PathId()));
- DataShard.MoveChangeRecord(db, record.Order(), record.PathId());
+ // We skip records for deleted indexes
+ if (remap.contains(record.PathId)) {
+ record.PathId = remap.at(record.PathId);
+ if (record.LockId) {
+ DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId);
+ } else {
+ DataShard.MoveChangeRecord(db, record.Order, record.PathId);
+ }
+ }
+ }
+
+ for (auto& pr : DataShard.GetLockChangeRecords()) {
+ for (auto& record : pr.second) {
+ if (remap.contains(record.PathId)) {
+ record.PathId = remap.at(record.PathId);
+ DataShard.MoveChangeRecord(db, record.LockId, record.LockOffset, record.PathId);
+ }
}
}
}
@@ -50,6 +64,21 @@ public:
return EExecutionStatus::Restart;
}
+ auto lockChangeRecords = DataShard.TakeLockChangeRecords();
+ auto committedLockChangeRecords = DataShard.TakeCommittedLockChangeRecords();
+
+ if (!DataShard.LoadLockChangeRecords(db)) {
+ DataShard.SetLockChangeRecords(std::move(lockChangeRecords));
+ DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords));
+ return EExecutionStatus::Restart;
+ }
+
+ if (!DataShard.LoadChangeRecordCommits(db, ChangeRecords)) {
+ DataShard.SetLockChangeRecords(std::move(lockChangeRecords));
+ DataShard.SetCommittedLockChangeRecords(std::move(committedLockChangeRecords));
+ return EExecutionStatus::Restart;
+ }
+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "TMoveTableUnit Execute"
<< ": schemeTx# " << schemeTx.DebugString()
<< ": changeRecords size# " << ChangeRecords.size()
diff --git a/ydb/core/tx/datashard/remove_lock_change_records.cpp b/ydb/core/tx/datashard/remove_lock_change_records.cpp
new file mode 100644
index 0000000000..e5b09bcd3a
--- /dev/null
+++ b/ydb/core/tx/datashard/remove_lock_change_records.cpp
@@ -0,0 +1,71 @@
+#include "datashard_impl.h"
+
+namespace NKikimr::NDataShard {
+
+class TDataShard::TTxRemoveLockChangeRecords
+ : public NTabletFlatExecutor::TTransactionBase<TDataShard>
+{
+public:
+ TTxRemoveLockChangeRecords(TDataShard* self)
+ : TBase(self)
+ { }
+
+ TTxType GetTxType() const override { return TXTYPE_REMOVE_LOCK_CHANGE_RECORDS; }
+
+ static constexpr size_t MaxRecordsToRemove = 1000;
+
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ size_t removed = 0;
+
+ while (!Self->PendingLockChangeRecordsToRemove.empty() && removed < MaxRecordsToRemove) {
+ ui64 lockId = Self->PendingLockChangeRecordsToRemove.back();
+
+ auto it = Self->LockChangeRecords.find(lockId);
+ if (it == Self->LockChangeRecords.end()) {
+ // Nothing to remove, just skip it
+ Self->PendingLockChangeRecordsToRemove.pop_back();
+ continue;
+ }
+
+ if (Self->CommittedLockChangeRecords.contains(lockId)) {
+ // Don't remove records that are committed
+ Self->PendingLockChangeRecordsToRemove.pop_back();
+ continue;
+ }
+
+ while (!it->second.empty() && removed < MaxRecordsToRemove) {
+ auto& record = it->second.back();
+ db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete();
+ db.Table<Schema::LockChangeRecordDetails>().Key(record.LockId, record.LockOffset).Delete();
+ it->second.pop_back();
+ ++removed;
+ }
+
+ if (!it->second.empty()) {
+ // We couldn't remove everything, continue in the next transaction
+ break;
+ }
+
+ Self->LockChangeRecords.erase(it);
+ Self->PendingLockChangeRecordsToRemove.pop_back();
+ }
+
+ if (!Self->PendingLockChangeRecordsToRemove.empty()) {
+ ctx.Send(ctx.SelfID, new TEvPrivate::TEvRemoveLockChangeRecords());
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+};
+
+void TDataShard::Handle(TEvPrivate::TEvRemoveLockChangeRecords::TPtr&, const TActorContext& ctx) {
+ Execute(new TTxRemoveLockChangeRecords(this), ctx);
+}
+
+} // namespace NKikimr::NDataShard
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 9db6a1eaef..ceb0578368 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -1723,5 +1723,196 @@
"Blobs": 1
}
}
+ },
+ {
+ "TableId": 101,
+ "TableName": "LockChangeRecords",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "LockOffset",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "PathOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "LocalPathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "BodySize",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "SchemaVersion",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "TableOwnerId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "TablePathId",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ 6,
+ 7,
+ 8
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
+ "TableId": 102,
+ "TableName": "LockChangeRecordDetails",
+ "TableKey": [
+ 1,
+ 2
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "LockOffset",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "Kind",
+ "ColumnType": "Byte"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "Body",
+ "ColumnType": "String"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
+ },
+ {
+ "TableId": 103,
+ "TableName": "ChangeRecordCommits",
+ "TableKey": [
+ 1
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "Order",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "LockId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "Group",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "PlanStep",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "TxId",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
}
] \ No newline at end of file