diff options
author | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-14 12:29:08 +0300 |
---|---|---|
committer | Aleksei Borzenkov <snaury@gmail.com> | 2022-06-14 12:29:08 +0300 |
commit | 3472d1ac59ed9de4b1d8091b507660d3bdc55f9a (patch) | |
tree | c492b686d72cc0367504cba1b94c887ec6c28519 | |
parent | cc2baf0174ac4c2fb565a2bd31c662e8339e894a (diff) | |
download | ydb-3472d1ac59ed9de4b1d8091b507660d3bdc55f9a.tar.gz |
Commit changes together with locks, KIKIMR-14732
ref:059951a5793b3f97c4685690a59672b1f54657e3
-rw-r--r-- | ydb/core/kqp/executer/kqp_data_executer.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 82 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_snapshot.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 3 |
6 files changed, 107 insertions, 12 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp index f5c08aad14..fdafe5cadc 100644 --- a/ydb/core/kqp/executer/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer/kqp_data_executer.cpp @@ -1570,10 +1570,10 @@ private: if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty()) { YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks); auto locksOp = Request.ValidateLocks && Request.EraseLocks - ? NKikimrTxDataShard::TKqpLocks::ValidateAndErase + ? NKikimrTxDataShard::TKqpLocks::Commit : (Request.ValidateLocks ? NKikimrTxDataShard::TKqpLocks::Validate - : NKikimrTxDataShard::TKqpLocks::Erase); + : NKikimrTxDataShard::TKqpLocks::Rollback); TSet<ui64> taskShardIds; if (Request.ValidateLocks) { diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index 4c4179fef3..229038ceae 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -114,14 +114,14 @@ message TReadTableTransaction { message TKqpLocks { repeated TLock Locks = 1; - repeated uint64 SendingShards = 2; // empty at Erase - repeated uint64 ReceivingShards = 3; // empty at Erase + repeated uint64 SendingShards = 2; // empty on Rollback + repeated uint64 ReceivingShards = 3; // empty on Rollback enum ELocksOp { Unspecified = 0; Validate = 1; - ValidateAndErase = 2; - Erase = 3; + Commit = 2; // Validate locks, commit buffered changes and erase locks + Rollback = 3; // Rollback buffered changes and erase locks } optional ELocksOp Op = 4; } diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 4bf6b7010a..32f7c49023 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -183,11 +183,11 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId, bool NeedValidateLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { switch (op) { - case NKikimrTxDataShard::TKqpLocks::ValidateAndErase: case NKikimrTxDataShard::TKqpLocks::Validate: + case NKikimrTxDataShard::TKqpLocks::Commit: return true; - case NKikimrTxDataShard::TKqpLocks::Erase: + case NKikimrTxDataShard::TKqpLocks::Rollback: case NKikimrTxDataShard::TKqpLocks::Unspecified: return false; } @@ -195,8 +195,8 @@ bool NeedValidateLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { switch (op) { - case NKikimrTxDataShard::TKqpLocks::ValidateAndErase: - case NKikimrTxDataShard::TKqpLocks::Erase: + case NKikimrTxDataShard::TKqpLocks::Commit: + case NKikimrTxDataShard::TKqpLocks::Rollback: return true; case NKikimrTxDataShard::TKqpLocks::Validate: @@ -205,6 +205,18 @@ bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { } } +bool NeedCommitLockChanges(NKikimrTxDataShard::TKqpLocks_ELocksOp op) { + switch (op) { + case NKikimrTxDataShard::TKqpLocks::Commit: + return true; + + case NKikimrTxDataShard::TKqpLocks::Validate: + case NKikimrTxDataShard::TKqpLocks::Rollback: + case NKikimrTxDataShard::TKqpLocks::Unspecified: + return false; + } +} + TVector<TCell> MakeLockKey(const NKikimrTxDataShard::TLock& lockProto) { auto lockId = lockProto.GetLockId(); auto lockDatashard = lockProto.GetDataShard(); @@ -612,6 +624,68 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) { } } +void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { + auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + + if (!kqpTx.HasLocks()) { + return; + } + + if (NeedCommitLockChanges(kqpTx.GetLocks().GetOp())) { + // We assume locks have been validated earlier + for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { + if (lockProto.GetDataShard() != origin) { + continue; + } + + TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); + auto localTid = dataShard.GetLocalTableId(tableId); + Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin); + + auto txId = lockProto.GetLockId(); + if (!txc.DB.HasOpenTx(localTid, txId)) { + continue; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid); + txc.DB.CommitTx(localTid, txId); + } + } else { + KqpRollbackLockChanges(origin, tx, dataShard, txc); + } +} + +void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) { + auto& kqpTx = tx->GetDataTx()->GetKqpTransaction(); + + if (!kqpTx.HasLocks()) { + return; + } + + if (NeedEraseLocks(kqpTx.GetLocks().GetOp())) { + for (auto& lockProto : kqpTx.GetLocks().GetLocks()) { + if (lockProto.GetDataShard() != origin) { + continue; + } + + TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId()); + auto localTid = dataShard.GetLocalTableId(tableId); + if (!localTid) { + // It may have been dropped already + continue; + } + + auto txId = lockProto.GetLockId(); + if (!txc.DB.HasOpenTx(localTid, txId)) { + continue; + } + + LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpRollbackLockChanges: removing txId# " << txId << " from localTid# " << localTid); + txc.DB.RemoveTx(localTid, txId); + } + } +} + void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId) { diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h index 228f71fd37..452fc92d33 100644 --- a/ydb/core/tx/datashard/datashard_kqp.h +++ b/ydb/core/tx/datashard/datashard_kqp.h @@ -37,6 +37,8 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets, bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks); +void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); +void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc); void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters); diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp index d5f60e825a..ab4092496a 100644 --- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp +++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp @@ -1664,13 +1664,29 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) { "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " "} Struct { Bool: false }"); - // Now commit with additional changes + // Now commit with additional changes (temporarily needed to trigger lock commits) UNIT_ASSERT_VALUES_EQUAL( commitSnapshotRequest(sessionId, txId, Q_(R"( UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3) - --SELECT 1 )")), ""); + + if (UseNewEngine) { + // Verify new snapshots observe all committed changes + // This is only possible with new engine at this time + TString sessionId3, txId3; + UNIT_ASSERT_VALUES_EQUAL( + beginSnapshotRequest(sessionId3, txId3, Q_(R"( + SELECT key, value FROM `/Root/table-1` + WHERE key >= 1 AND key <= 3 + ORDER BY key + )")), + "Struct { " + "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } " + "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } " + "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } " + "} Struct { Bool: false }"); + } } } diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index dda3a8d858..d1e02434d0 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -134,6 +134,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio } if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) { + KqpRollbackLockChanges(tabletId, tx, DataShard, txc); KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable()); DataShard.SysLocksTable().ApplyLocks(); return EExecutionStatus::Executed; @@ -161,6 +162,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->SetReadVersion(readVersion); dataTx->SetWriteVersion(writeVersion); + KqpCommitLockChanges(tabletId, tx, DataShard, txc); + auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx(); auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(), |