diff options
| author | Vladislav Kuznetsov <[email protected]> | 2022-03-17 23:53:09 +0300 | 
|---|---|---|
| committer | Vladislav Kuznetsov <[email protected]> | 2022-03-17 23:53:09 +0300 | 
| commit | e1b0b0aa2748c0ceac0d114f04157da125032865 (patch) | |
| tree | a70bb237471e80dd4cff1ebf578b5e99193f8b91 | |
| parent | b7e25ac4eef0181a0b58bd86deb2d36553c7f67e (diff) | |
Enhance error reply from TSessionActor KIKIMR-11938
ref:cf84e7253d41417f192e43dd401f66db7bfe242a
| -rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 77 | 
1 files changed, 47 insertions, 30 deletions
| diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 96353152f9c..3452262989e 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -209,11 +209,9 @@ public:          const auto& txId = txControl.tx_id();          auto txCtx = FindTransaction(txId);          if (!txCtx) { -            google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; -            auto issue = YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() -                    << "Transaction not found: " << QueryState->TxId); -            IssueToMessage(issue, message.Add()); -            ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", &message); +            std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, +                TStringBuilder() << "Transaction not found: " << QueryState->TxId)}; +            ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues));          } else {              QueryState->TxCtx = txCtx;              txCtx->Invalidate(); @@ -540,11 +538,7 @@ public:          auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);          if (!success) {              YQL_ENSURE(!issues.Empty()); -            google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; -            for (const auto& i : issues) { -                IssueToMessage(i, message.Add()); -            } -            ReplyProcessError(requestInfo, *GetYdbStatus(issues.back()), "", &message); +            ReplyQueryError(requestInfo, *GetYdbStatus(issues.back()), "", MessageFromIssues(issues));              return;          } @@ -726,7 +720,7 @@ public:          auto& txCtx = *QueryState->TxCtx;          auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);          if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) { -            ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect"); +            ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect");              return;          } @@ -739,7 +733,7 @@ public:          while (tx->GetHasEffects()) {              if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) { -                ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, +                ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST,                          "Failed to mix queries with old- and new- engines");                  return;              } @@ -848,8 +842,10 @@ public:          LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());          if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { +            LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx +                    << " response->Status: " << response->GetStatus());              auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); -            ReplyProcessError(requestInfo, response->GetStatus(), "", response->MutableIssues()); +            ReplyQueryError(requestInfo, response->GetStatus(), "", *response->MutableIssues());              return;          } @@ -876,11 +872,8 @@ public:              auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), txCtx);              if (!success) {                  auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); -                google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message; -                for (const auto& i : issues) { -                    IssueToMessage(i, message.Add()); -                } -                ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED,  "Error while locks merge", &message); +                ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED,  "Error while locks merge", +                    MessageFromIssues(issues));                  return;              }          } @@ -893,6 +886,15 @@ public:          }      } +    void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { +        auto& msg = ev->Get()->Record; + +        const auto& issues = ev->Get()->GetIssues(); +        LOG_I("Got TEvAbortExecution, status: " << msg.GetStatusCode()); +        auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); +        ReplyQueryError(requestInfo, msg.GetStatusCode(), "Got AbortExecution", MessageFromIssues(issues)); +    } +      void FillStats(NKikimrKqp::TQueryResponse* response) {          // TODO          // Compile status @@ -1019,19 +1021,23 @@ public:      }      void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { +          auto& event = ev->Get()->Record;          auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); -        //ui64 proxyRequestId = ev->Cookie; -        //if (!CheckRequest(requestInfo, ev->Sender, proxyRequestId, ctx)) { -        //    return; -        //} +        ui64 proxyRequestId = ev->Cookie;          auto busyStatus = Settings.Service.GetUseSessionBusyStatus()              ? Ydb::StatusIds::SESSION_BUSY              : Ydb::StatusIds::PRECONDITION_FAILED; -        ReplyProcessError(requestInfo, busyStatus, "Pending previous query completion"); +        TString message = "Pending previous query completion"; +        LOG_W(requestInfo << " " << message); + +        auto response = TEvKqp::TEvProcessResponse::Error(busyStatus, message); + +        //AddTrailingInfo(response->Record); +        Send(ev->Sender, response.Release(), 0, proxyRequestId);      }      bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) { @@ -1113,9 +1119,9 @@ public:      void Handle(TEvKqp::TEvCloseSessionRequest::TPtr&) {          if (QueryState) {              auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); -            ReplyProcessError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED, +            ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED,                      "Request cancelled due to explicit session close request"); -            // TODO Remove cleanup from ReplyProcessError since now it is possible +            // TODO Remove cleanup from ReplyQueryError since now it is possible              // for TxCtx in ExplicitTransactions to leak          }          if (CleanupCtx) { @@ -1243,10 +1249,20 @@ public:          }      } -    bool ReplyProcessError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, -            const TString& message, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> *issues = {}) +    template<class T> +    static google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> MessageFromIssues(const T& issues) { +        google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issueMessage; +        for (const auto& i : issues) { +            IssueToMessage(i, issueMessage.Add()); +        } + +        return issueMessage; +    } + +    bool ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus, +        const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {})      { -        LOG_W(requestInfo << message); +        LOG_W("Reply error on query request: " << requestInfo << " msg: " << message);          auto ev = std::make_unique<TEvKqp::TEvQueryResponse>();          ev->Record.GetRef().SetYdbStatus(ydbStatus); @@ -1256,7 +1272,7 @@ public:          auto *queryIssue = response->AddQueryIssues();          if (issues) { -            queryIssue->Mutableissues()->Swap(issues); +            queryIssue->Mutableissues()->Swap(&*issues);          }          IssueToMessage(TIssue{message}, queryIssue); @@ -1314,6 +1330,7 @@ public:                  hFunc(TEvKqp::TEvQueryRequest, HandleExecute);                  hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);                  hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); +                hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);                  hFunc(TEvKqp::TEvPingSessionRequest, Handle);                  hFunc(TEvKqp::TEvCloseSessionRequest, Handle); @@ -1353,7 +1370,7 @@ private:          LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message);          if (QueryState) {              auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); -            bool canContinue = ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message); +            bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message);              if (!canContinue) {                  PassAway();              } | 
