diff options
author | hor911 <hor911@ydb.tech> | 2023-01-27 23:57:01 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-01-27 23:57:01 +0300 |
commit | cf0fcdf102b5d6470d2b5eb79a57578008a02c5b (patch) | |
tree | e5402731e836f6774f3a9aa42a1e0e2363f5d401 | |
parent | 3f35860e4dd84c5e846c2cb89b48620795f16724 (diff) | |
download | ydb-cf0fcdf102b5d6470d2b5eb79a57578008a02c5b.tar.gz |
Checkpoint Coordinator as Task Controller subclass
10 files changed, 834 insertions, 679 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 0a73bc3aac3..daa370d952d 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -269,6 +269,9 @@ struct TEvaluationGraphInfo { class TRunActor : public NActors::TActorBootstrapped<TRunActor> { public: + using NActors::TActorBootstrapped<TRunActor>::Register; + using NActors::TActorBootstrapped<TRunActor>::Send; + explicit TRunActor( const TActorId& fetcherId , const ::NYql::NCommon::TServiceCounters& queryCounters @@ -394,7 +397,6 @@ private: // Clear finished actors ids ExecuterId = {}; - CheckpointCoordinatorId = {}; ControlId = {}; } } @@ -726,7 +728,7 @@ private: TopicsForConsumersCreation.size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); ProcessQuery(); } else if (ev->Cookie == SetLoadFromCheckpointModeCookie) { - Send(CheckpointCoordinatorId, new TEvCheckpointCoordinator::TEvRunGraph()); + Send(ControlId, new TEvCheckpointCoordinator::TEvRunGraph()); } } @@ -801,7 +803,7 @@ private: void Handle(TEvCheckpointCoordinator::TEvZeroCheckpointDone::TPtr&) { LOG_D("Coordinator saved zero checkpoint"); - Y_VERIFY(CheckpointCoordinatorId); + Y_VERIFY(ControlId); SetLoadFromCheckpointMode(); } @@ -1250,7 +1252,7 @@ private: TEvaluationGraphInfo info; - info.ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); + info.ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); if (dqGraphParams.GetResultType()) { TVector<TString> columns; @@ -1260,7 +1262,7 @@ private: NActors::TActorId empty = {}; THashMap<TString, TString> emptySecureParams; // NOT USED in RR - info.ResultId = NActors::TActivationContext::Register( + info.ResultId = Register( MakeResultReceiver( columns, info.ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams, dqGraphParams.GetResultType(), empty, false).Release()); @@ -1270,7 +1272,7 @@ private: info.ResultId = info.ExecuterId; } - info.ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); + info.ControlId = Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); Yql::DqsProto::ExecuteGraphRequest request; request.SetSourceId(dqGraphParams.GetSourceId()); @@ -1280,7 +1282,7 @@ private: *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); *request.MutableColumns() = dqGraphParams.GetColumns(); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); - NActors::TActivationContext::Send(new IEventHandle(info.ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId, TActorId{}))); + Send(info.ExecuterId, new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId)); LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId); return info; } @@ -1292,7 +1294,7 @@ private: dqConfiguration->FreezeDefaults(); dqConfiguration->FallbackPolicy = "never"; - ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); + ExecuterId = Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); NActors::TActorId resultId; if (dqGraphParams.GetResultType()) { @@ -1307,7 +1309,7 @@ private: for (const auto& column : dqGraphParams.GetColumns()) { columns.emplace_back(column); } - resultId = NActors::TActivationContext::Register( + resultId = Register( CreateResultWriter( ExecuterId, dqGraphParams.GetResultType(), writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ResultBytesLimit)); @@ -1316,18 +1318,34 @@ private: resultId = ExecuterId; } - ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); if (EnableCheckpointCoordinator) { - CheckpointCoordinatorId = NActors::TActivationContext::Register(MakeCheckpointCoordinator( + ControlId = Register(MakeCheckpointCoordinator( ::NYq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision), - ControlId, NYql::NDq::MakeCheckpointStorageID(), SelfId(), Params.CheckpointCoordinatorConfig, QueryCounters.Counters, dqGraphParams, Params.StateLoadMode, - Params.StreamingDisposition).Release()); + Params.StreamingDisposition, + // vvv TaskController temporary params vvv + SessionId, + ExecuterId, + resultId, + dqConfiguration, + QueryCounters, + TDuration::Seconds(3), + TDuration::Seconds(1) + ).Release()); + } else { + ControlId = Register(NYql::MakeTaskController( + SessionId, + ExecuterId, + resultId, + dqConfiguration, + QueryCounters, + TDuration::Seconds(3) + ).Release()); } Yql::DqsProto::ExecuteGraphRequest request; @@ -1341,8 +1359,8 @@ private: commonTaskParams["fq.job_id"] = Params.JobId; commonTaskParams["fq.restart_count"] = ToString(Params.RestartCount); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); - NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId))); - LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId << ", CheckPointCoordinatior " << CheckpointCoordinatorId); + Send(ExecuterId, new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId)); + LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId); } void SetupDqSettings(NYql::TDqGatewayConfig& dqGatewaysConfig) const { @@ -1746,7 +1764,6 @@ private: html << "<td>" << DqGraphIndex << " of " << DqGraphParams.size() << "</td>"; html << "<td> Executer" << ExecuterId << "</td>"; html << "<td> Control" << ControlId << "</td>"; - html << "<td> Coordinator" << CheckpointCoordinatorId << "</td>"; } html << "</tr>"; html << "</tbody></table>"; @@ -1908,7 +1925,6 @@ private: ui32 DqEvalIndex = 0; NActors::TActorId ExecuterId; NActors::TActorId ControlId; - NActors::TActorId CheckpointCoordinatorId; TString SessionId; ::NYql::NCommon::TServiceCounters QueryCounters; const ::NMonitoring::TDynamicCounters::TCounterPtr QueryUptime; diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index f1283ec9471..7780443d26f 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -28,16 +28,31 @@ namespace NYq { TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, - const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, - const YandexQuery::StreamingDisposition& streamingDisposition) - : CoordinatorId(std::move(coordinatorId)) - , TaskControllerId(taskControllerId) + const YandexQuery::StreamingDisposition& streamingDisposition, + // vvv TaskController temporary params vvv + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const NYql::TDqConfiguration::TPtr& tcSettings, + const NYql::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod) + : NYql::TTaskControllerImpl<TCheckpointCoordinator>( + traceId, + executerId, + resultId, + tcSettings, + serviceCounters, + pingPeriod, + aggrPeriod, + &TCheckpointCoordinator::DispatchEvent) + , CoordinatorId(std::move(coordinatorId)) , StorageProxy(storageProxy) , RunActorId(runActorId) , Settings(settings) @@ -49,12 +64,11 @@ TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, { } -void TCheckpointCoordinator::Bootstrap() { - Become(&TThis::DispatchEvent); - CC_LOG_D("Bootstrapped with streaming disposition " << StreamingDisposition << " and state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode)); -} +void TCheckpointCoordinator::Handle(NYql::NDqs::TEvReadyState::TPtr& ev) { + NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnReadyState(ev); + + CC_LOG_D("TEvReadyState, streaming disposition " << StreamingDisposition << ", state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode)); -void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { const auto& tasks = ev->Get()->Record.GetTask(); const auto& actorIds = ev->Get()->Record.GetActorId(); Y_VERIFY(tasks.size() == actorIds.size()); @@ -63,7 +77,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { auto& task = tasks[i]; auto& actorId = TaskIdToActor[task.GetId()]; if (actorId) { - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Duplicate task id: " << task.GetId())); + OnInternalError(TStringBuilder() << "Duplicate task id: " << task.GetId()); return; } actorId = ActorIdFromProto(actorIds[i]); @@ -113,7 +127,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord if (issues) { CC_LOG_E("Can't register in storage: " + issues.ToOneLineString()); ++*Metrics.StorageError; - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't register in storage", issues)); + OnInternalError("Can't register in storage", issues); return; } @@ -141,7 +155,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord InitCheckpoint(); ScheduleNextCheckpoint(); } else { - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition)); + OnInternalError(TStringBuilder() << "Unexpected state load mode (" << YandexQuery::StateLoadMode_Name(StateLoadMode) << ") and streaming disposition " << StreamingDisposition); } } @@ -169,7 +183,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpoint if (event->Issues) { ++*Metrics.StorageError; CC_LOG_E("Can't get checkpoints to restore: " + event->Issues.ToOneLineString()); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't get checkpoints to restore", event->Issues)); + OnInternalError("Can't get checkpoints to restore", event->Issues); return; } @@ -212,7 +226,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe ++*Metrics.StorageError; const TString message = TStringBuilder() << "Can't get graph params from checkpoint " << checkpoint.CheckpointId; CC_LOG_I(message); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); + OnInternalError(message); return; } @@ -230,7 +244,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe } if (!result) { - Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::BAD_REQUEST, issues)); + NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, "Can't restore from plan given", issues); return; } else { // Report as transient issues Send(RunActorId, new TEvents::TEvRaiseTransientIssues(std::move(issues))); @@ -243,7 +257,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe if (actorIdIt == TaskIdToActor.end()) { const TString msg = TStringBuilder() << "ActorId for task id " << taskId << " was not found"; CC_LOG_E(msg); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(msg)); + OnInternalError(msg); return; } const auto transportIt = ActorsToWaitFor.find(actorIdIt->second); @@ -274,19 +288,19 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro if (!PendingRestoreCheckpoint) { CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint")); + OnInternalError("Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint"); return; } if (PendingRestoreCheckpoint->CheckpointId != checkpoint) { CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult event with unexpected checkpoint: " << checkpoint << ", expected: " << PendingRestoreCheckpoint->CheckpointId); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Got unexpected checkpoint")); + OnInternalError("Got unexpected checkpoint"); return; } if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) { CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Aborted("Can't restore: " + statusName)); + NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, "Can't restore: " + statusName, {}); return; } @@ -563,14 +577,6 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) { PassAway(); } -void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { - TStringStream message; - message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType; - CC_LOG_D(message.Str()); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str())); - PassAway(); -} - void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) { InitingZeroCheckpoint = false; // TODO: run graph only now, not before zero checkpoint inited @@ -580,17 +586,52 @@ void TCheckpointCoordinator::PassAway() { for (const auto& [actorId, transport] : AllActors) { transport->EventsQueue.Unsubscribe(); } - TActorBootstrapped<TCheckpointCoordinator>::PassAway(); + NYql::TTaskControllerImpl<TCheckpointCoordinator>::PassAway(); } void TCheckpointCoordinator::HandleException(const std::exception& err) { NYql::TIssues issues; issues.AddIssue(err.what()); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Internal error in checkpoint coordinator", issues)); + OnInternalError("Internal error in checkpoint coordinator", issues); } -THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) { - return MakeHolder<TCheckpointCoordinator>(coordinatorId, taskControllerId, storageProxy, runActorId, settings, counters, graphParams, stateLoadMode, streamingDisposition); +THolder<NActors::IActor> MakeCheckpointCoordinator( + TCoordinatorId coordinatorId, + const TActorId& storageProxy, + const TActorId& runActorId, + const TCheckpointCoordinatorConfig& settings, + const ::NMonitoring::TDynamicCounterPtr& counters, + const NProto::TGraphParams& graphParams, + const YandexQuery::StateLoadMode& stateLoadMode /* = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT */, + const YandexQuery::StreamingDisposition& streamingDisposition /* = {} */, + // vvv TaskController temporary params vvv + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const NYql::TDqConfiguration::TPtr& tcSettings, + const NYql::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod + ) +{ + return MakeHolder<TCheckpointCoordinator>( + coordinatorId, + storageProxy, + runActorId, + settings, + counters, + graphParams, + stateLoadMode, + streamingDisposition, + // vvv TaskController temporary params vvv + traceId, + executerId, + resultId, + tcSettings, + serviceCounters, + pingPeriod, + aggrPeriod + ); } } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h index 38ab67f90e8..7a626bcc7c5 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h @@ -13,6 +13,7 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/library/yql/dq/actors/compute/retry_queue.h> #include <ydb/library/yql/providers/dq/actors/events.h> +#include <ydb/library/yql/providers/dq/actors/task_controller_impl.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -21,19 +22,29 @@ namespace NYq { using namespace NActors; using namespace NYq::NConfig; -class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> { +class TCheckpointCoordinator : public NYql::TTaskControllerImpl<TCheckpointCoordinator> { public: TCheckpointCoordinator(TCoordinatorId coordinatorId, - const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, - const YandexQuery::StreamingDisposition& streamingDisposition); - - void Handle(const NYql::NDqs::TEvReadyState::TPtr&); + const YandexQuery::StreamingDisposition& streamingDisposition, + // vvv TaskController temporary params vvv + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const NYql::TDqConfiguration::TPtr& tcSettings, + const NYql::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod + ); + + using NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnInternalError; + + void Handle(NYql::NDqs::TEvReadyState::TPtr&); void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&); void Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr&); void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&); @@ -47,7 +58,6 @@ public: void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev); void Handle(NActors::TEvents::TEvPoison::TPtr&); - void Handle(NActors::TEvents::TEvUndelivered::TPtr&); void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); void Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&); @@ -56,28 +66,38 @@ public: STRICT_STFUNC_EXC(DispatchEvent, hFunc(NYql::NDqs::TEvReadyState, Handle) + hFunc(NYql::NDqs::TEvQueryResponse, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnQueryResult) + hFunc(NYql::NDqs::TEvDqFailure, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnResultFailure) + + hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle) + hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle) + hFunc(TEvCheckpointStorage::TEvRegisterCoordinatorResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle) hFunc(TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle) - hFunc(TEvCheckpointCoordinator::TEvScheduleCheckpointing, Handle) hFunc(TEvCheckpointStorage::TEvCreateCheckpointResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle) hFunc(TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse, Handle) - hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle) hFunc(TEvCheckpointStorage::TEvCompleteCheckpointResponse, Handle) hFunc(TEvCheckpointStorage::TEvAbortCheckpointResponse, Handle) + + hFunc(NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvRestoreFromCheckpointResult, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvSaveTaskStateResult, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvStateCommitted, Handle) + hFunc(NYql::NDq::TEvDqCompute::TEvState, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnComputeActorState) + hFunc(NYql::NDq::TEvDq::TEvAbortExecution, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnAbortExecution) + hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle) + hFunc(NActors::TEvents::TEvPoison, Handle) - hFunc(NActors::TEvents::TEvUndelivered, Handle) + hFunc(NActors::TEvents::TEvUndelivered, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnUndelivered) + hFunc(NActors::TEvents::TEvWakeup, NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnWakeup) + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle) - hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle) - hFunc(TEvCheckpointCoordinator::TEvRunGraph, Handle), + hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle), + ExceptionFunc(std::exception, HandleException) ) - void Bootstrap(); - static constexpr char ActorName[] = "YQ_CHECKPOINT_COORDINATOR"; private: @@ -146,7 +166,6 @@ private: }; const TCoordinatorId CoordinatorId; - const TActorId TaskControllerId; const TActorId StorageProxy; const TActorId RunActorId; std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator; @@ -177,6 +196,23 @@ private: YandexQuery::StreamingDisposition StreamingDisposition; }; -THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& executerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const ::NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, const YandexQuery::StreamingDisposition& streamingDisposition = {}); +THolder<NActors::IActor> MakeCheckpointCoordinator( + TCoordinatorId coordinatorId, + const TActorId& storageProxy, + const TActorId& runActorId, + const TCheckpointCoordinatorConfig& settings, + const ::NMonitoring::TDynamicCounterPtr& counters, + const NProto::TGraphParams& graphParams, + const YandexQuery::StateLoadMode& stateLoadMode /* = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT */, + const YandexQuery::StreamingDisposition& streamingDisposition /* = {} */, + // vvv TaskController temporary params vvv + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const NYql::TDqConfiguration::TPtr& tcSettings, + const NYql::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod + ); } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index 30affc40524..96e2d697a82 100644 --- a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -89,21 +89,34 @@ struct TTestBootstrap : public TTestActorRuntime { Settings.SetCheckpointingPeriodMillis(TDuration::Hours(1).MilliSeconds()); Settings.SetMaxInflight(1); + NYql::TDqConfiguration::TPtr DqSettings = MakeIntrusive<NYql::TDqConfiguration>(); + SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG); - CheckpointCoordinator = Register(MakeCheckpointCoordinator(TCoordinatorId("my-graph-id", 42), {}, StorageProxy, RunActor, Settings, Counters, NProto::TGraphParams()).Release()); - WaitForBootstrap(); + CheckpointCoordinator = Register(MakeCheckpointCoordinator( + TCoordinatorId("my-graph-id", 42), + StorageProxy, + RunActor, + Settings, + Counters, + NProto::TGraphParams(), + YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, + {}, + // + "my-graph-id", + {} /* ExecuterId */, + RunActor, + DqSettings, + ::NYql::NCommon::TServiceCounters(Counters, nullptr, ""), + TDuration::Seconds(3), + TDuration::Seconds(1) + ).Release()); Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState)))); EnableScheduleForActor(CheckpointCoordinator); } - - void WaitForBootstrap() { - NActors::TDispatchOptions options; - options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1); - DispatchEvents(options); - } }; + } // namespace namespace NYq { diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 3b9ee612411..85863d52604 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -34,14 +34,11 @@ namespace NYql::NDqs { Record = std::move(queryResult); } - TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId) + TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId) { *Record.MutableRequest() = request; NActors::ActorIdToProto(controlId, Record.MutableControlId()); NActors::ActorIdToProto(resultId, Record.MutableResultId()); - if (checkPointCoordinatorId) { - NActors::ActorIdToProto(checkPointCoordinatorId, Record.MutableCheckPointCoordinatorId()); - } } TEvReadyState::TEvReadyState(NActors::TActorId sourceId, TString type) { diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index f2269534474..df65b75b322 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -40,7 +40,7 @@ namespace NYql::NDqs { struct TEvGraphRequest : NActors::TEventPB<TEvGraphRequest, NDqProto::TGraphRequest, TDqExecuterEvents::ES_GRAPH> { TEvGraphRequest() = default; - TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {}); + TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId); }; struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> { diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 753306b961b..32a3c897de8 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -140,11 +140,9 @@ private: }); ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId()); ResultId = NActors::ActorIdFromProto(ev->Get()->Record.GetResultId()); - CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId()); // These actors will be killed at exit. AddChild(ControlId); AddChild(ResultId); - AddChild(CheckPointCoordinatorId); int workerCount = ev->Get()->Record.GetRequest().GetTask().size(); const bool enableComputeActor = Settings->EnableComputeActor.Get().GetOrElse(false); @@ -404,14 +402,10 @@ private: AllocationHistogram->Collect((ExecutionStart-StartTime).Seconds()); auto readyState1 = res->Record; - auto readyState2 = res->Record; Send(ControlId, res.Release()); if (ResultId != SelfId() && ResultId != ControlId) { Send(ResultId, new TEvReadyState(std::move(readyState1))); } - if (CheckPointCoordinatorId) { - Send(CheckPointCoordinatorId, new TEvReadyState(std::move(readyState2))); - } if (Timeout) { ExecutionTimeoutCookieHolder.Reset(ISchedulerCookie::Make2Way()); @@ -466,7 +460,6 @@ private: NActors::TActorId ControlId; NActors::TActorId ResultId; - NActors::TActorId CheckPointCoordinatorId; TExprNode::TPtr ExprRoot; THolder<IDqsExecutionPlanner> ExecutionPlanner; ui64 ResourceId = 0; diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 02b10d5affe..a9366629f02 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -1,51 +1,13 @@ #include "task_controller.h" -#include "execution_helpers.h" -#include "events.h" -#include "proto_builder.h" -#include "actor_helpers.h" -#include "executer_actor.h" -#include "grouped_issues.h" - -#include <ydb/library/yql/providers/dq/counters/counters.h> - -#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> - -#include <ydb/library/yql/public/issue/yql_issue_message.h> - -#include <ydb/library/yql/utils/actor_log/log.h> -#include <ydb/library/yql/utils/log/log.h> - -#include <ydb/public/lib/yson_value/ydb_yson_value.h> -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/event_pb.h> -#include <library/cpp/actors/core/executor_pool_basic.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/scheduler_basic.h> -#include <library/cpp/threading/future/future.h> -#include <library/cpp/protobuf/util/pb_io.h> - -#include <util/generic/size_literals.h> -#include <util/generic/ptr.h> -#include <util/string/split.h> -#include <util/system/types.h> +#include "task_controller_impl.h" namespace NYql { -using namespace NActors; -using namespace NDqs; - namespace { -class TTaskController: public TRichActor<TTaskController> { +class TTaskController: public TTaskControllerImpl<TTaskController> { public: - static constexpr ui64 PING_TIMER_TAG = 1; - static constexpr ui64 AGGR_TIMER_TAG = 2; - - static constexpr char ActorName[] = "YQL_DQ_TASK_CONTROLLER"; - - explicit TTaskController( + TTaskController( const TString& traceId, const NActors::TActorId& executerId, const NActors::TActorId& resultId, @@ -54,557 +16,17 @@ public: const TDuration& pingPeriod, const TDuration& aggrPeriod ) - : TRichActor<TTaskController>(&TTaskController::Handler) - , ExecuterId(executerId) - , ResultId(resultId) - , TraceId(traceId) - , Settings(settings) - , ServiceCounters(serviceCounters, "task_controller") - , PingPeriod(pingPeriod) - , AggrPeriod(aggrPeriod) - , Issues(CreateDefaultTimeProvider()) + : TTaskControllerImpl<TTaskController>( + traceId, + executerId, + resultId, + settings, + serviceCounters, + pingPeriod, + aggrPeriod, + &TTaskControllerImpl<TTaskController>::Handler) { - if (Settings) { - if (Settings->_AllResultsBytesLimit.Get()) { - YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); - } - if (Settings->_RowsLimitPerWrite.Get()) { - YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); - } - } - } - - ~TTaskController() override { - SetTaskCountMetric(0); - } - -private: - STRICT_STFUNC(Handler, { - hFunc(TEvReadyState, OnReadyState); - hFunc(TEvQueryResponse, OnQueryResult); - hFunc(TEvDqFailure, OnResultFailure); - hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState); - hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution); - cFunc(TEvents::TEvPoison::EventType, PassAway); - hFunc(TEvents::TEvUndelivered, OnUndelivered); - hFunc(TEvents::TEvWakeup, OnWakeup); - }) - - void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) { - auto it = TaskIds.find(ev->Sender); - if (it != TaskIds.end() && FinishedTasks.contains(it->second)) { - // ignore undelivered from finished CAs - return; - } - - TStringBuilder message; - message << "Undelivered Event " << ev->Get()->SourceType - << " from " << SelfId() << " (Self) to " << ev->Sender - << " Reason: " << ev->Get()->Reason << " Cookie: " << ev->Cookie; - OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message); - } - - void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - auto statusCode = ev->Get()->Record.GetStatusCode(); - TIssues issues = ev->Get()->GetIssues(); - YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString(); - OnError(statusCode, issues); - } - - void SendNonFatalIssues() { - auto req = MakeHolder<TEvDqStats>(Issues.ToIssues()); - Send(ExecuterId, req.Release()); - } - - void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { - TActorId computeActor = ev->Sender; - auto& state = ev->Get()->Record; - ui64 taskId = state.GetTaskId(); - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - YQL_CLOG(TRACE, ProviderDq) - << SelfId() - << " EvState TaskId: " << taskId - << " State: " << state.GetState() - << " PingCookie: " << ev->Cookie - << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode()); - - if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) { - if (ServiceCounters.Counters && !AggrPeriod) { - ExportStats(TaskStat, taskId); - } - } else if (state.HasStats() && state.GetStats().GetTasks().size()) { - YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId; - AddStats(state.GetStats()); - if (ServiceCounters.Counters && !AggrPeriod) { - ExportStats(TaskStat, taskId); - } - } - - - TIssues localIssues; - // TODO: don't convert issues to string - NYql::IssuesFromMessage(state.GetIssues(), localIssues); - - switch (state.GetState()) { - case NDqProto::COMPUTE_STATE_UNKNOWN: { - // TODO: use issues - TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); - OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message); - break; - } - case NDqProto::COMPUTE_STATE_FAILURE: { - Issues.AddIssues(localIssues); - OnError(state.GetStatusCode(), Issues.ToIssues()); - break; - } - case NDqProto::COMPUTE_STATE_EXECUTING: { - Issues.AddIssues(localIssues); - YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId; - if (!FinishedTasks.contains(taskId)) { - // may get late/reordered? message - Executing[taskId] = Now(); - } - SendNonFatalIssues(); - break; - } - case NDqProto::COMPUTE_STATE_FINISHED: { - Executing.erase(taskId); - FinishedTasks.insert(taskId); - YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId << ". Tasks finished: " << FinishedTasks.size() << "/" << Tasks.size(); - break; - } - } - - MaybeUpdateChannels(); - MaybeFinish(); - } - - void OnWakeup(TEvents::TEvWakeup::TPtr& ev) { - switch (ev->Get()->Tag) { - case PING_TIMER_TAG: - if (PingPeriod) { - auto now = Now(); - for (auto& taskActors: Executing) { - if (now > taskActors.second + PingPeriod) { - PingCookie++; - YQL_CLOG(TRACE, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; - Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie); - taskActors.second = now; - } - } - Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); - } - break; - case AGGR_TIMER_TAG: - if (AggrPeriod) { - if (ServiceCounters.Counters) { - ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); - } - Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); - } - break; - } - }; - - ::NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) { - Y_UNUSED(stat); - TString prefix; - if (NCommon::ParseCounterName(&prefix, &labels, &name, counterName)) { - if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) { - auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId])); - for (const auto& [k, v] : labels) { - group = group->GetSubgroup(k, v); - } - return group; - } - } - return nullptr; } - - static bool IsAggregatedStage(const std::map<TString, TString>& labels) { - const auto it = labels.find("Stage"); - return it != labels.end() && it->second == "Total"; - } - - void ExportStats(const TCounters& stat, ui64 taskId) { - YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); - TString name; - std::map<TString, TString> labels; - static const TString SourceLabel = "Source"; - static const TString SinkLabel = "Sink"; - for (const auto& [k, v] : stat.Get()) { - labels.clear(); - if (auto group = GroupForExport(stat, k, taskId, name, labels)) { - *group->GetCounter(name) = v.Count; - if (ServiceCounters.PublicCounters && taskId == 0 && IsAggregatedStage(labels)) { - TString publicCounterName; - bool isDeriv = false; - if (name == "MkqlMaxMemoryUsage") { - publicCounterName = "query.memory_usage_bytes"; - } else if (name == "CpuTimeUs") { - publicCounterName = "query.cpu_usage_us"; - isDeriv = true; - } else if (name == "Bytes") { - if (labels.count(SourceLabel)) publicCounterName = "query.input_bytes"; - else if (labels.count(SinkLabel)) publicCounterName = "query.output_bytes"; - isDeriv = true; - } else if (name == "RowsIn") { - if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records"; - else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks - isDeriv = true; - } else if (name == "MultiHop_LateThrownEventsCount") { - publicCounterName = "query.late_events"; - isDeriv = true; - } - - if (publicCounterName) { - auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv); - if (name == "MultiHop_LateThrownEventsCount") { - // the only incremental sensor from TaskRunner - counter += v.Count; - } else { - counter = v.Count; - } - } - } - } - } - for (const auto& [k, v] : stat.GetHistograms()) { - labels.clear(); - if (auto group = GroupForExport(stat, k, taskId, name, labels)) { - auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10)); - hist->Reset(); - for (const auto& [bound, value] : v) { - hist->Collect(bound, value); - } - } - } - } - - bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) { - NDqProto::TExtraStats extraStats; - if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) { - YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra"; - for (const auto& [name, m] : extraStats.GetStats()) { - NYql::TCounters::TEntry value; - value.Sum = m.GetSum(); - value.Max = m.GetMax(); - value.Min = m.GetMin(); - //value.Avg = m.GetAvg(); - value.Count = m.GetCnt(); - TaskStat.AddCounter(name, value); - } - return true; - } - return false; - } - - void AddStats(const NDqProto::TDqComputeActorStats& x) { - YQL_ENSURE(x.GetTasks().size() == 1); - auto& s = x.GetTasks(0); - ui64 taskId = s.GetTaskId(); - -#define ADD_COUNTER(name) \ - if (stats.Get ## name()) { \ - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \ - } - - std::map<TString, TString> labels = { - {"Task", ToString(taskId)} - }; - - auto& stats = s; - // basic stats - ADD_COUNTER(CpuTimeUs) - ADD_COUNTER(ComputeCpuTimeUs) - ADD_COUNTER(PendingInputTimeUs) - ADD_COUNTER(PendingOutputTimeUs) - ADD_COUNTER(FinishTimeUs) - ADD_COUNTER(InputRows) - ADD_COUNTER(InputBytes) - ADD_COUNTER(OutputRows) - ADD_COUNTER(OutputBytes) - - // profile stats - ADD_COUNTER(BuildCpuTimeUs) - ADD_COUNTER(WaitTimeUs) - ADD_COUNTER(WaitOutputTimeUs) - - for (const auto& ingress : s.GetIngress()) { - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes()); - } - - for (const auto& egress : s.GetEgress()) { - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Egress" + egress.GetName() + "Bytes"), egress.GetBytes()); - } - - if (auto v = x.GetMkqlMaxMemoryUsage()) { - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v); - } - - for (const auto& stat : s.GetMkqlStats()) { - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, stat.GetName()), stat.GetValue()); - } - - if (stats.ComputeCpuTimeByRunSize()) { - auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs")); - for (const auto& bucket : s.GetComputeCpuTimeByRun()) { - hist[bucket.GetBound()] = bucket.GetValue(); - } - } - - // compilation stats -// ADD_COUNTER(MkqlTotalNodes) -// ADD_COUNTER(MkqlCodegenFunctions) -// ADD_COUNTER(CodeGenTotalInstructions) -// ADD_COUNTER(CodeGenTotalFunctions) -// -// ADD_COUNTER(CodeGenFullTime) -// ADD_COUNTER(CodeGenFinalizeTime) -// ADD_COUNTER(CodeGenModulePassTime) - -// if (stats.GetFinishTs() >= stats.GetStartTs()) { -// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); -// } - - for (const auto& stats : s.GetInputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"InputChannel", ToString(stats.GetChannelId())} - }; - - ADD_COUNTER(Chunks); - ADD_COUNTER(Bytes); - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); - ADD_COUNTER(DeserializationTimeUs); - -// if (stats.GetFinishTs() >= stats.GetStartTs()) { -// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); -// } - } - - for (const auto& stats : s.GetOutputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"OutputChannel", ToString(stats.GetChannelId())} - }; - - ADD_COUNTER(Chunks) - ADD_COUNTER(Bytes); - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); - - ADD_COUNTER(SerializationTimeUs); - ADD_COUNTER(BlockedByCapacity); - - ADD_COUNTER(SpilledBytes); - ADD_COUNTER(SpilledRows); - ADD_COUNTER(SpilledBlobs); - -// if (stats.GetFinishTs() >= stats.GetStartTs()) { -// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); -// } - } - - for (const auto& stats : s.GetSources()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"Source", ToString(stats.GetInputIndex())} - }; - - ADD_COUNTER(Chunks); - ADD_COUNTER(Bytes); - ADD_COUNTER(IngressBytes) - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); - - ADD_COUNTER(ErrorsCount); - -// if (stats.GetFinishTs() >= stats.GetStartTs()) { -// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); -// } - } - - for (const auto& stats : s.GetSinks()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"Sink", ToString(stats.GetOutputIndex())} - }; - - ADD_COUNTER(Chunks) - ADD_COUNTER(Bytes); - ADD_COUNTER(EgressBytes) - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); - - ADD_COUNTER(ErrorsCount); - -// if (stats.GetFinishTs() >= stats.GetStartTs()) { -// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); -// } - } - -#undef ADD_COUNTER - } - - void MaybeFinish() { - if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) { - Finish(); - } - } - - void SetTaskCountMetric(ui64 count) { - if (!ServiceCounters.Counters) { - return; - } - *ServiceCounters.Counters->GetCounter("TaskCount") = count; - - if (!ServiceCounters.PublicCounters) { - return; - } - *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count; - } - - void OnReadyState(TEvReadyState::TPtr& ev) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - - TaskStat.AddCounters(ev->Get()->Record); - - const auto& tasks = ev->Get()->Record.GetTask(); - const auto& actorIds = ev->Get()->Record.GetActorId(); - Y_VERIFY(tasks.size() == actorIds.size()); - - SetTaskCountMetric(tasks.size()); - - for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { - auto actorId = ActorIdFromProto(actorIds[i]); - auto& task = tasks[i]; - Tasks.emplace_back(task, actorId); - ActorIds.emplace(task.GetId(), actorId); - TaskIds.emplace(actorId, task.GetId()); - Yql::DqsProto::TTaskMeta taskMeta; - task.GetMeta().UnpackTo(&taskMeta); - Stages.emplace(task.GetId(), taskMeta.GetStageId()); - } - - YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId(); - - MaybeUpdateChannels(); - - if (PingPeriod) { - Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); - } - if (AggrPeriod) { - Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); - } - } - - void MaybeUpdateChannels() { - if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) { - return; - } - - YQL_CLOG(DEBUG, ProviderDq) << "Update channels"; - for (const auto& [task, actorId] : Tasks) { - auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>(); - - for (const auto& input : task.GetInputs()) { - for (const auto& channel : input.GetChannels()) { - *ev->Record.AddUpdate() = channel; - } - } - - for (const auto& output : task.GetOutputs()) { - for (const auto& channel : output.GetChannels()) { - *ev->Record.AddUpdate() = channel; - } - } - - YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString(); - - Send(actorId, ev.Release()); - } - ChannelsUpdated = true; - } - - void OnResultFailure(TEvDqFailure::TPtr& ev) { - if (Finished) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; - } else { - FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported - Send(ExecuterId, ev->Release().Release()); - Finished = true; - } - } - - void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); - if (Finished) { - YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); - YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished"; - } else { - auto req = MakeHolder<TEvDqFailure>(statusCode, issues); - FinalStat().FlushCounters(req->Record); - Send(ExecuterId, req.Release()); - Finished = true; - } - } - - void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) { - auto issueCode = NCommon::NeedFallback(statusCode) - ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR - : TIssuesIds::DQ_GATEWAY_ERROR; - OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)})); - } - - void Finish() { - if (ServiceCounters.Counters && AggrPeriod) { - ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated - } - Send(ExecuterId, new TEvGraphFinished()); - Finished = true; - } - - void OnQueryResult(TEvQueryResponse::TPtr& ev) { - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); - FinalStat().FlushCounters(ev->Get()->Record); - if (!Issues.Empty()) { - IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues()); - } - Send(ResultId, ev->Release().Release()); - } - - TCounters FinalStat() { - return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat; - } - - - bool ChannelsUpdated = false; - TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; - THashSet<ui64> FinishedTasks; - THashMap<ui64, TInstant> Executing; - THashMap<ui64, TActorId> ActorIds; - THashMap<TActorId, ui64> TaskIds; - THashMap<ui64, ui64> Stages; - const NActors::TActorId ExecuterId; - const NActors::TActorId ResultId; - const TString TraceId; - TDqConfiguration::TPtr Settings; - bool Finished = false; - TCounters TaskStat; - NYql::NCommon::TServiceCounters ServiceCounters; - TDuration PingPeriod = TDuration::Zero(); - TDuration AggrPeriod = TDuration::Zero(); - NYql::NDq::GroupedIssues Issues; - ui64 PingCookie = 0; }; } /* namespace */ @@ -621,5 +43,4 @@ THolder<NActors::IActor> MakeTaskController( return MakeHolder<NDq::TLogWrapReceive>(new TTaskController(traceId, executerId, resultId, settings, serviceCounters, pingPeriod, aggrPeriod), traceId); } - } /* namespace NYql */ diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h new file mode 100644 index 00000000000..3d59b9c26d8 --- /dev/null +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -0,0 +1,638 @@ +#include "task_controller.h" +#include "execution_helpers.h" +#include "events.h" +#include "proto_builder.h" +#include "actor_helpers.h" +#include "executer_actor.h" +#include "grouped_issues.h" + +#include <ydb/library/yql/providers/dq/counters/counters.h> + +#include <ydb/library/yql/providers/dq/common/yql_dq_common.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <ydb/library/yql/utils/actor_log/log.h> +#include <ydb/library/yql/utils/log/log.h> + +#include <ydb/public/lib/yson_value/ydb_yson_value.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/protobuf/util/pb_io.h> + +#include <util/generic/size_literals.h> +#include <util/generic/ptr.h> +#include <util/string/split.h> +#include <util/system/types.h> + +namespace NYql { + +using namespace NActors; +using namespace NDqs; + +template<typename TDerived> +class TTaskControllerImpl: public NActors::TActor<TDerived> { +public: + using NActors::TActor<TDerived>::PassAway; + using NActors::TActor<TDerived>::Schedule; + using NActors::TActor<TDerived>::SelfId; + using NActors::TActor<TDerived>::Send; + + static constexpr ui64 PING_TIMER_TAG = 1; + static constexpr ui64 AGGR_TIMER_TAG = 2; + + static constexpr char ActorName[] = "YQL_DQ_TASK_CONTROLLER"; + + explicit TTaskControllerImpl( + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const TDqConfiguration::TPtr& settings, + const NYql::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod, + void (TDerived::*func)(TAutoPtr<NActors::IEventHandle>&, const NActors::TActorContext&) + ) + : NActors::TActor<TDerived>(func) + , ExecuterId(executerId) + , ResultId(resultId) + , TraceId(traceId) + , Settings(settings) + , ServiceCounters(serviceCounters, "task_controller") + , PingPeriod(pingPeriod) + , AggrPeriod(aggrPeriod) + , Issues(CreateDefaultTimeProvider()) + { + if (Settings) { + if (Settings->_AllResultsBytesLimit.Get()) { + YQL_CLOG(DEBUG, ProviderDq) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); + } + if (Settings->_RowsLimitPerWrite.Get()) { + YQL_CLOG(DEBUG, ProviderDq) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); + } + } + } + + ~TTaskControllerImpl() override { + SetTaskCountMetric(0); + } + +public: + STRICT_STFUNC(Handler, { + hFunc(TEvReadyState, OnReadyState); + hFunc(TEvQueryResponse, OnQueryResult); + hFunc(TEvDqFailure, OnResultFailure); + hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState); + hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution); + cFunc(TEvents::TEvPoison::EventType, PassAway); + hFunc(TEvents::TEvUndelivered, OnUndelivered); + hFunc(TEvents::TEvWakeup, OnWakeup); + }) + + void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) { + auto it = TaskIds.find(ev->Sender); + if (it != TaskIds.end() && FinishedTasks.contains(it->second)) { + // ignore undelivered from finished CAs + return; + } + + TStringBuilder message; + message << "Undelivered Event " << ev->Get()->SourceType + << " from " << SelfId() << " (Self) to " << ev->Sender + << " Reason: " << ev->Get()->Reason << " Cookie: " << ev->Cookie; + OnError(NYql::NDqProto::StatusIds::UNAVAILABLE, message); + } + + void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + auto statusCode = ev->Get()->Record.GetStatusCode(); + TIssues issues = ev->Get()->GetIssues(); + YQL_CLOG(DEBUG, ProviderDq) << "AbortExecution from " << ev->Sender << ":" << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << issues.ToOneLineString(); + OnError(statusCode, issues); + } + + void OnInternalError(const TString& message, const TIssues& subIssues = {}) { + OnError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, message, subIssues); + } + +private: + void SendNonFatalIssues() { + auto req = MakeHolder<TEvDqStats>(Issues.ToIssues()); + Send(ExecuterId, req.Release()); + } + +public: + void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { + TActorId computeActor = ev->Sender; + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(TRACE, ProviderDq) + << SelfId() + << " EvState TaskId: " << taskId + << " State: " << state.GetState() + << " PingCookie: " << ev->Cookie + << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode()); + + if (state.HasStats() && TryAddStatsFromExtra(state.GetStats())) { + if (ServiceCounters.Counters && !AggrPeriod) { + ExportStats(TaskStat, taskId); + } + } else if (state.HasStats() && state.GetStats().GetTasks().size()) { + YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats " << taskId; + AddStats(state.GetStats()); + if (ServiceCounters.Counters && !AggrPeriod) { + ExportStats(TaskStat, taskId); + } + } + + + TIssues localIssues; + // TODO: don't convert issues to string + NYql::IssuesFromMessage(state.GetIssues(), localIssues); + + switch (state.GetState()) { + case NDqProto::COMPUTE_STATE_UNKNOWN: { + // TODO: use issues + TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); + OnError(NYql::NDqProto::StatusIds::BAD_REQUEST, message); + break; + } + case NDqProto::COMPUTE_STATE_FAILURE: { + Issues.AddIssues(localIssues); + OnError(state.GetStatusCode(), Issues.ToIssues()); + break; + } + case NDqProto::COMPUTE_STATE_EXECUTING: { + Issues.AddIssues(localIssues); + YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " Executing TaskId: " << taskId; + if (!FinishedTasks.contains(taskId)) { + // may get late/reordered? message + Executing[taskId] = Now(); + } + SendNonFatalIssues(); + break; + } + case NDqProto::COMPUTE_STATE_FINISHED: { + Executing.erase(taskId); + FinishedTasks.insert(taskId); + YQL_CLOG(DEBUG, ProviderDq) << " " << SelfId() << " Finish TaskId: " << taskId << ". Tasks finished: " << FinishedTasks.size() << "/" << Tasks.size(); + break; + } + } + + MaybeUpdateChannels(); + MaybeFinish(); + } + + void OnWakeup(TEvents::TEvWakeup::TPtr& ev) { + switch (ev->Get()->Tag) { + case PING_TIMER_TAG: + if (PingPeriod) { + auto now = Now(); + for (auto& taskActors: Executing) { + if (now > taskActors.second + PingPeriod) { + PingCookie++; + YQL_CLOG(TRACE, ProviderDq) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; + Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie); + taskActors.second = now; + } + } + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); + } + break; + case AGGR_TIMER_TAG: + if (AggrPeriod) { + if (ServiceCounters.Counters) { + ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); + } + Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); + } + break; + } + }; + + ::NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) { + Y_UNUSED(stat); + TString prefix; + if (NCommon::ParseCounterName(&prefix, &labels, &name, counterName)) { + if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) { + auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId])); + for (const auto& [k, v] : labels) { + group = group->GetSubgroup(k, v); + } + return group; + } + } + return nullptr; + } + +private: + static bool IsAggregatedStage(const std::map<TString, TString>& labels) { + const auto it = labels.find("Stage"); + return it != labels.end() && it->second == "Total"; + } + + void ExportStats(const TCounters& stat, ui64 taskId) { + YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); + TString name; + std::map<TString, TString> labels; + static const TString SourceLabel = "Source"; + static const TString SinkLabel = "Sink"; + for (const auto& [k, v] : stat.Get()) { + labels.clear(); + if (auto group = GroupForExport(stat, k, taskId, name, labels)) { + *group->GetCounter(name) = v.Count; + if (ServiceCounters.PublicCounters && taskId == 0 && IsAggregatedStage(labels)) { + TString publicCounterName; + bool isDeriv = false; + if (name == "MkqlMaxMemoryUsage") { + publicCounterName = "query.memory_usage_bytes"; + } else if (name == "CpuTimeUs") { + publicCounterName = "query.cpu_usage_us"; + isDeriv = true; + } else if (name == "Bytes") { + if (labels.count(SourceLabel)) publicCounterName = "query.input_bytes"; + else if (labels.count(SinkLabel)) publicCounterName = "query.output_bytes"; + isDeriv = true; + } else if (name == "RowsIn") { + if (labels.count(SourceLabel)) publicCounterName = "query.source_input_records"; + else if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; // RowsIn == RowsOut for Sinks + isDeriv = true; + } else if (name == "MultiHop_LateThrownEventsCount") { + publicCounterName = "query.late_events"; + isDeriv = true; + } + + if (publicCounterName) { + auto& counter = *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv); + if (name == "MultiHop_LateThrownEventsCount") { + // the only incremental sensor from TaskRunner + counter += v.Count; + } else { + counter = v.Count; + } + } + } + } + } + for (const auto& [k, v] : stat.GetHistograms()) { + labels.clear(); + if (auto group = GroupForExport(stat, k, taskId, name, labels)) { + auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10)); + hist->Reset(); + for (const auto& [bound, value] : v) { + hist->Collect(bound, value); + } + } + } + } + + bool TryAddStatsFromExtra(const NDqProto::TDqComputeActorStats& x) { + NDqProto::TExtraStats extraStats; + if (x.HasExtra() && x.GetExtra().UnpackTo(&extraStats)) { + YQL_CLOG(TRACE, ProviderDq) << " " << SelfId() << " AddStats from extra"; + for (const auto& [name, m] : extraStats.GetStats()) { + NYql::TCounters::TEntry value; + value.Sum = m.GetSum(); + value.Max = m.GetMax(); + value.Min = m.GetMin(); + //value.Avg = m.GetAvg(); + value.Count = m.GetCnt(); + TaskStat.AddCounter(name, value); + } + return true; + } + return false; + } + + void AddStats(const NDqProto::TDqComputeActorStats& x) { + YQL_ENSURE(x.GetTasks().size() == 1); + auto& s = x.GetTasks(0); + ui64 taskId = s.GetTaskId(); + +#define ADD_COUNTER(name) \ + if (stats.Get ## name()) { \ + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \ + } + + std::map<TString, TString> labels = { + {"Task", ToString(taskId)} + }; + + auto& stats = s; + // basic stats + ADD_COUNTER(CpuTimeUs) + ADD_COUNTER(ComputeCpuTimeUs) + ADD_COUNTER(PendingInputTimeUs) + ADD_COUNTER(PendingOutputTimeUs) + ADD_COUNTER(FinishTimeUs) + ADD_COUNTER(InputRows) + ADD_COUNTER(InputBytes) + ADD_COUNTER(OutputRows) + ADD_COUNTER(OutputBytes) + + // profile stats + ADD_COUNTER(BuildCpuTimeUs) + ADD_COUNTER(WaitTimeUs) + ADD_COUNTER(WaitOutputTimeUs) + + for (const auto& ingress : s.GetIngress()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes()); + } + + for (const auto& egress : s.GetEgress()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Egress" + egress.GetName() + "Bytes"), egress.GetBytes()); + } + + if (auto v = x.GetMkqlMaxMemoryUsage()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v); + } + + for (const auto& stat : s.GetMkqlStats()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, stat.GetName()), stat.GetValue()); + } + + if (stats.ComputeCpuTimeByRunSize()) { + auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs")); + for (const auto& bucket : s.GetComputeCpuTimeByRun()) { + hist[bucket.GetBound()] = bucket.GetValue(); + } + } + + // compilation stats +// ADD_COUNTER(MkqlTotalNodes) +// ADD_COUNTER(MkqlCodegenFunctions) +// ADD_COUNTER(CodeGenTotalInstructions) +// ADD_COUNTER(CodeGenTotalFunctions) +// +// ADD_COUNTER(CodeGenFullTime) +// ADD_COUNTER(CodeGenFinalizeTime) +// ADD_COUNTER(CodeGenModulePassTime) + +// if (stats.GetFinishTs() >= stats.GetStartTs()) { +// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); +// } + + for (const auto& stats : s.GetInputChannels()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"InputChannel", ToString(stats.GetChannelId())} + }; + + ADD_COUNTER(Chunks); + ADD_COUNTER(Bytes); + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); + ADD_COUNTER(DeserializationTimeUs); + +// if (stats.GetFinishTs() >= stats.GetStartTs()) { +// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); +// } + } + + for (const auto& stats : s.GetOutputChannels()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"OutputChannel", ToString(stats.GetChannelId())} + }; + + ADD_COUNTER(Chunks) + ADD_COUNTER(Bytes); + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); + + ADD_COUNTER(SerializationTimeUs); + ADD_COUNTER(BlockedByCapacity); + + ADD_COUNTER(SpilledBytes); + ADD_COUNTER(SpilledRows); + ADD_COUNTER(SpilledBlobs); + +// if (stats.GetFinishTs() >= stats.GetStartTs()) { +// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); +// } + } + + for (const auto& stats : s.GetSources()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"Source", ToString(stats.GetInputIndex())} + }; + + ADD_COUNTER(Chunks); + ADD_COUNTER(Bytes); + ADD_COUNTER(IngressBytes) + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); + + ADD_COUNTER(ErrorsCount); + +// if (stats.GetFinishTs() >= stats.GetStartTs()) { +// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); +// } + } + + for (const auto& stats : s.GetSinks()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"Sink", ToString(stats.GetOutputIndex())} + }; + + ADD_COUNTER(Chunks) + ADD_COUNTER(Bytes); + ADD_COUNTER(EgressBytes) + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); + + ADD_COUNTER(ErrorsCount); + +// if (stats.GetFinishTs() >= stats.GetStartTs()) { +// TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); +// } + } + +#undef ADD_COUNTER + } + + void MaybeFinish() { + if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) { + Finish(); + } + } + + void SetTaskCountMetric(ui64 count) { + if (!ServiceCounters.Counters) { + return; + } + *ServiceCounters.Counters->GetCounter("TaskCount") = count; + + if (!ServiceCounters.PublicCounters) { + return; + } + *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count; + } + +public: + void OnReadyState(TEvReadyState::TPtr& ev) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + + TaskStat.AddCounters(ev->Get()->Record); + + const auto& tasks = ev->Get()->Record.GetTask(); + const auto& actorIds = ev->Get()->Record.GetActorId(); + Y_VERIFY(tasks.size() == actorIds.size()); + + SetTaskCountMetric(tasks.size()); + + for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { + auto actorId = ActorIdFromProto(actorIds[i]); + auto& task = tasks[i]; + Tasks.emplace_back(task, actorId); + ActorIds.emplace(task.GetId(), actorId); + TaskIds.emplace(actorId, task.GetId()); + Yql::DqsProto::TTaskMeta taskMeta; + task.GetMeta().UnpackTo(&taskMeta); + Stages.emplace(task.GetId(), taskMeta.GetStageId()); + } + + YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId(); + + MaybeUpdateChannels(); + + if (PingPeriod) { + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); + } + if (AggrPeriod) { + Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); + } + } + +private: + void MaybeUpdateChannels() { + if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) { + return; + } + + YQL_CLOG(DEBUG, ProviderDq) << "Update channels"; + for (const auto& [task, actorId] : Tasks) { + auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>(); + + for (const auto& input : task.GetInputs()) { + for (const auto& channel : input.GetChannels()) { + *ev->Record.AddUpdate() = channel; + } + } + + for (const auto& output : task.GetOutputs()) { + for (const auto& channel : output.GetChannels()) { + *ev->Record.AddUpdate() = channel; + } + } + + YQL_CLOG(DEBUG, ProviderDq) << task.GetId() << " " << ev->Record.ShortDebugString(); + + Send(actorId, ev.Release()); + } + ChannelsUpdated = true; + } + +public: + void OnResultFailure(TEvDqFailure::TPtr& ev) { + if (Finished) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(WARN, ProviderDq) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; + } else { + FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported + Send(ExecuterId, ev->Release().Release()); + Finished = true; + } + } + + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message, const TIssues& subIssues) { + TIssue issue(message); + for (const TIssue& i : subIssues) { + issue.AddSubIssue(MakeIntrusive<TIssue>(i)); + } + TIssues issues; + issues.AddIssue(std::move(issue)); + OnError(statusCode, issues); + } + + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(DEBUG, ProviderDq) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); + if (Finished) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(TraceId); + YQL_CLOG(WARN, ProviderDq) << "OnError IGNORED when Finished"; + } else { + auto req = MakeHolder<TEvDqFailure>(statusCode, issues); + FinalStat().FlushCounters(req->Record); + Send(ExecuterId, req.Release()); + Finished = true; + } + } + + void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TString& message) { + auto issueCode = NCommon::NeedFallback(statusCode) + ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR + : TIssuesIds::DQ_GATEWAY_ERROR; + OnError(statusCode, TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)})); + } + +private: + void Finish() { + if (ServiceCounters.Counters && AggrPeriod) { + ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated + } + Send(ExecuterId, new TEvGraphFinished()); + Finished = true; + } + +public: + void OnQueryResult(TEvQueryResponse::TPtr& ev) { + YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + FinalStat().FlushCounters(ev->Get()->Record); + if (!Issues.Empty()) { + IssuesToMessage(Issues.ToIssues(), ev->Get()->Record.MutableIssues()); + } + Send(ResultId, ev->Release().Release()); + } + +private: + TCounters FinalStat() { + return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat; + } + + + bool ChannelsUpdated = false; + TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; + THashSet<ui64> FinishedTasks; + THashMap<ui64, TInstant> Executing; + THashMap<ui64, TActorId> ActorIds; + THashMap<TActorId, ui64> TaskIds; + THashMap<ui64, ui64> Stages; + const NActors::TActorId ExecuterId; + const NActors::TActorId ResultId; + const TString TraceId; + TDqConfiguration::TPtr Settings; + bool Finished = false; + TCounters TaskStat; + NYql::NCommon::TServiceCounters ServiceCounters; + TDuration PingPeriod = TDuration::Zero(); + TDuration AggrPeriod = TDuration::Zero(); + NYql::NDq::GroupedIssues Issues; + ui64 PingCookie = 0; +}; + +} /* namespace NYql */ diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index e657c03db54..783b1b5b624 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -74,7 +74,7 @@ message TFreeWorkersNotify { bool IsForwarded = 2; string TraceId = 6; - // repeated TWorker.TGuid FailedWorker = 7; // reserved + reserved 7; repeated string FailedWorkerGuid = 8; } @@ -196,7 +196,7 @@ message TGraphRequest { Yql.DqsProto.ExecuteGraphRequest Request = 1; NActorsProto.TActorId ControlId = 2; NActorsProto.TActorId ResultId = 3; - NActorsProto.TActorId CheckPointCoordinatorId = 4; // may be empty + reserved 4; } message TDqTaskRequest { |