summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <[email protected]>2022-04-14 14:27:30 +0300
committerVladislav Kuznetsov <[email protected]>2022-04-14 14:27:30 +0300
commit302ca93ca3295abf8e1255cd27798a3e41d281ee (patch)
tree29b0aa972a1072dbf3e4d5a3c9aa3fb46e88b58b
parent18d517202ec73242ce07163c192267453dd96bfb (diff)
Enhance reply issue and message for MVCC errors KIKIMR-14532
ref:8f036e9293c3800527e8305d1f8a3f86a1741170
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp62
1 files changed, 46 insertions, 16 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index bb9a72dfbde..1892a9751bb 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -610,7 +610,7 @@ public:
if (it == ExplicitTransactions.End()) {
std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
TStringBuilder() << "Transaction not found: " << QueryState->TxId)};
- ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues));
+ ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
return false;
}
QueryState->TxCtx = *it;
@@ -844,12 +844,25 @@ public:
return paramsMap;
}
+ bool CheckTransacionLocks() {
+ auto& txCtx = *QueryState->TxCtx;
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
+ std::vector<TIssue> issues{
+ YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated.")
+ };
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "tx has deferred effects, but lock is broken",
+ MessageFromIssues(issues));
+ return false;
+ }
+ return true;
+ }
void ExecuteOrDefer() {
auto& txCtx = *QueryState->TxCtx;
+
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect");
+ if (!CheckTransacionLocks()) {
return;
}
@@ -859,7 +872,6 @@ public:
auto tx = std::shared_ptr<const NKqpProto::TKqpPhyTx>(QueryState->PreparedQuery,
&phyQuery.GetTransactions(QueryState->CurrentTx));
-
while (tx->GetHasEffects()) {
if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
ythrow TBadRequestFail(requestInfo) << "Failed to mix queries with old- and new- engines";
@@ -877,8 +889,10 @@ public:
bool commit = QueryState->Commit && QueryState->CurrentTx == phyQuery.TransactionsSize() - 1;
if (tx || commit) {
- ExecutePhyTx(&phyQuery, std::move(tx), commit);
- ++QueryState->CurrentTx;
+ bool replied = ExecutePhyTx(&phyQuery, std::move(tx), commit);
+ if (!replied) {
+ ++QueryState->CurrentTx;
+ }
} else {
ReplySuccess();
}
@@ -892,6 +906,10 @@ public:
LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
+ if (!CheckTransacionLocks()) {
+ return true;
+ }
+
// TODO Handle timeouts -- request.Timeout, request.CancelAfter
if (tx) {
@@ -914,8 +932,6 @@ public:
}
if (commit) {
- YQL_ENSURE(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken());
-
for (const auto& effect : txCtx.DeferredEffects) {
YQL_ENSURE(!effect.Node);
YQL_ENSURE(effect.PhysicalTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_DATA);
@@ -973,11 +989,19 @@ public:
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
ExecuterId = TActorId{};
+ auto& txCtx = QueryState->TxCtx;
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx
- << " response->Status: " << response->GetStatus());
+ << " response->DebugString(): " << response->DebugString());
+
+ txCtx->Invalidate();
+ AbortedTransactions.emplace_back(txCtx);
+ RemoveTransaction(QueryState->TxId);
+
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, response->GetStatus(), "", *response->MutableIssues());
+ TIssues issues;
+ IssuesFromMessage(response->GetIssues(), issues);
+ ReplyQueryError(requestInfo, GetYdbStatus(issues), "", *response->MutableIssues());
return;
}
@@ -995,14 +1019,20 @@ public:
// locks merge
if (txResult.HasLocks()) {
- auto& txCtx = *QueryState->TxCtx;
const auto& locks = txResult.GetLocks();
- auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), txCtx);
+ auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx);
if (!success) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
- MessageFromIssues(issues));
- return;
+ if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) {
+
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
+ MessageFromIssues(issues));
+ return;
+ }
+
+ if (txCtx->GetSnapshot().IsValid()) {
+ txCtx->Locks.MarkBroken(issues.back());
+ }
}
}