aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-06 20:47:37 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-06 20:47:37 +0300
commit4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2 (patch)
tree75a7afe4981d68316e3418221d06c330dd0f8b73
parent7c7b101f38550c6875dd9574076534ca019f913e (diff)
downloadydb-4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2.tar.gz
Uncommitted locked writes in datashard, KIKIMR-14732
ref:750761cec90cab8a75decb92a18858eab65fe310
-rw-r--r--ydb/core/engine/minikql/flat_local_minikql_host.h15
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.cpp84
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.h22
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/tablet_flat/flat_database.cpp5
-rw-r--r--ydb/core/tablet_flat/flat_database.h5
-rw-r--r--ydb/core/tablet_flat/flat_table.cpp9
-rw-r--r--ydb/core/tablet_flat/flat_table.h5
-rw-r--r--ydb/core/tx/datashard/datashard.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp71
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h1
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp42
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.h4
-rw-r--r--ydb/core/tx/datashard/datashard_dep_tracker.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_pipeline.cpp20
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp204
18 files changed, 456 insertions, 75 deletions
diff --git a/ydb/core/engine/minikql/flat_local_minikql_host.h b/ydb/core/engine/minikql/flat_local_minikql_host.h
index edfd182286..e51a86e1a2 100644
--- a/ydb/core/engine/minikql/flat_local_minikql_host.h
+++ b/ydb/core/engine/minikql/flat_local_minikql_host.h
@@ -39,6 +39,21 @@ private:
return Factory->GetChangeCollector(tableId);
}
+ ui64 GetWriteTxId(const TTableId&) const override
+ {
+ return 0;
+ }
+
+ NTable::ITransactionMapPtr GetReadTxMap(const TTableId&) const override
+ {
+ return nullptr;
+ }
+
+ NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId&) const override
+ {
+ return nullptr;
+ }
+
private:
const TMiniKQLFactory* const Factory;
};
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp
index 7c0ad77aa0..6966fb700f 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.cpp
+++ b/ydb/core/engine/minikql/minikql_engine_host.cpp
@@ -297,7 +297,8 @@ NUdf::TUnboxedValue TEngineHost::SelectRow(const TTableId& tableId, const TArray
NTable::TSelectStats stats;
ui64 flags = Settings.DisableByKeyFilter ? (ui64)NTable::NoByKey : 0;
- const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId));
+ const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId),
+ GetReadTxMap(tableId), GetReadTxObserver(tableId));
Counters.InvisibleRowSkips += stats.InvisibleRowSkips;
@@ -627,7 +628,8 @@ public:
TSelectRangeLazyRowsList(NTable::TDatabase& db, const TScheme& scheme, const THolderFactory& holderFactory,
const TTableId& tableId, ui64 localTid, const TSmallVec<NTable::TTag>& tags, const TSmallVec<bool>& skipNullKeys, const TTableRange& range,
ui64 itemsLimit, ui64 bytesLimit, bool reverse, TEngineHost& engineHost
- , const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler)
+ , const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler,
+ NTable::ITransactionMapPtr&& txMap, NTable::ITransactionObserverPtr&& txObserver)
: TCustomListValue(&holderFactory.GetMemInfo())
, Db(db)
, Scheme(scheme)
@@ -644,6 +646,8 @@ public:
, Reverse(reverse)
, EngineHost(engineHost)
, KeyAccessSampler(keyAccessSampler)
+ , TxMap(std::move(txMap))
+ , TxObserver(std::move(txObserver))
{}
NUdf::TUnboxedValue GetListIterator() const override {
@@ -661,13 +665,13 @@ public:
keyRange.MinInclusive = tableRange.InclusiveFrom;
keyRange.MaxInclusive = tableRange.InclusiveTo;
if (Reverse) {
- auto read = Db.IterateRangeReverse(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId));
+ auto read = Db.IterateRangeReverse(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId), TxMap, TxObserver);
return NUdf::TUnboxedValuePod(
new TIterator<NTable::TTableReverseIt>(GetMemInfo(), *this, std::move(read), SystemColumnTags, ShardId)
);
} else {
- auto read = Db.IterateRange(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId));
+ auto read = Db.IterateRange(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId), TxMap, TxObserver);
return NUdf::TUnboxedValuePod(
new TIterator<NTable::TTableIt>(GetMemInfo(), *this, std::move(read), SystemColumnTags, ShardId)
@@ -724,6 +728,9 @@ private:
mutable TMaybe<ui64> SizeBytes;
TEngineHost& EngineHost;
IKeyAccessSampler::TPtr KeyAccessSampler;
+
+ NTable::ITransactionMapPtr TxMap;
+ NTable::ITransactionObserverPtr TxObserver;
};
class TSelectRangeResult : public TComputationValue<TSelectRangeResult> {
@@ -731,10 +738,12 @@ public:
TSelectRangeResult(NTable::TDatabase& db, const TScheme& scheme, const THolderFactory& holderFactory, const TTableId& tableId, ui64 localTid,
const TSmallVec<NTable::TTag>& tags, const TSmallVec<bool>& skipNullKeys, const TTableRange& range,
ui64 itemsLimit, ui64 bytesLimit, bool reverse, TEngineHost& engineHost,
- const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler)
+ const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler,
+ NTable::ITransactionMapPtr&& txMap, NTable::ITransactionObserverPtr&& txObserver)
: TComputationValue(&holderFactory.GetMemInfo())
, List(NUdf::TUnboxedValuePod(new TSelectRangeLazyRowsList(db, scheme, holderFactory, tableId, localTid, tags,
- skipNullKeys, range, itemsLimit, bytesLimit, reverse, engineHost, systemColumnTags, shardId, keyAccessSampler))) {}
+ skipNullKeys, range, itemsLimit, bytesLimit, reverse, engineHost, systemColumnTags, shardId, keyAccessSampler,
+ std::move(txMap), std::move(txObserver)))) {}
private:
NUdf::TUnboxedValue GetElement(ui32 index) const override {
@@ -846,7 +855,7 @@ NUdf::TUnboxedValue TEngineHost::SelectRange(const TTableId& tableId, const TTab
return NUdf::TUnboxedValuePod(new TSelectRangeResult(Db, Scheme, holderFactory, tableId, localTid, tags,
skipNullKeysFlags, range, itemsLimit, bytesLimit, reverse, *this, systemColumnTags, GetShardId(),
- Settings.KeyAccessSampler));
+ Settings.KeyAccessSampler, GetReadTxMap(tableId), GetReadTxObserver(tableId)));
}
// Updates the single row. Column in commands must be unique.
@@ -869,19 +878,26 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell
valueBytes += upd.Value.IsNull() ? 1 : upd.Value.Size();
}
- if (auto collector = GetChangeCollector(tableId)) {
- collector->SetWriteVersion(GetWriteVersion(tableId));
- if (collector->NeedToReadKeys()) {
- collector->SetReadVersion(GetReadVersion(tableId));
- }
+ const ui64 writeTxId = GetWriteTxId(tableId);
- if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) {
- collector->Reset();
- throw TNotReadyTabletException();
+ if (writeTxId == 0) {
+ if (auto collector = GetChangeCollector(tableId)) {
+ collector->SetWriteVersion(GetWriteVersion(tableId));
+ if (collector->NeedToReadKeys()) {
+ collector->SetReadVersion(GetReadVersion(tableId));
+ }
+
+ if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) {
+ collector->Reset();
+ throw TNotReadyTabletException();
+ }
}
- }
- Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId));
+ Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId));
+ } else {
+ // TODO: integrate with change collector somehow
+ Db.UpdateTx(localTid, NTable::ERowOp::Upsert, key, ops, writeTxId);
+ }
Settings.KeyAccessSampler->AddSample(tableId, row);
Counters.NUpdateRow++;
@@ -897,25 +913,39 @@ void TEngineHost::EraseRow(const TTableId& tableId, const TArrayRef<const TCell>
ui64 keyBytes = 0;
ConvertTableKeys(Scheme, tableInfo, row, key, &keyBytes);
- if (auto collector = GetChangeCollector(tableId)) {
- collector->SetWriteVersion(GetWriteVersion(tableId));
- if (collector->NeedToReadKeys()) {
- collector->SetReadVersion(GetReadVersion(tableId));
- }
+ const ui64 writeTxId = GetWriteTxId(tableId);
- if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) {
- collector->Reset();
- throw TNotReadyTabletException();
+ if (writeTxId == 0) {
+ if (auto collector = GetChangeCollector(tableId)) {
+ collector->SetWriteVersion(GetWriteVersion(tableId));
+ if (collector->NeedToReadKeys()) {
+ collector->SetReadVersion(GetReadVersion(tableId));
+ }
+
+ if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) {
+ collector->Reset();
+ throw TNotReadyTabletException();
+ }
}
- }
- Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId));
+ Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId));
+ } else {
+ // TODO: integrate with change collector somehow
+ Db.UpdateTx(localTid, NTable::ERowOp::Erase, key, { }, writeTxId);
+ }
Settings.KeyAccessSampler->AddSample(tableId, row);
Counters.NEraseRow++;
Counters.EraseRowBytes += keyBytes + 8;
}
+void TEngineHost::CommitWriteTxId(const TTableId& tableId, ui64 writeTxId) {
+ ui64 localTid = LocalTableId(tableId);
+ Y_VERIFY(localTid, "table does not exist");
+
+ Db.CommitTx(localTid, writeTxId);
+}
+
// Check that table is erased
bool TEngineHost::IsPathErased(const TTableId& tableId) const {
ui64 localTid = LocalTableId(tableId);
diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h
index 9c6e39a364..7ae05b7523 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.h
+++ b/ydb/core/engine/minikql/minikql_engine_host.h
@@ -130,6 +130,16 @@ public:
virtual IChangeCollector* GetChangeCollector(const TTableId& tableId) const = 0;
+ // Non-zero WriteTxId will force engine to work using a given persistent tx
+ virtual ui64 GetWriteTxId(const TTableId& tableId) const = 0;
+
+ // Commits a given persistent tx
+ virtual void CommitWriteTxId(const TTableId& tableId, ui64 writeTxId);
+
+ // Used to control reads in the presense of uncommitted transactions
+ virtual NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const = 0;
+ virtual NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const = 0;
+
protected:
virtual ui64 LocalTableId(const TTableId& tableId) const;
void ConvertKeys(const TScheme::TTableInfo* tableInfo, const TArrayRef<const TCell>& row,
@@ -162,6 +172,18 @@ public:
Y_UNUSED(tableId);
return nullptr;
}
+
+ ui64 GetWriteTxId(const TTableId&) const override {
+ return 0;
+ }
+
+ NTable::ITransactionMapPtr GetReadTxMap(const TTableId&) const override {
+ return nullptr;
+ }
+
+ NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId&) const override {
+ return nullptr;
+ }
};
void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 568165eaf7..c324d24be1 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1233,6 +1233,12 @@ message TImmediateControlsConfig {
MinValue: 0,
MaxValue: 134217728,
DefaultValue: 0 }];
+
+ optional uint64 EnableLockedWrites = 14 [(ControlOptions) = {
+ Description: "Enables experimental persistent locked writes",
+ MinValue: 0,
+ MaxValue: 1,
+ DefaultValue: 0 }];
}
message TTxLimitControls {
diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp
index 2090ee3a15..965463954c 100644
--- a/ydb/core/tablet_flat/flat_database.cpp
+++ b/ydb/core/tablet_flat/flat_database.cpp
@@ -254,6 +254,11 @@ void TDatabase::CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion)
Redo->EvCommitTx(table, txId, rowVersion);
}
+bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const
+{
+ return Require(table)->HasOpenTx(txId);
+}
+
void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper)
{
if (Y_LIKELY(lower < upper)) {
diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h
index 2cea6d756a..18fdd12d84 100644
--- a/ydb/core/tablet_flat/flat_database.h
+++ b/ydb/core/tablet_flat/flat_database.h
@@ -115,6 +115,11 @@ public:
void CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion = TRowVersion::Min());
/**
+ * Returns true when table has an open transaction that is not committed or removed yet
+ */
+ bool HasOpenTx(ui32 table, ui64 txId) const;
+
+ /**
* Remove row versions [lower, upper) from the given table
*
* Once committed this cannot be undone. This is a hint to the underlying
diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp
index 7ce3b7adb5..d7cc19e635 100644
--- a/ydb/core/tablet_flat/flat_table.cpp
+++ b/ydb/core/tablet_flat/flat_table.cpp
@@ -691,6 +691,15 @@ void TTable::RemoveTx(ui64 txId)
}
}
+bool TTable::HasOpenTx(ui64 txId) const
+{
+ if (OpenTransactions.contains(txId)) {
+ return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId);
+ }
+
+ return false;
+}
+
TMemTable& TTable::MemTable()
{
return
diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h
index cb5095a185..36493d77d9 100644
--- a/ydb/core/tablet_flat/flat_table.h
+++ b/ydb/core/tablet_flat/flat_table.h
@@ -154,6 +154,11 @@ public:
void CommitTx(ui64 txId, TRowVersion rowVersion);
void RemoveTx(ui64 txId);
+ /**
+ * Returns true when table has an open transaction that is not committed or removed yet
+ */
+ bool HasOpenTx(ui64 txId) const;
+
TPartView GetPartView(const TLogoBlobID &bundle) const
{
auto *partView = Flatten.FindPtr(bundle);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index f75f4b3585..a4f722adc3 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -144,6 +144,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, TtlReadAheadHi(0, 0, 128*1024*1024)
, EnablePrioritizedMvccSnapshotReads(1, 0, 1)
, EnableUnprotectedMvccSnapshotReads(1, 0, 1)
+ , EnableLockedWrites(0, 0, 1)
, EnableLeaderLeases(1, 0, 1)
, MinLeaderLeaseDurationUs(250000, 1000, 5000000)
, DataShardSysTables(InitDataShardSysTables(this))
@@ -314,6 +315,7 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
+ appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites");
appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases");
appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs");
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 7d0061c126..8c2955a68c 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -316,6 +316,10 @@ public:
IsImmediateTx = true;
}
+ void SetIsRepeatableSnapshot() {
+ IsRepeatableSnapshot = true;
+ }
+
IChangeCollector* GetChangeCollector(const TTableId& tableId) const override {
auto it = ChangeCollectors.find(tableId);
if (it != ChangeCollectors.end()) {
@@ -350,11 +354,22 @@ public:
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
- // prevent updates/erases with LockTxId set
- if (LockTxId && key.RowOperation != TKeyDesc::ERowOperation::Read) {
- key.Status = TKeyDesc::EStatus::OperationNotSupported;
- return false;
+ if (LockTxId) {
+ // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs
+ if (key.RowOperation != TKeyDesc::ERowOperation::Read &&
+ (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot))
+ {
+ key.Status = TKeyDesc::EStatus::OperationNotSupported;
+ return false;
+ }
+ } else if (IsRepeatableSnapshot) {
+ // Prevent updates/erases in repeatable mvcc txs
+ if (key.RowOperation != TKeyDesc::ERowOperation::Read) {
+ key.Status = TKeyDesc::EStatus::OperationNotSupported;
+ return false;
+ }
}
+
return TEngineHost::IsValidKey(key, maxSnapshotTime);
}
@@ -392,7 +407,10 @@ public:
return;
}
- Self->SysLocksTable().BreakLock(tableId, row);
+ // TODO: handle presistent tx locks
+ if (!LockTxId) {
+ Self->SysLocksTable().BreakLock(tableId, row);
+ }
Self->SetTableUpdateTime(tableId, Now);
// apply special columns if declared
@@ -444,7 +462,10 @@ public:
return;
}
- Self->SysLocksTable().BreakLock(tableId, row);
+ // TODO: handle persistent tx locks
+ if (!LockTxId) {
+ Self->SysLocksTable().BreakLock(tableId, row);
+ }
Self->SetTableUpdateTime(tableId, Now);
TEngineHost::EraseRow(tableId, row);
@@ -491,6 +512,36 @@ public:
}
}
+ ui64 GetWriteTxId(const TTableId& tableId) const override {
+ if (TSysTables::IsSystemTable(tableId))
+ return 0;
+
+ return LockTxId;
+ }
+
+ NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const override {
+ if (TSysTables::IsSystemTable(tableId) || !LockTxId)
+ return nullptr;
+
+ // Don't use tx map when we know there's no open tx with the given txId
+ if (!DB.HasOpenTx(LocalTableId(tableId), LockTxId)) {
+ return nullptr;
+ }
+
+ // Uncommitted changes are visible in all possible snapshots
+ // TODO: we need to guarantee no other changes committed between snapshot read and our local changes
+ return new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ }
+
+ NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override {
+ if (TSysTables::IsSystemTable(tableId))
+ return nullptr;
+
+ // TODO: use observer to detect conflicts with other uncommitted transactions
+
+ return nullptr;
+ }
+
private:
const TDataShardSysTable& DataShardSysTable(const TTableId& tableId) const {
return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId);
@@ -500,6 +551,7 @@ private:
NTable::TDatabase& DB;
const ui64& LockTxId;
bool IsImmediateTx = false;
+ bool IsRepeatableSnapshot = false;
TInstant Now;
TRowVersion WriteVersion = TRowVersion::Max();
TRowVersion ReadVersion = TRowVersion::Min();
@@ -666,6 +718,13 @@ void TEngineBay::SetIsImmediateTx() {
host->SetIsImmediateTx();
}
+void TEngineBay::SetIsRepeatableSnapshot() {
+ Y_VERIFY(EngineHost);
+
+ auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get());
+ host->SetIsRepeatableSnapshot();
+}
+
TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const {
Y_VERIFY(EngineHost);
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 12651fdcf9..2a1374cac1 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -92,6 +92,7 @@ public:
void SetWriteVersion(TRowVersion writeVersion);
void SetReadVersion(TRowVersion readVersion);
void SetIsImmediateTx();
+ void SetIsRepeatableSnapshot();
TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const;
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index b387addbba..72b36561e9 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -17,7 +17,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
const TActorContext &ctx,
const TStepOrder &stepTxId,
TInstant receivedAt,
- const TString &txBody)
+ const TString &txBody,
+ bool usesMvccSnapshot)
: StepTxId_(stepTxId)
, TabletId_(self->TabletID())
, TxBody(txBody)
@@ -50,6 +51,9 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
if (Tx.GetImmediate())
EngineBay.SetIsImmediateTx();
+ if (usesMvccSnapshot)
+ EngineBay.SetIsRepeatableSnapshot();
+
if (Tx.HasReadTableTransaction()) {
auto &tx = Tx.GetReadTableTransaction();
if (self->TableInfos.contains(tx.GetTableId().GetTableId())) {
@@ -244,38 +248,6 @@ bool TValidatedDataTx::ReValidateKeys()
return true;
}
-ETxOrder TValidatedDataTx::CheckOrder(const TSysLocks& sysLocks, const TValidatedDataTx& dataTx) const {
- Y_VERIFY(TxInfo().Loaded);
- Y_VERIFY(dataTx.TxInfo().Loaded);
-
- if (KeysCount() > MaxReorderTxKeys())
- return ETxOrder::Unknown;
-
- if (HasLockedWrites()) {
- if (sysLocks.IsBroken(LockTxId()))
- return ETxOrder::Any;
-
- // TODO: dataTx.DynRW && dataTx.RW && dataTx.LockedRW
- if (dataTx.HasWrites())
- return ETxOrder::Unknown;
-
- } else if (dataTx.HasLockedWrites()) {
- if (sysLocks.IsBroken(dataTx.LockTxId()))
- return ETxOrder::Any;
-
- if (LockTxId() == dataTx.LockTxId())
- return ETxOrder::Unknown;
-
- // TODO: DynRW && RW
- if (HasWrites())
- return ETxOrder::Unknown;
- }
-
- if (HasKeyConflict(TxInfo(), dataTx.TxInfo()))
- return StepTxId_.CheckOrder(dataTx.StepTxId());
- return ETxOrder::Any;
-}
-
bool TValidatedDataTx::CanCancel() {
if (!IsTxReadOnly()) {
return false;
@@ -427,7 +399,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self,
if (!DataTx) {
Y_VERIFY(TxBody);
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
- GetReceivedAt(), TxBody);
+ GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
if (DataTx->HasStreamResponse())
SetStreamSink(DataTx->GetSink());
}
@@ -648,7 +620,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData(
bool extractKeys = DataTx->IsTxInfoLoaded();
DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(),
- GetReceivedAt(), TxBody);
+ GetReceivedAt(), TxBody, MvccSnapshotRepeatable);
if (DataTx->Ready() && extractKeys) {
DataTx->ExtractKeys(true);
}
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h
index 43abd525e1..b2289b055d 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.h
+++ b/ydb/core/tx/datashard/datashard_active_transaction.h
@@ -120,7 +120,8 @@ public:
const TActorContext &ctx,
const TStepOrder &stepTxId,
TInstant receivedAt,
- const TString &txBody);
+ const TString &txBody,
+ bool usesMvccSnapshot);
~TValidatedDataTx();
@@ -206,7 +207,6 @@ public:
ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
- ETxOrder CheckOrder(const TSysLocks& sysLocks, const TValidatedDataTx& dataTx) const;
ui64 GetTxSize() const { return TxSize; }
ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; }
diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
index b7c3706937..2c099436a6 100644
--- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp
+++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp
@@ -723,6 +723,12 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
op->SetMvccSnapshot(snapshot, /* repeatable */ false);
}
+ if (snapshotRepeatable) {
+ // Repeatable snapshot writes are uncommitted, not externally visible, and don't conflict with anything
+ isGlobalWriter = false;
+ haveWrites = false;
+ }
+
auto onImmediateConflict = [&](TOperation& conflict) {
Y_VERIFY(!conflict.IsImmediate());
if (snapshot.IsMax()) {
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 88f8b35bf9..2377567430 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1283,6 +1283,11 @@ public:
return value != 0;
}
+ bool GetEnableLockedWrites() const {
+ ui64 value = EnableLockedWrites;
+ return value != 0;
+ }
+
template <typename T>
void ReleaseCache(T& tx) {
ReleaseTxCache(tx.GetTxCacheUsage());
@@ -2109,6 +2114,7 @@ private:
TControlWrapper EnablePrioritizedMvccSnapshotReads;
TControlWrapper EnableUnprotectedMvccSnapshotReads;
+ TControlWrapper EnableLockedWrites;
TControlWrapper EnableLeaderLeases;
TControlWrapper MinLeaderLeaseDurationUs;
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 979553b140..ef92e632c5 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -399,10 +399,16 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con
TouchTablePoint(tableId, key);
Shard->GetKeyAccessSampler()->AddSample(tableId, key);
+ NTable::ITransactionMapPtr txMap;
+ if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
+ txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ }
+ // TODO: tx observer
+
NTable::TRowState dbRow;
NTable::TSelectStats stats;
ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0;
- auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion());
+ auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), txMap);
kqpStats.NSelectRow = 1;
kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips;
@@ -454,8 +460,14 @@ TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTa
keyRange.MinInclusive = range.InclusiveFrom;
keyRange.MaxInclusive = range.InclusiveTo;
+ NTable::ITransactionMapPtr txMap;
+ if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
+ txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ }
+ // TODO: tx observer
+
TouchTableRange(tableId, range);
- return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion());
+ return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), txMap);
}
TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId,
@@ -475,8 +487,14 @@ TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIter
keyRange.MinInclusive = range.InclusiveFrom;
keyRange.MaxInclusive = range.InclusiveTo;
+ NTable::ITransactionMapPtr txMap;
+ if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) {
+ txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min());
+ }
+ // TODO: tx observer
+
TouchTableRange(tableId, range);
- return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion());
+ return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), txMap);
}
template <typename TReadTableIterator>
diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp
index 1f5796d7b4..1a0ba76320 100644
--- a/ydb/core/tx/datashard/datashard_pipeline.cpp
+++ b/ydb/core/tx/datashard/datashard_pipeline.cpp
@@ -1314,9 +1314,25 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction::
return tx;
}
- if(tx->IsMvccSnapshotRead() && (!tx->IsImmediate() || !tx->IsReadOnly())) {
+ auto allowSnapshot = [&]() -> bool {
+ // must be immediate
+ if (!tx->IsImmediate()) {
+ return false;
+ }
+ // always ok for readonly
+ if (tx->IsReadOnly()) {
+ return true;
+ }
+ // ok for locked writes
+ if (dataTx->LockTxId()) {
+ return true;
+ }
+ return false;
+ };
+
+ if(tx->IsMvccSnapshotRead() && !allowSnapshot()) {
tx->SetAbortedFlag();
- TString err = "Snapshot read must be an immediate read only transaction";
+ TString err = "Snapshot read must be an immediate read only or locked write transaction";
tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(),
Self->TabletID(),
tx->GetTxId(),
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index 7b6b2e93f8..d5f60e825a 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -7,6 +7,8 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD)
+#include <ydb/library/yql/minikql/mkql_node_printer.h>
+
namespace NKikimr {
using namespace NKikimr::NDataShard;
@@ -1469,6 +1471,208 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"} Struct { Bool: false }");
}
+ Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) {
+ TPortManager pm;
+ TServerSettings::TControls controls;
+ controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1);
+ controls.MutableDataShardControls()->SetEnableLockedWrites(1);
+
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMvcc(true)
+ .SetEnableMvccSnapshotReads(true)
+ .SetEnableKqpSessionActor(UseNewEngine)
+ .SetControls(controls);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ CreateShardedTable(server, sender, "/Root", "table-1", 1);
+
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)"));
+
+ SimulateSleep(server, TDuration::Seconds(1));
+
+ auto execSimpleRequest = [&](const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ sessionId = CreateSession(runtime, reqSender);
+ auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ txId = response.GetResponse().GetTxMeta().id();
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ auto commitSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString {
+ auto reqSender = runtime.AllocateEdgeActor();
+ auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query));
+ auto& response = ev->Get()->Record.GetRef();
+ if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
+ return TStringBuilder() << "ERROR: " << response.GetYdbStatus();
+ }
+ if (response.GetResponse().GetResults().size() == 0) {
+ return "";
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u);
+ return response.GetResponse().GetResults()[0].GetValue().ShortDebugString();
+ };
+
+ ui64 lastLockTxId = 0;
+ TRowVersion lastMvccSnapshot = TRowVersion::Min();
+ ui64 injectLockTxId = 0;
+ TRowVersion injectMvccSnapshot = TRowVersion::Min();
+ auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvDataShard::TEvProposeTransaction::EventType: {
+ auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record;
+ Cerr << "TEvProposeTransaction:" << Endl;
+ Cerr << record.DebugString() << Endl;
+ if (record.GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) {
+ NKikimrTxDataShard::TDataTransaction tx;
+ Y_VERIFY(tx.ParseFromString(record.GetTxBody()));
+ Cerr << "TxBody:" << Endl;
+ Cerr << tx.DebugString() << Endl;
+ if (tx.HasMiniKQL()) {
+ using namespace NKikimr::NMiniKQL;
+ TScopedAlloc alloc;
+ TTypeEnvironment typeEnv(alloc);
+ auto node = DeserializeRuntimeNode(tx.GetMiniKQL(), typeEnv);
+ Cerr << "MiniKQL:" << Endl;
+ Cerr << PrintNode(node.GetNode()) << Endl;
+ }
+ if (tx.HasKqpTransaction()) {
+ for (const auto& task : tx.GetKqpTransaction().GetTasks()) {
+ if (task.HasProgram() && task.GetProgram().GetRaw()) {
+ using namespace NKikimr::NMiniKQL;
+ TScopedAlloc alloc;
+ TTypeEnvironment typeEnv(alloc);
+ auto node = DeserializeRuntimeNode(task.GetProgram().GetRaw(), typeEnv);
+ Cerr << "Task program:" << Endl;
+ Cerr << PrintNode(node.GetNode()) << Endl;
+ }
+ }
+ }
+ if (tx.GetLockTxId()) {
+ lastLockTxId = tx.GetLockTxId();
+ } else if (injectLockTxId) {
+ tx.SetLockTxId(injectLockTxId);
+ TString txBody;
+ Y_VERIFY(tx.SerializeToString(&txBody));
+ record.SetTxBody(txBody);
+ }
+ if (record.HasMvccSnapshot()) {
+ lastMvccSnapshot.Step = record.GetMvccSnapshot().GetStep();
+ lastMvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId();
+ } else if (injectMvccSnapshot) {
+ record.MutableMvccSnapshot()->SetStep(injectMvccSnapshot.Step);
+ record.MutableMvccSnapshot()->SetTxId(injectMvccSnapshot.TxId);
+ }
+ }
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(capturePropose);
+
+ // Start a snapshot read transaction
+ TString sessionId, txId;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // We should have been acquiring locks
+ Y_VERIFY(lastLockTxId != 0);
+ ui64 snapshotLockTxId = lastLockTxId;
+ Y_VERIFY(lastMvccSnapshot);
+ auto snapshotVersion = lastMvccSnapshot;
+
+ // Perform an immediate write, pretending it happens as part of the above snapshot tx
+ injectLockTxId = snapshotLockTxId;
+ injectMvccSnapshot = snapshotVersion;
+ UNIT_ASSERT_VALUES_EQUAL(
+ execSimpleRequest(Q_(R"(
+ UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2)
+ )")),
+ "");
+ injectLockTxId = 0;
+ injectMvccSnapshot = TRowVersion::Min();
+
+ // Start another snapshot read, it should not see above write (it's uncommitted)
+ TString sessionId2, txId2;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId2, txId2, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "} Struct { Bool: false }");
+
+ // Perform another read using the first snapshot tx, it must see its own writes
+ UNIT_ASSERT_VALUES_EQUAL(
+ continueSnapshotRequest(sessionId, txId, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "} Struct { Bool: false }");
+
+ // Now commit with additional changes
+ UNIT_ASSERT_VALUES_EQUAL(
+ commitSnapshotRequest(sessionId, txId, Q_(R"(
+ UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
+ --SELECT 1
+ )")),
+ "");
+ }
+
}
} // namespace NKikimr