aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-03-10 18:34:52 +0300
committergvit <gvit@ydb.tech>2023-03-10 18:34:52 +0300
commit0fa703d28c587570b7799aab9bc4a6b196dcaf52 (patch)
tree6c381e3fe7f159b7f97d4056fb7dc212bd666751
parentcf8309626872ddec592055a28efe204363e1d229 (diff)
downloadydb-0fa703d28c587570b7799aab9bc4a6b196dcaf52.tar.gz
remove executer progress message handling & sending
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp6
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp18
-rw-r--r--ydb/core/kqp/common/kqp.h9
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp18
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp8
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/viewer/json_query.h5
12 files changed, 25 insertions, 85 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 20d46acb8a1..50a87e92700 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -94,7 +94,6 @@ private:
switch (ev->GetTypeRewrite()) {
HFunc(TEvents::TEvWakeup, Handle);
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
- HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
default:
@@ -167,12 +166,11 @@ private:
}
}
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "ExecuterActorId: " << ExecuterActorId_);
- }
-
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
+ if (!ExecuterActorId_) {
+ ExecuterActorId_ = ev->Sender;
+ }
+
Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);
response.set_result_set_index(0);
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 16b916aab49..b87e7e1934c 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -128,13 +128,13 @@ public:
}
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
+ queryAction,
+ queryType,
+ SelfId(),
Request_,
req->session_id(),
- SelfId(),
std::move(yqlText),
std::move(queryId),
- queryAction,
- queryType,
&req->tx_control(),
&req->parameters(),
req->collect_stats(),
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 1f5de54a17a..b3c458d3b78 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -201,7 +201,6 @@ private:
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
- HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
default: {
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, TStringBuilder()
<< "Unexpected event received in TStreamExecuteScanQueryRPC::StateWork: " << ev->GetTypeRewrite());
@@ -344,6 +343,10 @@ private:
}
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
+ if (!ExecuterActorId_) {
+ ExecuterActorId_ = ev->Sender;
+ }
+
Ydb::Table::ExecuteScanQueryPartialResponse response;
response.set_status(StatusIds::SUCCESS);
response.mutable_result()->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
@@ -389,11 +392,6 @@ private:
ExecutionProfiles_.emplace_back(std::move(profile));
}
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- ExecuterActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " ExecuterActorId: " << ExecuterActorId_);
- }
-
private:
void SetTimeoutTimer(TDuration timeout, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Set stream timeout timer for " << timeout);
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index b66818fbb9e..f5d0032161e 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -129,7 +129,6 @@ private:
HFunc(NKqp::TEvKqp::TEvDataQueryStreamPart, Handle);
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
default: {
return ReplyFinishStream(TStringBuilder()
@@ -227,22 +226,11 @@ private:
}
// From TKqpScanQueryStreamRequestHandler
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- GatewayRequestHandlerActorId_ = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- ProcessingScanQuery_ = false;
- LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " GatewayRequestHandlerActorId_: " << GatewayRequestHandlerActorId_);
- }
-
- // From TKqpScanQueryStreamRequestHandler
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
- if (!GatewayRequestHandlerActorId_) {
- return ReplyFinishStream("Received StreamData event from unknown executer", ctx);
- }
- if (!ProcessingScanQuery_) {
- // First data part from this scan query
+ if (GatewayRequestHandlerActorId_ != ev->Sender) {
++ResultsReceived_;
+ GatewayRequestHandlerActorId_ = ev->Sender;
}
- ProcessingScanQuery_ = true;
Ydb::Scripting::ExecuteYqlPartialResponse response;
response.set_status(StatusIds::SUCCESS);
@@ -481,8 +469,6 @@ private:
TActorId GatewayRequestHandlerActorId_;
ui64 ResultsReceived_ = 0;
- bool ProcessingScanQuery_ = false;
-
// DataQuery
THolder<TDataQueryStreamContext> DataQueryStreamContext;
};
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 6791ea79fa6..1f03ca80f14 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -257,13 +257,13 @@ struct TEvKqp {
struct TEvQueryRequest : public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
public:
TEvQueryRequest(
+ NKikimrKqp::EQueryAction queryAction,
+ NKikimrKqp::EQueryType queryType,
+ TActorId requestActorId,
const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
const TString& sessionId,
- TActorId actorId,
TString&& yqlText,
TString&& queryId,
- NKikimrKqp::EQueryAction queryAction,
- NKikimrKqp::EQueryType queryType,
const ::Ydb::Table::TransactionControl* txControl,
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
@@ -374,7 +374,7 @@ struct TEvKqp {
}
TActorId GetRequestActorId() const {
- return ActorIdFromProto(Record.GetRequestActorId());
+ return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId());
}
const TString& GetTraceId() const {
@@ -496,6 +496,7 @@ struct TEvKqp {
mutable TString TraceId;
mutable TString RequestType;
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
+ TActorId RequestActorId;
TString Database;
TString SessionId;
TString YqlText;
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index 6a2dd868cc4..66842a25577 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -6,13 +6,13 @@
namespace NKikimr::NKqp {
TEvKqp::TEvQueryRequest::TEvQueryRequest(
+ NKikimrKqp::EQueryAction queryAction,
+ NKikimrKqp::EQueryType queryType,
+ TActorId requestActorId,
const std::shared_ptr<NGRpcService::IRequestCtxMtSafe>& ctx,
const TString& sessionId,
- TActorId actorId,
TString&& yqlText,
TString&& queryId,
- NKikimrKqp::EQueryAction queryAction,
- NKikimrKqp::EQueryType queryType,
const ::Ydb::Table::TransactionControl* txControl,
const ::google::protobuf::Map<TProtoStringType, ::Ydb::TypedValue>* ydbParameters,
const ::Ydb::Table::QueryStatsCollection::Mode collectStats,
@@ -20,6 +20,7 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
const ::Ydb::Operations::OperationParams* operationParams,
bool keepSession)
: RequestCtx(ctx)
+ , RequestActorId(requestActorId)
, Database(CanonizePath(ctx->GetDatabaseName().GetOrElse("")))
, SessionId(sessionId)
, YqlText(std::move(yqlText))
@@ -37,7 +38,6 @@ TEvKqp::TEvQueryRequest::TEvQueryRequest(
OperationTimeout = GetDuration(OperationParams->operation_timeout());
CancelAfter = GetDuration(OperationParams->cancel_after());
}
- ActorIdToProto(actorId, Record.MutableCancelationActor());
}
void TEvKqp::TEvQueryRequest::PrepareRemote() const {
@@ -47,6 +47,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
}
Record.MutableRequest()->SetDatabase(Database);
+ ActorIdToProto(RequestActorId, Record.MutableCancelationActor());
if (auto traceId = RequestCtx->GetTraceId()) {
Record.SetTraceId(traceId.GetRef());
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 3057d482360..2d2a5748d4d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -61,9 +61,6 @@ struct TEvKqpExecuter {
struct TEvStreamProfile : public TEventPB<TEvStreamProfile, NKikimrKqp::TEvExecuterStreamProfile,
TKqpExecuterEvents::EvStreamProfile> {};
- struct TEvExecuterProgress : public TEventPB<TEvExecuterProgress, NKikimrKqp::TEvExecuterProgress,
- TKqpExecuterEvents::EvProgress> {};
-
struct TEvTableResolveStatus : public TEventLocal<TEvTableResolveStatus,
TKqpExecuterEvents::EvTableResolveStatus>
{
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 173c166561a..c6703410bc0 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -314,12 +314,6 @@ protected:
Target = ActorIdFromProto(ev->Get()->Record.GetTarget());
LWTRACK(KqpBaseExecuterHandleReady, ResponseEv->Orbit, TxId);
-
- LOG_D("Report self actorId " << this->SelfId() << " to " << Target);
- auto progressEv = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
- ActorIdToProto(this->SelfId(), progressEv->Record.MutableExecuterActorId());
- this->Send(Target, progressEv.Release());
-
if (IsDebugLogEnabled()) {
for (auto& tx : Request.Transactions) {
LOG_D("Executing physical tx, type: " << (ui32) tx.Body->GetType() << ", stages: " << tx.Body->StagesSize());
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 1219b2d3788..64a0717ce6c 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -178,6 +178,7 @@ public:
}
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
+ ExecuterActorId = ev->Sender;
auto& record = ev->Get()->Record;
if (!HasMeta) {
@@ -215,12 +216,6 @@ public:
Executions.push_back(std::move(*ev->Get()->Record.MutableProfile()));
}
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received executer progress for scan query, id: " << ExecuterActorId);
- }
-
void Handle(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
const auto& kqpResponse = ev->Get()->Record;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
@@ -257,7 +252,6 @@ public:
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
- HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
HFunc(TResponse, HandleResponse);
default:
@@ -403,6 +397,7 @@ public:
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
+ ExecuterActorId = ev->Sender;
TlsActivationContext->Send(ev->Forward(TargetActorId));
}
@@ -411,14 +406,6 @@ public:
TlsActivationContext->Send(ev->Forward(ExecuterActorId));
}
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev, const TActorContext& ctx) {
- ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- ActorIdToProto(SelfId(), ev->Get()->Record.MutableExecuterActorId());
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received executer progress for scan query, id: " << ExecuterActorId);
- TlsActivationContext->Send(ev->Forward(TargetActorId));
- }
-
void Handle(NKqp::TEvKqpExecuter::TEvStreamProfile::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
Executions.push_back(std::move(*ev->Get()->Record.MutableProfile()));
@@ -461,7 +448,6 @@ public:
HFunc(NKqp::TEvKqp::TEvAbortExecution, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, Handle);
- HFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
HFunc(TResponse, HandleResponse);
default:
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 12fd7253466..ba3b1f842d8 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -50,17 +50,11 @@ public:
private:
STRICT_STFUNC(StateFunc,
- hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
hFunc(NKqp::TEvKqp::TEvFetchScriptResultsRequest, Handle);
)
- void Handle(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
- ExecuterActorId = ActorIdFromProto(ev->Get()->Record.GetExecuterActorId());
- LOG_D("ExecuterActorId: " << ExecuterActorId);
- }
-
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
@@ -155,8 +149,6 @@ private:
private:
const NKikimrKqp::TEvQueryRequest Request;
- NActors::TActorId ExecuterActorId;
-
// Result
NYql::TIssues Issues;
Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index d504d390be4..04d7ff110eb 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1375,12 +1375,6 @@ public:
TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId));
}
- void HandleExecute(TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
- YQL_ENSURE(QueryState);
- // note: RequestActorId may be TActorId{};
- TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId));
- }
-
void HandleExecute(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
TlsActivationContext->Send(ev->Forward(ExecuterId));
}
@@ -2034,7 +2028,6 @@ public:
hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
- hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleExecute);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, Handle);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
@@ -2064,7 +2057,6 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqp::TEvQueryRequest, HandleCleanup);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleCleanup);
- hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleCleanup);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h
index 6412b5714de..12d39f5694c 100644
--- a/ydb/core/viewer/json_query.h
+++ b/ydb/core/viewer/json_query.h
@@ -53,7 +53,6 @@ public:
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleReply);
hFunc(NKqp::TEvKqpExecuter::TEvStreamData, HandleReply);
hFunc(NKqp::TEvKqpExecuter::TEvStreamProfile, HandleReply);
- hFunc(NKqp::TEvKqpExecuter::TEvExecuterProgress, HandleReply);
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
@@ -245,10 +244,6 @@ private:
Y_UNUSED(ev);
}
- void HandleReply(NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
- Y_UNUSED(ev);
- }
-
void HandleReply(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
const NKikimrKqp::TEvExecuterStreamData& data(ev->Get()->Record);