aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan <5627721+abyss7@users.noreply.github.com>2024-08-29 18:09:32 +0300
committerGitHub <noreply@github.com>2024-08-29 18:09:32 +0300
commit93089eecc92947f748b20c17c2fa0b3fa7d2b18a (patch)
tree70cef038a518826bace90f23f2d02095f4c4b7fa
parent0e39c53a25f06037e9c94b5ee5cbfa00d65b5ee6 (diff)
downloadydb-93089eecc92947f748b20c17c2fa0b3fa7d2b18a.tar.gz
Apply the last stats received from terminated CAs (#8356)
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp18
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h57
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp2
5 files changed, 51 insertions, 34 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 6f365c44e2..32353e34cd 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -533,7 +533,7 @@ private:
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
- HandleComputeStats(ev);
+ HandleComputeState(ev);
}
void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -1015,7 +1015,7 @@ private:
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
- hFunc(TEvDqCompute::TEvState, HandleComputeStats);
+ hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
@@ -2646,6 +2646,7 @@ private:
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
+ // TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
@@ -2681,17 +2682,10 @@ private:
}
void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
- if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
- YQL_ENSURE(Planner);
-
- TActorId actor = ev->Sender;
- ui64 taskId = ev->Get()->Record.GetTaskId();
-
- Planner->CompletedCA(taskId, actor);
+ HandleComputeStats(ev);
- if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
- PassAway();
- }
+ if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
+ PassAway();
}
}
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 2915775ea3..297e247e14 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -380,7 +380,7 @@ protected:
this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
}
- void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
+ bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
TActorId computeActor = ev->Sender;
auto& state = ev->Get()->Record;
ui64 taskId = state.GetTaskId();
@@ -409,7 +409,39 @@ protected:
}
YQL_ENSURE(Planner);
- bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state);
+ bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state);
+
+ switch (state.GetState()) {
+ case NYql::NDqProto::COMPUTE_STATE_FAILURE:
+ case NYql::NDqProto::COMPUTE_STATE_FINISHED:
+ // Don't finalize stats twice.
+ if (Planner->CompletedCA(taskId, computeActor)) {
+ ExtraData[computeActor].Swap(state.MutableExtraData());
+
+ if (Stats) {
+ Stats->AddComputeActorStats(
+ computeActor.NodeId(),
+ std::move(*state.MutableStats()),
+ TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
+ );
+ }
+
+ LastTaskId = taskId;
+ LastComputeActorId = computeActor.ToString();
+ }
+ default:
+ ; // ignore all other states.
+ }
+
+ return ack;
+ }
+
+ void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) {
+ TActorId computeActor = ev->Sender;
+ auto& state = ev->Get()->Record;
+ ui64 taskId = state.GetTaskId();
+
+ bool populateChannels = HandleComputeStats(ev);
switch (state.GetState()) {
case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
@@ -427,22 +459,8 @@ protected:
break;
}
- case NYql::NDqProto::COMPUTE_STATE_FAILURE:
- case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
- ExtraData[computeActor].Swap(state.MutableExtraData());
- if (Stats) {
- Stats->AddComputeActorStats(
- computeActor.NodeId(),
- std::move(*state.MutableStats()),
- TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
- );
- }
-
- LastTaskId = taskId;
- LastComputeActorId = computeActor.ToString();
- YQL_ENSURE(Planner);
- Planner->CompletedCA(taskId, computeActor);
- }
+ default:
+ ; // ignore all other states.
}
if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
@@ -1854,6 +1872,9 @@ protected:
void PassAway() override {
YQL_ENSURE(AlreadyReplied && ResponseEv);
+
+ // Actualize stats with the last stats from terminated CAs, but keep the status.
+ FillResponseStats(ResponseEv->Record.GetResponse().GetStatus());
this->Send(Target, ResponseEv.release());
for (auto channelPair: ResultChannelProxies) {
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 44243f0882..997964a60f 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::
return false;
}
-void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
+bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
auto& task = TasksGraph.GetTask(taskId);
if (task.Meta.Completed) {
YQL_ENSURE(!PendingComputeActors.contains(computeActor));
- return;
+ return false;
}
task.Meta.Completed = true;
@@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
PendingComputeActors.erase(it);
LOG_I("Compute actor has finished execution: " << computeActor.ToString());
+
+ return true;
}
void TKqpPlanner::TaskNotStarted(ui64 taskId) {
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index df3c3fdd39..29facb855f 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -75,7 +75,7 @@ public:
std::unique_ptr<IEventHandle> PlanExecution();
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
- void CompletedCA(ui64 taskId, TActorId computeActor);
+ bool CompletedCA(ui64 taskId, TActorId computeActor);
void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index c04d5e7573..f7357744e1 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -110,7 +110,7 @@ private:
STATEFN(ExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvDqCompute::TEvState, HandleComputeStats);
+ hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);