diff options
author | gvit <gvit@ydb.tech> | 2023-03-10 18:34:52 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-03-10 18:34:52 +0300 |
commit | 0fa703d28c587570b7799aab9bc4a6b196dcaf52 (patch) | |
tree | 6c381e3fe7f159b7f97d4056fb7dc212bd666751 | |
parent | cf8309626872ddec592055a28efe204363e1d229 (diff) | |
download | ydb-0fa703d28c587570b7799aab9bc4a6b196dcaf52.tar.gz |
remove executer progress message handling & sending
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 6 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 9 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp_event_impl.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/viewer/json_query.h | 5 |
12 files changed, 25 insertions, 85 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 20d46acb8a1..50a87e92700 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -94,7 +94,6 @@ private: switch (ev->GetTypeRewrite()) { HFunc(TEvents::TEvWakeup, Handle); HFunc(TRpcServices::TEvGrpcNextReply, Handle); - HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); default: @@ -167,12 +166,11 @@ private: } } - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "ExecuterActorId: " << ExecuterActorId_); - } - void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { + if (!ExecuterActorId_) { + ExecuterActorId_ = ev->Sender; + } + Ydb::Query::ExecuteQueryResponsePart response; response.set_status(Ydb::StatusIds::SUCCESS); response.set_result_set_index(0); diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 16b916aab49..b87e7e1934c 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -128,13 +128,13 @@ public: } auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>( + queryAction, + queryType, + SelfId(), Request_, req->session_id(), - SelfId(), std::move(yqlText), std::move(queryId), - queryAction, - queryType, &req->tx_control(), &req->parameters(), req->collect_stats(), diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 1f5de54a17a..b3c458d3b78 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -201,7 +201,6 @@ private: HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); - HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); default: { auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event received in TStreamExecuteScanQueryRPC::StateWork: " << ev->GetTypeRewrite()); @@ -344,6 +343,10 @@ private: } void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { + if (!ExecuterActorId_) { + ExecuterActorId_ = ev->Sender; + } + Ydb::Table::ExecuteScanQueryPartialResponse response; response.set_status(StatusIds::SUCCESS); response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet()); @@ -389,11 +392,6 @@ private: ExecutionProfiles_.emplace_back(std::move(profile)); } - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " ExecuterActorId: " << ExecuterActorId_); - } - private: void SetTimeoutTimer(TDuration timeout, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Set stream timeout timer for " << timeout); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index b66818fbb9e..f5d0032161e 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -129,7 +129,6 @@ private: HFunc(NKqp::TEvKqp::TEvDataQueryStreamPart, Handle); HFunc(TRpcServices::TEvGrpcNextReply, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); default: { return ReplyFinishStream(TStringBuilder() @@ -227,22 +226,11 @@ private: } // From TKqpScanQueryStreamRequestHandler - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - GatewayRequestHandlerActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - ProcessingScanQuery_ = false; - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " GatewayRequestHandlerActorId_: " << GatewayRequestHandlerActorId_); - } - - // From TKqpScanQueryStreamRequestHandler void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { - if (!GatewayRequestHandlerActorId_) { - return ReplyFinishStream("Received StreamData event from unknown executer", ctx); - } - if (!ProcessingScanQuery_) { - // First data part from this scan query + if (GatewayRequestHandlerActorId_ != ev->Sender) { ++ResultsReceived_; + GatewayRequestHandlerActorId_ = ev->Sender; } - ProcessingScanQuery_ = true; Ydb::Scripting::ExecuteYqlPartialResponse response; response.set_status(StatusIds::SUCCESS); @@ -481,8 +469,6 @@ private: TActorId GatewayRequestHandlerActorId_; ui64 ResultsReceived_ = 0; - bool ProcessingScanQuery_ = false; - // DataQuery THolder<TDataQueryStreamContext> DataQueryStreamContext; }; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 6791ea79fa6..1f03ca80f14 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -257,13 +257,13 @@ struct TEvKqp { struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> { public: TEvQueryRequest( + NKikimrKqp::EQueryAction queryAction, + NKikimrKqp::EQueryType queryType, + TActorId requestActorId, const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, const TString& sessionId, - TActorId actorId, TString&& yqlText, TString&& queryId, - NKikimrKqp::EQueryAction queryAction, - NKikimrKqp::EQueryType queryType, const ::Ydb::Table::TransactionControl* txControl, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, const ::Ydb::Table::QueryStatsCollection::Mode collectStats, @@ -374,7 +374,7 @@ struct TEvKqp { } TActorId GetRequestActorId() const { - return ActorIdFromProto(Record.GetRequestActorId()); + return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId()); } const TString& GetTraceId() const { @@ -496,6 +496,7 @@ struct TEvKqp { mutable TString TraceId; mutable TString RequestType; mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_; + TActorId RequestActorId; TString Database; TString SessionId; TString YqlText; diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index 6a2dd868cc4..66842a25577 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -6,13 +6,13 @@ namespace NKikimr::NKqp { TEvKqp::TEvQueryRequest::TEvQueryRequest( + NKikimrKqp::EQueryAction queryAction, + NKikimrKqp::EQueryType queryType, + TActorId requestActorId, const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx, const TString& sessionId, - TActorId actorId, TString&& yqlText, TString&& queryId, - NKikimrKqp::EQueryAction queryAction, - NKikimrKqp::EQueryType queryType, const ::Ydb::Table::TransactionControl* txControl, const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters, const ::Ydb::Table::QueryStatsCollection::Mode collectStats, @@ -20,6 +20,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( const ::Ydb::Operations::OperationParams* operationParams, bool keepSession) : RequestCtx(ctx) + , RequestActorId(requestActorId) , Database(CanonizePath(ctx->GetDatabaseName().GetOrElse(""))) , SessionId(sessionId) , YqlText(std::move(yqlText)) @@ -37,7 +38,6 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest( OperationTimeout = GetDuration(OperationParams->operation_timeout()); CancelAfter = GetDuration(OperationParams->cancel_after()); } - ActorIdToProto(actorId, Record.MutableCancelationActor()); } void TEvKqp::TEvQueryRequest::PrepareRemote() const { @@ -47,6 +47,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { } Record.MutableRequest()->SetDatabase(Database); + ActorIdToProto(RequestActorId, Record.MutableCancelationActor()); if (auto traceId = RequestCtx->GetTraceId()) { Record.SetTraceId(traceId.GetRef()); diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 3057d482360..2d2a5748d4d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -61,9 +61,6 @@ struct TEvKqpExecuter { struct TEvStreamProfile : public TEventPB<TEvStreamProfile, NKikimrKqp::TEvExecuterStreamProfile, TKqpExecuterEvents::EvStreamProfile> {}; - struct TEvExecuterProgress : public TEventPB<TEvExecuterProgress, NKikimrKqp::TEvExecuterProgress, - TKqpExecuterEvents::EvProgress> {}; - struct TEvTableResolveStatus : public TEventLocal<TEvTableResolveStatus, TKqpExecuterEvents::EvTableResolveStatus> { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 173c166561a..c6703410bc0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -314,12 +314,6 @@ protected: Target = ActorIdFromProto(ev->Get()->Record.GetTarget()); LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId); - - LOG_D("Report self actorId " << this->SelfId() << " to " << Target); - auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>(); - ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId()); - this->Send(Target, progressEv.Release()); - if (IsDebugLogEnabled()) { for (auto& tx : Request.Transactions) { LOG_D("Executing physical tx, type: " << (ui32) tx.Body->GetType() << ", stages: " << tx.Body->StagesSize()); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 1219b2d3788..64a0717ce6c 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -178,6 +178,7 @@ public: } void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { + ExecuterActorId = ev->Sender; auto& record = ev->Get()->Record; if (!HasMeta) { @@ -215,12 +216,6 @@ public: Executions.push_back(std::move(*ev->Get()->Record.MutableProfile())); } - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received executer progress for scan query, id: " << ExecuterActorId); - } - void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { const auto& kqpResponse = ev->Get()->Record; LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() @@ -257,7 +252,6 @@ public: HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); - HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); HFunc(TResponse, HandleResponse); default: @@ -403,6 +397,7 @@ public: void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); + ExecuterActorId = ev->Sender; TlsActivationContext->Send(ev->Forward(TargetActorId)); } @@ -411,14 +406,6 @@ public: TlsActivationContext->Send(ev->Forward(ExecuterActorId)); } - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) { - ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - ActorIdToProto(SelfId(), ev->Get()->Record.MutableExecuterActorId()); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received executer progress for scan query, id: " << ExecuterActorId); - TlsActivationContext->Send(ev->Forward(TargetActorId)); - } - void Handle(NKqp::TEvKqpExecuter::TEvStreamProfile::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); Executions.push_back(std::move(*ev->Get()->Record.MutableProfile())); @@ -461,7 +448,6 @@ public: HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle); - HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); HFunc(TResponse, HandleResponse); default: diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 12fd7253466..ba3b1f842d8 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -50,17 +50,11 @@ public: private: STRICT_STFUNC(StateFunc, - hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle); hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle); ) - void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { - ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId()); - LOG_D("ExecuterActorId: " << ExecuterActorId); - } - void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); @@ -155,8 +149,6 @@ private: private: const NKikimrKqp::TEvQueryRequest Request; - NActors::TActorId ExecuterActorId; - // Result NYql::TIssues Issues; Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index d504d390be4..04d7ff110eb 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1375,12 +1375,6 @@ public: TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId)); } - void HandleExecute(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { - YQL_ENSURE(QueryState); - // note: RequestActorId may be TActorId{}; - TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId)); - } - void HandleExecute(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) { TlsActivationContext->Send(ev->Forward(ExecuterId)); } @@ -2034,7 +2028,6 @@ public: hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); - hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); @@ -2064,7 +2057,6 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvKqp::TEvQueryRequest, HandleCleanup); hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup); - hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h index 6412b5714de..12d39f5694c 100644 --- a/ydb/core/viewer/json_query.h +++ b/ydb/core/viewer/json_query.h @@ -53,7 +53,6 @@ public: hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply); hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply); hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply); - hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, HandleReply); cFunc(TEvents::TSystem::Wakeup, HandleTimeout); @@ -245,10 +244,6 @@ private: Y_UNUSED(ev); } - void HandleReply(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) { - Y_UNUSED(ev); - } - void HandleReply(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { const NKikimrKqp::TEvExecuterStreamData& data(ev->Get()->Record); |