diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-06 20:47:37 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-06 20:47:37 +0300 |
commit | 4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2 (patch) | |
tree | 75a7afe4981d68316e3418221d06c330dd0f8b73 | |
parent | 7c7b101f38550c6875dd9574076534ca019f913e (diff) | |
download | ydb-4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2.tar.gz |
Uncommitted locked writes in datashard, KIKIMR-14732
ref:750761cec90cab8a75decb92a18858eab65fe310
-rw-r--r-- | ydb/core/engine/minikql/flat_local_minikql_host.h | 15 | ||||
-rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.cpp | 84 | ||||
-rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.h | 22 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 6 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_database.h | 5 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tablet_flat/flat_table.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 71 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 42 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_dep_tracker.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 24 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_pipeline.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 204 |
18 files changed, 456 insertions, 75 deletions
diff --git a/ydb/core/engine/minikql/flat_local_minikql_host.h b/ydb/core/engine/minikql/flat_local_minikql_host.h index edfd182286..e51a86e1a2 100644 --- a/ydb/core/engine/minikql/flat_local_minikql_host.h +++ b/ydb/core/engine/minikql/flat_local_minikql_host.h @@ -39,6 +39,21 @@ private: return Factory->GetChangeCollector(tableId); } + ui64 GetWriteTxId(const TTableId&) const override + { + return 0; + } + + NTable::ITransactionMapPtr GetReadTxMap(const TTableId&) const override + { + return nullptr; + } + + NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId&) const override + { + return nullptr; + } + private: const TMiniKQLFactory* const Factory; }; diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp index 7c0ad77aa0..6966fb700f 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.cpp +++ b/ydb/core/engine/minikql/minikql_engine_host.cpp @@ -297,7 +297,8 @@ NUdf::TUnboxedValue TEngineHost::SelectRow(const TTableId& tableId, const TArray NTable::TSelectStats stats; ui64 flags = Settings.DisableByKeyFilter ? (ui64)NTable::NoByKey : 0; - const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId)); + const auto ready = Db.Select(localTid, key, tags, dbRow, stats, flags, GetReadVersion(tableId), + GetReadTxMap(tableId), GetReadTxObserver(tableId)); Counters.InvisibleRowSkips += stats.InvisibleRowSkips; @@ -627,7 +628,8 @@ public: TSelectRangeLazyRowsList(NTable::TDatabase& db, const TScheme& scheme, const THolderFactory& holderFactory, const TTableId& tableId, ui64 localTid, const TSmallVec<NTable::TTag>& tags, const TSmallVec<bool>& skipNullKeys, const TTableRange& range, ui64 itemsLimit, ui64 bytesLimit, bool reverse, TEngineHost& engineHost - , const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler) + , const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler, + NTable::ITransactionMapPtr&& txMap, NTable::ITransactionObserverPtr&& txObserver) : TCustomListValue(&holderFactory.GetMemInfo()) , Db(db) , Scheme(scheme) @@ -644,6 +646,8 @@ public: , Reverse(reverse) , EngineHost(engineHost) , KeyAccessSampler(keyAccessSampler) + , TxMap(std::move(txMap)) + , TxObserver(std::move(txObserver)) {} NUdf::TUnboxedValue GetListIterator() const override { @@ -661,13 +665,13 @@ public: keyRange.MinInclusive = tableRange.InclusiveFrom; keyRange.MaxInclusive = tableRange.InclusiveTo; if (Reverse) { - auto read = Db.IterateRangeReverse(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId)); + auto read = Db.IterateRangeReverse(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId), TxMap, TxObserver); return NUdf::TUnboxedValuePod( new TIterator<NTable::TTableReverseIt>(GetMemInfo(), *this, std::move(read), SystemColumnTags, ShardId) ); } else { - auto read = Db.IterateRange(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId)); + auto read = Db.IterateRange(LocalTid, keyRange, Tags, EngineHost.GetReadVersion(TableId), TxMap, TxObserver); return NUdf::TUnboxedValuePod( new TIterator<NTable::TTableIt>(GetMemInfo(), *this, std::move(read), SystemColumnTags, ShardId) @@ -724,6 +728,9 @@ private: mutable TMaybe<ui64> SizeBytes; TEngineHost& EngineHost; IKeyAccessSampler::TPtr KeyAccessSampler; + + NTable::ITransactionMapPtr TxMap; + NTable::ITransactionObserverPtr TxObserver; }; class TSelectRangeResult : public TComputationValue<TSelectRangeResult> { @@ -731,10 +738,12 @@ public: TSelectRangeResult(NTable::TDatabase& db, const TScheme& scheme, const THolderFactory& holderFactory, const TTableId& tableId, ui64 localTid, const TSmallVec<NTable::TTag>& tags, const TSmallVec<bool>& skipNullKeys, const TTableRange& range, ui64 itemsLimit, ui64 bytesLimit, bool reverse, TEngineHost& engineHost, - const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler) + const TSmallVec<NTable::TTag>& systemColumnTags, ui64 shardId, IKeyAccessSampler::TPtr keyAccessSampler, + NTable::ITransactionMapPtr&& txMap, NTable::ITransactionObserverPtr&& txObserver) : TComputationValue(&holderFactory.GetMemInfo()) , List(NUdf::TUnboxedValuePod(new TSelectRangeLazyRowsList(db, scheme, holderFactory, tableId, localTid, tags, - skipNullKeys, range, itemsLimit, bytesLimit, reverse, engineHost, systemColumnTags, shardId, keyAccessSampler))) {} + skipNullKeys, range, itemsLimit, bytesLimit, reverse, engineHost, systemColumnTags, shardId, keyAccessSampler, + std::move(txMap), std::move(txObserver)))) {} private: NUdf::TUnboxedValue GetElement(ui32 index) const override { @@ -846,7 +855,7 @@ NUdf::TUnboxedValue TEngineHost::SelectRange(const TTableId& tableId, const TTab return NUdf::TUnboxedValuePod(new TSelectRangeResult(Db, Scheme, holderFactory, tableId, localTid, tags, skipNullKeysFlags, range, itemsLimit, bytesLimit, reverse, *this, systemColumnTags, GetShardId(), - Settings.KeyAccessSampler)); + Settings.KeyAccessSampler, GetReadTxMap(tableId), GetReadTxObserver(tableId))); } // Updates the single row. Column in commands must be unique. @@ -869,19 +878,26 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell valueBytes += upd.Value.IsNull() ? 1 : upd.Value.Size(); } - if (auto collector = GetChangeCollector(tableId)) { - collector->SetWriteVersion(GetWriteVersion(tableId)); - if (collector->NeedToReadKeys()) { - collector->SetReadVersion(GetReadVersion(tableId)); - } + const ui64 writeTxId = GetWriteTxId(tableId); - if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) { - collector->Reset(); - throw TNotReadyTabletException(); + if (writeTxId == 0) { + if (auto collector = GetChangeCollector(tableId)) { + collector->SetWriteVersion(GetWriteVersion(tableId)); + if (collector->NeedToReadKeys()) { + collector->SetReadVersion(GetReadVersion(tableId)); + } + + if (!collector->Collect(tableId, NTable::ERowOp::Upsert, key, ops)) { + collector->Reset(); + throw TNotReadyTabletException(); + } } - } - Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId)); + Db.Update(localTid, NTable::ERowOp::Upsert, key, ops, GetWriteVersion(tableId)); + } else { + // TODO: integrate with change collector somehow + Db.UpdateTx(localTid, NTable::ERowOp::Upsert, key, ops, writeTxId); + } Settings.KeyAccessSampler->AddSample(tableId, row); Counters.NUpdateRow++; @@ -897,25 +913,39 @@ void TEngineHost::EraseRow(const TTableId& tableId, const TArrayRef<const TCell> ui64 keyBytes = 0; ConvertTableKeys(Scheme, tableInfo, row, key, &keyBytes); - if (auto collector = GetChangeCollector(tableId)) { - collector->SetWriteVersion(GetWriteVersion(tableId)); - if (collector->NeedToReadKeys()) { - collector->SetReadVersion(GetReadVersion(tableId)); - } + const ui64 writeTxId = GetWriteTxId(tableId); - if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) { - collector->Reset(); - throw TNotReadyTabletException(); + if (writeTxId == 0) { + if (auto collector = GetChangeCollector(tableId)) { + collector->SetWriteVersion(GetWriteVersion(tableId)); + if (collector->NeedToReadKeys()) { + collector->SetReadVersion(GetReadVersion(tableId)); + } + + if (!collector->Collect(tableId, NTable::ERowOp::Erase, key, {})) { + collector->Reset(); + throw TNotReadyTabletException(); + } } - } - Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId)); + Db.Update(localTid, NTable::ERowOp::Erase, key, { }, GetWriteVersion(tableId)); + } else { + // TODO: integrate with change collector somehow + Db.UpdateTx(localTid, NTable::ERowOp::Erase, key, { }, writeTxId); + } Settings.KeyAccessSampler->AddSample(tableId, row); Counters.NEraseRow++; Counters.EraseRowBytes += keyBytes + 8; } +void TEngineHost::CommitWriteTxId(const TTableId& tableId, ui64 writeTxId) { + ui64 localTid = LocalTableId(tableId); + Y_VERIFY(localTid, "table does not exist"); + + Db.CommitTx(localTid, writeTxId); +} + // Check that table is erased bool TEngineHost::IsPathErased(const TTableId& tableId) const { ui64 localTid = LocalTableId(tableId); diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h index 9c6e39a364..7ae05b7523 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.h +++ b/ydb/core/engine/minikql/minikql_engine_host.h @@ -130,6 +130,16 @@ public: virtual IChangeCollector* GetChangeCollector(const TTableId& tableId) const = 0; + // Non-zero WriteTxId will force engine to work using a given persistent tx + virtual ui64 GetWriteTxId(const TTableId& tableId) const = 0; + + // Commits a given persistent tx + virtual void CommitWriteTxId(const TTableId& tableId, ui64 writeTxId); + + // Used to control reads in the presense of uncommitted transactions + virtual NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const = 0; + virtual NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const = 0; + protected: virtual ui64 LocalTableId(const TTableId& tableId) const; void ConvertKeys(const TScheme::TTableInfo* tableInfo, const TArrayRef<const TCell>& row, @@ -162,6 +172,18 @@ public: Y_UNUSED(tableId); return nullptr; } + + ui64 GetWriteTxId(const TTableId&) const override { + return 0; + } + + NTable::ITransactionMapPtr GetReadTxMap(const TTableId&) const override { + return nullptr; + } + + NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId&) const override { + return nullptr; + } }; void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 568165eaf7..c324d24be1 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1233,6 +1233,12 @@ message TImmediateControlsConfig { MinValue: 0, MaxValue: 134217728, DefaultValue: 0 }]; + + optional uint64 EnableLockedWrites = 14 [(ControlOptions) = { + Description: "Enables experimental persistent locked writes", + MinValue: 0, + MaxValue: 1, + DefaultValue: 0 }]; } message TTxLimitControls { diff --git a/ydb/core/tablet_flat/flat_database.cpp b/ydb/core/tablet_flat/flat_database.cpp index 2090ee3a15..965463954c 100644 --- a/ydb/core/tablet_flat/flat_database.cpp +++ b/ydb/core/tablet_flat/flat_database.cpp @@ -254,6 +254,11 @@ void TDatabase::CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion) Redo->EvCommitTx(table, txId, rowVersion); } +bool TDatabase::HasOpenTx(ui32 table, ui64 txId) const +{ + return Require(table)->HasOpenTx(txId); +} + void TDatabase::RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper) { if (Y_LIKELY(lower < upper)) { diff --git a/ydb/core/tablet_flat/flat_database.h b/ydb/core/tablet_flat/flat_database.h index 2cea6d756a..18fdd12d84 100644 --- a/ydb/core/tablet_flat/flat_database.h +++ b/ydb/core/tablet_flat/flat_database.h @@ -115,6 +115,11 @@ public: void CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion = TRowVersion::Min()); /** + * Returns true when table has an open transaction that is not committed or removed yet + */ + bool HasOpenTx(ui32 table, ui64 txId) const; + + /** * Remove row versions [lower, upper) from the given table * * Once committed this cannot be undone. This is a hint to the underlying diff --git a/ydb/core/tablet_flat/flat_table.cpp b/ydb/core/tablet_flat/flat_table.cpp index 7ce3b7adb5..d7cc19e635 100644 --- a/ydb/core/tablet_flat/flat_table.cpp +++ b/ydb/core/tablet_flat/flat_table.cpp @@ -691,6 +691,15 @@ void TTable::RemoveTx(ui64 txId) } } +bool TTable::HasOpenTx(ui64 txId) const +{ + if (OpenTransactions.contains(txId)) { + return !CommittedTransactions.Find(txId) && !RemovedTransactions.Contains(txId); + } + + return false; +} + TMemTable& TTable::MemTable() { return diff --git a/ydb/core/tablet_flat/flat_table.h b/ydb/core/tablet_flat/flat_table.h index cb5095a185..36493d77d9 100644 --- a/ydb/core/tablet_flat/flat_table.h +++ b/ydb/core/tablet_flat/flat_table.h @@ -154,6 +154,11 @@ public: void CommitTx(ui64 txId, TRowVersion rowVersion); void RemoveTx(ui64 txId); + /** + * Returns true when table has an open transaction that is not committed or removed yet + */ + bool HasOpenTx(ui64 txId) const; + TPartView GetPartView(const TLogoBlobID &bundle) const { auto *partView = Flatten.FindPtr(bundle); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index f75f4b3585..a4f722adc3 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -144,6 +144,7 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , TtlReadAheadHi(0, 0, 128*1024*1024) , EnablePrioritizedMvccSnapshotReads(1, 0, 1) , EnableUnprotectedMvccSnapshotReads(1, 0, 1) + , EnableLockedWrites(0, 0, 1) , EnableLeaderLeases(1, 0, 1) , MinLeaderLeaseDurationUs(250000, 1000, 5000000) , DataShardSysTables(InitDataShardSysTables(this)) @@ -314,6 +315,7 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads"); appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads"); + appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites"); appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases"); appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs"); diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 7d0061c126..8c2955a68c 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -316,6 +316,10 @@ public: IsImmediateTx = true; } + void SetIsRepeatableSnapshot() { + IsRepeatableSnapshot = true; + } + IChangeCollector* GetChangeCollector(const TTableId& tableId) const override { auto it = ChangeCollectors.find(tableId); if (it != ChangeCollectors.end()) { @@ -350,11 +354,22 @@ public: if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); - // prevent updates/erases with LockTxId set - if (LockTxId && key.RowOperation != TKeyDesc::ERowOperation::Read) { - key.Status = TKeyDesc::EStatus::OperationNotSupported; - return false; + if (LockTxId) { + // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs + if (key.RowOperation != TKeyDesc::ERowOperation::Read && + (!Self->GetEnableLockedWrites() || !IsImmediateTx || !IsRepeatableSnapshot)) + { + key.Status = TKeyDesc::EStatus::OperationNotSupported; + return false; + } + } else if (IsRepeatableSnapshot) { + // Prevent updates/erases in repeatable mvcc txs + if (key.RowOperation != TKeyDesc::ERowOperation::Read) { + key.Status = TKeyDesc::EStatus::OperationNotSupported; + return false; + } } + return TEngineHost::IsValidKey(key, maxSnapshotTime); } @@ -392,7 +407,10 @@ public: return; } - Self->SysLocksTable().BreakLock(tableId, row); + // TODO: handle presistent tx locks + if (!LockTxId) { + Self->SysLocksTable().BreakLock(tableId, row); + } Self->SetTableUpdateTime(tableId, Now); // apply special columns if declared @@ -444,7 +462,10 @@ public: return; } - Self->SysLocksTable().BreakLock(tableId, row); + // TODO: handle persistent tx locks + if (!LockTxId) { + Self->SysLocksTable().BreakLock(tableId, row); + } Self->SetTableUpdateTime(tableId, Now); TEngineHost::EraseRow(tableId, row); @@ -491,6 +512,36 @@ public: } } + ui64 GetWriteTxId(const TTableId& tableId) const override { + if (TSysTables::IsSystemTable(tableId)) + return 0; + + return LockTxId; + } + + NTable::ITransactionMapPtr GetReadTxMap(const TTableId& tableId) const override { + if (TSysTables::IsSystemTable(tableId) || !LockTxId) + return nullptr; + + // Don't use tx map when we know there's no open tx with the given txId + if (!DB.HasOpenTx(LocalTableId(tableId), LockTxId)) { + return nullptr; + } + + // Uncommitted changes are visible in all possible snapshots + // TODO: we need to guarantee no other changes committed between snapshot read and our local changes + return new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + } + + NTable::ITransactionObserverPtr GetReadTxObserver(const TTableId& tableId) const override { + if (TSysTables::IsSystemTable(tableId)) + return nullptr; + + // TODO: use observer to detect conflicts with other uncommitted transactions + + return nullptr; + } + private: const TDataShardSysTable& DataShardSysTable(const TTableId& tableId) const { return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId); @@ -500,6 +551,7 @@ private: NTable::TDatabase& DB; const ui64& LockTxId; bool IsImmediateTx = false; + bool IsRepeatableSnapshot = false; TInstant Now; TRowVersion WriteVersion = TRowVersion::Max(); TRowVersion ReadVersion = TRowVersion::Min(); @@ -666,6 +718,13 @@ void TEngineBay::SetIsImmediateTx() { host->SetIsImmediateTx(); } +void TEngineBay::SetIsRepeatableSnapshot() { + Y_VERIFY(EngineHost); + + auto* host = static_cast<TDataShardEngineHost*>(EngineHost.Get()); + host->SetIsRepeatableSnapshot(); +} + TVector<IChangeCollector::TChange> TEngineBay::GetCollectedChanges() const { Y_VERIFY(EngineHost); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 12651fdcf9..2a1374cac1 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -92,6 +92,7 @@ public: void SetWriteVersion(TRowVersion writeVersion); void SetReadVersion(TRowVersion readVersion); void SetIsImmediateTx(); + void SetIsRepeatableSnapshot(); TVector<NMiniKQL::IChangeCollector::TChange> GetCollectedChanges() const; diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index b387addbba..72b36561e9 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -17,7 +17,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, const TActorContext &ctx, const TStepOrder &stepTxId, TInstant receivedAt, - const TString &txBody) + const TString &txBody, + bool usesMvccSnapshot) : StepTxId_(stepTxId) , TabletId_(self->TabletID()) , TxBody(txBody) @@ -50,6 +51,9 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, if (Tx.GetImmediate()) EngineBay.SetIsImmediateTx(); + if (usesMvccSnapshot) + EngineBay.SetIsRepeatableSnapshot(); + if (Tx.HasReadTableTransaction()) { auto &tx = Tx.GetReadTableTransaction(); if (self->TableInfos.contains(tx.GetTableId().GetTableId())) { @@ -244,38 +248,6 @@ bool TValidatedDataTx::ReValidateKeys() return true; } -ETxOrder TValidatedDataTx::CheckOrder(const TSysLocks& sysLocks, const TValidatedDataTx& dataTx) const { - Y_VERIFY(TxInfo().Loaded); - Y_VERIFY(dataTx.TxInfo().Loaded); - - if (KeysCount() > MaxReorderTxKeys()) - return ETxOrder::Unknown; - - if (HasLockedWrites()) { - if (sysLocks.IsBroken(LockTxId())) - return ETxOrder::Any; - - // TODO: dataTx.DynRW && dataTx.RW && dataTx.LockedRW - if (dataTx.HasWrites()) - return ETxOrder::Unknown; - - } else if (dataTx.HasLockedWrites()) { - if (sysLocks.IsBroken(dataTx.LockTxId())) - return ETxOrder::Any; - - if (LockTxId() == dataTx.LockTxId()) - return ETxOrder::Unknown; - - // TODO: DynRW && RW - if (HasWrites()) - return ETxOrder::Unknown; - } - - if (HasKeyConflict(TxInfo(), dataTx.TxInfo())) - return StepTxId_.CheckOrder(dataTx.StepTxId()); - return ETxOrder::Any; -} - bool TValidatedDataTx::CanCancel() { if (!IsTxReadOnly()) { return false; @@ -427,7 +399,7 @@ TValidatedDataTx::TPtr TActiveTransaction::BuildDataTx(TDataShard *self, if (!DataTx) { Y_VERIFY(TxBody); DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(), - GetReceivedAt(), TxBody); + GetReceivedAt(), TxBody, MvccSnapshotRepeatable); if (DataTx->HasStreamResponse()) SetStreamSink(DataTx->GetSink()); } @@ -648,7 +620,7 @@ ERestoreDataStatus TActiveTransaction::RestoreTxData( bool extractKeys = DataTx->IsTxInfoLoaded(); DataTx = std::make_shared<TValidatedDataTx>(self, txc, ctx, GetStepOrder(), - GetReceivedAt(), TxBody); + GetReceivedAt(), TxBody, MvccSnapshotRepeatable); if (DataTx->Ready() && extractKeys) { DataTx->ExtractKeys(true); } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 43abd525e1..b2289b055d 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -120,7 +120,8 @@ public: const TActorContext &ctx, const TStepOrder &stepTxId, TInstant receivedAt, - const TString &txBody); + const TString &txBody, + bool usesMvccSnapshot); ~TValidatedDataTx(); @@ -206,7 +207,6 @@ public: ui32 ExtractKeys(bool allowErrors); bool ReValidateKeys(); - ETxOrder CheckOrder(const TSysLocks& sysLocks, const TValidatedDataTx& dataTx) const; ui64 GetTxSize() const { return TxSize; } ui32 KeysCount() const { return TxInfo().ReadsCount + TxInfo().WritesCount; } diff --git a/ydb/core/tx/datashard/datashard_dep_tracker.cpp b/ydb/core/tx/datashard/datashard_dep_tracker.cpp index b7c3706937..2c099436a6 100644 --- a/ydb/core/tx/datashard/datashard_dep_tracker.cpp +++ b/ydb/core/tx/datashard/datashard_dep_tracker.cpp @@ -723,6 +723,12 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera op->SetMvccSnapshot(snapshot, /* repeatable */ false); } + if (snapshotRepeatable) { + // Repeatable snapshot writes are uncommitted, not externally visible, and don't conflict with anything + isGlobalWriter = false; + haveWrites = false; + } + auto onImmediateConflict = [&](TOperation& conflict) { Y_VERIFY(!conflict.IsImmediate()); if (snapshot.IsMax()) { diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 88f8b35bf9..2377567430 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1283,6 +1283,11 @@ public: return value != 0; } + bool GetEnableLockedWrites() const { + ui64 value = EnableLockedWrites; + return value != 0; + } + template <typename T> void ReleaseCache(T& tx) { ReleaseTxCache(tx.GetTxCacheUsage()); @@ -2109,6 +2114,7 @@ private: TControlWrapper EnablePrioritizedMvccSnapshotReads; TControlWrapper EnableUnprotectedMvccSnapshotReads; + TControlWrapper EnableLockedWrites; TControlWrapper EnableLeaderLeases; TControlWrapper MinLeaderLeaseDurationUs; diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 979553b140..ef92e632c5 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -399,10 +399,16 @@ bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<con TouchTablePoint(tableId, key); Shard->GetKeyAccessSampler()->AddSample(tableId, key); + NTable::ITransactionMapPtr txMap; + if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { + txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + } + // TODO: tx observer + NTable::TRowState dbRow; NTable::TSelectStats stats; ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0; - auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion()); + auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion(), txMap); kqpStats.NSelectRow = 1; kqpStats.InvisibleRowSkips = stats.InvisibleRowSkips; @@ -454,8 +460,14 @@ TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTa keyRange.MinInclusive = range.InclusiveFrom; keyRange.MaxInclusive = range.InclusiveTo; + NTable::ITransactionMapPtr txMap; + if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { + txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + } + // TODO: tx observer + TouchTableRange(tableId, range); - return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion()); + return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion(), txMap); } TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId, @@ -475,8 +487,14 @@ TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIter keyRange.MinInclusive = range.InclusiveFrom; keyRange.MaxInclusive = range.InclusiveTo; + NTable::ITransactionMapPtr txMap; + if (LockTxId && Database->HasOpenTx(localTid, LockTxId)) { + txMap = new NTable::TSingleTransactionMap(LockTxId, TRowVersion::Min()); + } + // TODO: tx observer + TouchTableRange(tableId, range); - return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion()); + return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion(), txMap); } template <typename TReadTableIterator> diff --git a/ydb/core/tx/datashard/datashard_pipeline.cpp b/ydb/core/tx/datashard/datashard_pipeline.cpp index 1f5796d7b4..1a0ba76320 100644 --- a/ydb/core/tx/datashard/datashard_pipeline.cpp +++ b/ydb/core/tx/datashard/datashard_pipeline.cpp @@ -1314,9 +1314,25 @@ TOperation::TPtr TPipeline::BuildOperation(TEvDataShard::TEvProposeTransaction:: return tx; } - if(tx->IsMvccSnapshotRead() && (!tx->IsImmediate() || !tx->IsReadOnly())) { + auto allowSnapshot = [&]() -> bool { + // must be immediate + if (!tx->IsImmediate()) { + return false; + } + // always ok for readonly + if (tx->IsReadOnly()) { + return true; + } + // ok for locked writes + if (dataTx->LockTxId()) { + return true; + } + return false; + }; + + if(tx->IsMvccSnapshotRead() && !allowSnapshot()) { tx->SetAbortedFlag(); - TString err = "Snapshot read must be an immediate read only transaction"; + TString err = "Snapshot read must be an immediate read only or locked write transaction"; tx->Result().Reset(new TEvDataShard::TEvProposeTransactionResult(rec.GetTxKind(), Self->TabletID(), tx->GetTxId(), diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index 7b6b2e93f8..d5f60e825a 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -7,6 +7,8 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> // Y_UNIT_TEST_(TWIN|QUAD) +#include <ydb/library/yql/minikql/mkql_node_printer.h> + namespace NKikimr { using namespace NKikimr::NDataShard; @@ -1469,6 +1471,208 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "} Struct { Bool: false }"); } + Y_UNIT_TEST_TWIN(MvccSnapshotLockedWrites, UseNewEngine) { + TPortManager pm; + TServerSettings::TControls controls; + controls.MutableDataShardControls()->SetPrioritizedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetUnprotectedMvccSnapshotReads(1); + controls.MutableDataShardControls()->SetEnableLockedWrites(1); + + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMvcc(true) + .SetEnableMvccSnapshotReads(true) + .SetEnableKqpSessionActor(UseNewEngine) + .SetControls(controls); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG); + + InitRoot(server, sender); + + TDisableDataShardLogBatching disableDataShardLogBatching; + CreateShardedTable(server, sender, "/Root", "table-1", 1); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1)")); + + SimulateSleep(server, TDuration::Seconds(1)); + + auto execSimpleRequest = [&](const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeSimpleRequest(query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + if (response.GetResponse().GetResults().size() == 0) { + return ""; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto beginSnapshotRequest = [&](TString& sessionId, TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + sessionId = CreateSession(runtime, reqSender); + auto ev = ExecRequest(runtime, reqSender, MakeBeginRequest(sessionId, query)); + auto& response = ev->Get()->Record.GetRef(); + UNIT_ASSERT_VALUES_EQUAL(response.GetYdbStatus(), Ydb::StatusIds::SUCCESS); + txId = response.GetResponse().GetTxMeta().id(); + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto continueSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeContinueRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return ""; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + auto commitSnapshotRequest = [&](const TString& sessionId, const TString& txId, const TString& query) -> TString { + auto reqSender = runtime.AllocateEdgeActor(); + auto ev = ExecRequest(runtime, reqSender, MakeCommitRequest(sessionId, txId, query)); + auto& response = ev->Get()->Record.GetRef(); + if (response.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { + return TStringBuilder() << "ERROR: " << response.GetYdbStatus(); + } + if (response.GetResponse().GetResults().size() == 0) { + return ""; + } + UNIT_ASSERT_VALUES_EQUAL(response.GetResponse().GetResults().size(), 1u); + return response.GetResponse().GetResults()[0].GetValue().ShortDebugString(); + }; + + ui64 lastLockTxId = 0; + TRowVersion lastMvccSnapshot = TRowVersion::Min(); + ui64 injectLockTxId = 0; + TRowVersion injectMvccSnapshot = TRowVersion::Min(); + auto capturePropose = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case TEvDataShard::TEvProposeTransaction::EventType: { + auto& record = ev->Get<TEvDataShard::TEvProposeTransaction>()->Record; + Cerr << "TEvProposeTransaction:" << Endl; + Cerr << record.DebugString() << Endl; + if (record.GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) { + NKikimrTxDataShard::TDataTransaction tx; + Y_VERIFY(tx.ParseFromString(record.GetTxBody())); + Cerr << "TxBody:" << Endl; + Cerr << tx.DebugString() << Endl; + if (tx.HasMiniKQL()) { + using namespace NKikimr::NMiniKQL; + TScopedAlloc alloc; + TTypeEnvironment typeEnv(alloc); + auto node = DeserializeRuntimeNode(tx.GetMiniKQL(), typeEnv); + Cerr << "MiniKQL:" << Endl; + Cerr << PrintNode(node.GetNode()) << Endl; + } + if (tx.HasKqpTransaction()) { + for (const auto& task : tx.GetKqpTransaction().GetTasks()) { + if (task.HasProgram() && task.GetProgram().GetRaw()) { + using namespace NKikimr::NMiniKQL; + TScopedAlloc alloc; + TTypeEnvironment typeEnv(alloc); + auto node = DeserializeRuntimeNode(task.GetProgram().GetRaw(), typeEnv); + Cerr << "Task program:" << Endl; + Cerr << PrintNode(node.GetNode()) << Endl; + } + } + } + if (tx.GetLockTxId()) { + lastLockTxId = tx.GetLockTxId(); + } else if (injectLockTxId) { + tx.SetLockTxId(injectLockTxId); + TString txBody; + Y_VERIFY(tx.SerializeToString(&txBody)); + record.SetTxBody(txBody); + } + if (record.HasMvccSnapshot()) { + lastMvccSnapshot.Step = record.GetMvccSnapshot().GetStep(); + lastMvccSnapshot.TxId = record.GetMvccSnapshot().GetTxId(); + } else if (injectMvccSnapshot) { + record.MutableMvccSnapshot()->SetStep(injectMvccSnapshot.Step); + record.MutableMvccSnapshot()->SetTxId(injectMvccSnapshot.TxId); + } + } + break; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + auto prevObserverFunc = runtime.SetObserverFunc(capturePropose); + + // Start a snapshot read transaction + TString sessionId, txId; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // We should have been acquiring locks + Y_VERIFY(lastLockTxId != 0); + ui64 snapshotLockTxId = lastLockTxId; + Y_VERIFY(lastMvccSnapshot); + auto snapshotVersion = lastMvccSnapshot; + + // Perform an immediate write, pretending it happens as part of the above snapshot tx + injectLockTxId = snapshotLockTxId; + injectMvccSnapshot = snapshotVersion; + UNIT_ASSERT_VALUES_EQUAL( + execSimpleRequest(Q_(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 2) + )")), + ""); + injectLockTxId = 0; + injectMvccSnapshot = TRowVersion::Min(); + + // Start another snapshot read, it should not see above write (it's uncommitted) + TString sessionId2, txId2; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId2, txId2, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "} Struct { Bool: false }"); + + // Perform another read using the first snapshot tx, it must see its own writes + UNIT_ASSERT_VALUES_EQUAL( + continueSnapshotRequest(sessionId, txId, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "} Struct { Bool: false }"); + + // Now commit with additional changes + UNIT_ASSERT_VALUES_EQUAL( + commitSnapshotRequest(sessionId, txId, Q_(R"( + UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) + --SELECT 1 + )")), + ""); + } + } } // namespace NKikimr |