aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-18 17:31:35 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-18 17:31:35 +0300
commit2f7de7ff19d8807f43deca8d87ef55ec1aa3df32 (patch)
treefab753a44623b82896ebcac529e1fdfcf095a233
parentd641d83a816185d66b5a72d2f8a98401530c3bac (diff)
downloadydb-2f7de7ff19d8807f43deca8d87ef55ec1aa3df32.tar.gz
Refactor SessionActor KIKIMR-11938
ref:1ce52eba0cf1a314dd8ae377dfb5dd80582f4443
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp85
1 files changed, 46 insertions, 39 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 1b29261f606..aa54c02f18f 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -259,7 +259,6 @@ public:
YQL_ENSURE(txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId, "Can't commit transaction - "
<< " there is no TxId in Query's TxControl, queryRequest: " << queryRequest.DebugString());
-
QueryState->Commit = txControl.commit_tx();
const auto& txId = txControl.tx_id();
@@ -652,17 +651,16 @@ public:
}
auto action = queryRequest.GetAction();
- auto queryType = queryRequest.GetType();
+ auto type = queryRequest.GetType();
- if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && queryType == NKikimrKqp::QUERY_TYPE_SQL_DML) {
- queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
+ if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_DML) {
+ type = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
}
- YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED,
- "Unexpected query action, expected: QUERY_ACTION_EXECUTE_PREPARED, got: " << action);
- YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML || queryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN,
- "Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType);
+ YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE && type == NKikimrKqp::QUERY_TYPE_SQL_SCAN
+ || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED && type == NKikimrKqp::QUERY_TYPE_PREPARED_DML,
+ "Unexpected query action: " << action << " and type: " << type);
ParseParameters(std::move(*QueryState->Request.MutableParameters()), queryCtx->Parameters);
return true;
@@ -763,8 +761,8 @@ public:
}
if (!IsSameType(parameter->GetType(), type)) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name << " type mismatch, expected: " << type
- << ", actual: " << parameter->GetType();
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Parameter " << name
+ << " type mismatch, expected: " << type << ", actual: " << parameter->GetType();
}
return parameter;
@@ -778,7 +776,6 @@ public:
TKqpParamsMap paramsMap(QueryState->QueryCtx);
for (const auto& paramBinding : tx.GetParamBindings()) {
-
try {
auto& qCtx = QueryState->QueryCtx;
auto it = paramsMap.Values.emplace(paramBinding.GetName(),
@@ -789,7 +786,6 @@ public:
ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << ex.what();
}
}
-
return paramsMap;
}
@@ -883,7 +879,8 @@ public:
while (tx->GetHasEffects()) {
if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
- ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << "Failed to mix queries with old- and new- engines";
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST)
+ << "Failed to mix queries with old- and new- engines";
}
if (QueryState->CurrentTx + 1 < phyQuery.TransactionsSize()) {
LWTRACK(KqpPhyQueryDefer, QueryState->Orbit, QueryState->CurrentTx);
@@ -993,16 +990,47 @@ public:
ReplyBusy(ev);
}
+ TVector<NKikimrMiniKQL::TResult> ExtractTxResults(NKikimrKqp::TExecuterTxResult& result) {
+ TVector<NKikimrMiniKQL::TResult> txResults;
+ txResults.resize(result.ResultsSize());
+ for (ui32 i = 0; i < result.ResultsSize(); ++i) {
+ txResults[i].Swap(result.MutableResults(i));
+ }
+ return txResults;
+ }
+
+ bool MergeLocksWithTxResult(const NKikimrKqp::TExecuterTxResult& result) {
+ if (result.HasLocks()) {
+ auto& txCtx = QueryState->TxCtx;
+ const auto& locks = result.GetLocks();
+ auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx);
+ if (!success) {
+ if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) {
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
+ MessageFromIssues(issues));
+ return false;
+ }
+
+ if (txCtx->GetSnapshot().IsValid()) {
+ txCtx->Locks.MarkBroken(issues.back());
+ }
+ }
+ }
+
+ return true;
+ }
+
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
auto* response = ev->Get()->Record.MutableResponse();
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
ExecuterId = TActorId{};
- auto& txCtx = QueryState->TxCtx;
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx
<< " response->DebugString(): " << response->DebugString());
+ auto& txCtx = QueryState->TxCtx;
txCtx->Invalidate();
AbortedTransactions.emplace_back(txCtx);
RemoveTransaction(QueryState->TxId);
@@ -1016,33 +1044,12 @@ public:
YQL_ENSURE(QueryState);
LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize());
- // save tx results
- auto& txResult = *response->MutableResult();
- TVector<NKikimrMiniKQL::TResult> txResults;
- txResults.resize(txResult.ResultsSize());
- for (ui32 i = 0; i < txResult.ResultsSize(); ++i) {
- txResults[i].Swap(txResult.MutableResults(i));
- }
-
- QueryState->QueryCtx->TxResults.emplace_back(std::move(txResults));
-
- // locks merge
- if (txResult.HasLocks()) {
- const auto& locks = txResult.GetLocks();
- auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), *txCtx);
- if (!success) {
- if (!txCtx->GetSnapshot().IsValid() || !txCtx->DeferredEffects.Empty()) {
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
- MessageFromIssues(issues));
- return;
- }
+ auto& txResult = *response->MutableResult();
+ QueryState->QueryCtx->TxResults.emplace_back(ExtractTxResults(txResult));
- if (txCtx->GetSnapshot().IsValid()) {
- txCtx->Locks.MarkBroken(issues.back());
- }
- }
+ if (!MergeLocksWithTxResult(txResult)) {
+ return;
}
bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;