diff options
author | gvit <gvit@ydb.tech> | 2023-11-01 10:56:37 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-11-01 11:52:11 +0300 |
commit | 28b7d19f293b1c4b5e43e69fd34700c1b07026fe (patch) | |
tree | a80105a55ce806ba0c1a19694b5a4cb0567231b9 | |
parent | 967a51911964e64bd631f640b95cdfa5012809e8 (diff) | |
download | ydb-28b7d19f293b1c4b5e43e69fd34700c1b07026fe.tar.gz |
remove ev process response from ydb KIKIMR-19227
28 files changed, 77 insertions, 270 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 5fd43ef3515..93875328fae 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -270,13 +270,6 @@ private: } } - void HandleQueryResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "failed to list topics: " << record); - - Reset(ctx); - } - void HandleCheckVersionResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record.GetRef(); @@ -747,7 +740,6 @@ public: HFunc(NActors::TEvents::TEvWakeup, HandleWakeup) HFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, HandleClustersUpdate) HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse); - HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse); HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) diff --git a/ydb/core/grpc_services/query/rpc_attach_session.cpp b/ydb/core/grpc_services/query/rpc_attach_session.cpp index 13dff91caef..5f695e7b3e2 100644 --- a/ydb/core/grpc_services/query/rpc_attach_session.cpp +++ b/ydb/core/grpc_services/query/rpc_attach_session.cpp @@ -30,7 +30,6 @@ public: try { switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqp::TEvPingSessionResponse, HandleAttaching); - hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleAttachin); default: UnexpectedEvent(__func__, ev); } @@ -114,6 +113,14 @@ private: return ReplyFinishStream(Ydb::StatusIds::NOT_FOUND); } + if (record.GetStatus() != Ydb::StatusIds::SUCCESS) { + if (record.GetIssues().size() > 0) { + Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetIssues().at(0).message())); + } + + return ReplyFinishStream(record.GetStatus()); + } + if (record.GetResponse().GetSessionStatus() != Ydb::Table::KeepAliveResult::SESSION_STATUS_READY) { return ReplyFinishStream(Ydb::StatusIds::SESSION_BUSY); } @@ -153,17 +160,6 @@ private: ev->GetTypeName() << Sprintf("(0x%08" PRIx32 ")", ev->GetTypeRewrite())); } - void HandleAttachin(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { - const auto& record = ev->Get()->Record; - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - // KQP should not send TEvProcessResponse with SUCCESS for CreateSession rpc. - // We expect TEvKqp::TEvPingSessionResponse instead. - InternalError("Unexpected TEvProcessResponse with success status for PingSession request"); - } else { - return ReplyResponseError(record); - } - } - void ReplyFinishStream(Ydb::StatusIds::StatusCode status) { Request->ReplyWithYdbStatus(status); Request->FinishStream(status); diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index e5a3dc14a4a..2b07aae0a79 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -210,7 +210,6 @@ private: HFunc(TRpcServices::TEvGrpcNextReply, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); default: UnexpectedEvent(__func__, ev); } @@ -374,17 +373,6 @@ private: ReplyFinishStream(record.GetYdbStatus(), issues); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext&) { - auto& record = ev->Get()->Record; - - NYql::TIssues issues; - if (record.HasError()) { - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetError())); - } - - ReplyFinishStream(record.GetYdbStatus(), issues); - } - private: void HandleClientLost(const TActorContext& ctx) { LOG_WARN_S(ctx, NKikimrServices::RPC_REQUEST, "Client lost"); diff --git a/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp index 6e492cdec93..102f5a7c1eb 100644 --- a/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp +++ b/ydb/core/grpc_services/rpc_common/rpc_common_kqp_session.cpp @@ -83,7 +83,6 @@ private: void StateWork(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); - hFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); hFunc(TEvents::TEvWakeup, Handle); } } @@ -145,19 +144,6 @@ private: return Reply(kqpResponse.GetYdbStatus()); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { - const auto& record = ev->Get()->Record; - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - // KQP should not send TEvProcessResponse with SUCCESS for CreateSession rpc. - // We expect TEvKqp::TEvCreateSessionResponse instead. - static const TString err = "Unexpected TEvProcessResponse with success status for CreateSession request"; - Request->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, err)); - Reply(Ydb::StatusIds::INTERNAL_ERROR); - } else { - return ReplyResponseError(record); - } - } - void Reply(Ydb::StatusIds::StatusCode status) { Request->ReplyWithYdbStatus(status); this->PassAway(); diff --git a/ydb/core/grpc_services/rpc_keep_alive.cpp b/ydb/core/grpc_services/rpc_keep_alive.cpp index ff2f73df202..75e0a07b78a 100644 --- a/ydb/core/grpc_services/rpc_keep_alive.cpp +++ b/ydb/core/grpc_services/rpc_keep_alive.cpp @@ -36,7 +36,6 @@ public: private: void StateWork(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvPingSessionResponse, Handle); default: TBase::StateWork(ev); } @@ -62,16 +61,6 @@ private: ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - Ydb::Table::KeepAliveResult result; - ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); - } else { - return OnProcessError(record, ctx); - } - } - void Handle(NKqp::TEvKqp::TEvPingSessionResponse::TPtr& ev, const TActorContext& ctx) { const auto& record = ev->Get()->Record; diff --git a/ydb/core/grpc_services/rpc_kqp_base.h b/ydb/core/grpc_services/rpc_kqp_base.h index 159dd8e61d7..698576763a7 100644 --- a/ydb/core/grpc_services/rpc_kqp_base.h +++ b/ydb/core/grpc_services/rpc_kqp_base.h @@ -94,7 +94,6 @@ public: protected: void StateWork(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); default: TBase::StateFuncBase(ev); } } @@ -147,26 +146,6 @@ protected: this->Die(ctx); } - void OnProcessError(const NKikimrKqp::TEvProcessResponse& kqpResponse, const TActorContext& ctx) { - if (kqpResponse.HasError()) { - NYql::TIssues issues; - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, kqpResponse.GetError())); - return this->Reply(kqpResponse.GetYdbStatus(), issues, ctx); - } else { - return this->Reply(kqpResponse.GetYdbStatus(), ctx); - } - } - -private: - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; - NYql::TIssues issues; - if (record.HasError()) { - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, record.GetError())); - } - return this->Reply(record.GetYdbStatus(), issues, ctx); - } - private: template<typename TKqpResponse> void RaiseIssuesFromKqp(const TKqpResponse& kqpResponse) { diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 0f98cbb9489..e854fd5394d 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -187,7 +187,6 @@ private: HFunc(TEvents::TEvWakeup, Handle); HFunc(TRpcServices::TEvGrpcNextReply, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); @@ -321,16 +320,6 @@ private: ReplyFinishStream(record.GetYdbStatus(), issues); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext&) { - const auto& kqpResponse = ev->Get()->Record; - NYql::TIssues issues; - if (kqpResponse.HasError()) { - issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, kqpResponse.GetError())); - } - - ReplyFinishStream(kqpResponse.GetYdbStatus(), issues); - } - void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; NYql::TIssues issues = ev->Get()->GetIssues(); diff --git a/ydb/core/kqp/common/events/process_response.cpp b/ydb/core/kqp/common/events/process_response.cpp index fe71fdcfbb6..1f2f87f1319 100644 --- a/ydb/core/kqp/common/events/process_response.cpp +++ b/ydb/core/kqp/common/events/process_response.cpp @@ -1,18 +1,4 @@ #include "process_response.h" namespace NKikimr::NKqp::NPrivateEvents { - -THolder<NKikimr::NKqp::NPrivateEvents::TEvProcessResponse> TEvProcessResponse::Error(Ydb::StatusIds::StatusCode ydbStatus, const TString& error) { - auto ev = MakeHolder<TEvProcessResponse>(); - ev->Record.SetYdbStatus(ydbStatus); - ev->Record.SetError(error); - return ev; -} - -THolder<NKikimr::NKqp::NPrivateEvents::TEvProcessResponse> TEvProcessResponse::Success() { - auto ev = MakeHolder<TEvProcessResponse>(); - ev->Record.SetYdbStatus(Ydb::StatusIds::SUCCESS); - return ev; -} - } // namespace NKikimr::NKqp::NPrivateEvents diff --git a/ydb/core/kqp/common/events/process_response.h b/ydb/core/kqp/common/events/process_response.h index 3c0adea39ce..c5ed88717f9 100644 --- a/ydb/core/kqp/common/events/process_response.h +++ b/ydb/core/kqp/common/events/process_response.h @@ -7,9 +7,8 @@ namespace NKikimr::NKqp::NPrivateEvents { struct TEvProcessResponse: public TEventPB<TEvProcessResponse, NKikimrKqp::TEvProcessResponse, - TKqpEvents::EvProcessResponse> { - static THolder<TEvProcessResponse> Error(Ydb::StatusIds::StatusCode ydbStatus, const TString& error); - static THolder<TEvProcessResponse> Success(); + TKqpEvents::EvProcessResponse> +{ }; } // namespace NKikimr::NKqp::NPrivateEvents diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index f03d37119c0..79adead6f3b 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -126,24 +126,14 @@ public: void Bootstrap(const TActorContext& ctx) { TActorId kqpProxy = MakeKqpProxyID(ctx.SelfID.NodeId()); ctx.Send(kqpProxy, this->Request.Release()); - this->Become(&TKqpRequestHandler::AwaitState); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& kqpResponse = ev->Get()->Record; - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, ctx.SelfID - << "Received process error for kqp query: " << kqpResponse.GetError()); - - TBase::HandleError(kqpResponse.GetError(), ctx); - } - using TBase::Handle; using TBase::HandleResponse; STFUNC(AwaitState) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(TResponse, HandleResponse); default: @@ -219,14 +209,6 @@ public: Executions.push_back(std::move(*ev->Get()->Record.MutableProfile())); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& kqpResponse = ev->Get()->Record; - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received process error for scan query: " << kqpResponse.GetError()); - - TBase::HandleError(kqpResponse.GetError(), ctx); - } - void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { const TString msg = ev->Get()->GetIssues().ToOneLineString(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() @@ -251,7 +233,6 @@ public: STFUNC(AwaitState) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); @@ -294,14 +275,6 @@ public: this->Become(&TKqpStreamRequestHandler::AwaitState); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& kqpResponse = ev->Get()->Record; - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, ctx.SelfID - << "Received process error for kqp data query: " << kqpResponse.GetError()); - - TBase::HandleError(kqpResponse.GetError(), ctx); - } - using TBase::Promise; using TBase::Callback; @@ -355,7 +328,6 @@ public: STFUNC(AwaitState) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(TResponse, HandleResponse); HFunc(NKqp::TEvKqp::TEvDataQueryStreamPartAck, Handle); HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); @@ -414,14 +386,6 @@ public: Executions.push_back(std::move(*ev->Get()->Record.MutableProfile())); } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& kqpResponse = ev->Get()->Record; - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received process error for scan query: " << kqpResponse.GetError()); - - TBase::HandleError(kqpResponse.GetError(), ctx); - } - void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { const TString msg = ev->Get()->GetIssues().ToOneLineString(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() @@ -447,7 +411,6 @@ public: STFUNC(AwaitState) { switch (ev->GetTypeRewrite()) { - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 1d55ec71944..7965a46058c 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -23,6 +23,7 @@ #include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/core/mon/mon.h> +#include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/core/services/mounts/yql_mounts.h> @@ -1238,7 +1239,7 @@ public: hFunc(TEvKqp::TEvScriptRequest, Handle); hFunc(TEvKqp::TEvCloseSessionRequest, Handle); hFunc(TEvKqp::TEvQueryResponse, ForwardEvent); - hFunc(TEvKqp::TEvProcessResponse, ForwardEvent); + hFunc(TEvKqp::TEvProcessResponse, Handle); hFunc(TEvKqp::TEvCreateSessionRequest, Handle); hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvCancelQueryRequest, Handle); @@ -1289,10 +1290,38 @@ private: Counters->ReportResponseStatus(dbCounters, event.ByteSize(), event.GetStatus()); } + + void Handle(TEvKqp::TEvProcessResponse::TPtr&ev) { + ReplyProcessError(ev->Get()->Record.GetYdbStatus(), ev->Get()->Record.GetError(), ev->Cookie); + } + bool ReplyProcessError(Ydb::StatusIds::StatusCode ydbStatus, const TString& message, ui64 requestId) { - auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); - return Send(SelfId(), response.Release(), 0, requestId); + auto issue = NKikimr::MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message); + NYql::TIssues issues; + issues.AddIssue(issue); + const auto request = PendingRequests.FindPtr(requestId); + if (!request) { + return true; + } + + if (request->EventType == TKqpEvents::EvPingSessionRequest) { + auto response = std::make_unique<TEvKqp::TEvPingSessionResponse>(); + response->Record.SetStatus(ydbStatus); + NYql::IssuesToMessage(issues, response->Record.MutableIssues()); + return Send(SelfId(), response.release(), 0, requestId); + } else if (request->EventType == TKqpEvents::EvCreateSessionRequest) { + auto response = std::make_unique<TEvKqp::TEvCreateSessionResponse>(); + response->Record.SetYdbStatus(ydbStatus); + response->Record.SetError(message); + return Send(SelfId(), response.release(), 0, requestId); + } + + auto response = std::make_unique<TEvKqp::TEvQueryResponse>(); + response->Record.GetRef().SetYdbStatus(ydbStatus); + + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + return Send(SelfId(), response.release(), 0, requestId); } bool CheckRequestDeadline(const TKqpRequestInfo& requestInfo, const TInstant deadline, TProcessResult<TKqpSessionInfo*>& result) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index 271c38680b7..e7074e6bf9b 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -110,8 +110,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) { runtime->Send(new IEventHandle(kqpProxy, sender, ev.Release())); TAutoPtr<IEventHandle> handle; - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvProcessResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST); }; SendBadRequestToSession("ydb://session/1?id=ZjY5NWRlM2EtYWMyYjA5YWEtNzQ0MTVlYTMtM2Q4ZDgzOWQ=&node_id=1234&node_id=12345"); @@ -152,9 +152,9 @@ Y_UNIT_TEST_SUITE(KqpProxy) { runtime->Send(new IEventHandle(kqpProxy, sender, ev.Release())); TAutoPtr<IEventHandle> handle; - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvProcessResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetError(), "<main>: Error: SomeUniqTextForUt\n"); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::BAD_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetResponse().GetQueryIssues().at(0).message(), "<main>: Error: SomeUniqTextForUt\n"); } Y_UNIT_TEST(LoadedMetadataAfterCompilationTimeout) { @@ -289,11 +289,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) { runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release())); TAutoPtr<IEventHandle> handle; - auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvQueryResponse, TEvKqp::TEvProcessResponse>(handle); - - TEvKqp::TEvQueryResponse* queryResponse = std::get<TEvKqp::TEvQueryResponse*>(reply); - Y_ABORT_UNLESS(queryResponse); - UNIT_ASSERT_VALUES_EQUAL(queryResponse->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(handle); + UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); } } @@ -366,18 +363,14 @@ Y_UNIT_TEST_SUITE(KqpProxy) { runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release())); TAutoPtr<IEventHandle> handle; - auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvQueryResponse, TEvKqp::TEvProcessResponse>(handle); + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(handle); + auto status = reply->Record.GetRef().GetYdbStatus(); + UNIT_ASSERT(status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::TIMEOUT); - TEvKqp::TEvQueryResponse* queryResponse = std::get<TEvKqp::TEvQueryResponse*>(reply); - if (queryResponse) { + if (status == Ydb::StatusIds::SUCCESS) { ++SuccessStories; - UNIT_ASSERT_VALUES_EQUAL(queryResponse->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - } - - TEvKqp::TEvProcessResponse* processResponse = std::get<TEvKqp::TEvProcessResponse*>(reply); - if (processResponse) { + } else if (status == Ydb::StatusIds::TIMEOUT) { ++NegativeStories; - UNIT_ASSERT_VALUES_EQUAL(processResponse->Record.GetYdbStatus(), Ydb::StatusIds::TIMEOUT); } } @@ -389,18 +382,13 @@ Y_UNIT_TEST_SUITE(KqpProxy) { runtime->Send(new IEventHandle(kqpProxy1, sender, ev.Release())); TAutoPtr<IEventHandle> handle; - auto reply = runtime->GrabEdgeEventsRethrow<TEvKqp::TEvPingSessionResponse, TEvKqp::TEvProcessResponse>(handle); - - TEvKqp::TEvPingSessionResponse* pingResponse = std::get<TEvKqp::TEvPingSessionResponse*>(reply); - if (pingResponse) { + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvPingSessionResponse>(handle); + auto status = reply->Record.GetStatus(); + UNIT_ASSERT(status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::TIMEOUT); + if (status == Ydb::StatusIds::SUCCESS) { ++SuccessStories; - UNIT_ASSERT_VALUES_EQUAL(pingResponse->Record.GetStatus(), Ydb::StatusIds::SUCCESS); - } - - TEvKqp::TEvProcessResponse* processResponse = std::get<TEvKqp::TEvProcessResponse*>(reply); - if (processResponse) { + } else if (status == Ydb::StatusIds::TIMEOUT) { ++NegativeStories; - UNIT_ASSERT_VALUES_EQUAL(processResponse->Record.GetYdbStatus(), Ydb::StatusIds::TIMEOUT); } } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c5d3197cb4e..53cf5e1915a 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -238,19 +238,6 @@ public: Cleanup(); } - void ForwardResponse(TEvKqp::TEvProcessResponse::TPtr& ev) { - QueryResponse = std::make_unique<TEvKqp::TEvQueryResponse>(); - auto& record = QueryResponse->Record.GetRef(); - record.SetYdbStatus(ev->Get()->Record.GetYdbStatus()); - - if (ev->Get()->Record.HasError()) { - auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, ev->Get()->Record.GetError()); - IssueToMessage(issue, record.MutableResponse()->AddQueryIssues()); - } - - Cleanup(); - } - void ReplyTransactionNotFound(const TString& txId) { std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, TStringBuilder() << "Transaction not found: " << txId)}; @@ -1543,12 +1530,15 @@ public: void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus, const TString& message) { - LOG_W("Reply process error, msg: " << message << " proxyRequestId: " << proxyRequestId); - - auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); - - AddTrailingInfo(response->Record); - Send(sender, response.Release(), 0, proxyRequestId); + LOG_W("Reply query error, msg: " << message << " proxyRequestId: " << proxyRequestId); + auto response = std::make_unique<TEvKqp::TEvQueryResponse>(); + response->Record.GetRef().SetYdbStatus(ydbStatus); + auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message); + NYql::TIssues issues; + issues.AddIssue(issue); + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + AddTrailingInfo(response->Record.GetRef()); + Send(sender, response.release(), 0, proxyRequestId); } void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -2001,7 +1991,6 @@ public: // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); - hFunc(TEvKqp::TEvProcessResponse, ForwardResponse); default: UnexpectedEvent("ExecuteState", ev); } diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 3fb8fb73a12..d113737683b 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -11,6 +11,7 @@ #include <ydb/core/kqp/host/kqp_host.h> #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/library/aclib/aclib.h> +#include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/library/yql/utils/actor_log/log.h> @@ -864,11 +865,14 @@ private: Ydb::StatusIds::StatusCode ydbStatus, const TString& message) { LOG_W(message); - - auto response = TEvKqp::TEvProcessResponse::Error(ydbStatus, message); - - AddTrailingInfo(response->Record); - return Send(sender, response.Release(), 0, proxyRequestId); + auto response = std::make_unique<TEvKqp::TEvQueryResponse>(); + response->Record.GetRef().SetYdbStatus(ydbStatus); + auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, message); + NYql::TIssues issues; + issues.AddIssue(issue); + NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues()); + AddTrailingInfo(response->Record.GetRef()); + return Send(sender, response.release(), 0, proxyRequestId); } bool CheckRequest(const TString& eventSessionId, const TActorId& sender, ui64 proxyRequestId, const TActorContext&) diff --git a/ydb/core/persqueue/cluster_tracker.cpp b/ydb/core/persqueue/cluster_tracker.cpp index c5f51205215..2107b4c0df5 100644 --- a/ydb/core/persqueue/cluster_tracker.cpp +++ b/ydb/core/persqueue/cluster_tracker.cpp @@ -89,7 +89,6 @@ private: hFunc(TEvClusterTracker::TEvSubscribe, HandleWhileWorking); hFunc(TEvents::TEvWakeup, HandleWhileWorking); hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWhileWorking); - hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleWhileWorking); } } @@ -164,15 +163,6 @@ private: } } - void HandleWhileWorking(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { - const auto& record = ev->Get()->Record; - LOG_ERROR_S(Ctx(), NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, "failed to list clusters: " << record); - - ClustersList = nullptr; - - Schedule(TDuration::Seconds(Cfg().GetClustersUpdateTimeoutOnErrorSec()), new TEvents::TEvWakeup); - } - template<typename TProtoRecord> void UpdateClustersList(const TProtoRecord& record) { auto clustersList = MakeIntrusive<TClustersList>(); diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h index 422e5a7916b..9afa37e795f 100644 --- a/ydb/core/viewer/json_query.h +++ b/ydb/core/viewer/json_query.h @@ -143,7 +143,6 @@ public: hFunc(TEvInterconnect::TEvNodeDisconnected, Disconnected); hFunc(TEvViewer::TEvViewerResponse, HandleReply); hFunc(NKqp::TEvKqp::TEvQueryResponse, HandleReply); - hFunc(NKqp::TEvKqp::TEvProcessResponse, HandleReply); hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply); hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply); hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply); @@ -405,10 +404,6 @@ private: Handle(*(ev.Get()->Get()->Record.MutableQueryResponse())); } - void HandleReply(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev) { - Y_UNUSED(ev); - } - void HandleReply(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev) { Y_UNUSED(ev); } diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp index bf5002f5c95..70cb640525a 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.cpp +++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp @@ -166,10 +166,6 @@ namespace NKikimr::NSQS { } } } - - void TCleanupQueueDataActor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - HandleError(ev->Get()->Record.DebugString(), ctx); - } void TCleanupQueueDataActor::HandleError(const TString& error, const TActorContext& ctx) { MonitoringCounters->CleanupRemovedQueuesErrors->Inc(); diff --git a/ydb/core/ymq/actor/cleanup_queue_data.h b/ydb/core/ymq/actor/cleanup_queue_data.h index b6168d6f022..0ac6fb71ff0 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.h +++ b/ydb/core/ymq/actor/cleanup_queue_data.h @@ -31,13 +31,11 @@ public: STRICT_STFUNC(StateFunc,
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
)
void RunGetQueuesQuery(EState state, TDuration sendAfter, const TActorContext& ctx);
void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
- void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
void HandleError(const TString& error, const TActorContext& ctx);
void LockQueueToRemove(TDuration runAfter, const TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/index_events_processor.cpp b/ydb/core/ymq/actor/index_events_processor.cpp index 67206be2fbf..04f105ac38e 100644 --- a/ydb/core/ymq/actor/index_events_processor.cpp +++ b/ydb/core/ymq/actor/index_events_processor.cpp @@ -97,12 +97,6 @@ void TSearchEventsProcessor::HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse: } } -void TSearchEventsProcessor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record; - LOG_ERROR_S(ctx, NKikimrServices::SQS, "YC Search events processor: failed to list ymq events: " << record); - HandleFailure(ctx); -} - void TSearchEventsProcessor::HandleFailure(const TActorContext& ctx) { StopSession(ctx); switch (State) { diff --git a/ydb/core/ymq/actor/index_events_processor.h b/ydb/core/ymq/actor/index_events_processor.h index 29767c6c15c..162522e154e 100644 --- a/ydb/core/ymq/actor/index_events_processor.h +++ b/ydb/core/ymq/actor/index_events_processor.h @@ -74,13 +74,11 @@ private: STRICT_STFUNC(StateFunc, HFunc(NActors::TEvents::TEvWakeup, HandleWakeup); HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse); - HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse); HFunc(TEvents::TEvPoisonPill, HandlePoisonPill); IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse); ) void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx); - void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx); void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp index 9999e5279c5..50b18335c2e 100644 --- a/ydb/core/ymq/actor/monitoring.cpp +++ b/ydb/core/ymq/actor/monitoring.cpp @@ -63,8 +63,4 @@ namespace NKikimr::NSQS { RequestMetrics(RetryPeriod, ctx);
}
- void TMonitoringActor::HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
- HandleError(ev->Get()->Record.DebugString(), ctx);
- }
-
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/monitoring.h b/ydb/core/ymq/actor/monitoring.h index 3aad685f0ff..fd882eef0a3 100644 --- a/ydb/core/ymq/actor/monitoring.h +++ b/ydb/core/ymq/actor/monitoring.h @@ -28,7 +28,6 @@ public: STRICT_STFUNC(StateFunc,
HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
- HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleProcessResponse);
IgnoreFunc(NKqp::TEvKqp::TEvCloseSessionResponse);
)
@@ -37,9 +36,6 @@ public: void HandleError(const TString& error, const TActorContext& ctx);
void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx);
- void HandleProcessResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx);
-
-
private:
TIntrusivePtr<TMonitoringCounters> Counters;
diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h index 1801779b26e..196f7f5c938 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h @@ -400,7 +400,6 @@ private: HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); @@ -410,7 +409,6 @@ private: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx); - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx); void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx); void TryCloseSession(const TActorContext& ctx); diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp index a88437fdbec..adafac71490 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.cpp @@ -74,13 +74,5 @@ void TClustersUpdater::Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TA } -void TClustersUpdater::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) { - auto& record = ev->Get()->Record; - - LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "can't update clusters " << record); - ctx.Schedule(TDuration::Seconds(CLUSTERS_UPDATER_TIMEOUT_ON_ERROR), new TEvPQClustersUpdater::TEvUpdateClusters()); -} - - } } diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h index efa13549631..b3fddca037e 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_clusters_updater_actor.h @@ -61,14 +61,12 @@ private: switch (ev->GetTypeRewrite()) { HFunc(TEvPQClustersUpdater::TEvUpdateClusters, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate, Handle); } } void Handle(TEvPQClustersUpdater::TEvUpdateClusters::TPtr &ev, const TActorContext &ctx); void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx); - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx); void Handle(NNetClassifier::TEvNetClassifier::TEvClassifierUpdate::TPtr& ev, const TActorContext& ctx); }; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index deac184e2fe..0e45c8e4a5b 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -775,15 +775,6 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet return ev; } - -void TWriteSessionActor::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) { - auto& record = ev->Get()->Record; - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID " - << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record); - CloseSession("Internal error on discovering partition", NPersQueue::NErrorCode::ERROR, ctx); -} - - void TWriteSessionActor::ProceedPartition(const ui32 partition, const TActorContext& ctx) { Partition = partition; auto it = PartitionToTablet.find(Partition); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 875090da21d..089583362c2 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -107,7 +107,6 @@ private: HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqp::TEvProcessResponse, Handle); HFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); HFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); @@ -125,7 +124,6 @@ private: void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr &ev, const TActorContext &ctx); - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx); void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr &ev, const NActors::TActorContext& ctx); void TryCloseSession(const TActorContext& ctx); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 380ed18155a..f9a39eeb93e 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1002,16 +1002,6 @@ void TWriteSessionActor<UseMigrationProtocol>::SendUpdateSrcIdsRequests(const TA } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr &ev, const TActorContext &ctx) { - auto& record = ev->Get()->Record; - - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " sourceID " - << SourceId << " escaped " << EncodedSourceId.EscapedSourceId << " discover partition error - " << record); - - CloseSession("Internal error on discovering partition", PersQueue::ErrorCode::ERROR, ctx); -} - -template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 partition, const TActorContext& ctx) { Partition = partition; auto it = PartitionToTablet.find(Partition); |