diff options
| author | Vladislav Kuznetsov <[email protected]> | 2022-04-14 14:27:30 +0300 |
|---|---|---|
| committer | Vladislav Kuznetsov <[email protected]> | 2022-04-14 14:27:30 +0300 |
| commit | 302ca93ca3295abf8e1255cd27798a3e41d281ee (patch) | |
| tree | 29b0aa972a1072dbf3e4d5a3c9aa3fb46e88b58b | |
| parent | 18d517202ec73242ce07163c192267453dd96bfb (diff) | |
Enhance reply issue and message for MVCC errors KIKIMR-14532
ref:8f036e9293c3800527e8305d1f8a3f86a1741170
| -rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 62 |
1 files changed, 46 insertions, 16 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index bb9a72dfbde..1892a9751bb 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -610,7 +610,7 @@ public: if (it == ExplicitTransactions.End()) { std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << QueryState->TxId)}; - ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues)); + ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); return false; } QueryState->TxCtx = *it; @@ -844,12 +844,25 @@ public: return paramsMap; } + bool CheckTransacionLocks() { + auto& txCtx = *QueryState->TxCtx; + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { + std::vector<TIssue> issues{ + YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.") + }; + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken", + MessageFromIssues(issues)); + return false; + } + return true; + } void ExecuteOrDefer() { auto& txCtx = *QueryState->TxCtx; + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect"); + if (!CheckTransacionLocks()) { return; } @@ -859,7 +872,6 @@ public: auto tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery, &phyQuery.GetTransactions(QueryState->CurrentTx)); - while (tx->GetHasEffects()) { if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { ythrow TBadRequestFail(requestInfo) << "Failed to mix queries with old- and new- engines"; @@ -877,8 +889,10 @@ public: bool commit = QueryState->Commit && QueryState->CurrentTx == phyQuery.TransactionsSize() - 1; if (tx || commit) { - ExecutePhyTx(&phyQuery, std::move(tx), commit); - ++QueryState->CurrentTx; + bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit); + if (!replied) { + ++QueryState->CurrentTx; + } } else { ReplySuccess(); } @@ -892,6 +906,10 @@ public: LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); + if (!CheckTransacionLocks()) { + return true; + } + // TODO Handle timeouts -- request.Timeout, request.CancelAfter if (tx) { @@ -914,8 +932,6 @@ public: } if (commit) { - YQL_ENSURE(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken()); - for (const auto& effect : txCtx.DeferredEffects) { YQL_ENSURE(!effect.Node); YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA); @@ -973,11 +989,19 @@ public: LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); ExecuterId = TActorId{}; + auto& txCtx = QueryState->TxCtx; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx - << " response->Status: " << response->GetStatus()); + << " response->DebugString(): " << response->DebugString()); + + txCtx->Invalidate(); + AbortedTransactions.emplace_back(txCtx); + RemoveTransaction(QueryState->TxId); + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, response->GetStatus(), "", *response->MutableIssues()); + TIssues issues; + IssuesFromMessage(response->GetIssues(), issues); + ReplyQueryError(requestInfo, GetYdbStatus(issues), "", *response->MutableIssues()); return; } @@ -995,14 +1019,20 @@ public: // locks merge if (txResult.HasLocks()) { - auto& txCtx = *QueryState->TxCtx; const auto& locks = txResult.GetLocks(); - auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), txCtx); + auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx); if (!success) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", - MessageFromIssues(issues)); - return; + if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) { + + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", + MessageFromIssues(issues)); + return; + } + + if (txCtx->GetSnapshot().IsValid()) { + txCtx->Locks.MarkBroken(issues.back()); + } } } |
