aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-01-27 23:57:01 +0300
committerhor911 <hor911@ydb.tech>2023-01-27 23:57:01 +0300
commitcf0fcdf102b5d6470d2b5eb79a57578008a02c5b (patch)
treee5402731e836f6774f3a9aa42a1e0e2363f5d401
parent3f35860e4dd84c5e846c2cb89b48620795f16724 (diff)
downloadydb-cf0fcdf102b5d6470d2b5eb79a57578008a02c5b.tar.gz
Checkpoint Coordinator as Task Controller subclass
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp50
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp103
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h72
-rw-r--r--ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp29
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp5
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h2
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp7
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp603
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h638
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto4
10 files changed, 834 insertions, 679 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 0a73bc3aac3..daa370d952d 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -269,6 +269,9 @@ struct TEvaluationGraphInfo {
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
public:
+ using NActors::TActorBootstrapped<TRunActor>::Register;
+ using NActors::TActorBootstrapped<TRunActor>::Send;
+
explicit TRunActor(
const TActorId& fetcherId
, const ::NYql::NCommon::TServiceCounters& queryCounters
@@ -394,7 +397,6 @@ private:
// Clear finished actors ids
ExecuterId = {};
- CheckpointCoordinatorId = {};
ControlId = {};
}
}
@@ -726,7 +728,7 @@ private:
TopicsForConsumersCreation.size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED);
ProcessQuery();
} else if (ev->Cookie == SetLoadFromCheckpointModeCookie) {
- Send(CheckpointCoordinatorId, new TEvCheckpointCoordinator::TEvRunGraph());
+ Send(ControlId, new TEvCheckpointCoordinator::TEvRunGraph());
}
}
@@ -801,7 +803,7 @@ private:
void Handle(TEvCheckpointCoordinator::TEvZeroCheckpointDone::TPtr&) {
LOG_D("Coordinator saved zero checkpoint");
- Y_VERIFY(CheckpointCoordinatorId);
+ Y_VERIFY(ControlId);
SetLoadFromCheckpointMode();
}
@@ -1250,7 +1252,7 @@ private:
TEvaluationGraphInfo info;
- info.ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
+ info.ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
if (dqGraphParams.GetResultType()) {
TVector<TString> columns;
@@ -1260,7 +1262,7 @@ private:
NActors::TActorId empty = {};
THashMap<TString, TString> emptySecureParams; // NOT USED in RR
- info.ResultId = NActors::TActivationContext::Register(
+ info.ResultId = Register(
MakeResultReceiver(
columns, info.ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams,
dqGraphParams.GetResultType(), empty, false).Release());
@@ -1270,7 +1272,7 @@ private:
info.ResultId = info.ExecuterId;
}
- info.ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
+ info.ControlId = Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
Yql::DqsProto::ExecuteGraphRequest request;
request.SetSourceId(dqGraphParams.GetSourceId());
@@ -1280,7 +1282,7 @@ private:
*request.MutableSecureParams() = dqGraphParams.GetSecureParams();
*request.MutableColumns() = dqGraphParams.GetColumns();
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
- NActors::TActivationContext::Send(new IEventHandle(info.ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId, TActorId{})));
+ Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId));
LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId);
return info;
}
@@ -1292,7 +1294,7 @@ private:
dqConfiguration->FreezeDefaults();
dqConfiguration->FallbackPolicy = "never";
- ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator));
+ ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator));
NActors::TActorId resultId;
if (dqGraphParams.GetResultType()) {
@@ -1307,7 +1309,7 @@ private:
for (const auto& column : dqGraphParams.GetColumns()) {
columns.emplace_back(column);
}
- resultId = NActors::TActivationContext::Register(
+ resultId = Register(
CreateResultWriter(
ExecuterId, dqGraphParams.GetResultType(),
writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit));
@@ -1316,18 +1318,34 @@ private:
resultId = ExecuterId;
}
- ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
if (EnableCheckpointCoordinator) {
- CheckpointCoordinatorId = NActors::TActivationContext::Register(MakeCheckpointCoordinator(
+ ControlId = Register(MakeCheckpointCoordinator(
::NYq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision),
- ControlId,
NYql::NDq::MakeCheckpointStorageID(),
SelfId(),
Params.CheckpointCoordinatorConfig,
QueryCounters.Counters,
dqGraphParams,
Params.StateLoadMode,
- Params.StreamingDisposition).Release());
+ Params.StreamingDisposition,
+ // vvv TaskController temporary params vvv
+ SessionId,
+ ExecuterId,
+ resultId,
+ dqConfiguration,
+ QueryCounters,
+ TDuration::Seconds(3),
+ TDuration::Seconds(1)
+ ).Release());
+ } else {
+ ControlId = Register(NYql::MakeTaskController(
+ SessionId,
+ ExecuterId,
+ resultId,
+ dqConfiguration,
+ QueryCounters,
+ TDuration::Seconds(3)
+ ).Release());
}
Yql::DqsProto::ExecuteGraphRequest request;
@@ -1341,8 +1359,8 @@ private:
commonTaskParams["fq.job_id"] = Params.JobId;
commonTaskParams["fq.restart_count"] = ToString(Params.RestartCount);
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
- NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId)));
- LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId << ", CheckPointCoordinatior " << CheckpointCoordinatorId);
+ Send(ExecuterId, new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId));
+ LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
}
void SetupDqSettings(NYql::TDqGatewayConfig& dqGatewaysConfig) const {
@@ -1746,7 +1764,6 @@ private:
html << "<td>" << DqGraphIndex << " of " << DqGraphParams.size() << "</td>";
html << "<td> Executer" << ExecuterId << "</td>";
html << "<td> Control" << ControlId << "</td>";
- html << "<td> Coordinator" << CheckpointCoordinatorId << "</td>";
}
html << "</tr>";
html << "</tbody></table>";
@@ -1908,7 +1925,6 @@ private:
ui32 DqEvalIndex = 0;
NActors::TActorId ExecuterId;
NActors::TActorId ControlId;
- NActors::TActorId CheckpointCoordinatorId;
TString SessionId;
::NYql::NCommon::TServiceCounters QueryCounters;
const ::NMonitoring::TDynamicCounters::TCounterPtr QueryUptime;
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
index f1283ec9471..7780443d26f 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -28,16 +28,31 @@
namespace NYq {
TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId,
- const TActorId& taskControllerId,
const TActorId& storageProxy,
const TActorId& runActorId,
const TCheckpointCoordinatorConfig& settings,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NProto::TGraphParams& graphParams,
const YandexQuery::StateLoadMode& stateLoadMode,
- const YandexQuery::StreamingDisposition& streamingDisposition)
- : CoordinatorId(std::move(coordinatorId))
- , TaskControllerId(taskControllerId)
+ const YandexQuery::StreamingDisposition& streamingDisposition,
+ // vvv TaskController temporary params vvv
+ const TString& traceId,
+ const NActors::TActorId& executerId,
+ const NActors::TActorId& resultId,
+ const NYql::TDqConfiguration::TPtr& tcSettings,
+ const NYql::NCommon::TServiceCounters& serviceCounters,
+ const TDuration& pingPeriod,
+ const TDuration& aggrPeriod)
+ : NYql::TTaskControllerImpl<TCheckpointCoordinator>(
+ traceId,
+ executerId,
+ resultId,
+ tcSettings,
+ serviceCounters,
+ pingPeriod,
+ aggrPeriod,
+ &TCheckpointCoordinator::DispatchEvent)
+ , CoordinatorId(std::move(coordinatorId))
, StorageProxy(storageProxy)
, RunActorId(runActorId)
, Settings(settings)
@@ -49,12 +64,11 @@ TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId,
{
}
-void TCheckpointCoordinator::Bootstrap() {
- Become(&TThis::DispatchEvent);
- CC_LOG_D("Bootstrapped with streaming disposition " << StreamingDisposition << " and state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode));
-}
+void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) {
+ NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnReadyState(ev);
+
+ CC_LOG_D("TEvReadyState, streaming disposition " << StreamingDisposition << ", state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode));
-void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) {
const auto& tasks = ev->Get()->Record.GetTask();
const auto& actorIds = ev->Get()->Record.GetActorId();
Y_VERIFY(tasks.size() == actorIds.size());
@@ -63,7 +77,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) {
auto& task = tasks[i];
auto& actorId = TaskIdToActor[task.GetId()];
if (actorId) {
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Duplicate task id: " << task.GetId()));
+ OnInternalError(TStringBuilder() << "Duplicate task id: " << task.GetId());
return;
}
actorId = ActorIdFromProto(actorIds[i]);
@@ -113,7 +127,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord
if (issues) {
CC_LOG_E("Can't register in storage: " + issues.ToOneLineString());
++*Metrics.StorageError;
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't register in storage", issues));
+ OnInternalError("Can't register in storage", issues);
return;
}
@@ -141,7 +155,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord
InitCheckpoint();
ScheduleNextCheckpoint();
} else {
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition));
+ OnInternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition);
}
}
@@ -169,7 +183,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpoint
if (event->Issues) {
++*Metrics.StorageError;
CC_LOG_E("Can't get checkpoints to restore: " + event->Issues.ToOneLineString());
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't get checkpoints to restore", event->Issues));
+ OnInternalError("Can't get checkpoints to restore", event->Issues);
return;
}
@@ -212,7 +226,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
++*Metrics.StorageError;
const TString message = TStringBuilder() << "Can't get graph params from checkpoint " << checkpoint.CheckpointId;
CC_LOG_I(message);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
+ OnInternalError(message);
return;
}
@@ -230,7 +244,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
}
if (!result) {
- Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::BAD_REQUEST, issues));
+ NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, "Can't restore from plan given", issues);
return;
} else { // Report as transient issues
Send(RunActorId, new TEvents::TEvRaiseTransientIssues(std::move(issues)));
@@ -243,7 +257,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
if (actorIdIt == TaskIdToActor.end()) {
const TString msg = TStringBuilder() << "ActorId for task id " << taskId << " was not found";
CC_LOG_E(msg);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(msg));
+ OnInternalError(msg);
return;
}
const auto transportIt = ActorsToWaitFor.find(actorIdIt->second);
@@ -274,19 +288,19 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
if (!PendingRestoreCheckpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"));
+ OnInternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
return;
}
if (PendingRestoreCheckpoint->CheckpointId != checkpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult event with unexpected checkpoint: " << checkpoint << ", expected: " << PendingRestoreCheckpoint->CheckpointId);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint"));
+ OnInternalError("Got unexpected checkpoint");
return;
}
if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName));
+ NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, "Can't restore: " + statusName, {});
return;
}
@@ -563,14 +577,6 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) {
PassAway();
}
-void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
- TStringStream message;
- message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType;
- CC_LOG_D(message.Str());
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str()));
- PassAway();
-}
-
void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) {
InitingZeroCheckpoint = false;
// TODO: run graph only now, not before zero checkpoint inited
@@ -580,17 +586,52 @@ void TCheckpointCoordinator::PassAway() {
for (const auto& [actorId, transport] : AllActors) {
transport->EventsQueue.Unsubscribe();
}
- TActorBootstrapped<TCheckpointCoordinator>::PassAway();
+ NYql::TTaskControllerImpl<TCheckpointCoordinator>::PassAway();
}
void TCheckpointCoordinator::HandleException(const std::exception& err) {
NYql::TIssues issues;
issues.AddIssue(err.what());
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpoint coordinator", issues));
+ OnInternalError("Internal error in checkpoint coordinator", issues);
}
-THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) {
- return MakeHolder<TCheckpointCoordinator>(coordinatorId, taskControllerId, storageProxy, runActorId, settings, counters, graphParams, stateLoadMode, streamingDisposition);
+THolder<NActors::IActor> MakeCheckpointCoordinator(
+ TCoordinatorId coordinatorId,
+ const TActorId& storageProxy,
+ const TActorId& runActorId,
+ const TCheckpointCoordinatorConfig& settings,
+ const ::NMonitoring::TDynamicCounterPtr& counters,
+ const NProto::TGraphParams& graphParams,
+ const YandexQuery::StateLoadMode& stateLoadMode /* = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT */,
+ const YandexQuery::StreamingDisposition& streamingDisposition /* = {} */,
+ // vvv TaskController temporary params vvv
+ const TString& traceId,
+ const NActors::TActorId& executerId,
+ const NActors::TActorId& resultId,
+ const NYql::TDqConfiguration::TPtr& tcSettings,
+ const NYql::NCommon::TServiceCounters& serviceCounters,
+ const TDuration& pingPeriod,
+ const TDuration& aggrPeriod
+ )
+{
+ return MakeHolder<TCheckpointCoordinator>(
+ coordinatorId,
+ storageProxy,
+ runActorId,
+ settings,
+ counters,
+ graphParams,
+ stateLoadMode,
+ streamingDisposition,
+ // vvv TaskController temporary params vvv
+ traceId,
+ executerId,
+ resultId,
+ tcSettings,
+ serviceCounters,
+ pingPeriod,
+ aggrPeriod
+ );
}
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
index 38ab67f90e8..7a626bcc7c5 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h
@@ -13,6 +13,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/library/yql/dq/actors/compute/retry_queue.h>
#include <ydb/library/yql/providers/dq/actors/events.h>
+#include <ydb/library/yql/providers/dq/actors/task_controller_impl.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -21,19 +22,29 @@ namespace NYq {
using namespace NActors;
using namespace NYq::NConfig;
-class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> {
+class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoordinator> {
public:
TCheckpointCoordinator(TCoordinatorId coordinatorId,
- const TActorId& taskControllerId,
const TActorId& storageProxy,
const TActorId& runActorId,
const TCheckpointCoordinatorConfig& settings,
const ::NMonitoring::TDynamicCounterPtr& counters,
const NProto::TGraphParams& graphParams,
const YandexQuery::StateLoadMode& stateLoadMode,
- const YandexQuery::StreamingDisposition& streamingDisposition);
-
- void Handle(const NYql::NDqs::TEvReadyState::TPtr&);
+ const YandexQuery::StreamingDisposition& streamingDisposition,
+ // vvv TaskController temporary params vvv
+ const TString& traceId,
+ const NActors::TActorId& executerId,
+ const NActors::TActorId& resultId,
+ const NYql::TDqConfiguration::TPtr& tcSettings,
+ const NYql::NCommon::TServiceCounters& serviceCounters,
+ const TDuration& pingPeriod,
+ const TDuration& aggrPeriod
+ );
+
+ using NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnInternalError;
+
+ void Handle(NYql::NDqs::TEvReadyState::TPtr&);
void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&);
void Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr&);
void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&);
@@ -47,7 +58,6 @@ public:
void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&);
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev);
void Handle(NActors::TEvents::TEvPoison::TPtr&);
- void Handle(NActors::TEvents::TEvUndelivered::TPtr&);
void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev);
void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&);
@@ -56,28 +66,38 @@ public:
STRICT_STFUNC_EXC(DispatchEvent,
hFunc(NYql::NDqs::TEvReadyState, Handle)
+ hFunc(NYql::NDqs::TEvQueryResponse, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnQueryResult)
+ hFunc(NYql::NDqs::TEvDqFailure, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnResultFailure)
+
+ hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle)
+ hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle)
+
hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle)
hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle)
- hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle)
hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle)
hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle)
- hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle)
hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle)
hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle)
+
+ hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle)
+ hFunc(NYql::NDq::TEvDqCompute::TEvState, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnComputeActorState)
+ hFunc(NYql::NDq::TEvDq::TEvAbortExecution, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnAbortExecution)
+
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle)
+
hFunc(NActors::TEvents::TEvPoison, Handle)
- hFunc(NActors::TEvents::TEvUndelivered, Handle)
+ hFunc(NActors::TEvents::TEvUndelivered, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered)
+ hFunc(NActors::TEvents::TEvWakeup, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnWakeup)
+
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle)
- hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle)
- hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle),
+ hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle),
+
ExceptionFunc(std::exception, HandleException)
)
- void Bootstrap();
-
static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR";
private:
@@ -146,7 +166,6 @@ private:
};
const TCoordinatorId CoordinatorId;
- const TActorId TaskControllerId;
const TActorId StorageProxy;
const TActorId RunActorId;
std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator;
@@ -177,6 +196,23 @@ private:
YandexQuery::StreamingDisposition StreamingDisposition;
};
-THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& executerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, const YandexQuery::StreamingDisposition& streamingDisposition = {});
+THolder<NActors::IActor> MakeCheckpointCoordinator(
+ TCoordinatorId coordinatorId,
+ const TActorId& storageProxy,
+ const TActorId& runActorId,
+ const TCheckpointCoordinatorConfig& settings,
+ const ::NMonitoring::TDynamicCounterPtr& counters,
+ const NProto::TGraphParams& graphParams,
+ const YandexQuery::StateLoadMode& stateLoadMode /* = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT */,
+ const YandexQuery::StreamingDisposition& streamingDisposition /* = {} */,
+ // vvv TaskController temporary params vvv
+ const TString& traceId,
+ const NActors::TActorId& executerId,
+ const NActors::TActorId& resultId,
+ const NYql::TDqConfiguration::TPtr& tcSettings,
+ const NYql::NCommon::TServiceCounters& serviceCounters,
+ const TDuration& pingPeriod,
+ const TDuration& aggrPeriod
+ );
} // namespace NYq
diff --git a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
index 30affc40524..96e2d697a82 100644
--- a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
+++ b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp
@@ -89,21 +89,34 @@ struct TTestBootstrap : public TTestActorRuntime {
Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds());
Settings.SetMaxInflight(1);
+ NYql::TDqConfiguration::TPtr DqSettings = MakeIntrusive<NYql::TDqConfiguration>();
+
SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG);
- CheckpointCoordinator = Register(MakeCheckpointCoordinator(TCoordinatorId("my-graph-id", 42), {}, StorageProxy, RunActor, Settings, Counters, NProto::TGraphParams()).Release());
- WaitForBootstrap();
+ CheckpointCoordinator = Register(MakeCheckpointCoordinator(
+ TCoordinatorId("my-graph-id", 42),
+ StorageProxy,
+ RunActor,
+ Settings,
+ Counters,
+ NProto::TGraphParams(),
+ YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT,
+ {},
+ //
+ "my-graph-id",
+ {} /* ExecuterId */,
+ RunActor,
+ DqSettings,
+ ::NYql::NCommon::TServiceCounters(Counters, nullptr, ""),
+ TDuration::Seconds(3),
+ TDuration::Seconds(1)
+ ).Release());
Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState))));
EnableScheduleForActor(CheckpointCoordinator);
}
-
- void WaitForBootstrap() {
- NActors::TDispatchOptions options;
- options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1);
- DispatchEvents(options);
- }
};
+
} // namespace
namespace NYq {
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 3b9ee612411..85863d52604 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -34,14 +34,11 @@ namespace NYql::NDqs {
Record = std::move(queryResult);
}
- TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId)
+ TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId)
{
*Record.MutableRequest() = request;
NActors::ActorIdToProto(controlId, Record.MutableControlId());
NActors::ActorIdToProto(resultId, Record.MutableResultId());
- if (checkPointCoordinatorId) {
- NActors::ActorIdToProto(checkPointCoordinatorId, Record.MutableCheckPointCoordinatorId());
- }
}
TEvReadyState::TEvReadyState(NActors::TActorId sourceId, TString type) {
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index f2269534474..df65b75b322 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -40,7 +40,7 @@ namespace NYql::NDqs {
struct TEvGraphRequest : NActors::TEventPB<TEvGraphRequest, NDqProto::TGraphRequest, TDqExecuterEvents::ES_GRAPH> {
TEvGraphRequest() = default;
- TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {});
+ TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId);
};
struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> {
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 753306b961b..32a3c897de8 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -140,11 +140,9 @@ private:
});
ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId());
ResultId = NActors::ActorIdFromProto(ev->Get()->Record.GetResultId());
- CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId());
// These actors will be killed at exit.
AddChild(ControlId);
AddChild(ResultId);
- AddChild(CheckPointCoordinatorId);
int workerCount = ev->Get()->Record.GetRequest().GetTask().size();
const bool enableComputeActor = Settings->EnableComputeActor.Get().GetOrElse(false);
@@ -404,14 +402,10 @@ private:
AllocationHistogram->Collect((ExecutionStart-StartTime).Seconds());
auto readyState1 = res->Record;
- auto readyState2 = res->Record;
Send(ControlId, res.Release());
if (ResultId != SelfId() && ResultId != ControlId) {
Send(ResultId, new TEvReadyState(std::move(readyState1)));
}
- if (CheckPointCoordinatorId) {
- Send(CheckPointCoordinatorId, new TEvReadyState(std::move(readyState2)));
- }
if (Timeout) {
ExecutionTimeoutCookieHolder.Reset(ISchedulerCookie::Make2Way());
@@ -466,7 +460,6 @@ private:
NActors::TActorId ControlId;
NActors::TActorId ResultId;
- NActors::TActorId CheckPointCoordinatorId;
TExprNode::TPtr ExprRoot;
THolder<IDqsExecutionPlanner> ExecutionPlanner;
ui64 ResourceId = 0;
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 02b10d5affe..a9366629f02 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -1,51 +1,13 @@
#include "task_controller.h"
-#include "execution_helpers.h"
-#include "events.h"
-#include "proto_builder.h"
-#include "actor_helpers.h"
-#include "executer_actor.h"
-#include "grouped_issues.h"
-
-#include <ydb/library/yql/providers/dq/counters/counters.h>
-
-#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
-
-#include <ydb/library/yql/public/issue/yql_issue_message.h>
-
-#include <ydb/library/yql/utils/actor_log/log.h>
-#include <ydb/library/yql/utils/log/log.h>
-
-#include <ydb/public/lib/yson_value/ydb_yson_value.h>
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
-
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/event_pb.h>
-#include <library/cpp/actors/core/executor_pool_basic.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/core/scheduler_basic.h>
-#include <library/cpp/threading/future/future.h>
-#include <library/cpp/protobuf/util/pb_io.h>
-
-#include <util/generic/size_literals.h>
-#include <util/generic/ptr.h>
-#include <util/string/split.h>
-#include <util/system/types.h>
+#include "task_controller_impl.h"
namespace NYql {
-using namespace NActors;
-using namespace NDqs;
-
namespace {
-class TTaskController: public TRichActor<TTaskController> {
+class TTaskController: public TTaskControllerImpl<TTaskController> {
public:
- static constexpr ui64 PING_TIMER_TAG = 1;
- static constexpr ui64 AGGR_TIMER_TAG = 2;
-
- static constexpr char ActorName[] = "YQL_DQ_TASK_CONTROLLER";
-
- explicit TTaskController(
+ TTaskController(
const TString& traceId,
const NActors::TActorId& executerId,
const NActors::TActorId& resultId,
@@ -54,557 +16,17 @@ public:
const TDuration& pingPeriod,
const TDuration& aggrPeriod
)
- : TRichActor<TTaskController>(&TTaskController::Handler)
- , ExecuterId(executerId)
- , ResultId(resultId)
- , TraceId(traceId)
- , Settings(settings)
- , ServiceCounters(serviceCounters, "task_controller")
- , PingPeriod(pingPeriod)
- , AggrPeriod(aggrPeriod)
- , Issues(CreateDefaultTimeProvider())
+ : TTaskControllerImpl<TTaskController>(
+ traceId,
+ executerId,
+ resultId,
+ settings,
+ serviceCounters,
+ pingPeriod,
+ aggrPeriod,
+ &TTaskControllerImpl<TTaskController>::Handler)
{
- if (Settings) {
- if (Settings->_AllResultsBytesLimit.Get()) {
- YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get();
- }
- if (Settings->_RowsLimitPerWrite.Get()) {
- YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
- }
- }
- }
-
- ~TTaskController() override {
- SetTaskCountMetric(0);
- }
-
-private:
- STRICT_STFUNC(Handler, {
- hFunc(TEvReadyState, OnReadyState);
- hFunc(TEvQueryResponse, OnQueryResult);
- hFunc(TEvDqFailure, OnResultFailure);
- hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState);
- hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution);
- cFunc(TEvents::TEvPoison::EventType, PassAway);
- hFunc(TEvents::TEvUndelivered, OnUndelivered);
- hFunc(TEvents::TEvWakeup, OnWakeup);
- })
-
- void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
- auto it = TaskIds.find(ev->Sender);
- if (it != TaskIds.end() && FinishedTasks.contains(it->second)) {
- // ignore undelivered from finished CAs
- return;
- }
-
- TStringBuilder message;
- message << "Undelivered Event " << ev->Get()->SourceType
- << " from " << SelfId() << " (Self) to " << ev->Sender
- << " Reason: " << ev->Get()->Reason << " Cookie: " << ev->Cookie;
- OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message);
- }
-
- void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
- auto statusCode = ev->Get()->Record.GetStatusCode();
- TIssues issues = ev->Get()->GetIssues();
- YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString();
- OnError(statusCode, issues);
- }
-
- void SendNonFatalIssues() {
- auto req = MakeHolder<TEvDqStats>(Issues.ToIssues());
- Send(ExecuterId, req.Release());
- }
-
- void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
- TActorId computeActor = ev->Sender;
- auto& state = ev->Get()->Record;
- ui64 taskId = state.GetTaskId();
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
- YQL_CLOG(TRACE, ProviderDq)
- << SelfId()
- << " EvState TaskId: " << taskId
- << " State: " << state.GetState()
- << " PingCookie: " << ev->Cookie
- << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode());
-
- if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) {
- if (ServiceCounters.Counters && !AggrPeriod) {
- ExportStats(TaskStat, taskId);
- }
- } else if (state.HasStats() && state.GetStats().GetTasks().size()) {
- YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId;
- AddStats(state.GetStats());
- if (ServiceCounters.Counters && !AggrPeriod) {
- ExportStats(TaskStat, taskId);
- }
- }
-
-
- TIssues localIssues;
- // TODO: don't convert issues to string
- NYql::IssuesFromMessage(state.GetIssues(), localIssues);
-
- switch (state.GetState()) {
- case NDqProto::COMPUTE_STATE_UNKNOWN: {
- // TODO: use issues
- TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId);
- OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message);
- break;
- }
- case NDqProto::COMPUTE_STATE_FAILURE: {
- Issues.AddIssues(localIssues);
- OnError(state.GetStatusCode(), Issues.ToIssues());
- break;
- }
- case NDqProto::COMPUTE_STATE_EXECUTING: {
- Issues.AddIssues(localIssues);
- YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId;
- if (!FinishedTasks.contains(taskId)) {
- // may get late/reordered? message
- Executing[taskId] = Now();
- }
- SendNonFatalIssues();
- break;
- }
- case NDqProto::COMPUTE_STATE_FINISHED: {
- Executing.erase(taskId);
- FinishedTasks.insert(taskId);
- YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId << ". Tasks finished: " << FinishedTasks.size() << "/" << Tasks.size();
- break;
- }
- }
-
- MaybeUpdateChannels();
- MaybeFinish();
- }
-
- void OnWakeup(TEvents::TEvWakeup::TPtr& ev) {
- switch (ev->Get()->Tag) {
- case PING_TIMER_TAG:
- if (PingPeriod) {
- auto now = Now();
- for (auto& taskActors: Executing) {
- if (now > taskActors.second + PingPeriod) {
- PingCookie++;
- YQL_CLOG(TRACE, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie;
- Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie);
- taskActors.second = now;
- }
- }
- Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG));
- }
- break;
- case AGGR_TIMER_TAG:
- if (AggrPeriod) {
- if (ServiceCounters.Counters) {
- ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0);
- }
- Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG));
- }
- break;
- }
- };
-
- ::NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) {
- Y_UNUSED(stat);
- TString prefix;
- if (NCommon::ParseCounterName(&prefix, &labels, &name, counterName)) {
- if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) {
- auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId]));
- for (const auto& [k, v] : labels) {
- group = group->GetSubgroup(k, v);
- }
- return group;
- }
- }
- return nullptr;
}
-
- static bool IsAggregatedStage(const std::map<TString, TString>& labels) {
- const auto it = labels.find("Stage");
- return it != labels.end() && it->second == "Total";
- }
-
- void ExportStats(const TCounters& stat, ui64 taskId) {
- YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary");
- TString name;
- std::map<TString, TString> labels;
- static const TString SourceLabel = "Source";
- static const TString SinkLabel = "Sink";
- for (const auto& [k, v] : stat.Get()) {
- labels.clear();
- if (auto group = GroupForExport(stat, k, taskId, name, labels)) {
- *group->GetCounter(name) = v.Count;
- if (ServiceCounters.PublicCounters && taskId == 0 && IsAggregatedStage(labels)) {
- TString publicCounterName;
- bool isDeriv = false;
- if (name == "MkqlMaxMemoryUsage") {
- publicCounterName = "query.memory_usage_bytes";
- } else if (name == "CpuTimeUs") {
- publicCounterName = "query.cpu_usage_us";
- isDeriv = true;
- } else if (name == "Bytes") {
- if (labels.count(SourceLabel)) publicCounterName = "query.input_bytes";
- else if (labels.count(SinkLabel)) publicCounterName = "query.output_bytes";
- isDeriv = true;
- } else if (name == "RowsIn") {
- if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records";
- else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks
- isDeriv = true;
- } else if (name == "MultiHop_LateThrownEventsCount") {
- publicCounterName = "query.late_events";
- isDeriv = true;
- }
-
- if (publicCounterName) {
- auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv);
- if (name == "MultiHop_LateThrownEventsCount") {
- // the only incremental sensor from TaskRunner
- counter += v.Count;
- } else {
- counter = v.Count;
- }
- }
- }
- }
- }
- for (const auto& [k, v] : stat.GetHistograms()) {
- labels.clear();
- if (auto group = GroupForExport(stat, k, taskId, name, labels)) {
- auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10));
- hist->Reset();
- for (const auto& [bound, value] : v) {
- hist->Collect(bound, value);
- }
- }
- }
- }
-
- bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) {
- NDqProto::TExtraStats extraStats;
- if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) {
- YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra";
- for (const auto& [name, m] : extraStats.GetStats()) {
- NYql::TCounters::TEntry value;
- value.Sum = m.GetSum();
- value.Max = m.GetMax();
- value.Min = m.GetMin();
- //value.Avg = m.GetAvg();
- value.Count = m.GetCnt();
- TaskStat.AddCounter(name, value);
- }
- return true;
- }
- return false;
- }
-
- void AddStats(const NDqProto::TDqComputeActorStats& x) {
- YQL_ENSURE(x.GetTasks().size() == 1);
- auto& s = x.GetTasks(0);
- ui64 taskId = s.GetTaskId();
-
-#define ADD_COUNTER(name) \
- if (stats.Get ## name()) { \
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \
- }
-
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)}
- };
-
- auto& stats = s;
- // basic stats
- ADD_COUNTER(CpuTimeUs)
- ADD_COUNTER(ComputeCpuTimeUs)
- ADD_COUNTER(PendingInputTimeUs)
- ADD_COUNTER(PendingOutputTimeUs)
- ADD_COUNTER(FinishTimeUs)
- ADD_COUNTER(InputRows)
- ADD_COUNTER(InputBytes)
- ADD_COUNTER(OutputRows)
- ADD_COUNTER(OutputBytes)
-
- // profile stats
- ADD_COUNTER(BuildCpuTimeUs)
- ADD_COUNTER(WaitTimeUs)
- ADD_COUNTER(WaitOutputTimeUs)
-
- for (const auto& ingress : s.GetIngress()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes());
- }
-
- for (const auto& egress : s.GetEgress()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Egress" + egress.GetName() + "Bytes"), egress.GetBytes());
- }
-
- if (auto v = x.GetMkqlMaxMemoryUsage()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v);
- }
-
- for (const auto& stat : s.GetMkqlStats()) {
- TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, stat.GetName()), stat.GetValue());
- }
-
- if (stats.ComputeCpuTimeByRunSize()) {
- auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs"));
- for (const auto& bucket : s.GetComputeCpuTimeByRun()) {
- hist[bucket.GetBound()] = bucket.GetValue();
- }
- }
-
- // compilation stats
-// ADD_COUNTER(MkqlTotalNodes)
-// ADD_COUNTER(MkqlCodegenFunctions)
-// ADD_COUNTER(CodeGenTotalInstructions)
-// ADD_COUNTER(CodeGenTotalFunctions)
-//
-// ADD_COUNTER(CodeGenFullTime)
-// ADD_COUNTER(CodeGenFinalizeTime)
-// ADD_COUNTER(CodeGenModulePassTime)
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
-
- for (const auto& stats : s.GetInputChannels()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"InputChannel", ToString(stats.GetChannelId())}
- };
-
- ADD_COUNTER(Chunks);
- ADD_COUNTER(Bytes);
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
- ADD_COUNTER(DeserializationTimeUs);
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
- }
-
- for (const auto& stats : s.GetOutputChannels()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"OutputChannel", ToString(stats.GetChannelId())}
- };
-
- ADD_COUNTER(Chunks)
- ADD_COUNTER(Bytes);
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
-
- ADD_COUNTER(SerializationTimeUs);
- ADD_COUNTER(BlockedByCapacity);
-
- ADD_COUNTER(SpilledBytes);
- ADD_COUNTER(SpilledRows);
- ADD_COUNTER(SpilledBlobs);
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
- }
-
- for (const auto& stats : s.GetSources()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"Source", ToString(stats.GetInputIndex())}
- };
-
- ADD_COUNTER(Chunks);
- ADD_COUNTER(Bytes);
- ADD_COUNTER(IngressBytes)
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
-
- ADD_COUNTER(ErrorsCount);
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
- }
-
- for (const auto& stats : s.GetSinks()) {
- std::map<TString, TString> labels = {
- {"Task", ToString(taskId)},
- {"Sink", ToString(stats.GetOutputIndex())}
- };
-
- ADD_COUNTER(Chunks)
- ADD_COUNTER(Bytes);
- ADD_COUNTER(EgressBytes)
- ADD_COUNTER(RowsIn);
- ADD_COUNTER(RowsOut);
- ADD_COUNTER(MaxMemoryUsage);
-
- ADD_COUNTER(ErrorsCount);
-
-// if (stats.GetFinishTs() >= stats.GetStartTs()) {
-// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
-// }
- }
-
-#undef ADD_COUNTER
- }
-
- void MaybeFinish() {
- if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) {
- Finish();
- }
- }
-
- void SetTaskCountMetric(ui64 count) {
- if (!ServiceCounters.Counters) {
- return;
- }
- *ServiceCounters.Counters->GetCounter("TaskCount") = count;
-
- if (!ServiceCounters.PublicCounters) {
- return;
- }
- *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count;
- }
-
- void OnReadyState(TEvReadyState::TPtr& ev) {
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
-
- TaskStat.AddCounters(ev->Get()->Record);
-
- const auto& tasks = ev->Get()->Record.GetTask();
- const auto& actorIds = ev->Get()->Record.GetActorId();
- Y_VERIFY(tasks.size() == actorIds.size());
-
- SetTaskCountMetric(tasks.size());
-
- for (int i = 0; i < static_cast<int>(tasks.size()); ++i) {
- auto actorId = ActorIdFromProto(actorIds[i]);
- auto& task = tasks[i];
- Tasks.emplace_back(task, actorId);
- ActorIds.emplace(task.GetId(), actorId);
- TaskIds.emplace(actorId, task.GetId());
- Yql::DqsProto::TTaskMeta taskMeta;
- task.GetMeta().UnpackTo(&taskMeta);
- Stages.emplace(task.GetId(), taskMeta.GetStageId());
- }
-
- YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId();
-
- MaybeUpdateChannels();
-
- if (PingPeriod) {
- Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG));
- }
- if (AggrPeriod) {
- Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG));
- }
- }
-
- void MaybeUpdateChannels() {
- if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) {
- return;
- }
-
- YQL_CLOG(DEBUG, ProviderDq) << "Update channels";
- for (const auto& [task, actorId] : Tasks) {
- auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>();
-
- for (const auto& input : task.GetInputs()) {
- for (const auto& channel : input.GetChannels()) {
- *ev->Record.AddUpdate() = channel;
- }
- }
-
- for (const auto& output : task.GetOutputs()) {
- for (const auto& channel : output.GetChannels()) {
- *ev->Record.AddUpdate() = channel;
- }
- }
-
- YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString();
-
- Send(actorId, ev.Release());
- }
- ChannelsUpdated = true;
- }
-
- void OnResultFailure(TEvDqFailure::TPtr& ev) {
- if (Finished) {
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
- YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender;
- } else {
- FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported
- Send(ExecuterId, ev->Release().Release());
- Finished = true;
- }
- }
-
- void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
- YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
- if (Finished) {
- YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
- YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished";
- } else {
- auto req = MakeHolder<TEvDqFailure>(statusCode, issues);
- FinalStat().FlushCounters(req->Record);
- Send(ExecuterId, req.Release());
- Finished = true;
- }
- }
-
- void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) {
- auto issueCode = NCommon::NeedFallback(statusCode)
- ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
- : TIssuesIds::DQ_GATEWAY_ERROR;
- OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}));
- }
-
- void Finish() {
- if (ServiceCounters.Counters && AggrPeriod) {
- ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated
- }
- Send(ExecuterId, new TEvGraphFinished());
- Finished = true;
- }
-
- void OnQueryResult(TEvQueryResponse::TPtr& ev) {
- YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
- FinalStat().FlushCounters(ev->Get()->Record);
- if (!Issues.Empty()) {
- IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues());
- }
- Send(ResultId, ev->Release().Release());
- }
-
- TCounters FinalStat() {
- return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat;
- }
-
-
- bool ChannelsUpdated = false;
- TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks;
- THashSet<ui64> FinishedTasks;
- THashMap<ui64, TInstant> Executing;
- THashMap<ui64, TActorId> ActorIds;
- THashMap<TActorId, ui64> TaskIds;
- THashMap<ui64, ui64> Stages;
- const NActors::TActorId ExecuterId;
- const NActors::TActorId ResultId;
- const TString TraceId;
- TDqConfiguration::TPtr Settings;
- bool Finished = false;
- TCounters TaskStat;
- NYql::NCommon::TServiceCounters ServiceCounters;
- TDuration PingPeriod = TDuration::Zero();
- TDuration AggrPeriod = TDuration::Zero();
- NYql::NDq::GroupedIssues Issues;
- ui64 PingCookie = 0;
};
} /* namespace */
@@ -621,5 +43,4 @@ THolder<NActors::IActor> MakeTaskController(
return MakeHolder<NDq::TLogWrapReceive>(new TTaskController(traceId, executerId, resultId, settings, serviceCounters, pingPeriod, aggrPeriod), traceId);
}
-
} /* namespace NYql */
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
new file mode 100644
index 00000000000..3d59b9c26d8
--- /dev/null
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -0,0 +1,638 @@
+#include "task_controller.h"
+#include "execution_helpers.h"
+#include "events.h"
+#include "proto_builder.h"
+#include "actor_helpers.h"
+#include "executer_actor.h"
+#include "grouped_issues.h"
+
+#include <ydb/library/yql/providers/dq/counters/counters.h>
+
+#include <ydb/library/yql/providers/dq/common/yql_dq_common.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <ydb/library/yql/utils/actor_log/log.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <ydb/public/lib/yson_value/ydb_yson_value.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/protobuf/util/pb_io.h>
+
+#include <util/generic/size_literals.h>
+#include <util/generic/ptr.h>
+#include <util/string/split.h>
+#include <util/system/types.h>
+
+namespace NYql {
+
+using namespace NActors;
+using namespace NDqs;
+
+template<typename TDerived>
+class TTaskControllerImpl: public NActors::TActor<TDerived> {
+public:
+ using NActors::TActor<TDerived>::PassAway;
+ using NActors::TActor<TDerived>::Schedule;
+ using NActors::TActor<TDerived>::SelfId;
+ using NActors::TActor<TDerived>::Send;
+
+ static constexpr ui64 PING_TIMER_TAG = 1;
+ static constexpr ui64 AGGR_TIMER_TAG = 2;
+
+ static constexpr char ActorName[] = "YQL_DQ_TASK_CONTROLLER";
+
+ explicit TTaskControllerImpl(
+ const TString& traceId,
+ const NActors::TActorId& executerId,
+ const NActors::TActorId& resultId,
+ const TDqConfiguration::TPtr& settings,
+ const NYql::NCommon::TServiceCounters& serviceCounters,
+ const TDuration& pingPeriod,
+ const TDuration& aggrPeriod,
+ void (TDerived::*func)(TAutoPtr<NActors::IEventHandle>&, const NActors::TActorContext&)
+ )
+ : NActors::TActor<TDerived>(func)
+ , ExecuterId(executerId)
+ , ResultId(resultId)
+ , TraceId(traceId)
+ , Settings(settings)
+ , ServiceCounters(serviceCounters, "task_controller")
+ , PingPeriod(pingPeriod)
+ , AggrPeriod(aggrPeriod)
+ , Issues(CreateDefaultTimeProvider())
+ {
+ if (Settings) {
+ if (Settings->_AllResultsBytesLimit.Get()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get();
+ }
+ if (Settings->_RowsLimitPerWrite.Get()) {
+ YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get();
+ }
+ }
+ }
+
+ ~TTaskControllerImpl() override {
+ SetTaskCountMetric(0);
+ }
+
+public:
+ STRICT_STFUNC(Handler, {
+ hFunc(TEvReadyState, OnReadyState);
+ hFunc(TEvQueryResponse, OnQueryResult);
+ hFunc(TEvDqFailure, OnResultFailure);
+ hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState);
+ hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution);
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ hFunc(TEvents::TEvUndelivered, OnUndelivered);
+ hFunc(TEvents::TEvWakeup, OnWakeup);
+ })
+
+ void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) {
+ auto it = TaskIds.find(ev->Sender);
+ if (it != TaskIds.end() && FinishedTasks.contains(it->second)) {
+ // ignore undelivered from finished CAs
+ return;
+ }
+
+ TStringBuilder message;
+ message << "Undelivered Event " << ev->Get()->SourceType
+ << " from " << SelfId() << " (Self) to " << ev->Sender
+ << " Reason: " << ev->Get()->Reason << " Cookie: " << ev->Cookie;
+ OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message);
+ }
+
+ void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ auto statusCode = ev->Get()->Record.GetStatusCode();
+ TIssues issues = ev->Get()->GetIssues();
+ YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString();
+ OnError(statusCode, issues);
+ }
+
+ void OnInternalError(const TString& message, const TIssues& subIssues = {}) {
+ OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, subIssues);
+ }
+
+private:
+ void SendNonFatalIssues() {
+ auto req = MakeHolder<TEvDqStats>(Issues.ToIssues());
+ Send(ExecuterId, req.Release());
+ }
+
+public:
+ void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
+ TActorId computeActor = ev->Sender;
+ auto& state = ev->Get()->Record;
+ ui64 taskId = state.GetTaskId();
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(TRACE, ProviderDq)
+ << SelfId()
+ << " EvState TaskId: " << taskId
+ << " State: " << state.GetState()
+ << " PingCookie: " << ev->Cookie
+ << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode());
+
+ if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) {
+ if (ServiceCounters.Counters && !AggrPeriod) {
+ ExportStats(TaskStat, taskId);
+ }
+ } else if (state.HasStats() && state.GetStats().GetTasks().size()) {
+ YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId;
+ AddStats(state.GetStats());
+ if (ServiceCounters.Counters && !AggrPeriod) {
+ ExportStats(TaskStat, taskId);
+ }
+ }
+
+
+ TIssues localIssues;
+ // TODO: don't convert issues to string
+ NYql::IssuesFromMessage(state.GetIssues(), localIssues);
+
+ switch (state.GetState()) {
+ case NDqProto::COMPUTE_STATE_UNKNOWN: {
+ // TODO: use issues
+ TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId);
+ OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message);
+ break;
+ }
+ case NDqProto::COMPUTE_STATE_FAILURE: {
+ Issues.AddIssues(localIssues);
+ OnError(state.GetStatusCode(), Issues.ToIssues());
+ break;
+ }
+ case NDqProto::COMPUTE_STATE_EXECUTING: {
+ Issues.AddIssues(localIssues);
+ YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId;
+ if (!FinishedTasks.contains(taskId)) {
+ // may get late/reordered? message
+ Executing[taskId] = Now();
+ }
+ SendNonFatalIssues();
+ break;
+ }
+ case NDqProto::COMPUTE_STATE_FINISHED: {
+ Executing.erase(taskId);
+ FinishedTasks.insert(taskId);
+ YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId << ". Tasks finished: " << FinishedTasks.size() << "/" << Tasks.size();
+ break;
+ }
+ }
+
+ MaybeUpdateChannels();
+ MaybeFinish();
+ }
+
+ void OnWakeup(TEvents::TEvWakeup::TPtr& ev) {
+ switch (ev->Get()->Tag) {
+ case PING_TIMER_TAG:
+ if (PingPeriod) {
+ auto now = Now();
+ for (auto& taskActors: Executing) {
+ if (now > taskActors.second + PingPeriod) {
+ PingCookie++;
+ YQL_CLOG(TRACE, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie;
+ Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie);
+ taskActors.second = now;
+ }
+ }
+ Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG));
+ }
+ break;
+ case AGGR_TIMER_TAG:
+ if (AggrPeriod) {
+ if (ServiceCounters.Counters) {
+ ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0);
+ }
+ Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG));
+ }
+ break;
+ }
+ };
+
+ ::NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) {
+ Y_UNUSED(stat);
+ TString prefix;
+ if (NCommon::ParseCounterName(&prefix, &labels, &name, counterName)) {
+ if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) {
+ auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId]));
+ for (const auto& [k, v] : labels) {
+ group = group->GetSubgroup(k, v);
+ }
+ return group;
+ }
+ }
+ return nullptr;
+ }
+
+private:
+ static bool IsAggregatedStage(const std::map<TString, TString>& labels) {
+ const auto it = labels.find("Stage");
+ return it != labels.end() && it->second == "Total";
+ }
+
+ void ExportStats(const TCounters& stat, ui64 taskId) {
+ YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary");
+ TString name;
+ std::map<TString, TString> labels;
+ static const TString SourceLabel = "Source";
+ static const TString SinkLabel = "Sink";
+ for (const auto& [k, v] : stat.Get()) {
+ labels.clear();
+ if (auto group = GroupForExport(stat, k, taskId, name, labels)) {
+ *group->GetCounter(name) = v.Count;
+ if (ServiceCounters.PublicCounters && taskId == 0 && IsAggregatedStage(labels)) {
+ TString publicCounterName;
+ bool isDeriv = false;
+ if (name == "MkqlMaxMemoryUsage") {
+ publicCounterName = "query.memory_usage_bytes";
+ } else if (name == "CpuTimeUs") {
+ publicCounterName = "query.cpu_usage_us";
+ isDeriv = true;
+ } else if (name == "Bytes") {
+ if (labels.count(SourceLabel)) publicCounterName = "query.input_bytes";
+ else if (labels.count(SinkLabel)) publicCounterName = "query.output_bytes";
+ isDeriv = true;
+ } else if (name == "RowsIn") {
+ if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records";
+ else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks
+ isDeriv = true;
+ } else if (name == "MultiHop_LateThrownEventsCount") {
+ publicCounterName = "query.late_events";
+ isDeriv = true;
+ }
+
+ if (publicCounterName) {
+ auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv);
+ if (name == "MultiHop_LateThrownEventsCount") {
+ // the only incremental sensor from TaskRunner
+ counter += v.Count;
+ } else {
+ counter = v.Count;
+ }
+ }
+ }
+ }
+ }
+ for (const auto& [k, v] : stat.GetHistograms()) {
+ labels.clear();
+ if (auto group = GroupForExport(stat, k, taskId, name, labels)) {
+ auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10));
+ hist->Reset();
+ for (const auto& [bound, value] : v) {
+ hist->Collect(bound, value);
+ }
+ }
+ }
+ }
+
+ bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) {
+ NDqProto::TExtraStats extraStats;
+ if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) {
+ YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra";
+ for (const auto& [name, m] : extraStats.GetStats()) {
+ NYql::TCounters::TEntry value;
+ value.Sum = m.GetSum();
+ value.Max = m.GetMax();
+ value.Min = m.GetMin();
+ //value.Avg = m.GetAvg();
+ value.Count = m.GetCnt();
+ TaskStat.AddCounter(name, value);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ void AddStats(const NDqProto::TDqComputeActorStats& x) {
+ YQL_ENSURE(x.GetTasks().size() == 1);
+ auto& s = x.GetTasks(0);
+ ui64 taskId = s.GetTaskId();
+
+#define ADD_COUNTER(name) \
+ if (stats.Get ## name()) { \
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \
+ }
+
+ std::map<TString, TString> labels = {
+ {"Task", ToString(taskId)}
+ };
+
+ auto& stats = s;
+ // basic stats
+ ADD_COUNTER(CpuTimeUs)
+ ADD_COUNTER(ComputeCpuTimeUs)
+ ADD_COUNTER(PendingInputTimeUs)
+ ADD_COUNTER(PendingOutputTimeUs)
+ ADD_COUNTER(FinishTimeUs)
+ ADD_COUNTER(InputRows)
+ ADD_COUNTER(InputBytes)
+ ADD_COUNTER(OutputRows)
+ ADD_COUNTER(OutputBytes)
+
+ // profile stats
+ ADD_COUNTER(BuildCpuTimeUs)
+ ADD_COUNTER(WaitTimeUs)
+ ADD_COUNTER(WaitOutputTimeUs)
+
+ for (const auto& ingress : s.GetIngress()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes());
+ }
+
+ for (const auto& egress : s.GetEgress()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Egress" + egress.GetName() + "Bytes"), egress.GetBytes());
+ }
+
+ if (auto v = x.GetMkqlMaxMemoryUsage()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v);
+ }
+
+ for (const auto& stat : s.GetMkqlStats()) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, stat.GetName()), stat.GetValue());
+ }
+
+ if (stats.ComputeCpuTimeByRunSize()) {
+ auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs"));
+ for (const auto& bucket : s.GetComputeCpuTimeByRun()) {
+ hist[bucket.GetBound()] = bucket.GetValue();
+ }
+ }
+
+ // compilation stats
+// ADD_COUNTER(MkqlTotalNodes)
+// ADD_COUNTER(MkqlCodegenFunctions)
+// ADD_COUNTER(CodeGenTotalInstructions)
+// ADD_COUNTER(CodeGenTotalFunctions)
+//
+// ADD_COUNTER(CodeGenFullTime)
+// ADD_COUNTER(CodeGenFinalizeTime)
+// ADD_COUNTER(CodeGenModulePassTime)
+
+// if (stats.GetFinishTs() >= stats.GetStartTs()) {
+// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
+// }
+
+ for (const auto& stats : s.GetInputChannels()) {
+ std::map<TString, TString> labels = {
+ {"Task", ToString(taskId)},
+ {"InputChannel", ToString(stats.GetChannelId())}
+ };
+
+ ADD_COUNTER(Chunks);
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+ ADD_COUNTER(DeserializationTimeUs);
+
+// if (stats.GetFinishTs() >= stats.GetStartTs()) {
+// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
+// }
+ }
+
+ for (const auto& stats : s.GetOutputChannels()) {
+ std::map<TString, TString> labels = {
+ {"Task", ToString(taskId)},
+ {"OutputChannel", ToString(stats.GetChannelId())}
+ };
+
+ ADD_COUNTER(Chunks)
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+
+ ADD_COUNTER(SerializationTimeUs);
+ ADD_COUNTER(BlockedByCapacity);
+
+ ADD_COUNTER(SpilledBytes);
+ ADD_COUNTER(SpilledRows);
+ ADD_COUNTER(SpilledBlobs);
+
+// if (stats.GetFinishTs() >= stats.GetStartTs()) {
+// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
+// }
+ }
+
+ for (const auto& stats : s.GetSources()) {
+ std::map<TString, TString> labels = {
+ {"Task", ToString(taskId)},
+ {"Source", ToString(stats.GetInputIndex())}
+ };
+
+ ADD_COUNTER(Chunks);
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(IngressBytes)
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+
+ ADD_COUNTER(ErrorsCount);
+
+// if (stats.GetFinishTs() >= stats.GetStartTs()) {
+// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
+// }
+ }
+
+ for (const auto& stats : s.GetSinks()) {
+ std::map<TString, TString> labels = {
+ {"Task", ToString(taskId)},
+ {"Sink", ToString(stats.GetOutputIndex())}
+ };
+
+ ADD_COUNTER(Chunks)
+ ADD_COUNTER(Bytes);
+ ADD_COUNTER(EgressBytes)
+ ADD_COUNTER(RowsIn);
+ ADD_COUNTER(RowsOut);
+ ADD_COUNTER(MaxMemoryUsage);
+
+ ADD_COUNTER(ErrorsCount);
+
+// if (stats.GetFinishTs() >= stats.GetStartTs()) {
+// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs());
+// }
+ }
+
+#undef ADD_COUNTER
+ }
+
+ void MaybeFinish() {
+ if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) {
+ Finish();
+ }
+ }
+
+ void SetTaskCountMetric(ui64 count) {
+ if (!ServiceCounters.Counters) {
+ return;
+ }
+ *ServiceCounters.Counters->GetCounter("TaskCount") = count;
+
+ if (!ServiceCounters.PublicCounters) {
+ return;
+ }
+ *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count;
+ }
+
+public:
+ void OnReadyState(TEvReadyState::TPtr& ev) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+
+ TaskStat.AddCounters(ev->Get()->Record);
+
+ const auto& tasks = ev->Get()->Record.GetTask();
+ const auto& actorIds = ev->Get()->Record.GetActorId();
+ Y_VERIFY(tasks.size() == actorIds.size());
+
+ SetTaskCountMetric(tasks.size());
+
+ for (int i = 0; i < static_cast<int>(tasks.size()); ++i) {
+ auto actorId = ActorIdFromProto(actorIds[i]);
+ auto& task = tasks[i];
+ Tasks.emplace_back(task, actorId);
+ ActorIds.emplace(task.GetId(), actorId);
+ TaskIds.emplace(actorId, task.GetId());
+ Yql::DqsProto::TTaskMeta taskMeta;
+ task.GetMeta().UnpackTo(&taskMeta);
+ Stages.emplace(task.GetId(), taskMeta.GetStageId());
+ }
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId();
+
+ MaybeUpdateChannels();
+
+ if (PingPeriod) {
+ Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG));
+ }
+ if (AggrPeriod) {
+ Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG));
+ }
+ }
+
+private:
+ void MaybeUpdateChannels() {
+ if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) {
+ return;
+ }
+
+ YQL_CLOG(DEBUG, ProviderDq) << "Update channels";
+ for (const auto& [task, actorId] : Tasks) {
+ auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>();
+
+ for (const auto& input : task.GetInputs()) {
+ for (const auto& channel : input.GetChannels()) {
+ *ev->Record.AddUpdate() = channel;
+ }
+ }
+
+ for (const auto& output : task.GetOutputs()) {
+ for (const auto& channel : output.GetChannels()) {
+ *ev->Record.AddUpdate() = channel;
+ }
+ }
+
+ YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString();
+
+ Send(actorId, ev.Release());
+ }
+ ChannelsUpdated = true;
+ }
+
+public:
+ void OnResultFailure(TEvDqFailure::TPtr& ev) {
+ if (Finished) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender;
+ } else {
+ FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported
+ Send(ExecuterId, ev->Release().Release());
+ Finished = true;
+ }
+ }
+
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, const TIssues& subIssues) {
+ TIssue issue(message);
+ for (const TIssue& i : subIssues) {
+ issue.AddSubIssue(MakeIntrusive<TIssue>(i));
+ }
+ TIssues issues;
+ issues.AddIssue(std::move(issue));
+ OnError(statusCode, issues);
+ }
+
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
+ if (Finished) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId);
+ YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished";
+ } else {
+ auto req = MakeHolder<TEvDqFailure>(statusCode, issues);
+ FinalStat().FlushCounters(req->Record);
+ Send(ExecuterId, req.Release());
+ Finished = true;
+ }
+ }
+
+ void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) {
+ auto issueCode = NCommon::NeedFallback(statusCode)
+ ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
+ : TIssuesIds::DQ_GATEWAY_ERROR;
+ OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}));
+ }
+
+private:
+ void Finish() {
+ if (ServiceCounters.Counters && AggrPeriod) {
+ ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated
+ }
+ Send(ExecuterId, new TEvGraphFinished());
+ Finished = true;
+ }
+
+public:
+ void OnQueryResult(TEvQueryResponse::TPtr& ev) {
+ YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty());
+ FinalStat().FlushCounters(ev->Get()->Record);
+ if (!Issues.Empty()) {
+ IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues());
+ }
+ Send(ResultId, ev->Release().Release());
+ }
+
+private:
+ TCounters FinalStat() {
+ return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat;
+ }
+
+
+ bool ChannelsUpdated = false;
+ TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks;
+ THashSet<ui64> FinishedTasks;
+ THashMap<ui64, TInstant> Executing;
+ THashMap<ui64, TActorId> ActorIds;
+ THashMap<TActorId, ui64> TaskIds;
+ THashMap<ui64, ui64> Stages;
+ const NActors::TActorId ExecuterId;
+ const NActors::TActorId ResultId;
+ const TString TraceId;
+ TDqConfiguration::TPtr Settings;
+ bool Finished = false;
+ TCounters TaskStat;
+ NYql::NCommon::TServiceCounters ServiceCounters;
+ TDuration PingPeriod = TDuration::Zero();
+ TDuration AggrPeriod = TDuration::Zero();
+ NYql::NDq::GroupedIssues Issues;
+ ui64 PingCookie = 0;
+};
+
+} /* namespace NYql */
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index e657c03db54..783b1b5b624 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -74,7 +74,7 @@ message TFreeWorkersNotify {
bool IsForwarded = 2;
string TraceId = 6;
- // repeated TWorker.TGuid FailedWorker = 7; // reserved
+ reserved 7;
repeated string FailedWorkerGuid = 8;
}
@@ -196,7 +196,7 @@ message TGraphRequest {
Yql.DqsProto.ExecuteGraphRequest Request = 1;
NActorsProto.TActorId ControlId = 2;
NActorsProto.TActorId ResultId = 3;
- NActorsProto.TActorId CheckPointCoordinatorId = 4; // may be empty
+ reserved 4;
}
message TDqTaskRequest {