aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-16 18:16:11 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-16 18:16:11 +0300
commit8f60d032b1180f55355c5f2b694e47dab3653056 (patch)
treef4a762557e4b93460a07d15e4c8d2578cd5337da
parentb40833909a29351baa2cbd151a23ebb10f9a601f (diff)
downloadydb-8f60d032b1180f55355c5f2b694e47dab3653056.tar.gz
Refactor commit logic in SessionActor
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp50
1 files changed, 28 insertions, 22 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 8ef17235862..ac2ce73af44 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -948,7 +948,6 @@ public:
return request;
}
-
IKqpGateway::TExecPhysicalRequest PrepareLiteralRequest(TKqpQueryState *queryState) {
auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc);
request.NeedTxId = false;
@@ -1107,14 +1106,14 @@ public:
auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx);
if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) {
while (tx && tx->GetHasEffects()) {
- YQL_ENSURE(txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx)));
+ bool success = txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx));
+ YQL_ENSURE(success);
LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
++QueryState->CurrentTx;
tx = QueryState->PreparedQuery->GetPhyTx(QueryState->CurrentTx);
} else {
tx = nullptr;
- break;
}
}
}
@@ -1123,27 +1122,35 @@ public:
return;
}
- bool commit = false;
- if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) {
+ bool commit = ShouldCommitWithCurrentTx(phyQuery, tx);
+ if (tx || commit) {
+ ExecutePhyTx(&phyQuery, tx, commit);
+ } else {
+ ReplySuccess();
+ }
+ }
+
+ bool ShouldCommitWithCurrentTx(const NKqpProto::TKqpPhyQuery& phyQuery, const TKqpPhyTxHolder::TConstPtr& tx) {
+ if (!QueryState->Commit) {
+ return false;
+ }
+
+ if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
+ // commit can only be applied to the last transaction or perform separately at the end
+ return false;
+ }
+
+ if (Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) {
// every phy tx should acquire LockTxId, so commit is sent separately at the end
- commit = QueryState->CurrentTx >= phyQuery.TransactionsSize();
- } else if (QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1) {
- if (!tx) {
- // no physical transactions left, perform commit
- commit = true;
- } else {
- // we can merge commit with last tx only for read-only transactions
- commit = txCtx.DeferredEffects.Empty();
- }
+ return QueryState->CurrentTx >= phyQuery.TransactionsSize();
}
- if (tx || commit) {
- bool replied = ExecutePhyTx(&phyQuery, tx, commit);
- if (!replied) {
- ++QueryState->CurrentTx;
- }
+ if (!tx) {
+ // no physical transactions left, perform commit
+ return true;
} else {
- ReplySuccess();
+ // we can merge commit with last tx only for read-only transactions
+ return QueryState->TxCtx->DeferredEffects.Empty();
}
}
@@ -1172,8 +1179,6 @@ public:
return true;
}
- // TODO Handle timeouts -- request.Timeout, request.CancelAfter
-
if (tx) {
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
@@ -1244,6 +1249,7 @@ public:
SendToExecuter(std::move(request));
+ ++QueryState->CurrentTx;
return false;
}