diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-18 17:31:35 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-18 17:31:35 +0300 |
commit | 2f7de7ff19d8807f43deca8d87ef55ec1aa3df32 (patch) | |
tree | fab753a44623b82896ebcac529e1fdfcf095a233 | |
parent | d641d83a816185d66b5a72d2f8a98401530c3bac (diff) | |
download | ydb-2f7de7ff19d8807f43deca8d87ef55ec1aa3df32.tar.gz |
Refactor SessionActor KIKIMR-11938
ref:1ce52eba0cf1a314dd8ae377dfb5dd80582f4443
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 85 |
1 files changed, 46 insertions, 39 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 1b29261f606..aa54c02f18f 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -259,7 +259,6 @@ public: YQL_ENSURE(txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId, "Can't commit transaction - " << " there is no TxId in Query's TxControl, queryRequest: " << queryRequest.DebugString()); - QueryState->Commit = txControl.commit_tx(); const auto& txId = txControl.tx_id(); @@ -652,17 +651,16 @@ public: } auto action = queryRequest.GetAction(); - auto queryType = queryRequest.GetType(); + auto type = queryRequest.GetType(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && queryType == NKikimrKqp::QUERY_TYPE_SQL_DML) { - queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML; + if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML) { + type = NKikimrKqp::QUERY_TYPE_PREPARED_DML; action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; } - YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED, - "Unexpected query action, expected: QUERY_ACTION_EXECUTE_PREPARED, got: " << action); - YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML || queryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN, - "Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType); + YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN + || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML, + "Unexpected query action: " << action << " and type: " << type); ParseParameters(std::move(*QueryState->Request.MutableParameters()), queryCtx->Parameters); return true; @@ -763,8 +761,8 @@ public: } if (!IsSameType(parameter->GetType(), type)) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name << " type mismatch, expected: " << type - << ", actual: " << parameter->GetType(); + ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name + << " type mismatch, expected: " << type << ", actual: " << parameter->GetType(); } return parameter; @@ -778,7 +776,6 @@ public: TKqpParamsMap paramsMap(QueryState->QueryCtx); for (const auto& paramBinding : tx.GetParamBindings()) { - try { auto& qCtx = QueryState->QueryCtx; auto it = paramsMap.Values.emplace(paramBinding.GetName(), @@ -789,7 +786,6 @@ public: ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << ex.what(); } } - return paramsMap; } @@ -883,7 +879,8 @@ public: while (tx->GetHasEffects()) { if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { - ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Failed to mix queries with old- and new- engines"; + ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + << "Failed to mix queries with old- and new- engines"; } if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) { LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx); @@ -993,16 +990,47 @@ public: ReplyBusy(ev); } + TVector<NKikimrMiniKQL::TResult> ExtractTxResults(NKikimrKqp::TExecuterTxResult& result) { + TVector<NKikimrMiniKQL::TResult> txResults; + txResults.resize(result.ResultsSize()); + for (ui32 i = 0; i < result.ResultsSize(); ++i) { + txResults[i].Swap(result.MutableResults(i)); + } + return txResults; + } + + bool MergeLocksWithTxResult(const NKikimrKqp::TExecuterTxResult& result) { + if (result.HasLocks()) { + auto& txCtx = QueryState->TxCtx; + const auto& locks = result.GetLocks(); + auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx); + if (!success) { + if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) { + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", + MessageFromIssues(issues)); + return false; + } + + if (txCtx->GetSnapshot().IsValid()) { + txCtx->Locks.MarkBroken(issues.back()); + } + } + } + + return true; + } + void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { auto* response = ev->Get()->Record.MutableResponse(); LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); ExecuterId = TActorId{}; - auto& txCtx = QueryState->TxCtx; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx << " response->DebugString(): " << response->DebugString()); + auto& txCtx = QueryState->TxCtx; txCtx->Invalidate(); AbortedTransactions.emplace_back(txCtx); RemoveTransaction(QueryState->TxId); @@ -1016,33 +1044,12 @@ public: YQL_ENSURE(QueryState); LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize()); - // save tx results - auto& txResult = *response->MutableResult(); - TVector<NKikimrMiniKQL::TResult> txResults; - txResults.resize(txResult.ResultsSize()); - for (ui32 i = 0; i < txResult.ResultsSize(); ++i) { - txResults[i].Swap(txResult.MutableResults(i)); - } - - QueryState->QueryCtx->TxResults.emplace_back(std::move(txResults)); - - // locks merge - if (txResult.HasLocks()) { - const auto& locks = txResult.GetLocks(); - auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx); - if (!success) { - if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) { - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", - MessageFromIssues(issues)); - return; - } + auto& txResult = *response->MutableResult(); + QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult)); - if (txCtx->GetSnapshot().IsValid()) { - txCtx->Locks.MarkBroken(issues.back()); - } - } + if (!MergeLocksWithTxResult(txResult)) { + return; } bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; |