diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-14 17:26:47 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-03-14 17:26:47 +0300 |
commit | 05b2a699630e0d2e0a05f680c2d923b49e4d8ad0 (patch) | |
tree | 25899ada5e8619aa6d2f95c58ee20dc320d20fce | |
parent | d5c1d40525bfa7ced9bb9becda8a54427459744b (diff) | |
download | ydb-05b2a699630e0d2e0a05f680c2d923b49e4d8ad0.tar.gz |
Reply Busy to inappropriate TEvQueryRequests, KIKIMR-11938
ref:4379dfe801e8ef0c314dee9ff0c09b3b1af8942e
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index de7ce965f1..cdf90a39f1 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -353,6 +353,10 @@ public: Become(&TKqpSessionActor::CompileState); } + void HandleCompile(TEvKqp::TEvQueryRequest::TPtr& ev) { + ReplyBusy(ev); + } + void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) { auto compileResult = ev->Get()->CompileResult; @@ -777,6 +781,10 @@ public: void HandleNoop(TEvKqpExecuter::TEvExecuterProgress::TPtr& /*ev*/) { } + void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) { + ReplyBusy(ev); + } + void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { auto* response = ev->Get()->Record.MutableResponse(); LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); @@ -965,6 +973,22 @@ public: return Reply(std::move(responseEv)); } + void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { + auto& event = ev->Get()->Record; + auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); + + //ui64 proxyRequestId = ev->Cookie; + //if (!CheckRequest(requestInfo, ev->Sender, proxyRequestId, ctx)) { + // return; + //} + + auto busyStatus = Settings.Service.GetUseSessionBusyStatus() + ? Ydb::StatusIds::SESSION_BUSY + : Ydb::StatusIds::PRECONDITION_FAILED; + + ReplyProcessError(requestInfo, busyStatus, "Pending previous query completion"); + } + bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) { YQL_ENSURE(QueryState); @@ -1225,6 +1249,7 @@ public: STATEFN(CompileState) { try { switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvQueryRequest, HandleCompile); hFunc(TEvKqp::TEvCompileResponse, HandleCompile); hFunc(TEvKqp::TEvPingSessionRequest, Handle); @@ -1241,6 +1266,7 @@ public: STATEFN(ExecuteState) { try { switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvQueryRequest, HandleExecute); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); |