aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <aozeritsky@gmail.com>2022-05-25 21:32:50 +0300
committerAlexey Ozeritskiy <aozeritsky@gmail.com>2022-05-25 21:32:50 +0300
commit483dcbe8de1b955ec0556102f0d44ec0d461b8a0 (patch)
tree55fe8e9394aa3d17243fc86ceddef3dc4c0f6592
parent97c5f124000eaf506008d3126105f289255ff8ee (diff)
downloadydb-483dcbe8de1b955ec0556102f0d44ec0d461b8a0.tar.gz
YQL-14903: ProcessInit for compute actor
ref:b325ed6b0f8e476685dbc26b97be8c3d8f0b372a
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp3
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp15
2 files changed, 13 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index d99a3cc12b1..a771742a79c 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -227,8 +227,6 @@ private:
TFailureInjector::Reach("dq_task_failure", [] {::_exit(1); });
Y_VERIFY(!TaskRunnerActor);
-
- Stat.StartCounter(Stat.GetCounterName("Actor", {{"ClusterName", RuntimeData ? RuntimeData->ClusterName : "local"}}, "ProcessInit"));
Y_VERIFY(!Executer);
Executer = ev->Sender;
Task = ev->Get()->Record.GetTask();
@@ -257,7 +255,6 @@ private:
}
void OnTaskRunnerCreated(TEvTaskRunnerCreateFinished::TPtr& ev, const TActorContext& ) {
- Stat.FlushCounter(Stat.GetCounterName("Actor", {{"ClusterName", RuntimeData ? RuntimeData->ClusterName : "local"}}, "ProcessInit"));
TaskRunnerPrepared = true;
try {
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index f86368a8565..43a12111f4e 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -56,6 +56,7 @@ public:
, Settings(MakeIntrusive<TDqConfiguration>())
, StageId(0)
, RuntimeData(runtimeData)
+ , ClusterName(RuntimeData ? RuntimeData->ClusterName : "local")
{
}
@@ -423,6 +424,8 @@ private:
auto cookie = ev->Cookie;
auto taskId = ev->Get()->Task.GetId();
auto& inputs = ev->Get()->Task.GetInputs();
+ auto startTime = TInstant::Now();
+
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
if (input.HasSource()) {
@@ -451,17 +454,24 @@ private:
Settings->FreezeDefaults();
StageId = taskMeta.GetStageId();
}
- Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId](){
+ Invoker->Invoke([taskRunner=TaskRunner, replyTo, selfId, cookie, actorSystem, settings=Settings, stageId=StageId, startTime, clusterName = ClusterName](){
try {
//auto guard = taskRunner->BindAllocator(); // only for local mode
auto result = taskRunner->Prepare();
+ auto sensors = GetSensors(result);
+ auto sensorName = TCounters::GetCounterName(
+ "Actor",
+ {{"ClusterName", clusterName}},
+ "ProcessInit");
+ i64 val = (TInstant::Now()-startTime).MilliSeconds();
+ sensors.push_back({sensorName, val, val, val, val, 1});
auto event = MakeHolder<TEvTaskRunnerCreateFinished>(
taskRunner->GetSecureParams(),
taskRunner->GetTaskParams(),
taskRunner->GetTypeEnv(),
taskRunner->GetHolderFactory(),
- GetSensors(result));
+ sensors);
actorSystem->Send(
new IEventHandle(
@@ -552,6 +562,7 @@ private:
TIntrusivePtr<TDqConfiguration> Settings;
ui64 StageId;
TWorkerRuntimeData* RuntimeData;
+ TString ClusterName;
};
class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {