aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-11-01 10:56:37 +0300
committergvit <gvit@ydb.tech>2023-11-01 11:52:11 +0300
commit28b7d19f293b1c4b5e43e69fd34700c1b07026fe (patch)
treea80105a55ce806ba0c1a19694b5a4cb0567231b9
parent967a51911964e64bd631f640b95cdfa5012809e8 (diff)
downloadydb-28b7d19f293b1c4b5e43e69fd34700c1b07026fe.tar.gz
remove ev process response from ydb KIKIMR-19227
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp8
-rw-r--r--ydb/core/grpc_services/query/rpc_attach_session.cpp20
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp12
-rw-r--r--ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp14
-rw-r--r--ydb/core/grpc_services/rpc_keep_alive.cpp11
-rw-r--r--ydb/core/grpc_services/rpc_kqp_base.h21
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp11
-rw-r--r--ydb/core/kqp/common/events/process_response.cpp14
-rw-r--r--ydb/core/kqp/common/events/process_response.h5
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp37
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp35
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp46
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp29
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp14
-rw-r--r--ydb/core/persqueue/cluster_tracker.cpp10
-rw-r--r--ydb/core/viewer/json_query.h5
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.cpp4
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.h2
-rw-r--r--ydb/core/ymq/actor/index_events_processor.cpp6
-rw-r--r--ydb/core/ymq/actor/index_events_processor.h2
-rw-r--r--ydb/core/ymq/actor/monitoring.cpp4
-rw-r--r--ydb/core/ymq/actor/monitoring.h4
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h2
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp8
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h2
-rw-r--r--ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp9
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h2
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp10
28 files changed, 77 insertions, 270 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 5fd43ef3515..93875328fae 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -270,13 +270,6 @@ private:
}
}
- void HandleQueryResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
- LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "failed to list topics: " << record);
-
- Reset(ctx);
- }
-
void HandleCheckVersionResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record.GetRef();
@@ -747,7 +740,6 @@ public:
HFunc(NActors::TEvents::TEvWakeup, HandleWakeup)
HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdate)
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse);
HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
diff --git a/ydb/core/grpc_services/query/rpc_attach_session.cpp b/ydb/core/grpc_services/query/rpc_attach_session.cpp
index 13dff91caef..5f695e7b3e2 100644
--- a/ydb/core/grpc_services/query/rpc_attach_session.cpp
+++ b/ydb/core/grpc_services/query/rpc_attach_session.cpp
@@ -30,7 +30,6 @@ public:
try {
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqp::TEvPingSessionResponse, HandleAttaching);
- hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleAttachin);
default:
UnexpectedEvent(__func__, ev);
}
@@ -114,6 +113,14 @@ private:
return ReplyFinishStream(Ydb::StatusIds::NOT_FOUND);
}
+ if (record.GetStatus() != Ydb::StatusIds::SUCCESS) {
+ if (record.GetIssues().size() > 0) {
+ Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetIssues().at(0).message()));
+ }
+
+ return ReplyFinishStream(record.GetStatus());
+ }
+
if (record.GetResponse().GetSessionStatus() != Ydb::Table::KeepAliveResult::SESSION_STATUS_READY) {
return ReplyFinishStream(Ydb::StatusIds::SESSION_BUSY);
}
@@ -153,17 +160,6 @@ private:
ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite()));
}
- void HandleAttachin(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) {
- const auto& record = ev->Get()->Record;
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- // KQP should not send TEvProcessResponse with SUCCESS for CreateSession rpc.
- // We expect TEvKqp::TEvPingSessionResponse instead.
- InternalError("Unexpected TEvProcessResponse with success status for PingSession request");
- } else {
- return ReplyResponseError(record);
- }
- }
-
void ReplyFinishStream(Ydb::StatusIds::StatusCode status) {
Request->ReplyWithYdbStatus(status);
Request->FinishStream(status);
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index e5a3dc14a4a..2b07aae0a79 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -210,7 +210,6 @@ private:
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
default:
UnexpectedEvent(__func__, ev);
}
@@ -374,17 +373,6 @@ private:
ReplyFinishStream(record.GetYdbStatus(), issues);
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext&) {
- auto& record = ev->Get()->Record;
-
- NYql::TIssues issues;
- if (record.HasError()) {
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetError()));
- }
-
- ReplyFinishStream(record.GetYdbStatus(), issues);
- }
-
private:
void HandleClientLost(const TActorContext& ctx) {
LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost");
diff --git a/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp
index 6e492cdec93..102f5a7c1eb 100644
--- a/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp
+++ b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp
@@ -83,7 +83,6 @@ private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
- hFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
hFunc(TEvents::TEvWakeup, Handle);
}
}
@@ -145,19 +144,6 @@ private:
return Reply(kqpResponse.GetYdbStatus());
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) {
- const auto& record = ev->Get()->Record;
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- // KQP should not send TEvProcessResponse with SUCCESS for CreateSession rpc.
- // We expect TEvKqp::TEvCreateSessionResponse instead.
- static const TString err = "Unexpected TEvProcessResponse with success status for CreateSession request";
- Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, err));
- Reply(Ydb::StatusIds::INTERNAL_ERROR);
- } else {
- return ReplyResponseError(record);
- }
- }
-
void Reply(Ydb::StatusIds::StatusCode status) {
Request->ReplyWithYdbStatus(status);
this->PassAway();
diff --git a/ydb/core/grpc_services/rpc_keep_alive.cpp b/ydb/core/grpc_services/rpc_keep_alive.cpp
index ff2f73df202..75e0a07b78a 100644
--- a/ydb/core/grpc_services/rpc_keep_alive.cpp
+++ b/ydb/core/grpc_services/rpc_keep_alive.cpp
@@ -36,7 +36,6 @@ public:
private:
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvPingSessionResponse, Handle);
default: TBase::StateWork(ev);
}
@@ -62,16 +61,6 @@ private:
ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- Ydb::Table::KeepAliveResult result;
- ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
- } else {
- return OnProcessError(record, ctx);
- }
- }
-
void Handle(NKqp::TEvKqp::TEvPingSessionResponse::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
diff --git a/ydb/core/grpc_services/rpc_kqp_base.h b/ydb/core/grpc_services/rpc_kqp_base.h
index 159dd8e61d7..698576763a7 100644
--- a/ydb/core/grpc_services/rpc_kqp_base.h
+++ b/ydb/core/grpc_services/rpc_kqp_base.h
@@ -94,7 +94,6 @@ public:
protected:
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
default: TBase::StateFuncBase(ev);
}
}
@@ -147,26 +146,6 @@ protected:
this->Die(ctx);
}
- void OnProcessError(const NKikimrKqp::TEvProcessResponse& kqpResponse, const TActorContext& ctx) {
- if (kqpResponse.HasError()) {
- NYql::TIssues issues;
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, kqpResponse.GetError()));
- return this->Reply(kqpResponse.GetYdbStatus(), issues, ctx);
- } else {
- return this->Reply(kqpResponse.GetYdbStatus(), ctx);
- }
- }
-
-private:
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
- NYql::TIssues issues;
- if (record.HasError()) {
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetError()));
- }
- return this->Reply(record.GetYdbStatus(), issues, ctx);
- }
-
private:
template<typename TKqpResponse>
void RaiseIssuesFromKqp(const TKqpResponse& kqpResponse) {
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 0f98cbb9489..e854fd5394d 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -187,7 +187,6 @@ private:
HFunc(TEvents::TEvWakeup, Handle);
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
@@ -321,16 +320,6 @@ private:
ReplyFinishStream(record.GetYdbStatus(), issues);
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext&) {
- const auto& kqpResponse = ev->Get()->Record;
- NYql::TIssues issues;
- if (kqpResponse.HasError()) {
- issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, kqpResponse.GetError()));
- }
-
- ReplyFinishStream(kqpResponse.GetYdbStatus(), issues);
- }
-
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();
diff --git a/ydb/core/kqp/common/events/process_response.cpp b/ydb/core/kqp/common/events/process_response.cpp
index fe71fdcfbb6..1f2f87f1319 100644
--- a/ydb/core/kqp/common/events/process_response.cpp
+++ b/ydb/core/kqp/common/events/process_response.cpp
@@ -1,18 +1,4 @@
#include "process_response.h"
namespace NKikimr::NKqp::NPrivateEvents {
-
-THolder<NKikimr::NKqp::NPrivateEvents::TEvProcessResponse> TEvProcessResponse::Error(Ydb::StatusIds::StatusCode ydbStatus, const TString& error) {
- auto ev = MakeHolder<TEvProcessResponse>();
- ev->Record.SetYdbStatus(ydbStatus);
- ev->Record.SetError(error);
- return ev;
-}
-
-THolder<NKikimr::NKqp::NPrivateEvents::TEvProcessResponse> TEvProcessResponse::Success() {
- auto ev = MakeHolder<TEvProcessResponse>();
- ev->Record.SetYdbStatus(Ydb::StatusIds::SUCCESS);
- return ev;
-}
-
} // namespace NKikimr::NKqp::NPrivateEvents
diff --git a/ydb/core/kqp/common/events/process_response.h b/ydb/core/kqp/common/events/process_response.h
index 3c0adea39ce..c5ed88717f9 100644
--- a/ydb/core/kqp/common/events/process_response.h
+++ b/ydb/core/kqp/common/events/process_response.h
@@ -7,9 +7,8 @@
namespace NKikimr::NKqp::NPrivateEvents {
struct TEvProcessResponse: public TEventPB<TEvProcessResponse, NKikimrKqp::TEvProcessResponse,
- TKqpEvents::EvProcessResponse> {
- static THolder<TEvProcessResponse> Error(Ydb::StatusIds::StatusCode ydbStatus, const TString& error);
- static THolder<TEvProcessResponse> Success();
+ TKqpEvents::EvProcessResponse>
+{
};
} // namespace NKikimr::NKqp::NPrivateEvents
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index f03d37119c0..79adead6f3b 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -126,24 +126,14 @@ public:
void Bootstrap(const TActorContext& ctx) {
TActorId kqpProxy = MakeKqpProxyID(ctx.SelfID.NodeId());
ctx.Send(kqpProxy, this->Request.Release());
-
this->Become(&TKqpRequestHandler::AwaitState);
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& kqpResponse = ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, ctx.SelfID
- << "Received process error for kqp query: " << kqpResponse.GetError());
-
- TBase::HandleError(kqpResponse.GetError(), ctx);
- }
-
using TBase::Handle;
using TBase::HandleResponse;
STFUNC(AwaitState) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(TResponse, HandleResponse);
default:
@@ -219,14 +209,6 @@ public:
Executions.push_back(std::move(*ev->Get()->Record.MutableProfile()));
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& kqpResponse = ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received process error for scan query: " << kqpResponse.GetError());
-
- TBase::HandleError(kqpResponse.GetError(), ctx);
- }
-
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
const TString msg = ev->Get()->GetIssues().ToOneLineString();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
@@ -251,7 +233,6 @@ public:
STFUNC(AwaitState) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
@@ -294,14 +275,6 @@ public:
this->Become(&TKqpStreamRequestHandler::AwaitState);
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& kqpResponse = ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, ctx.SelfID
- << "Received process error for kqp data query: " << kqpResponse.GetError());
-
- TBase::HandleError(kqpResponse.GetError(), ctx);
- }
-
using TBase::Promise;
using TBase::Callback;
@@ -355,7 +328,6 @@ public:
STFUNC(AwaitState) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(TResponse, HandleResponse);
HFunc(NKqp::TEvKqp::TEvDataQueryStreamPartAck, Handle);
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
@@ -414,14 +386,6 @@ public:
Executions.push_back(std::move(*ev->Get()->Record.MutableProfile()));
}
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& kqpResponse = ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received process error for scan query: " << kqpResponse.GetError());
-
- TBase::HandleError(kqpResponse.GetError(), ctx);
- }
-
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
const TString msg = ev->Get()->GetIssues().ToOneLineString();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
@@ -447,7 +411,6 @@ public:
STFUNC(AwaitState) {
switch (ev->GetTypeRewrite()) {
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 1d55ec71944..7965a46058c 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -23,6 +23,7 @@
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/mon/mon.h>
+#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/library/yql/utils/actor_log/log.h>
#include <ydb/library/yql/core/services/mounts/yql_mounts.h>
@@ -1238,7 +1239,7 @@ public:
hFunc(TEvKqp::TEvScriptRequest, Handle);
hFunc(TEvKqp::TEvCloseSessionRequest, Handle);
hFunc(TEvKqp::TEvQueryResponse, ForwardEvent);
- hFunc(TEvKqp::TEvProcessResponse, ForwardEvent);
+ hFunc(TEvKqp::TEvProcessResponse, Handle);
hFunc(TEvKqp::TEvCreateSessionRequest, Handle);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvCancelQueryRequest, Handle);
@@ -1289,10 +1290,38 @@ private:
Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus());
}
+
+ void Handle(TEvKqp::TEvProcessResponse::TPtr&ev) {
+ ReplyProcessError(ev->Get()->Record.GetYdbStatus(), ev->Get()->Record.GetError(), ev->Cookie);
+ }
+
bool ReplyProcessError(Ydb::StatusIds::StatusCode ydbStatus, const TString& message, ui64 requestId)
{
- auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
- return Send(SelfId(), response.Release(), 0, requestId);
+ auto issue = NKikimr::MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message);
+ NYql::TIssues issues;
+ issues.AddIssue(issue);
+ const auto request = PendingRequests.FindPtr(requestId);
+ if (!request) {
+ return true;
+ }
+
+ if (request->EventType == TKqpEvents::EvPingSessionRequest) {
+ auto response = std::make_unique<TEvKqp::TEvPingSessionResponse>();
+ response->Record.SetStatus(ydbStatus);
+ NYql::IssuesToMessage(issues, response->Record.MutableIssues());
+ return Send(SelfId(), response.release(), 0, requestId);
+ } else if (request->EventType == TKqpEvents::EvCreateSessionRequest) {
+ auto response = std::make_unique<TEvKqp::TEvCreateSessionResponse>();
+ response->Record.SetYdbStatus(ydbStatus);
+ response->Record.SetError(message);
+ return Send(SelfId(), response.release(), 0, requestId);
+ }
+
+ auto response = std::make_unique<TEvKqp::TEvQueryResponse>();
+ response->Record.GetRef().SetYdbStatus(ydbStatus);
+
+ NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues());
+ return Send(SelfId(), response.release(), 0, requestId);
}
bool CheckRequestDeadline(const TKqpRequestInfo& requestInfo, const TInstant deadline, TProcessResult<TKqpSessionInfo*>& result)
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
index 271c38680b7..e7074e6bf9b 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
@@ -110,8 +110,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
runtime->Send(new IEventHandle(kqpProxy, sender, ev.Release()));
TAutoPtr<IEventHandle> handle;
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvProcessResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST);
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST);
};
SendBadRequestToSession("ydb://session/1?id=ZjY5NWRlM2EtYWMyYjA5YWEtNzQ0MTVlYTMtM2Q4ZDgzOWQ=&node_id=1234&node_id=12345");
@@ -152,9 +152,9 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
runtime->Send(new IEventHandle(kqpProxy, sender, ev.Release()));
TAutoPtr<IEventHandle> handle;
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvProcessResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetError(), "<main>: Error: SomeUniqTextForUt\n");
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetResponse().GetQueryIssues().at(0).message(), "<main>: Error: SomeUniqTextForUt\n");
}
Y_UNIT_TEST(LoadedMetadataAfterCompilationTimeout) {
@@ -289,11 +289,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release()));
TAutoPtr<IEventHandle> handle;
- auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvQueryResponse, TEvKqp::TEvProcessResponse>(handle);
-
- TEvKqp::TEvQueryResponse* queryResponse = std::get<TEvKqp::TEvQueryResponse*>(reply);
- Y_ABORT_UNLESS(queryResponse);
- UNIT_ASSERT_VALUES_EQUAL(queryResponse->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
}
}
@@ -366,18 +363,14 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release()));
TAutoPtr<IEventHandle> handle;
- auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvQueryResponse, TEvKqp::TEvProcessResponse>(handle);
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(handle);
+ auto status = reply->Record.GetRef().GetYdbStatus();
+ UNIT_ASSERT(status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::TIMEOUT);
- TEvKqp::TEvQueryResponse* queryResponse = std::get<TEvKqp::TEvQueryResponse*>(reply);
- if (queryResponse) {
+ if (status == Ydb::StatusIds::SUCCESS) {
++SuccessStories;
- UNIT_ASSERT_VALUES_EQUAL(queryResponse->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- }
-
- TEvKqp::TEvProcessResponse* processResponse = std::get<TEvKqp::TEvProcessResponse*>(reply);
- if (processResponse) {
+ } else if (status == Ydb::StatusIds::TIMEOUT) {
++NegativeStories;
- UNIT_ASSERT_VALUES_EQUAL(processResponse->Record.GetYdbStatus(), Ydb::StatusIds::TIMEOUT);
}
}
@@ -389,18 +382,13 @@ Y_UNIT_TEST_SUITE(KqpProxy) {
runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release()));
TAutoPtr<IEventHandle> handle;
- auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvPingSessionResponse, TEvKqp::TEvProcessResponse>(handle);
-
- TEvKqp::TEvPingSessionResponse* pingResponse = std::get<TEvKqp::TEvPingSessionResponse*>(reply);
- if (pingResponse) {
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvPingSessionResponse>(handle);
+ auto status = reply->Record.GetStatus();
+ UNIT_ASSERT(status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::TIMEOUT);
+ if (status == Ydb::StatusIds::SUCCESS) {
++SuccessStories;
- UNIT_ASSERT_VALUES_EQUAL(pingResponse->Record.GetStatus(), Ydb::StatusIds::SUCCESS);
- }
-
- TEvKqp::TEvProcessResponse* processResponse = std::get<TEvKqp::TEvProcessResponse*>(reply);
- if (processResponse) {
+ } else if (status == Ydb::StatusIds::TIMEOUT) {
++NegativeStories;
- UNIT_ASSERT_VALUES_EQUAL(processResponse->Record.GetYdbStatus(), Ydb::StatusIds::TIMEOUT);
}
}
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c5d3197cb4e..53cf5e1915a 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -238,19 +238,6 @@ public:
Cleanup();
}
- void ForwardResponse(TEvKqp::TEvProcessResponse::TPtr& ev) {
- QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>();
- auto& record = QueryResponse->Record.GetRef();
- record.SetYdbStatus(ev->Get()->Record.GetYdbStatus());
-
- if (ev->Get()->Record.HasError()) {
- auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ev->Get()->Record.GetError());
- IssueToMessage(issue, record.MutableResponse()->AddQueryIssues());
- }
-
- Cleanup();
- }
-
void ReplyTransactionNotFound(const TString& txId) {
std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
TStringBuilder() << "Transaction not found: " << txId)};
@@ -1543,12 +1530,15 @@ public:
void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message)
{
- LOG_W("Reply process error, msg: " << message << " proxyRequestId: " << proxyRequestId);
-
- auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
-
- AddTrailingInfo(response->Record);
- Send(sender, response.Release(), 0, proxyRequestId);
+ LOG_W("Reply query error, msg: " << message << " proxyRequestId: " << proxyRequestId);
+ auto response = std::make_unique<TEvKqp::TEvQueryResponse>();
+ response->Record.GetRef().SetYdbStatus(ydbStatus);
+ auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message);
+ NYql::TIssues issues;
+ issues.AddIssue(issue);
+ NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues());
+ AddTrailingInfo(response->Record.GetRef());
+ Send(sender, response.release(), 0, proxyRequestId);
}
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -2001,7 +1991,6 @@ public:
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
- hFunc(TEvKqp::TEvProcessResponse, ForwardResponse);
default:
UnexpectedEvent("ExecuteState", ev);
}
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 3fb8fb73a12..d113737683b 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/kqp/host/kqp_host.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/library/aclib/aclib.h>
+#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/library/yql/utils/actor_log/log.h>
@@ -864,11 +865,14 @@ private:
Ydb::StatusIds::StatusCode ydbStatus, const TString& message)
{
LOG_W(message);
-
- auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message);
-
- AddTrailingInfo(response->Record);
- return Send(sender, response.Release(), 0, proxyRequestId);
+ auto response = std::make_unique<TEvKqp::TEvQueryResponse>();
+ response->Record.GetRef().SetYdbStatus(ydbStatus);
+ auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message);
+ NYql::TIssues issues;
+ issues.AddIssue(issue);
+ NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues());
+ AddTrailingInfo(response->Record.GetRef());
+ return Send(sender, response.release(), 0, proxyRequestId);
}
bool CheckRequest(const TString& eventSessionId, const TActorId& sender, ui64 proxyRequestId, const TActorContext&)
diff --git a/ydb/core/persqueue/cluster_tracker.cpp b/ydb/core/persqueue/cluster_tracker.cpp
index c5f51205215..2107b4c0df5 100644
--- a/ydb/core/persqueue/cluster_tracker.cpp
+++ b/ydb/core/persqueue/cluster_tracker.cpp
@@ -89,7 +89,6 @@ private:
hFunc(TEvClusterTracker::TEvSubscribe, HandleWhileWorking);
hFunc(TEvents::TEvWakeup, HandleWhileWorking);
hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWhileWorking);
- hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleWhileWorking);
}
}
@@ -164,15 +163,6 @@ private:
}
}
- void HandleWhileWorking(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) {
- const auto& record = ev->Get()->Record;
- LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record);
-
- ClustersList = nullptr;
-
- Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup);
- }
-
template<typename TProtoRecord>
void UpdateClustersList(const TProtoRecord& record) {
auto clustersList = MakeIntrusive<TClustersList>();
diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h
index 422e5a7916b..9afa37e795f 100644
--- a/ydb/core/viewer/json_query.h
+++ b/ydb/core/viewer/json_query.h
@@ -143,7 +143,6 @@ public:
hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected);
hFunc(TEvViewer::TEvViewerResponse, HandleReply);
hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply);
- hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply);
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply);
hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply);
hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply);
@@ -405,10 +404,6 @@ private:
Handle(*(ev.Get()->Get()->Record.MutableQueryResponse()));
}
- void HandleReply(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) {
- Y_UNUSED(ev);
- }
-
void HandleReply(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev) {
Y_UNUSED(ev);
}
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp
index bf5002f5c95..70cb640525a 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.cpp
+++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp
@@ -166,10 +166,6 @@ namespace NKikimr::NSQS {
}
}
}
-
- void TCleanupQueueDataActor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- HandleError(ev->Get()->Record.DebugString(), ctx);
- }
void TCleanupQueueDataActor::HandleError(const TString& error, const TActorContext& ctx) {
MonitoringCounters->CleanupRemovedQueuesErrors->Inc();
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h
index b6168d6f022..0ac6fb71ff0 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.h
+++ b/ydb/core/ymq/actor/cleanup_queue_data.h
@@ -31,13 +31,11 @@ public:
STRICT_STFUNC(StateFunc,
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
)
void RunGetQueuesQuery(EState state, TDuration sendAfter, const TActorContext& ctx);
void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
- void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
void HandleError(const TString& error, const TActorContext& ctx);
void LockQueueToRemove(TDuration runAfter, const TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp
index 67206be2fbf..04f105ac38e 100644
--- a/ydb/core/ymq/actor/index_events_processor.cpp
+++ b/ydb/core/ymq/actor/index_events_processor.cpp
@@ -97,12 +97,6 @@ void TSearchEventsProcessor::HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse:
}
}
-void TSearchEventsProcessor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record;
- LOG_ERROR_S(ctx, NKikimrServices::SQS, "YC Search events processor: failed to list ymq events: " << record);
- HandleFailure(ctx);
-}
-
void TSearchEventsProcessor::HandleFailure(const TActorContext& ctx) {
StopSession(ctx);
switch (State) {
diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h
index 29767c6c15c..162522e154e 100644
--- a/ydb/core/ymq/actor/index_events_processor.h
+++ b/ydb/core/ymq/actor/index_events_processor.h
@@ -74,13 +74,11 @@ private:
STRICT_STFUNC(StateFunc,
HFunc(NActors::TEvents::TEvWakeup, HandleWakeup);
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
HFunc(TEvents::TEvPoisonPill, HandlePoisonPill);
IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
)
void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
- void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp
index 9999e5279c5..50b18335c2e 100644
--- a/ydb/core/ymq/actor/monitoring.cpp
+++ b/ydb/core/ymq/actor/monitoring.cpp
@@ -63,8 +63,4 @@ namespace NKikimr::NSQS {
RequestMetrics(RetryPeriod, ctx);
}
- void TMonitoringActor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- HandleError(ev->Get()->Record.DebugString(), ctx);
- }
-
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/monitoring.h b/ydb/core/ymq/actor/monitoring.h
index 3aad685f0ff..fd882eef0a3 100644
--- a/ydb/core/ymq/actor/monitoring.h
+++ b/ydb/core/ymq/actor/monitoring.h
@@ -28,7 +28,6 @@ public:
STRICT_STFUNC(StateFunc,
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
)
@@ -37,9 +36,6 @@ public:
void HandleError(const TString& error, const TActorContext& ctx);
void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
- void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
-
-
private:
TIntrusivePtr<TMonitoringCounters> Counters;
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
index 1801779b26e..196f7f5c938 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h
@@ -400,7 +400,6 @@ private:
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
@@ -410,7 +409,6 @@ private:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx);
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx);
void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx);
void TryCloseSession(const TActorContext& ctx);
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp
index a88437fdbec..adafac71490 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp
@@ -74,13 +74,5 @@ void TClustersUpdater::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TA
}
-void TClustersUpdater::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) {
- auto& record = ev->Get()->Record;
-
- LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "can't update clusters " << record);
- ctx.Schedule(TDuration::Seconds(CLUSTERS_UPDATER_TIMEOUT_ON_ERROR), new TEvPQClustersUpdater::TEvUpdateClusters());
-}
-
-
}
}
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h
index efa13549631..b3fddca037e 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h
@@ -61,14 +61,12 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(TEvPQClustersUpdater::TEvUpdateClusters, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle);
}
}
void Handle(TEvPQClustersUpdater::TEvUpdateClusters::TPtr &ev, const TActorContext &ctx);
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx);
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx);
void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx);
};
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
index deac184e2fe..0e45c8e4a5b 100644
--- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
+++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
@@ -775,15 +775,6 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet
return ev;
}
-
-void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) {
- auto& record = ev->Get()->Record;
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID "
- << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record);
- CloseSession("Internal error on discovering partition", NPersQueue::NErrorCode::ERROR, ctx);
-}
-
-
void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorContext& ctx) {
Partition = partition;
auto it = PartitionToTablet.find(Partition);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 875090da21d..089583362c2 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -107,7 +107,6 @@ private:
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle);
HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
@@ -125,7 +124,6 @@ private:
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx);
- void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx);
void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx);
void TryCloseSession(const TActorContext& ctx);
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 380ed18155a..f9a39eeb93e 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1002,16 +1002,6 @@ void TWriteSessionActor<UseMigrationProtocol>::SendUpdateSrcIdsRequests(const TA
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) {
- auto& record = ev->Get()->Record;
-
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID "
- << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record);
-
- CloseSession("Internal error on discovering partition", PersQueue::ErrorCode::ERROR, ctx);
-}
-
-template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 partition, const TActorContext& ctx) {
Partition = partition;
auto it = PartitionToTablet.find(Partition);