summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <[email protected]>2023-06-14 10:47:53 +0300
committeraozeritsky <[email protected]>2023-06-14 10:47:53 +0300
commitf6e723596f2e176356bbd53c0a87ee9168e14f22 (patch)
tree4e99b003dc6e004a904df62a2ee1a57134988edc
parent1b25a1dce9e349c29293525aab7a4c66b4aacccc (diff)
Add new CA metrics
-rw-r--r--ydb/core/fq/libs/init/init.cpp7
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp74
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h20
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp21
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/interface/counters.h2
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp37
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h2
9 files changed, 138 insertions, 39 deletions
diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp
index ed489059172..1d730bfc19f 100644
--- a/ydb/core/fq/libs/init/init.cpp
+++ b/ydb/core/fq/libs/init/init.cpp
@@ -129,8 +129,6 @@ void Init(
actorRegistrator(NYql::NDq::MakeCheckpointStorageID(), checkpointStorage.release());
}
- auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager"));
-
TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = {
NYql::GetCommonDqFactory(),
NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver),
@@ -204,6 +202,10 @@ void Init(
}
ui64 mkqlInitialMemoryLimit = 8_GB;
+ auto taskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr;
+ auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(
+ yqCounters->GetSubgroup("subsystem", "worker_manager"),
+ taskCounters);
if (protoConfig.GetResourceManager().GetEnabled()) {
mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit();
@@ -217,7 +219,6 @@ void Init(
}
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
- lwmOptions.DqTaskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr;
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, nullptr, false);
lwmOptions.AsyncIoFactory = asyncIoFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 558087acf71..1d94e3868af 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -320,7 +320,7 @@ private:
WatermarkTakeInputChannelDataRequests[*watermark]++;
}
- DqComputeActorMetrics.ReportInputChannelWatermark(
+ MetricsReporter.ReportInputChannelWatermark(
channelData.GetChannelId(),
channelData.GetData().GetRows(),
watermark);
@@ -378,7 +378,7 @@ private:
return Nothing();
}
- DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark);
+ MetricsReporter.ReportInjectedToTaskRunnerWatermark(pendingWatermark);
return pendingWatermark;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index ac05aaf117e..95f69005200 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -192,7 +192,7 @@ protected:
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
, TaskCounters(taskCounters)
- , DqComputeActorMetrics(taskCounters)
+ , MetricsReporter(taskCounters)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
@@ -225,7 +225,7 @@ protected:
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, WatermarksTracker(this->SelfId(), TxId, Task.GetId())
, TaskCounters(taskCounters)
- , DqComputeActorMetrics(taskCounters)
+ , MetricsReporter(taskCounters)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
, Running(!Task.GetCreateSuspended())
{
@@ -280,6 +280,8 @@ protected:
}
STFUNC(BaseStateFuncBody) {
+ MetricsReporter.ReportEvent(ev->GetTypeRewrite());
+
switch (ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvResumeExecution, HandleExecuteBase);
hFunc(TEvDqCompute::TEvChannelsInfo, HandleExecuteBase);
@@ -1683,7 +1685,7 @@ protected:
ContinueExecute();
}
- DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark);
+ MetricsReporter.ReportAsyncInputData(inputIndex, batch.RowCount(), watermark);
if (watermark) {
const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived(
@@ -2176,7 +2178,7 @@ protected:
THolder<TDqMemoryQuota> MemoryQuota;
TDqComputeActorWatermarks WatermarksTracker;
::NMonitoring::TDynamicCounterPtr TaskCounters;
- TDqComputeActorMetrics DqComputeActorMetrics;
+ TDqComputeActorMetrics MetricsReporter;
NWilson::TSpan ComputeActorSpan;
TDuration SourceCpuTime;
private:
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
index a6547c94aa5..ab43da974d9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
@@ -1,18 +1,76 @@
#include "dq_compute_actor_metrics.h"
+#include "dq_compute_actor.h"
+
+#include <ydb/library/yql/dq/actors/dq.h>
+#include <library/cpp/actors/core/interconnect.h>
namespace NYql::NDq {
-TDqComputeActorMetrics::TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters) {
- if (!counters) {
+TDqComputeActorMetrics::TDqComputeActorMetrics(
+ const NMonitoring::TDynamicCounterPtr& counters)
+ : Enable(!!counters)
+ , ComputeActorSubgroup(counters
+ ? counters->GetSubgroup("subsystem", "compute_actor")
+ : nullptr)
+{
+ if (ComputeActorSubgroup) {
+ InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms");
+ InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms");
+ WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram(
+ "watermark_collect_ms",
+ NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000}));
+
+#define ADD_COUNTER(name) \
+ name = ComputeActorSubgroup->GetCounter(#name)
+
+ ADD_COUNTER(ResumeExecution);
+ ADD_COUNTER(ChannelsInfo);
+ ADD_COUNTER(AbortExecution);
+ ADD_COUNTER(Wakeup);
+ ADD_COUNTER(Undelivered);
+ ADD_COUNTER(ChannelData);
+ ADD_COUNTER(ChannelDataAck);
+ ADD_COUNTER(StateRequest);
+ ADD_COUNTER(CheckpointCoordinator);
+ ADD_COUNTER(InjectCheckpoint);
+ ADD_COUNTER(CommitState);
+ ADD_COUNTER(RestoreFromCheckpoint);
+ ADD_COUNTER(NodeDisconnected);
+ ADD_COUNTER(NodeConnected);
+ ADD_COUNTER(NewAsyncInputDataArrived);
+ ADD_COUNTER(AsyncInputError);
+ ADD_COUNTER(OtherEvent);
+
+#undef ADD_COUNTER
+ }
+}
+
+void TDqComputeActorMetrics::ReportEvent(ui32 type)
+{
+ if (!Enable) {
return;
}
- ComputeActorSubgroup = counters->GetSubgroup("subsystem", "compute_actor");
- InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms");
- InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms");
- WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram(
- "watermark_collect_ms",
- NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000}));
+ switch (type) {
+ case TEvDqCompute::TEvResumeExecution::EventType: ResumeExecution->Inc(); break;
+ case TEvDqCompute::TEvChannelsInfo::EventType: ChannelsInfo->Inc(); break;
+ case TEvDq::TEvAbortExecution::EventType: AbortExecution->Inc(); break;
+ case NActors::TEvents::TEvWakeup::EventType: Wakeup->Inc(); break;
+ case NActors::TEvents::TEvUndelivered::EventType: Undelivered->Inc(); break;
+ case TEvDqCompute::TEvChannelData::EventType: ChannelData->Inc(); break;
+ case TEvDqCompute::TEvChannelDataAck::EventType: ChannelDataAck->Inc(); break;
+ case TEvDqCompute::TEvRun::EventType: Run->Inc(); break;
+ case TEvDqCompute::TEvStateRequest::EventType: StateRequest->Inc(); break;
+ case TEvDqCompute::TEvNewCheckpointCoordinator::EventType: CheckpointCoordinator->Inc(); break;
+ case TEvDqCompute::TEvInjectCheckpoint::EventType: InjectCheckpoint->Inc(); break;
+ case TEvDqCompute::TEvCommitState::EventType: CommitState->Inc(); break;
+ case TEvDqCompute::TEvRestoreFromCheckpoint::EventType: RestoreFromCheckpoint->Inc(); break;
+ case NActors::TEvInterconnect::TEvNodeDisconnected::EventType: NodeDisconnected->Inc(); break;
+ case NActors::TEvInterconnect::TEvNodeConnected::EventType: NodeConnected->Inc(); break;
+ case IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::EventType: NewAsyncInputDataArrived->Inc(); break;
+ case IDqComputeActorAsyncInput::TEvAsyncInputError::EventType: AsyncInputError->Inc(); break;
+ default: OtherEvent->Inc(); break;
+ }
}
void TDqComputeActorMetrics::ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
index 3e9afbf64ae..69227c4430d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
@@ -10,6 +10,7 @@ struct TDqComputeActorMetrics {
public:
TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters);
+ void ReportEvent(ui32 type);
void ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark);
void ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark);
void ReportInjectedToTaskRunnerWatermark(TInstant watermark);
@@ -30,6 +31,25 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr InjectedToOutputsWatermark;
NMonitoring::THistogramPtr WatermarkCollectLatency;
+ NMonitoring::TDynamicCounters::TCounterPtr ResumeExecution;
+ NMonitoring::TDynamicCounters::TCounterPtr ChannelsInfo;
+ NMonitoring::TDynamicCounters::TCounterPtr AbortExecution;
+ NMonitoring::TDynamicCounters::TCounterPtr Wakeup;
+ NMonitoring::TDynamicCounters::TCounterPtr Undelivered;
+ NMonitoring::TDynamicCounters::TCounterPtr ChannelData;
+ NMonitoring::TDynamicCounters::TCounterPtr ChannelDataAck;
+ NMonitoring::TDynamicCounters::TCounterPtr Run;
+ NMonitoring::TDynamicCounters::TCounterPtr StateRequest;
+ NMonitoring::TDynamicCounters::TCounterPtr CheckpointCoordinator;
+ NMonitoring::TDynamicCounters::TCounterPtr InjectCheckpoint;
+ NMonitoring::TDynamicCounters::TCounterPtr CommitState;
+ NMonitoring::TDynamicCounters::TCounterPtr RestoreFromCheckpoint;
+ NMonitoring::TDynamicCounters::TCounterPtr NodeDisconnected;
+ NMonitoring::TDynamicCounters::TCounterPtr NodeConnected;
+ NMonitoring::TDynamicCounters::TCounterPtr NewAsyncInputDataArrived;
+ NMonitoring::TDynamicCounters::TCounterPtr AsyncInputError;
+ NMonitoring::TDynamicCounters::TCounterPtr OtherEvent;
+
THashMap<TInstant, TInstant> WatermarkStartedAt;
};
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp
index cf150e270cf..2d325d0fe6f 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp
@@ -2,12 +2,21 @@
namespace NYql::NDqs {
-TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root) {
- ActiveWorkers = root->GetCounter("ActiveWorkers");
- MkqlMemoryLimit = root->GetCounter("MkqlMemoryLimit");
- MkqlMemoryAllocated = root->GetCounter("MkqlMemoryAllocated");
- FreeGroupError = root->GetCounter("FreeGroupError");
-}
+TWorkerManagerCounters::TWorkerManagerCounters(
+ ::NMonitoring::TDynamicCounterPtr root,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
+ : ActiveWorkers(root->GetCounter("ActiveWorkers"))
+ , MkqlMemoryLimit(root->GetCounter("MkqlMemoryLimit"))
+ , MkqlMemoryAllocated(root->GetCounter("MkqlMemoryAllocated"))
+ , FreeGroupError(root->GetCounter("FreeGroupError"))
+ , TaskCounters(taskCounters)
+{ }
+
+TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root)
+ : TWorkerManagerCounters(
+ root->GetSubgroup("component", "lwm"),
+ root->GetSubgroup("component", "tasks"))
+{ }
TWorkerManagerCounters::TWorkerManagerCounters()
: TWorkerManagerCounters(new ::NMonitoring::TDynamicCounters)
diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h
index 16f206e7696..07c6c6f4d6c 100644
--- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h
+++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h
@@ -8,8 +8,10 @@ struct TWorkerManagerCounters {
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated;
::NMonitoring::TDynamicCounters::TCounterPtr FreeGroupError;
+ ::NMonitoring::TDynamicCounterPtr TaskCounters;
explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root);
+ explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root, ::NMonitoring::TDynamicCounterPtr taskCounters);
TWorkerManagerCounters();
};
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index 13ffc497ee4..88d05f5d4bc 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -40,7 +40,7 @@ static_assert(sizeof(TDqLocalResourceId) == 8);
struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
- TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit)
+ TMemoryQuotaManager(std::shared_ptr<NDq::TResourceQuoter> nodeQuoter, const NDq::TTxId& txId, ui64 limit)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, NodeQuoter(nodeQuoter)
, TxId(txId) {
@@ -72,6 +72,7 @@ public:
TLocalWorkerManager(const TLocalWorkerManagerOptions& options)
: TWorkerManagerCommon<TLocalWorkerManager>(&TLocalWorkerManager::Handler)
, Options(options)
+ , TaskCounters(Options.Counters.TaskCounters)
, MemoryQuoter(std::make_shared<NDq::TResourceQuoter>(Options.MkqlTotalMemoryLimit))
{
Options.Counters.MkqlMemoryLimit->Set(Options.MkqlTotalMemoryLimit);
@@ -271,10 +272,10 @@ private:
auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId());
::NMonitoring::TDynamicCounterPtr taskCounters;
- if (createComputeActor && Options.DqTaskCounters) {
+ if (createComputeActor && TaskCounters) {
auto& info = TaskCountersMap[traceId];
if (!info.TaskCounters) {
- info.TaskCounters = Options.DqTaskCounters->GetSubgroup("operation", traceId);
+ info.TaskCounters = TaskCounters->GetSubgroup("operation", traceId);
}
info.ReferenceCount += count;
taskCounters = info.TaskCounters;
@@ -288,7 +289,7 @@ private:
actor.Reset(NYql::CreateComputeActor(
Options,
- std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]),
+ std::make_shared<TMemoryQuotaManager>(MemoryQuoter, allocationInfo.TxId, quotas[i]),
resultId,
traceId,
std::move(tasks[i]),
@@ -328,6 +329,20 @@ private:
Send(ev->Sender, response.Release());
}
+ void DropTaskCounters(const auto& info) {
+ auto traceId = std::get<TString>(info.TxId);
+ if (auto it = TaskCountersMap.find(traceId); it != TaskCountersMap.end()) {
+ if (it->second.ReferenceCount <= info.WorkerActors.size()) {
+ if (TaskCounters) {
+ TaskCounters->RemoveSubgroup("operation", traceId);
+ }
+ TaskCountersMap.erase(it);
+ } else {
+ it->second.ReferenceCount -= info.WorkerActors.size();
+ }
+ }
+ }
+
void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) {
YQL_CLOG(DEBUG, ProviderDq) << "Free Group " << id;
auto it = AllocatedWorkers.find(id);
@@ -341,17 +356,8 @@ private:
YQL_CLOG(ERROR, ProviderDq) << "Free Group " << id << " mismatched alloc-free senders: " << it->second.Sender << " and " << sender << " TxId: " << it->second.TxId;
}
- auto traceId = std::get<TString>(it->second.TxId);
- auto itt = TaskCountersMap.find(traceId);
- if (itt != TaskCountersMap.end()) {
- if (itt->second.ReferenceCount <= it->second.WorkerActors.size()) {
- if (Options.DqTaskCounters) {
- Options.DqTaskCounters->RemoveSubgroup("operation", traceId);
- }
- TaskCountersMap.erase(itt);
- } else {
- itt->second.ReferenceCount -= it->second.WorkerActors.size();
- }
+ if (Options.DropTaskCountersOnFinish) {
+ DropTaskCounters(it->second);
}
Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size());
@@ -374,6 +380,7 @@ private:
}
TLocalWorkerManagerOptions Options;
+ NMonitoring::TDynamicCounterPtr TaskCounters;
struct TAllocationInfo {
TVector<NActors::TActorId> WorkerActors;
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 7aacd11b261..95ecae911e5 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -22,7 +22,6 @@ namespace NYql::NDqs {
struct TLocalWorkerManagerOptions {
TWorkerManagerCounters Counters;
- ::NMonitoring::TDynamicCounterPtr DqTaskCounters;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;
@@ -39,6 +38,7 @@ namespace NYql::NDqs {
bool CanUseComputeActor = true;
NActors::TActorId QuoterServiceActorId;
bool ComputeActorOwnsCounters = false;
+ bool DropTaskCountersOnFinish = true;
};
NActors::IActor* CreateLocalWorkerManager(const TLocalWorkerManagerOptions& options);