aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-14 17:26:47 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-03-14 17:26:47 +0300
commit05b2a699630e0d2e0a05f680c2d923b49e4d8ad0 (patch)
tree25899ada5e8619aa6d2f95c58ee20dc320d20fce
parentd5c1d40525bfa7ced9bb9becda8a54427459744b (diff)
downloadydb-05b2a699630e0d2e0a05f680c2d923b49e4d8ad0.tar.gz
Reply Busy to inappropriate TEvQueryRequests, KIKIMR-11938
ref:4379dfe801e8ef0c314dee9ff0c09b3b1af8942e
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp26
1 files changed, 26 insertions, 0 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index de7ce965f1..cdf90a39f1 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -353,6 +353,10 @@ public:
Become(&TKqpSessionActor::CompileState);
}
+ void HandleCompile(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ ReplyBusy(ev);
+ }
+
void HandleCompile(TEvKqp::TEvCompileResponse::TPtr& ev) {
auto compileResult = ev->Get()->CompileResult;
@@ -777,6 +781,10 @@ public:
void HandleNoop(TEvKqpExecuter::TEvExecuterProgress::TPtr& /*ev*/) {
}
+ void HandleExecute(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ ReplyBusy(ev);
+ }
+
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
auto* response = ev->Get()->Record.MutableResponse();
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
@@ -965,6 +973,22 @@ public:
return Reply(std::move(responseEv));
}
+ void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ auto& event = ev->Get()->Record;
+ auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId());
+
+ //ui64 proxyRequestId = ev->Cookie;
+ //if (!CheckRequest(requestInfo, ev->Sender, proxyRequestId, ctx)) {
+ // return;
+ //}
+
+ auto busyStatus = Settings.Service.GetUseSessionBusyStatus()
+ ? Ydb::StatusIds::SESSION_BUSY
+ : Ydb::StatusIds::PRECONDITION_FAILED;
+
+ ReplyProcessError(requestInfo, busyStatus, "Pending previous query completion");
+ }
+
bool Reply(std::unique_ptr<TEvKqp::TEvQueryResponse> responseEv) {
YQL_ENSURE(QueryState);
@@ -1225,6 +1249,7 @@ public:
STATEFN(CompileState) {
try {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqp::TEvQueryRequest, HandleCompile);
hFunc(TEvKqp::TEvCompileResponse, HandleCompile);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
@@ -1241,6 +1266,7 @@ public:
STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqp::TEvQueryRequest, HandleExecute);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);