aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-14 17:40:17 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-14 17:40:17 +0300
commit443255e68d604b2dcbb97c3bf6cf89db75fbbb4d (patch)
tree3b6b0b2ba566fa4a68f084a93edbcf9c40e9603f
parent05b2a699630e0d2e0a05f680c2d923b49e4d8ad0 (diff)
downloadydb-443255e68d604b2dcbb97c3bf6cf89db75fbbb4d.tar.gz
Implement BEGIN_TX and ROLLBACK_TX actions in session_actor, KIKIMR-11938
ref:6e801487f25b7bf128a94edb6fd6d36d4c1af076
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp62
1 files changed, 50 insertions, 12 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index cdf90a39f1..8da1054b85 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -67,7 +67,7 @@ struct TKqpQueryState {
NLWTrace::TOrbit Orbit;
TString TxId; // User tx
- bool Commit = true;
+ bool Commit = false;
};
struct TKqpCleanupCtx {
@@ -200,6 +200,31 @@ public:
}
}
+ void RollbackTx(const TKqpRequestInfo& requestInfo) {
+ auto& queryRequest = QueryState->Request;
+ YQL_ENSURE(queryRequest.HasTxControl(),
+ "Can't perform ROLLBACK_TX: TxControl isn't set in TQueryRequest");
+ const auto& txControl = queryRequest.GetTxControl();
+ QueryState->Commit = txControl.commit_tx();
+ const auto& txId = txControl.tx_id();
+ auto txCtx = FindTransaction(txId);
+ if (!txCtx) {
+ google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message;
+ auto issue = YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder()
+ << "Transaction not found: " << QueryState->TxId);
+ IssueToMessage(issue, message.Add());
+ ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", &message);
+ } else {
+ QueryState->TxCtx = txCtx;
+ txCtx->Invalidate();
+ AbortedTransactions.emplace_back(txCtx);
+ RemoveTransaction(txId);
+
+ SendRollbackRequest(txCtx.Get());
+ Become(&TKqpSessionActor::ExecuteState);
+ }
+ }
+
void CommitTx() {
auto& queryRequest = QueryState->Request;
@@ -292,11 +317,19 @@ public:
}
break;
}
-
- case NKikimrKqp::QUERY_ACTION_BEGIN_TX:
- case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX:
- YQL_ENSURE(false, "BEGIN_TX and ROLLBACK_TX is not supported yet");
+ case NKikimrKqp::QUERY_ACTION_BEGIN_TX: {
+ YQL_ENSURE(queryRequest.HasTxControl(),
+ "Can't perform BEGIN_TX: TxControl isn't set in TQueryRequest");
+ auto& txControl = queryRequest.GetTxControl();
+ QueryState->Commit = txControl.commit_tx();
+ BeginTx(txControl.begin_tx());
+ ReplySuccess();
return;
+ }
+ case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX: {
+ RollbackTx(requestInfo);
+ return;
+ }
case NKikimrKqp::QUERY_ACTION_COMMIT_TX:
CommitTx();
return;
@@ -452,6 +485,13 @@ public:
YQL_ENSURE(success);
}
+ void BeginTx(const Ydb::Table::TransactionSettings& settings) {
+ QueryState->TxId = CreateGuidAsString();
+ QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false);
+ SetIsolationLevel(settings);
+ CreateNewTx();
+ }
+
void PrepareQueryContext() {
YQL_ENSURE(QueryState);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -473,10 +513,7 @@ public:
break;
}
case Ydb::Table::TransactionControl::kBeginTx: {
- QueryState->TxId = CreateGuidAsString();
- QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false);
- SetIsolationLevel(txControl.begin_tx());
- CreateNewTx();
+ BeginTx(txControl.begin_tx());
break;
}
case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
@@ -959,6 +996,7 @@ public:
}
resEv->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS);
+ LOG_D("Reply for action: " << queryRequest.GetAction() << " with SUCCESS status");
Reply(std::move(resEv));
LWTRACK(KqpQueryReplySuccess, QueryState->Orbit, arena->SpaceUsed());
@@ -1110,7 +1148,7 @@ public:
}
}
- void RollbackTx(TIntrusivePtr<TKqpTransactionContext> txCtx) {
+ void SendRollbackRequest(TKqpTransactionContext* txCtx) {
auto request = PreparePhysicalRequest(nullptr);
request.EraseLocks = true;
@@ -1186,7 +1224,7 @@ public:
CleanupCtx->TransactionsToBeAborted = AbortedTransactions.size();
// TODO Rollback one-by-one to avoid burst
for (auto& txCtx : AbortedTransactions) {
- RollbackTx(txCtx);
+ SendRollbackRequest(txCtx.Get());
}
}
@@ -1210,10 +1248,10 @@ public:
auto* response = ev->Record.GetRef().MutableResponse();
auto *queryIssue = response->AddQueryIssues();
- IssueToMessage(TIssue{message}, queryIssue);
if (issues) {
queryIssue->Mutableissues()->Swap(issues);
}
+ IssueToMessage(TIssue{message}, queryIssue);
if (QueryState) {
if (QueryState->TxCtx) {