diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-16 18:16:11 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-16 18:16:11 +0300 |
commit | 8f60d032b1180f55355c5f2b694e47dab3653056 (patch) | |
tree | f4a762557e4b93460a07d15e4c8d2578cd5337da | |
parent | b40833909a29351baa2cbd151a23ebb10f9a601f (diff) | |
download | ydb-8f60d032b1180f55355c5f2b694e47dab3653056.tar.gz |
Refactor commit logic in SessionActor
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 50 |
1 files changed, 28 insertions, 22 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 8ef17235862..ac2ce73af44 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -948,7 +948,6 @@ public: return request; } - IKqpGateway::TExecPhysicalRequest PrepareLiteralRequest(TKqpQueryState *queryState) { auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); request.NeedTxId = false; @@ -1107,14 +1106,14 @@ public: auto tx = QueryState->PreparedQuery->GetPhyTxOrEmpty(QueryState->CurrentTx); if (!Config->FeatureFlags.GetEnableKqpImmediateEffects()) { while (tx && tx->GetHasEffects()) { - YQL_ENSURE(txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx))); + bool success = txCtx.AddDeferredEffect(tx, CreateKqpValueMap(tx)); + YQL_ENSURE(success); LWTRACK(KqpSessionPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { ++QueryState->CurrentTx; tx = QueryState->PreparedQuery->GetPhyTx(QueryState->CurrentTx); } else { tx = nullptr; - break; } } } @@ -1123,27 +1122,35 @@ public: return; } - bool commit = false; - if (QueryState->Commit && Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) { + bool commit = ShouldCommitWithCurrentTx(phyQuery, tx); + if (tx || commit) { + ExecutePhyTx(&phyQuery, tx, commit); + } else { + ReplySuccess(); + } + } + + bool ShouldCommitWithCurrentTx(const NKqpProto::TKqpPhyQuery& phyQuery, const TKqpPhyTxHolder::TConstPtr& tx) { + if (!QueryState->Commit) { + return false; + } + + if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { + // commit can only be applied to the last transaction or perform separately at the end + return false; + } + + if (Config->FeatureFlags.GetEnableKqpImmediateEffects() && phyQuery.GetHasUncommittedChangesRead()) { // every phy tx should acquire LockTxId, so commit is sent separately at the end - commit = QueryState->CurrentTx >= phyQuery.TransactionsSize(); - } else if (QueryState->Commit && QueryState->CurrentTx >= phyQuery.TransactionsSize() - 1) { - if (!tx) { - // no physical transactions left, perform commit - commit = true; - } else { - // we can merge commit with last tx only for read-only transactions - commit = txCtx.DeferredEffects.Empty(); - } + return QueryState->CurrentTx >= phyQuery.TransactionsSize(); } - if (tx || commit) { - bool replied = ExecutePhyTx(&phyQuery, tx, commit); - if (!replied) { - ++QueryState->CurrentTx; - } + if (!tx) { + // no physical transactions left, perform commit + return true; } else { - ReplySuccess(); + // we can merge commit with last tx only for read-only transactions + return QueryState->TxCtx->DeferredEffects.Empty(); } } @@ -1172,8 +1179,6 @@ public: return true; } - // TODO Handle timeouts -- request.Timeout, request.CancelAfter - if (tx) { switch (tx->GetType()) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: @@ -1244,6 +1249,7 @@ public: SendToExecuter(std::move(request)); + ++QueryState->CurrentTx; return false; } |