diff options
author | hor911 <hor911@ydb.tech> | 2022-07-20 17:34:03 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-07-20 17:34:03 +0300 |
commit | a0e7ac0b914070c9f6f89a9d6c6ec43724ef70dc (patch) | |
tree | 394f224ed070fe4293f8cd8e3df49c67a25041c0 | |
parent | 1488927ea39daea1a84a7f2996ea74ff46c67318 (diff) | |
download | ydb-a0e7ac0b914070c9f6f89a9d6c6ec43724ef70dc.tar.gz |
Export metrics from CA to separate service
10 files changed, 66 insertions, 26 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index ff24490d58f..d3e57645317 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -171,6 +171,7 @@ void Init( } NYql::NDqs::TLocalWorkerManagerOptions lwmOptions; lwmOptions.Counters = workerManagerCounters; + lwmOptions.DqTaskCounters = appData->Counters->GetSubgroup("counters", "dq_tasks"); lwmOptions.Factory = NYql::NTaskRunnerProxy::CreateFactory(appData->FunctionRegistry, dqCompFactory, dqTaskTransformFactory, false); lwmOptions.AsyncIoFactory = asyncIoFactory; lwmOptions.FunctionRegistry = appData->FunctionRegistry; diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 4cf80c62d9f..3456cf0bc92 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -33,8 +33,9 @@ public: IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) - : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false) + const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, /* ownMemoryQuota = */ false, false, taskCounters) , TaskRunnerActorFactory(taskRunnerActorFactory) , ReadyToCheckpointFlag(false) , SentStatsRequest(false) @@ -652,10 +653,11 @@ IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) + const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters) { return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, taskRunnerActorFactory); + functionRegistry, settings, memoryLimits, taskRunnerActorFactory, taskCounters); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h index 23b6c897193..53f485de3b9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h @@ -21,7 +21,8 @@ NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory); + const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr); } // namespace NDq } // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp index 6d82b5a9c98..4c99e02f9e2 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp @@ -36,8 +36,9 @@ public: IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const TTaskRunnerFactory& taskRunnerFactory) - : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits) + const TTaskRunnerFactory& taskRunnerFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters) + : TBase(executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, true, false, taskCounters) , TaskRunnerFactory(taskRunnerFactory) {} @@ -70,10 +71,12 @@ private: IActor* CreateDqComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, const TTaskRunnerFactory& taskRunnerFactory) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + const TTaskRunnerFactory& taskRunnerFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters) { return new TDqComputeActor(executerId, txId, std::move(task), std::move(asyncIoFactory), - functionRegistry, settings, memoryLimits, taskRunnerFactory); + functionRegistry, settings, memoryLimits, taskRunnerFactory, taskCounters); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 5db7135fdef..c3cbaa6b435 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -259,7 +259,8 @@ NActors::IActor* CreateDqComputeActor(const NActors::TActorId& executerId, const IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const TTaskRunnerFactory& taskRunnerFactory); + const TTaskRunnerFactory& taskRunnerFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr); } // namespace NDq } // namespace NYql diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 4fa67082d1c..068b36662ed 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -167,7 +167,9 @@ protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, bool ownMemoryQuota = true, bool passExceptions = false) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + bool ownMemoryQuota = true, bool passExceptions = false, + ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr) : ExecuterId(executerId) , TxId(txId) , Task(std::move(task)) @@ -186,12 +188,14 @@ protected: BasicStats = std::make_unique<TBasicStats>(); } InitializeTask(); + InitMonCounters(taskCounters); } TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, + ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr) : ExecuterId(executerId) , TxId(txId) , Task(task) @@ -208,6 +212,22 @@ protected: BasicStats = std::make_unique<TBasicStats>(); } InitializeTask(); + InitMonCounters(taskCounters); + } + + void InitMonCounters(::NMonitoring::TDynamicCounterPtr taskCounters) { + if (taskCounters) { + MkqlMemoryUsage = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryUsage"); + MkqlMemoryLimit = taskCounters->GetSubgroup("subsystem", "mkql")->GetCounter("MemoryLimit"); + MonCountersProvided = true; + } + } + + void UpdateMonCounters() { + if (MonCountersProvided) { + *MkqlMemoryUsage = GetProfileStats()->MkqlMaxUsedMemory; + *MkqlMemoryLimit = GetMkqlMemoryLimit(); + } } void ReportEventElapsedTime() { @@ -1798,6 +1818,10 @@ private: bool Running = true; TInstant LastSendStatsTime; bool PassExceptions = false; +protected: + bool MonCountersProvided = false; + ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryUsage; + ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; }; } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index ff4491c15b0..e1ca744ad80 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -22,7 +22,8 @@ IActor* CreateComputeActor( const TString& operationId, NYql::NDqProto::TDqTask&& task, const TString& computeActorType, - const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) + const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters) { auto memoryLimits = NDq::TComputeMemoryLimits(); memoryLimits.ChannelBufferSize = 1000000; @@ -69,7 +70,8 @@ IActor* CreateComputeActor( options.FunctionRegistry, computeRuntimeSettings, memoryLimits, - taskRunnerFactory); + taskRunnerFactory, + taskCounters); } else { return NYql::NDq::CreateDqAsyncComputeActor( executerId, @@ -79,7 +81,8 @@ IActor* CreateComputeActor( options.FunctionRegistry, computeRuntimeSettings, memoryLimits, - taskRunnerActorFactory); + taskRunnerActorFactory, + taskCounters); } } diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h index 2bf024dfc3a..b0eaf4f6880 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.h +++ b/ydb/library/yql/providers/dq/actors/compute_actor.h @@ -13,6 +13,7 @@ NActors::IActor* CreateComputeActor( const TString& operationId, NYql::NDqProto::TDqTask&& task, const TString& computeActorType, - const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory); + const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, + ::NMonitoring::TDynamicCounterPtr taskCounters = nullptr); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index a515b687479..52ce8ee2095 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -245,17 +245,20 @@ private: THolder<NActors::IActor> actor; if (createComputeActor) { + auto id = tasks[i].GetId(); + auto stageId = tasks[i].GetStageId(); YQL_CLOG(DEBUG, ProviderDq) << "Create compute actor: " << computeActorType; - actor.Reset( - NYql::CreateComputeActor( - Options, - Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr, - Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr, - resultId, - traceId, - std::move(tasks[i]), - computeActorType, - Options.TaskRunnerActorFactory)); + auto taskCounters = Options.DqTaskCounters ? Options.DqTaskCounters->GetSubgroup("operation", traceId)->GetSubgroup("stage", ToString(stageId))->GetSubgroup("id", ToString(id)) : nullptr; + actor.Reset(NYql::CreateComputeActor( + Options, + Options.MkqlTotalMemoryLimit ? AllocateMemoryFn : nullptr, + Options.MkqlTotalMemoryLimit ? FreeMemoryFn : nullptr, + resultId, + traceId, + std::move(tasks[i]), + computeActorType, + Options.TaskRunnerActorFactory, + taskCounters)); } else { actor.Reset(CreateWorkerActor( Options.RuntimeData, diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 83b88f1792e..b2fcdfdef07 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -20,6 +20,7 @@ namespace NYql::NDqs { struct TLocalWorkerManagerOptions { TWorkerManagerCounters Counters; + ::NMonitoring::TDynamicCounterPtr DqTaskCounters; NTaskRunnerProxy::IProxyFactory::TPtr Factory; NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry = nullptr; |