aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@gmail.com>2022-06-14 12:29:08 +0300
committerAleksei Borzenkov <snaury@gmail.com>2022-06-14 12:29:08 +0300
commit3472d1ac59ed9de4b1d8091b507660d3bdc55f9a (patch)
treec492b686d72cc0367504cba1b94c887ec6c28519
parentcc2baf0174ac4c2fb565a2bd31c662e8339e894a (diff)
downloadydb-3472d1ac59ed9de4b1d8091b507660d3bdc55f9a.tar.gz
Commit changes together with locks, KIKIMR-14732
ref:059951a5793b3f97c4685690a59672b1f54657e3
-rw-r--r--ydb/core/kqp/executer/kqp_data_executer.cpp4
-rw-r--r--ydb/core/protos/tx_datashard.proto8
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp82
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp20
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp3
6 files changed, 107 insertions, 12 deletions
diff --git a/ydb/core/kqp/executer/kqp_data_executer.cpp b/ydb/core/kqp/executer/kqp_data_executer.cpp
index f5c08aad14..fdafe5cadc 100644
--- a/ydb/core/kqp/executer/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_data_executer.cpp
@@ -1570,10 +1570,10 @@ private:
if (auto locksMap = ExtractLocks(Request.Locks); !locksMap.empty()) {
YQL_ENSURE(Request.ValidateLocks || Request.EraseLocks);
auto locksOp = Request.ValidateLocks && Request.EraseLocks
- ? NKikimrTxDataShard::TKqpLocks::ValidateAndErase
+ ? NKikimrTxDataShard::TKqpLocks::Commit
: (Request.ValidateLocks
? NKikimrTxDataShard::TKqpLocks::Validate
- : NKikimrTxDataShard::TKqpLocks::Erase);
+ : NKikimrTxDataShard::TKqpLocks::Rollback);
TSet<ui64> taskShardIds;
if (Request.ValidateLocks) {
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 4c4179fef3..229038ceae 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -114,14 +114,14 @@ message TReadTableTransaction {
message TKqpLocks {
repeated TLock Locks = 1;
- repeated uint64 SendingShards = 2; // empty at Erase
- repeated uint64 ReceivingShards = 3; // empty at Erase
+ repeated uint64 SendingShards = 2; // empty on Rollback
+ repeated uint64 ReceivingShards = 3; // empty on Rollback
enum ELocksOp {
Unspecified = 0;
Validate = 1;
- ValidateAndErase = 2;
- Erase = 3;
+ Commit = 2; // Validate locks, commit buffered changes and erase locks
+ Rollback = 3; // Rollback buffered changes and erase locks
}
optional ELocksOp Op = 4;
}
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 4bf6b7010a..32f7c49023 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -183,11 +183,11 @@ NDq::ERunStatus RunKqpTransactionInternal(const TActorContext& ctx, ui64 txId,
bool NeedValidateLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
switch (op) {
- case NKikimrTxDataShard::TKqpLocks::ValidateAndErase:
case NKikimrTxDataShard::TKqpLocks::Validate:
+ case NKikimrTxDataShard::TKqpLocks::Commit:
return true;
- case NKikimrTxDataShard::TKqpLocks::Erase:
+ case NKikimrTxDataShard::TKqpLocks::Rollback:
case NKikimrTxDataShard::TKqpLocks::Unspecified:
return false;
}
@@ -195,8 +195,8 @@ bool NeedValidateLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
switch (op) {
- case NKikimrTxDataShard::TKqpLocks::ValidateAndErase:
- case NKikimrTxDataShard::TKqpLocks::Erase:
+ case NKikimrTxDataShard::TKqpLocks::Commit:
+ case NKikimrTxDataShard::TKqpLocks::Rollback:
return true;
case NKikimrTxDataShard::TKqpLocks::Validate:
@@ -205,6 +205,18 @@ bool NeedEraseLocks(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
}
}
+bool NeedCommitLockChanges(NKikimrTxDataShard::TKqpLocks_ELocksOp op) {
+ switch (op) {
+ case NKikimrTxDataShard::TKqpLocks::Commit:
+ return true;
+
+ case NKikimrTxDataShard::TKqpLocks::Validate:
+ case NKikimrTxDataShard::TKqpLocks::Rollback:
+ case NKikimrTxDataShard::TKqpLocks::Unspecified:
+ return false;
+ }
+}
+
TVector<TCell> MakeLockKey(const NKikimrTxDataShard::TLock& lockProto) {
auto lockId = lockProto.GetLockId();
auto lockDatashard = lockProto.GetDataShard();
@@ -612,6 +624,68 @@ void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
}
}
+void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
+ auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+
+ if (!kqpTx.HasLocks()) {
+ return;
+ }
+
+ if (NeedCommitLockChanges(kqpTx.GetLocks().GetOp())) {
+ // We assume locks have been validated earlier
+ for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
+ if (lockProto.GetDataShard() != origin) {
+ continue;
+ }
+
+ TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId());
+ auto localTid = dataShard.GetLocalTableId(tableId);
+ Y_VERIFY_S(localTid, "Unexpected failure to find table " << tableId << " in datashard " << origin);
+
+ auto txId = lockProto.GetLockId();
+ if (!txc.DB.HasOpenTx(localTid, txId)) {
+ continue;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpCommitLockChanges: committing txId# " << txId << " in localTid# " << localTid);
+ txc.DB.CommitTx(localTid, txId);
+ }
+ } else {
+ KqpRollbackLockChanges(origin, tx, dataShard, txc);
+ }
+}
+
+void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc) {
+ auto& kqpTx = tx->GetDataTx()->GetKqpTransaction();
+
+ if (!kqpTx.HasLocks()) {
+ return;
+ }
+
+ if (NeedEraseLocks(kqpTx.GetLocks().GetOp())) {
+ for (auto& lockProto : kqpTx.GetLocks().GetLocks()) {
+ if (lockProto.GetDataShard() != origin) {
+ continue;
+ }
+
+ TTableId tableId(lockProto.GetSchemeShard(), lockProto.GetPathId());
+ auto localTid = dataShard.GetLocalTableId(tableId);
+ if (!localTid) {
+ // It may have been dropped already
+ continue;
+ }
+
+ auto txId = lockProto.GetLockId();
+ if (!txc.DB.HasOpenTx(localTid, txId)) {
+ continue;
+ }
+
+ LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "KqpRollbackLockChanges: removing txId# " << txId << " from localTid# " << localTid);
+ txc.DB.RemoveTx(localTid, txId);
+ }
+ }
+}
+
void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
const NKikimrTxDataShard::TKqpTransaction& kqpTx, ui64 tabletId)
{
diff --git a/ydb/core/tx/datashard/datashard_kqp.h b/ydb/core/tx/datashard/datashard_kqp.h
index 228f71fd37..452fc92d33 100644
--- a/ydb/core/tx/datashard/datashard_kqp.h
+++ b/ydb/core/tx/datashard/datashard_kqp.h
@@ -37,6 +37,8 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
+void KqpCommitLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
+void KqpRollbackLockChanges(ui64 origin, TActiveTransaction* tx, TDataShard& dataShard, TTransactionContext& txc);
void KqpUpdateDataShardStatCounters(TDataShard& dataShard, const NMiniKQL::TEngineHostCounters& counters);
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index d5f60e825a..ab4092496a 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -1664,13 +1664,29 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
"} Struct { Bool: false }");
- // Now commit with additional changes
+ // Now commit with additional changes (temporarily needed to trigger lock commits)
UNIT_ASSERT_VALUES_EQUAL(
commitSnapshotRequest(sessionId, txId, Q_(R"(
UPSERT INTO `Root/table-1` (key, value) VALUES (3, 3)
- --SELECT 1
)")),
"");
+
+ if (UseNewEngine) {
+ // Verify new snapshots observe all committed changes
+ // This is only possible with new engine at this time
+ TString sessionId3, txId3;
+ UNIT_ASSERT_VALUES_EQUAL(
+ beginSnapshotRequest(sessionId3, txId3, Q_(R"(
+ SELECT key, value FROM `/Root/table-1`
+ WHERE key >= 1 AND key <= 3
+ ORDER BY key
+ )")),
+ "Struct { "
+ "List { Struct { Optional { Uint32: 1 } } Struct { Optional { Uint32: 1 } } } "
+ "List { Struct { Optional { Uint32: 2 } } Struct { Optional { Uint32: 2 } } } "
+ "List { Struct { Optional { Uint32: 3 } } Struct { Optional { Uint32: 3 } } } "
+ "} Struct { Bool: false }");
+ }
}
}
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index dda3a8d858..d1e02434d0 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -134,6 +134,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
}
if (!KqpValidateLocks(tabletId, tx, DataShard.SysLocksTable())) {
+ KqpRollbackLockChanges(tabletId, tx, DataShard, txc);
KqpEraseLocks(tabletId, tx, DataShard.SysLocksTable());
DataShard.SysLocksTable().ApplyLocks();
return EExecutionStatus::Executed;
@@ -161,6 +162,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
dataTx->SetReadVersion(readVersion);
dataTx->SetWriteVersion(writeVersion);
+ KqpCommitLockChanges(tabletId, tx, DataShard, txc);
+
auto& computeCtx = tx->GetDataTx()->GetKqpComputeCtx();
auto result = KqpCompleteTransaction(ctx, tabletId, op->GetTxId(),