aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-07-20 17:34:03 +0300
committerhor911 <hor911@ydb.tech>2022-07-20 17:34:03 +0300
commita0e7ac0b914070c9f6f89a9d6c6ec43724ef70dc (patch)
tree394f224ed070fe4293f8cd8e3df49c67a25041c0
parent1488927ea39daea1a84a7f2996ea74ff46c67318 (diff)
downloadydb-a0e7ac0b914070c9f6f89a9d6c6ec43724ef70dc.tar.gz
Export metrics from CA to separate service
-rw-r--r--ydb/core/yq/libs/init/init.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp10
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp11
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h28
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp9
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp23
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h1
10 files changed, 66 insertions, 26 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index ff24490d58f..d3e57645317 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -171,6 +171,7 @@ void Init(
}
NYql::NDqs::TLocalWorkerManagerOptions lwmOptions;
lwmOptions.Counters = workerManagerCounters;
+ lwmOptions.DqTaskCounters = appData->Counters->GetSubgroup("counters", "dq_tasks");
lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false);
lwmOptions.AsyncIoFactory = asyncIoFactory;
lwmOptions.FunctionRegistry = appData->FunctionRegistry;
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 4cf80c62d9f..3456cf0bc92 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -33,8 +33,9 @@ public:
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false)
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters)
, TaskRunnerActorFactory(taskRunnerActorFactory)
, ReadyToCheckpointFlag(false)
, SentStatsRequest(false)
@@ -652,10 +653,11 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
{
return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, taskRunnerActorFactory);
+ functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
index 23b6c897193..53f485de3b9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -21,7 +21,8 @@ NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr);
} // namespace NDq
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
index 6d82b5a9c98..4c99e02f9e2 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
@@ -36,8 +36,9 @@ public:
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const TTaskRunnerFactory& taskRunnerFactory)
- : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits)
+ const TTaskRunnerFactory& taskRunnerFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
+ : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, true, false, taskCounters)
, TaskRunnerFactory(taskRunnerFactory)
{}
@@ -70,10 +71,12 @@ private:
IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const TTaskRunnerFactory& taskRunnerFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
{
return new TDqComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory),
- functionRegistry, settings, memoryLimits, taskRunnerFactory);
+ functionRegistry, settings, memoryLimits, taskRunnerFactory, taskCounters);
}
} // namespace NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 5db7135fdef..c3cbaa6b435 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -259,7 +259,8 @@ NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const TTaskRunnerFactory& taskRunnerFactory);
+ const TTaskRunnerFactory& taskRunnerFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr);
} // namespace NDq
} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 4fa67082d1c..068b36662ed 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -167,7 +167,9 @@ protected:
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ bool ownMemoryQuota = true, bool passExceptions = false,
+ ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr)
: ExecuterId(executerId)
, TxId(txId)
, Task(std::move(task))
@@ -186,12 +188,14 @@ protected:
BasicStats = std::make_unique<TBasicStats>();
}
InitializeTask();
+ InitMonCounters(taskCounters);
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
- const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits)
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr)
: ExecuterId(executerId)
, TxId(txId)
, Task(task)
@@ -208,6 +212,22 @@ protected:
BasicStats = std::make_unique<TBasicStats>();
}
InitializeTask();
+ InitMonCounters(taskCounters);
+ }
+
+ void InitMonCounters(::NMonitoring::TDynamicCounterPtr taskCounters) {
+ if (taskCounters) {
+ MkqlMemoryUsage = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryUsage");
+ MkqlMemoryLimit = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryLimit");
+ MonCountersProvided = true;
+ }
+ }
+
+ void UpdateMonCounters() {
+ if (MonCountersProvided) {
+ *MkqlMemoryUsage = GetProfileStats()->MkqlMaxUsedMemory;
+ *MkqlMemoryLimit = GetMkqlMemoryLimit();
+ }
}
void ReportEventElapsedTime() {
@@ -1798,6 +1818,10 @@ private:
bool Running = true;
TInstant LastSendStatsTime;
bool PassExceptions = false;
+protected:
+ bool MonCountersProvided = false;
+ ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage;
+ ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit;
};
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index ff4491c15b0..e1ca744ad80 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -22,7 +22,8 @@ IActor* CreateComputeActor(
const TString& operationId,
NYql::NDqProto::TDqTask&& task,
const TString& computeActorType,
- const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
+ const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters)
{
auto memoryLimits = NDq::TComputeMemoryLimits();
memoryLimits.ChannelBufferSize = 1000000;
@@ -69,7 +70,8 @@ IActor* CreateComputeActor(
options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
- taskRunnerFactory);
+ taskRunnerFactory,
+ taskCounters);
} else {
return NYql::NDq::CreateDqAsyncComputeActor(
executerId,
@@ -79,7 +81,8 @@ IActor* CreateComputeActor(
options.FunctionRegistry,
computeRuntimeSettings,
memoryLimits,
- taskRunnerActorFactory);
+ taskRunnerActorFactory,
+ taskCounters);
}
}
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h
index 2bf024dfc3a..b0eaf4f6880 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.h
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.h
@@ -13,6 +13,7 @@ NActors::IActor* CreateComputeActor(
const TString& operationId,
NYql::NDqProto::TDqTask&& task,
const TString& computeActorType,
- const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
+ const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
+ ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index a515b687479..52ce8ee2095 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -245,17 +245,20 @@ private:
THolder<NActors::IActor> actor;
if (createComputeActor) {
+ auto id = tasks[i].GetId();
+ auto stageId = tasks[i].GetStageId();
YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType;
- actor.Reset(
- NYql::CreateComputeActor(
- Options,
- Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr,
- Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr,
- resultId,
- traceId,
- std::move(tasks[i]),
- computeActorType,
- Options.TaskRunnerActorFactory));
+ auto taskCounters = Options.DqTaskCounters ? Options.DqTaskCounters->GetSubgroup("operation", traceId)->GetSubgroup("stage", ToString(stageId))->GetSubgroup("id", ToString(id)) : nullptr;
+ actor.Reset(NYql::CreateComputeActor(
+ Options,
+ Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr,
+ Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr,
+ resultId,
+ traceId,
+ std::move(tasks[i]),
+ computeActorType,
+ Options.TaskRunnerActorFactory,
+ taskCounters));
} else {
actor.Reset(CreateWorkerActor(
Options.RuntimeData,
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
index 83b88f1792e..b2fcdfdef07 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h
@@ -20,6 +20,7 @@ namespace NYql::NDqs {
struct TLocalWorkerManagerOptions {
TWorkerManagerCounters Counters;
+ ::NMonitoring::TDynamicCounterPtr DqTaskCounters;
NTaskRunnerProxy::IProxyFactory::TPtr Factory;
NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr;