aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-17 23:53:09 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-17 23:53:09 +0300
commite1b0b0aa2748c0ceac0d114f04157da125032865 (patch)
treea70bb237471e80dd4cff1ebf578b5e99193f8b91
parentb7e25ac4eef0181a0b58bd86deb2d36553c7f67e (diff)
downloadydb-e1b0b0aa2748c0ceac0d114f04157da125032865.tar.gz
Enhance error reply from TSessionActor KIKIMR-11938
ref:cf84e7253d41417f192e43dd401f66db7bfe242a
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp77
1 files changed, 47 insertions, 30 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 96353152f9..3452262989 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -209,11 +209,9 @@ public:
const auto& txId = txControl.tx_id();
auto txCtx = FindTransaction(txId);
if (!txCtx) {
- google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message;
- auto issue = YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder()
- << "Transaction not found: " << QueryState->TxId);
- IssueToMessage(issue, message.Add());
- ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", &message);
+ std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
+ TStringBuilder() << "Transaction not found: " << QueryState->TxId)};
+ ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues));
} else {
QueryState->TxCtx = txCtx;
txCtx->Invalidate();
@@ -540,11 +538,7 @@ public:
auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
if (!success) {
YQL_ENSURE(!issues.Empty());
- google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message;
- for (const auto& i : issues) {
- IssueToMessage(i, message.Add());
- }
- ReplyProcessError(requestInfo, *GetYdbStatus(issues.back()), "", &message);
+ ReplyQueryError(requestInfo, *GetYdbStatus(issues.back()), "", MessageFromIssues(issues));
return;
}
@@ -726,7 +720,7 @@ public:
auto& txCtx = *QueryState->TxCtx;
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
if (!txCtx.DeferredEffects.Empty() && txCtx.Locks.Broken()) {
- ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect");
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while AddDeferredEffect");
return;
}
@@ -739,7 +733,7 @@ public:
while (tx->GetHasEffects()) {
if (!txCtx.AddDeferredEffect(tx, CreateKqpValueMap(*tx))) {
- ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST,
+ ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST,
"Failed to mix queries with old- and new- engines");
return;
}
@@ -848,8 +842,10 @@ public:
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
+ LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx
+ << " response->Status: " << response->GetStatus());
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyProcessError(requestInfo, response->GetStatus(), "", response->MutableIssues());
+ ReplyQueryError(requestInfo, response->GetStatus(), "", *response->MutableIssues());
return;
}
@@ -876,11 +872,8 @@ public:
auto [success, issues] = MergeLocks(locks.GetType(), locks.GetValue(), txCtx);
if (!success) {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> message;
- for (const auto& i : issues) {
- IssueToMessage(i, message.Add());
- }
- ReplyProcessError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge", &message);
+ ReplyQueryError(requestInfo, Ydb::StatusIds::ABORTED, "Error while locks merge",
+ MessageFromIssues(issues));
return;
}
}
@@ -893,6 +886,15 @@ public:
}
}
+ void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
+ auto& msg = ev->Get()->Record;
+
+ const auto& issues = ev->Get()->GetIssues();
+ LOG_I("Got TEvAbortExecution, status: " << msg.GetStatusCode());
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ ReplyQueryError(requestInfo, msg.GetStatusCode(), "Got AbortExecution", MessageFromIssues(issues));
+ }
+
void FillStats(NKikimrKqp::TQueryResponse* response) {
// TODO
// Compile status
@@ -1019,19 +1021,23 @@ public:
}
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
+
auto& event = ev->Get()->Record;
auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId());
- //ui64 proxyRequestId = ev->Cookie;
- //if (!CheckRequest(requestInfo, ev->Sender, proxyRequestId, ctx)) {
- // return;
- //}
+ ui64 proxyRequestId = ev->Cookie;
auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
? Ydb::StatusIds::SESSION_BUSY
: Ydb::StatusIds::PRECONDITION_FAILED;
- ReplyProcessError(requestInfo, busyStatus, "Pending previous query completion");
+ TString message = "Pending previous query completion";
+ LOG_W(requestInfo << " " << message);
+
+ auto response = TEvKqp::TEvProcessResponse::Error(busyStatus, message);
+
+ //AddTrailingInfo(response->Record);
+ Send(ev->Sender, response.Release(), 0, proxyRequestId);
}
bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) {
@@ -1113,9 +1119,9 @@ public:
void Handle(TEvKqp::TEvCloseSessionRequest::TPtr&) {
if (QueryState) {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- ReplyProcessError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED,
+ ReplyQueryError(requestInfo, Ydb::StatusIds::SESSION_EXPIRED,
"Request cancelled due to explicit session close request");
- // TODO Remove cleanup from ReplyProcessError since now it is possible
+ // TODO Remove cleanup from ReplyQueryError since now it is possible
// for TxCtx in ExplicitTransactions to leak
}
if (CleanupCtx) {
@@ -1243,10 +1249,20 @@ public:
}
}
- bool ReplyProcessError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus,
- const TString& message, google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> *issues = {})
+ template<class T>
+ static google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> MessageFromIssues(const T& issues) {
+ google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issueMessage;
+ for (const auto& i : issues) {
+ IssueToMessage(i, issueMessage.Add());
+ }
+
+ return issueMessage;
+ }
+
+ bool ReplyQueryError(const TKqpRequestInfo& requestInfo, Ydb::StatusIds::StatusCode ydbStatus,
+ const TString& message, std::optional<google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>> issues = {})
{
- LOG_W(requestInfo << message);
+ LOG_W("Reply error on query request: " << requestInfo << " msg: " << message);
auto ev = std::make_unique<TEvKqp::TEvQueryResponse>();
ev->Record.GetRef().SetYdbStatus(ydbStatus);
@@ -1256,7 +1272,7 @@ public:
auto *queryIssue = response->AddQueryIssues();
if (issues) {
- queryIssue->Mutableissues()->Swap(issues);
+ queryIssue->Mutableissues()->Swap(&*issues);
}
IssueToMessage(TIssue{message}, queryIssue);
@@ -1314,6 +1330,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleExecute);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
@@ -1353,7 +1370,7 @@ private:
LOG_E("Internal error, SelfId: " << SelfId() << ", message: " << message);
if (QueryState) {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- bool canContinue = ReplyProcessError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message);
+ bool canContinue = ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, message);
if (!canContinue) {
PassAway();
}