diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-17 23:53:09 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-17 23:53:09 +0300 |
commit | e1b0b0aa2748c0ceac0d114f04157da125032865 (patch) | |
tree | a70bb237471e80dd4cff1ebf578b5e99193f8b91 | |
parent | b7e25ac4eef0181a0b58bd86deb2d36553c7f67e (diff) | |
download | ydb-e1b0b0aa2748c0ceac0d114f04157da125032865.tar.gz |
Enhance error reply from TSessionActor KIKIMR-11938
ref:cf84e7253d41417f192e43dd401f66db7bfe242a
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 77 |
1 files changed, 47 insertions, 30 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 96353152f9..3452262989 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -209,11 +209,9 @@ public: const auto& txId = txControl.tx_id(); auto txCtx = FindTransaction(txId); if (!txCtx) { - google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; - auto issue = YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() - << "Transaction not found: " << QueryState->TxId); - IssueToMessage(issue, message.Add()); - ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", &message); + std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, + TStringBuilder() << "Transaction not found: " << QueryState->TxId)}; + ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues)); } else { QueryState->TxCtx = txCtx; txCtx->Invalidate(); @@ -540,11 +538,7 @@ public: auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); if (!success) { YQL_ENSURE(!issues.Empty()); - google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; - for (const auto& i : issues) { - IssueToMessage(i, message.Add()); - } - ReplyProcessError(requestInfo, *GetYdbStatus(issues.back()), "", &message); + ReplyQueryError(requestInfo, *GetYdbStatus(issues.back()), "", MessageFromIssues(issues)); return; } @@ -726,7 +720,7 @@ public: auto& txCtx = *QueryState->TxCtx; auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { - ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect"); + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect"); return; } @@ -739,7 +733,7 @@ public: while (tx->GetHasEffects()) { if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { - ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, + ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "Failed to mix queries with old- and new- engines"); return; } @@ -848,8 +842,10 @@ public: LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { + LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx + << " response->Status: " << response->GetStatus()); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyProcessError(requestInfo, response->GetStatus(), "", response->MutableIssues()); + ReplyQueryError(requestInfo, response->GetStatus(), "", *response->MutableIssues()); return; } @@ -876,11 +872,8 @@ public: auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), txCtx); if (!success) { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; - for (const auto& i : issues) { - IssueToMessage(i, message.Add()); - } - ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", &message); + ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", + MessageFromIssues(issues)); return; } } @@ -893,6 +886,15 @@ public: } } + void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { + auto& msg = ev->Get()->Record; + + const auto& issues = ev->Get()->GetIssues(); + LOG_I("Got TEvAbortExecution, status: " << msg.GetStatusCode()); + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + ReplyQueryError(requestInfo, msg.GetStatusCode(), "Got AbortExecution", MessageFromIssues(issues)); + } + void FillStats(NKikimrKqp::TQueryResponse* response) { // TODO // Compile status @@ -1019,19 +1021,23 @@ public: } void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { + auto& event = ev->Get()->Record; auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); - //ui64 proxyRequestId = ev->Cookie; - //if (!CheckRequest(requestInfo, ev->Sender, proxyRequestId, ctx)) { - // return; - //} + ui64 proxyRequestId = ev->Cookie; auto busyStatus = Settings.Service.GetUseSessionBusyStatus() ? Ydb::StatusIds::SESSION_BUSY : Ydb::StatusIds::PRECONDITION_FAILED; - ReplyProcessError(requestInfo, busyStatus, "Pending previous query completion"); + TString message = "Pending previous query completion"; + LOG_W(requestInfo << " " << message); + + auto response = TEvKqp::TEvProcessResponse::Error(busyStatus, message); + + //AddTrailingInfo(response->Record); + Send(ev->Sender, response.Release(), 0, proxyRequestId); } bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) { @@ -1113,9 +1119,9 @@ public: void Handle(TEvKqp::TEvCloseSessionRequest::TPtr&) { if (QueryState) { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - ReplyProcessError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED, + ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED, "Request cancelled due to explicit session close request"); - // TODO Remove cleanup from ReplyProcessError since now it is possible + // TODO Remove cleanup from ReplyQueryError since now it is possible // for TxCtx in ExplicitTransactions to leak } if (CleanupCtx) { @@ -1243,10 +1249,20 @@ public: } } - bool ReplyProcessError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, - const TString& message, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> *issues = {}) + template<class T> + static google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> MessageFromIssues(const T& issues) { + google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issueMessage; + for (const auto& i : issues) { + IssueToMessage(i, issueMessage.Add()); + } + + return issueMessage; + } + + bool ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, + const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {}) { - LOG_W(requestInfo << message); + LOG_W("Reply error on query request: " << requestInfo << " msg: " << message); auto ev = std::make_unique<TEvKqp::TEvQueryResponse>(); ev->Record.GetRef().SetYdbStatus(ydbStatus); @@ -1256,7 +1272,7 @@ public: auto *queryIssue = response->AddQueryIssues(); if (issues) { - queryIssue->Mutableissues()->Swap(issues); + queryIssue->Mutableissues()->Swap(&*issues); } IssueToMessage(TIssue{message}, queryIssue); @@ -1314,6 +1330,7 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleExecute); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); @@ -1353,7 +1370,7 @@ private: LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message); if (QueryState) { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - bool canContinue = ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message); + bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message); if (!canContinue) { PassAway(); } |