diff options
author | aozeritsky <[email protected]> | 2023-06-14 10:47:53 +0300 |
---|---|---|
committer | aozeritsky <[email protected]> | 2023-06-14 10:47:53 +0300 |
commit | f6e723596f2e176356bbd53c0a87ee9168e14f22 (patch) | |
tree | 4e99b003dc6e004a904df62a2ee1a57134988edc | |
parent | 1b25a1dce9e349c29293525aab7a4c66b4aacccc (diff) |
Add new CA metrics
9 files changed, 138 insertions, 39 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index ed489059172..1d730bfc19f 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -129,8 +129,6 @@ void Init( actorRegistrator(NYql::NDq::MakeCheckpointStorageID(), checkpointStorage.release()); } - auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager")); - TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = { NYql::GetCommonDqFactory(), NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver), @@ -204,6 +202,10 @@ void Init( } ui64 mkqlInitialMemoryLimit = 8_GB; + auto taskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr; + auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters( + yqCounters->GetSubgroup("subsystem", "worker_manager"), + taskCounters); if (protoConfig.GetResourceManager().GetEnabled()) { mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit(); @@ -217,7 +219,6 @@ void Init( } NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; - lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr; lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false); lwmOptions.AsyncIoFactory = asyncIoFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 558087acf71..1d94e3868af 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -320,7 +320,7 @@ private: WatermarkTakeInputChannelDataRequests[*watermark]++; } - DqComputeActorMetrics.ReportInputChannelWatermark( + MetricsReporter.ReportInputChannelWatermark( channelData.GetChannelId(), channelData.GetData().GetRows(), watermark); @@ -378,7 +378,7 @@ private: return Nothing(); } - DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark); + MetricsReporter.ReportInjectedToTaskRunnerWatermark(pendingWatermark); return pendingWatermark; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index ac05aaf117e..95f69005200 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -192,7 +192,7 @@ protected: , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) , TaskCounters(taskCounters) - , DqComputeActorMetrics(taskCounters) + , MetricsReporter(taskCounters) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") , Running(!Task.GetCreateSuspended()) , PassExceptions(passExceptions) @@ -225,7 +225,7 @@ protected: , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) , TaskCounters(taskCounters) - , DqComputeActorMetrics(taskCounters) + , MetricsReporter(taskCounters) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") , Running(!Task.GetCreateSuspended()) { @@ -280,6 +280,8 @@ protected: } STFUNC(BaseStateFuncBody) { + MetricsReporter.ReportEvent(ev->GetTypeRewrite()); + switch (ev->GetTypeRewrite()) { hFunc(TEvDqCompute::TEvResumeExecution, HandleExecuteBase); hFunc(TEvDqCompute::TEvChannelsInfo, HandleExecuteBase); @@ -1683,7 +1685,7 @@ protected: ContinueExecute(); } - DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark); + MetricsReporter.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark); if (watermark) { const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived( @@ -2176,7 +2178,7 @@ protected: THolder<TDqMemoryQuota> MemoryQuota; TDqComputeActorWatermarks WatermarksTracker; ::NMonitoring::TDynamicCounterPtr TaskCounters; - TDqComputeActorMetrics DqComputeActorMetrics; + TDqComputeActorMetrics MetricsReporter; NWilson::TSpan ComputeActorSpan; TDuration SourceCpuTime; private: diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp index a6547c94aa5..ab43da974d9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp @@ -1,18 +1,76 @@ #include "dq_compute_actor_metrics.h" +#include "dq_compute_actor.h" + +#include <ydb/library/yql/dq/actors/dq.h> +#include <library/cpp/actors/core/interconnect.h> namespace NYql::NDq { -TDqComputeActorMetrics::TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters) { - if (!counters) { +TDqComputeActorMetrics::TDqComputeActorMetrics( + const NMonitoring::TDynamicCounterPtr& counters) + : Enable(!!counters) + , ComputeActorSubgroup(counters + ? counters->GetSubgroup("subsystem", "compute_actor") + : nullptr) +{ + if (ComputeActorSubgroup) { + InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms"); + InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms"); + WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram( + "watermark_collect_ms", + NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000})); + +#define ADD_COUNTER(name) \ + name = ComputeActorSubgroup->GetCounter(#name) + + ADD_COUNTER(ResumeExecution); + ADD_COUNTER(ChannelsInfo); + ADD_COUNTER(AbortExecution); + ADD_COUNTER(Wakeup); + ADD_COUNTER(Undelivered); + ADD_COUNTER(ChannelData); + ADD_COUNTER(ChannelDataAck); + ADD_COUNTER(StateRequest); + ADD_COUNTER(CheckpointCoordinator); + ADD_COUNTER(InjectCheckpoint); + ADD_COUNTER(CommitState); + ADD_COUNTER(RestoreFromCheckpoint); + ADD_COUNTER(NodeDisconnected); + ADD_COUNTER(NodeConnected); + ADD_COUNTER(NewAsyncInputDataArrived); + ADD_COUNTER(AsyncInputError); + ADD_COUNTER(OtherEvent); + +#undef ADD_COUNTER + } +} + +void TDqComputeActorMetrics::ReportEvent(ui32 type) +{ + if (!Enable) { return; } - ComputeActorSubgroup = counters->GetSubgroup("subsystem", "compute_actor"); - InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms"); - InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms"); - WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram( - "watermark_collect_ms", - NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000})); + switch (type) { + case TEvDqCompute::TEvResumeExecution::EventType: ResumeExecution->Inc(); break; + case TEvDqCompute::TEvChannelsInfo::EventType: ChannelsInfo->Inc(); break; + case TEvDq::TEvAbortExecution::EventType: AbortExecution->Inc(); break; + case NActors::TEvents::TEvWakeup::EventType: Wakeup->Inc(); break; + case NActors::TEvents::TEvUndelivered::EventType: Undelivered->Inc(); break; + case TEvDqCompute::TEvChannelData::EventType: ChannelData->Inc(); break; + case TEvDqCompute::TEvChannelDataAck::EventType: ChannelDataAck->Inc(); break; + case TEvDqCompute::TEvRun::EventType: Run->Inc(); break; + case TEvDqCompute::TEvStateRequest::EventType: StateRequest->Inc(); break; + case TEvDqCompute::TEvNewCheckpointCoordinator::EventType: CheckpointCoordinator->Inc(); break; + case TEvDqCompute::TEvInjectCheckpoint::EventType: InjectCheckpoint->Inc(); break; + case TEvDqCompute::TEvCommitState::EventType: CommitState->Inc(); break; + case TEvDqCompute::TEvRestoreFromCheckpoint::EventType: RestoreFromCheckpoint->Inc(); break; + case NActors::TEvInterconnect::TEvNodeDisconnected::EventType: NodeDisconnected->Inc(); break; + case NActors::TEvInterconnect::TEvNodeConnected::EventType: NodeConnected->Inc(); break; + case IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType: NewAsyncInputDataArrived->Inc(); break; + case IDqComputeActorAsyncInput::TEvAsyncInputError::EventType: AsyncInputError->Inc(); break; + default: OtherEvent->Inc(); break; + } } void TDqComputeActorMetrics::ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h index 3e9afbf64ae..69227c4430d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h @@ -10,6 +10,7 @@ struct TDqComputeActorMetrics { public: TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters); + void ReportEvent(ui32 type); void ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark); void ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark); void ReportInjectedToTaskRunnerWatermark(TInstant watermark); @@ -30,6 +31,25 @@ private: NMonitoring::TDynamicCounters::TCounterPtr InjectedToOutputsWatermark; NMonitoring::THistogramPtr WatermarkCollectLatency; + NMonitoring::TDynamicCounters::TCounterPtr ResumeExecution; + NMonitoring::TDynamicCounters::TCounterPtr ChannelsInfo; + NMonitoring::TDynamicCounters::TCounterPtr AbortExecution; + NMonitoring::TDynamicCounters::TCounterPtr Wakeup; + NMonitoring::TDynamicCounters::TCounterPtr Undelivered; + NMonitoring::TDynamicCounters::TCounterPtr ChannelData; + NMonitoring::TDynamicCounters::TCounterPtr ChannelDataAck; + NMonitoring::TDynamicCounters::TCounterPtr Run; + NMonitoring::TDynamicCounters::TCounterPtr StateRequest; + NMonitoring::TDynamicCounters::TCounterPtr CheckpointCoordinator; + NMonitoring::TDynamicCounters::TCounterPtr InjectCheckpoint; + NMonitoring::TDynamicCounters::TCounterPtr CommitState; + NMonitoring::TDynamicCounters::TCounterPtr RestoreFromCheckpoint; + NMonitoring::TDynamicCounters::TCounterPtr NodeDisconnected; + NMonitoring::TDynamicCounters::TCounterPtr NodeConnected; + NMonitoring::TDynamicCounters::TCounterPtr NewAsyncInputDataArrived; + NMonitoring::TDynamicCounters::TCounterPtr AsyncInputError; + NMonitoring::TDynamicCounters::TCounterPtr OtherEvent; + THashMap<TInstant, TInstant> WatermarkStartedAt; }; diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp index cf150e270cf..2d325d0fe6f 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp @@ -2,12 +2,21 @@ namespace NYql::NDqs { -TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root) { - ActiveWorkers = root->GetCounter("ActiveWorkers"); - MkqlMemoryLimit = root->GetCounter("MkqlMemoryLimit"); - MkqlMemoryAllocated = root->GetCounter("MkqlMemoryAllocated"); - FreeGroupError = root->GetCounter("FreeGroupError"); -} +TWorkerManagerCounters::TWorkerManagerCounters( + ::NMonitoring::TDynamicCounterPtr root, + ::NMonitoring::TDynamicCounterPtr taskCounters) + : ActiveWorkers(root->GetCounter("ActiveWorkers")) + , MkqlMemoryLimit(root->GetCounter("MkqlMemoryLimit")) + , MkqlMemoryAllocated(root->GetCounter("MkqlMemoryAllocated")) + , FreeGroupError(root->GetCounter("FreeGroupError")) + , TaskCounters(taskCounters) +{ } + +TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root) + : TWorkerManagerCounters( + root->GetSubgroup("component", "lwm"), + root->GetSubgroup("component", "tasks")) +{ } TWorkerManagerCounters::TWorkerManagerCounters() : TWorkerManagerCounters(new ::NMonitoring::TDynamicCounters) diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h index 16f206e7696..07c6c6f4d6c 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h @@ -8,8 +8,10 @@ struct TWorkerManagerCounters { ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated; ::NMonitoring::TDynamicCounters::TCounterPtr FreeGroupError; + ::NMonitoring::TDynamicCounterPtr TaskCounters; explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root); + explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root, ::NMonitoring::TDynamicCounterPtr taskCounters); TWorkerManagerCounters(); }; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 13ffc497ee4..88d05f5d4bc 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -40,7 +40,7 @@ static_assert(sizeof(TDqLocalResourceId) == 8); struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager { - TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit) + TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit) : NYql::NDq::TGuaranteeQuotaManager(limit, limit) , NodeQuoter(nodeQuoter) , TxId(txId) { @@ -72,6 +72,7 @@ public: TLocalWorkerManager(const TLocalWorkerManagerOptions& options) : TWorkerManagerCommon<TLocalWorkerManager>(&TLocalWorkerManager::Handler) , Options(options) + , TaskCounters(Options.Counters.TaskCounters) , MemoryQuoter(std::make_shared<NDq::TResourceQuoter>(Options.MkqlTotalMemoryLimit)) { Options.Counters.MkqlMemoryLimit->Set(Options.MkqlTotalMemoryLimit); @@ -271,10 +272,10 @@ private: auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId()); ::NMonitoring::TDynamicCounterPtr taskCounters; - if (createComputeActor && Options.DqTaskCounters) { + if (createComputeActor && TaskCounters) { auto& info = TaskCountersMap[traceId]; if (!info.TaskCounters) { - info.TaskCounters = Options.DqTaskCounters->GetSubgroup("operation", traceId); + info.TaskCounters = TaskCounters->GetSubgroup("operation", traceId); } info.ReferenceCount += count; taskCounters = info.TaskCounters; @@ -288,7 +289,7 @@ private: actor.Reset(NYql::CreateComputeActor( Options, - std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]), + std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]), resultId, traceId, std::move(tasks[i]), @@ -328,6 +329,20 @@ private: Send(ev->Sender, response.Release()); } + void DropTaskCounters(const auto& info) { + auto traceId = std::get<TString>(info.TxId); + if (auto it = TaskCountersMap.find(traceId); it != TaskCountersMap.end()) { + if (it->second.ReferenceCount <= info.WorkerActors.size()) { + if (TaskCounters) { + TaskCounters->RemoveSubgroup("operation", traceId); + } + TaskCountersMap.erase(it); + } else { + it->second.ReferenceCount -= info.WorkerActors.size(); + } + } + } + void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) { YQL_CLOG(DEBUG, ProviderDq) << "Free Group " << id; auto it = AllocatedWorkers.find(id); @@ -341,17 +356,8 @@ private: YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId; } - auto traceId = std::get<TString>(it->second.TxId); - auto itt = TaskCountersMap.find(traceId); - if (itt != TaskCountersMap.end()) { - if (itt->second.ReferenceCount <= it->second.WorkerActors.size()) { - if (Options.DqTaskCounters) { - Options.DqTaskCounters->RemoveSubgroup("operation", traceId); - } - TaskCountersMap.erase(itt); - } else { - itt->second.ReferenceCount -= it->second.WorkerActors.size(); - } + if (Options.DropTaskCountersOnFinish) { + DropTaskCounters(it->second); } Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size()); @@ -374,6 +380,7 @@ private: } TLocalWorkerManagerOptions Options; + NMonitoring::TDynamicCounterPtr TaskCounters; struct TAllocationInfo { TVector<NActors::TActorId> WorkerActors; diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 7aacd11b261..95ecae911e5 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -22,7 +22,6 @@ namespace NYql::NDqs { struct TLocalWorkerManagerOptions { TWorkerManagerCounters Counters; - ::NMonitoring::TDynamicCounterPtr DqTaskCounters; NTaskRunnerProxy::IProxyFactory::TPtr Factory; NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; @@ -39,6 +38,7 @@ namespace NYql::NDqs { bool CanUseComputeActor = true; NActors::TActorId QuoterServiceActorId; bool ComputeActorOwnsCounters = false; + bool DropTaskCountersOnFinish = true; }; NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options); |