diff options
author | hor911 <hor911@ydb.tech> | 2022-10-04 22:46:37 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-10-04 22:46:37 +0300 |
commit | ee9ef0e95c1ddf52aad66fee7a0f64b8d49b1300 (patch) | |
tree | 9f8c529d6efeb6c21cceaca473c71d216cc8c172 | |
parent | 2c64e058a844f9cf03275337e962759d9dd52367 (diff) | |
download | ydb-ee9ef0e95c1ddf52aad66fee7a0f64b8d49b1300.tar.gz |
Update statistics before finish
3 files changed, 15 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 1113dd051fd..4ae2f85225e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -796,6 +796,8 @@ private: } void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) { + continueRunEvent->SinkIds = GetIds(SinksMap); + if (!UseCpuQuota()) { Send(TaskRunnerActorId, continueRunEvent.release()); return; diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 8fbb410b1ec..bea7d5becf2 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -334,6 +334,7 @@ struct TEvContinueRun TMaybe<TWatermarkRequest> WatermarkRequest = Nothing(); TMaybe<TCheckpointRequest> CheckpointRequest = Nothing(); bool CheckpointOnly = false; + TVector<ui32> SinkIds; }; struct TEvAsyncInputPushFinished diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 8f235feabf7..123bc2783ae 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -209,6 +209,18 @@ private: MemoryQuota->TryShrinkMemory(guard.GetMutex()); } + { + auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds)); + + TaskRunner->UpdateStats(); + THashMap<ui32, const TDqAsyncOutputBufferStats*> sinkStats; + for (const auto sinkId : st->SinkIds) { + sinkStats[sinkId] = TaskRunner->GetSink(sinkId)->GetStats(); + } + st->Stats = TDqTaskRunnerStatsView(TaskRunner->GetStats(), std::move(sinkStats)); + Send(ev->Sender, st.Release()); + } + Send( ev->Sender, new TEvTaskRunFinished( |