diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-14 17:40:17 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-14 17:40:17 +0300 |
commit | 443255e68d604b2dcbb97c3bf6cf89db75fbbb4d (patch) | |
tree | 3b6b0b2ba566fa4a68f084a93edbcf9c40e9603f | |
parent | 05b2a699630e0d2e0a05f680c2d923b49e4d8ad0 (diff) | |
download | ydb-443255e68d604b2dcbb97c3bf6cf89db75fbbb4d.tar.gz |
Implement BEGIN_TX and ROLLBACK_TX actions in session_actor, KIKIMR-11938
ref:6e801487f25b7bf128a94edb6fd6d36d4c1af076
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 62 |
1 files changed, 50 insertions, 12 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index cdf90a39f1..8da1054b85 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -67,7 +67,7 @@ struct TKqpQueryState { NLWTrace::TOrbit Orbit; TString TxId; // User tx - bool Commit = true; + bool Commit = false; }; struct TKqpCleanupCtx { @@ -200,6 +200,31 @@ public: } } + void RollbackTx(const TKqpRequestInfo& requestInfo) { + auto& queryRequest = QueryState->Request; + YQL_ENSURE(queryRequest.HasTxControl(), + "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest"); + const auto& txControl = queryRequest.GetTxControl(); + QueryState->Commit = txControl.commit_tx(); + const auto& txId = txControl.tx_id(); + auto txCtx = FindTransaction(txId); + if (!txCtx) { + google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; + auto issue = YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() + << "Transaction not found: " << QueryState->TxId); + IssueToMessage(issue, message.Add()); + ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", &message); + } else { + QueryState->TxCtx = txCtx; + txCtx->Invalidate(); + AbortedTransactions.emplace_back(txCtx); + RemoveTransaction(txId); + + SendRollbackRequest(txCtx.Get()); + Become(&TKqpSessionActor::ExecuteState); + } + } + void CommitTx() { auto& queryRequest = QueryState->Request; @@ -292,11 +317,19 @@ public: } break; } - - case NKikimrKqp::QUERY_ACTION_BEGIN_TX: - case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: - YQL_ENSURE(false, "BEGIN_TX and ROLLBACK_TX is not supported yet"); + case NKikimrKqp::QUERY_ACTION_BEGIN_TX: { + YQL_ENSURE(queryRequest.HasTxControl(), + "Can't perform BEGIN_TX: TxControl isn't set in TQueryRequest"); + auto& txControl = queryRequest.GetTxControl(); + QueryState->Commit = txControl.commit_tx(); + BeginTx(txControl.begin_tx()); + ReplySuccess(); return; + } + case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: { + RollbackTx(requestInfo); + return; + } case NKikimrKqp::QUERY_ACTION_COMMIT_TX: CommitTx(); return; @@ -452,6 +485,13 @@ public: YQL_ENSURE(success); } + void BeginTx(const Ydb::Table::TransactionSettings& settings) { + QueryState->TxId = CreateGuidAsString(); + QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false); + SetIsolationLevel(settings); + CreateNewTx(); + } + void PrepareQueryContext() { YQL_ENSURE(QueryState); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -473,10 +513,7 @@ public: break; } case Ydb::Table::TransactionControl::kBeginTx: { - QueryState->TxId = CreateGuidAsString(); - QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false); - SetIsolationLevel(txControl.begin_tx()); - CreateNewTx(); + BeginTx(txControl.begin_tx()); break; } case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: @@ -959,6 +996,7 @@ public: } resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); + LOG_D("Reply for action: " << queryRequest.GetAction() << " with SUCCESS status"); Reply(std::move(resEv)); LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed()); @@ -1110,7 +1148,7 @@ public: } } - void RollbackTx(TIntrusivePtr<TKqpTransactionContext> txCtx) { + void SendRollbackRequest(TKqpTransactionContext* txCtx) { auto request = PreparePhysicalRequest(nullptr); request.EraseLocks = true; @@ -1186,7 +1224,7 @@ public: CleanupCtx->TransactionsToBeAborted = AbortedTransactions.size(); // TODO Rollback one-by-one to avoid burst for (auto& txCtx : AbortedTransactions) { - RollbackTx(txCtx); + SendRollbackRequest(txCtx.Get()); } } @@ -1210,10 +1248,10 @@ public: auto* response = ev->Record.GetRef().MutableResponse(); auto *queryIssue = response->AddQueryIssues(); - IssueToMessage(TIssue{message}, queryIssue); if (issues) { queryIssue->Mutableissues()->Swap(issues); } + IssueToMessage(TIssue{message}, queryIssue); if (QueryState) { if (QueryState->TxCtx) { |