diff options
author | Alexey Ozeritskiy <[email protected]> | 2022-05-25 19:13:20 +0300 |
---|---|---|
committer | Alexey Ozeritskiy <[email protected]> | 2022-05-25 19:13:20 +0300 |
commit | ab6d08ba5049a701adf054e48943c85d90607319 (patch) | |
tree | 8d7a649f6a5e959de2eebdf893445b87aebc08bd | |
parent | 10ee71fe578e9241b1e9198ef9467a7e5ca59b9f (diff) |
Use rusage-fields for CA execution mode
ref:208a43cbd87d312a0b62e456865a4f9eb7e50441
8 files changed, 43 insertions, 47 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 40db3c57e7d..2cd69388977 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -657,8 +657,8 @@ add_subdirectory(ydb/library/yql/providers/dq/counters) add_subdirectory(ydb/library/yql/providers/dq/task_runner) add_subdirectory(ydb/library/yql/providers/dq/task_runner_actor) add_subdirectory(ydb/library/yql/dq/actors/task_runner) -add_subdirectory(ydb/library/yql/providers/dq/worker_manager) add_subdirectory(ydb/library/yql/providers/dq/runtime) +add_subdirectory(ydb/library/yql/providers/dq/worker_manager) add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface) add_subdirectory(ydb/core/yq/libs/common) add_subdirectory(ydb/core/yq/libs/control_plane_storage/events) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index f55fb7f1213..ba3ec09dd9c 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -737,8 +737,8 @@ add_subdirectory(ydb/library/yql/providers/dq/counters) add_subdirectory(ydb/library/yql/providers/dq/task_runner) add_subdirectory(ydb/library/yql/providers/dq/task_runner_actor) add_subdirectory(ydb/library/yql/dq/actors/task_runner) -add_subdirectory(ydb/library/yql/providers/dq/worker_manager) add_subdirectory(ydb/library/yql/providers/dq/runtime) +add_subdirectory(ydb/library/yql/providers/dq/worker_manager) add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface) add_subdirectory(ydb/core/yq/libs/common) add_subdirectory(ydb/core/yq/libs/control_plane_storage/events) diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 82a68507bff..3c060393b76 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -58,12 +58,6 @@ struct TTaskRunnerActorSensorEntry { i64 Count = 0; }; -struct TTaskRunnerActorRusage { - i64 Utime = 0; - i64 Stime = 0; - i64 MajorPageFaults = 0; -}; - using TTaskRunnerActorSensors = TVector<TTaskRunnerActorSensorEntry>; struct TEvError @@ -202,14 +196,12 @@ struct TEvTaskRunFinished THashMap<ui32, ui64>&& inputMap, THashMap<ui32, ui64>&& sourcesMap, const TTaskRunnerActorSensors& sensors = {}, - const TTaskRunnerActorRusage& rusage = {}, const TDqMemoryQuota::TProfileStats& profileStats = {}, ui64 mkqlMemoryLimit = 0, THolder<NDqProto::TMiniKqlProgramState>&& programState = nullptr, bool checkpointRequestedFromTaskRunner = false) : RunStatus(runStatus) , Sensors(sensors) - , Rusage(rusage) , InputChannelFreeSpace(std::move(inputMap)) , SourcesFreeSpace(std::move(sourcesMap)) , ProfileStats(profileStats) @@ -220,7 +212,6 @@ struct TEvTaskRunFinished NDq::ERunStatus RunStatus; TTaskRunnerActorSensors Sensors; - TTaskRunnerActorRusage Rusage; THashMap<ui32, ui64> InputChannelFreeSpace; THashMap<ui32, ui64> SourcesFreeSpace; @@ -264,7 +255,7 @@ struct TEvChannelPopFinished struct TCheckpointRequest { TCheckpointRequest(TVector<ui32>&& channelIds, TVector<ui32>&& sinkIds, const NDqProto::TCheckpoint& checkpoint) : ChannelIds(std::move(channelIds)) - , SinkIds(std::move(sinkIds)) + , SinkIds(std::move(sinkIds)) , Checkpoint(checkpoint) { } @@ -278,13 +269,13 @@ struct TEvContinueRun TEvContinueRun() = default; - explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly) + explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly) : ChannelId(0) , MemLimit(0) , FreeSpace(0) - , CheckpointRequest(std::move(checkpointRequest)) + , CheckpointRequest(std::move(checkpointRequest)) , CheckpointOnly(checkpointOnly) - { } + { } TEvContinueRun(ui32 channelId, ui64 freeSpace) : ChannelId(channelId) diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 4c8afa0cb09..310a1c0c6ef 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -197,7 +197,6 @@ private: std::move(inputChannelFreeSpace), std::move(sourcesFreeSpace), {}, - {}, MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(), MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0, std::move(mkqlProgramState), diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 22d9bc86682..d99a3cc12b1 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -527,14 +527,6 @@ private: void OnRunFinished(TEvTaskRunFinished::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); auto res = ev->Get()->RunStatus; - if (RuntimeData) { - ::TRusage delta; - delta.Stime = TDuration::MicroSeconds(ev->Get()->Rusage.Stime); - delta.Utime = TDuration::MicroSeconds(ev->Get()->Rusage.Utime); - delta.MajorPageFaults = ev->Get()->Rusage.MajorPageFaults; - RuntimeData->AddRusageDelta(delta); - } - Stat.AddCounters2(ev->Get()->Sensors); switch (res) { diff --git a/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt b/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt index 0831df79d43..b138e813537 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt @@ -17,6 +17,7 @@ target_link_libraries(providers-dq-task_runner_actor PUBLIC cpp-actors-core dq-actors-task_runner dq-api-protos + providers-dq-runtime yql-utils-actors providers-dq-task_runner ) diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index a2321bf2529..f86368a8565 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -2,7 +2,8 @@ #include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/providers/dq/actors/actor_helpers.h> -#include "ydb/library/yql/providers/dq/actors/events.h" +#include <ydb/library/yql/providers/dq/actors/events.h> +#include <ydb/library/yql/providers/dq/runtime/runtime_data.h> #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/counters/counters.h> #include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h> @@ -31,16 +32,6 @@ TTaskRunnerActorSensors GetSensors(const T& t) { return result; } -template<typename T> -TTaskRunnerActorRusage GetRusage(const T& t) { - TTaskRunnerActorRusage rusage = { - t.GetRusage().GetUtime(), - t.GetRusage().GetStime(), - t.GetRusage().GetMajorPageFaults() - }; - return rusage; -} - } // namespace class TTaskRunnerActor @@ -54,7 +45,8 @@ public: ITaskRunnerActor::ICallbacks* parent, const NTaskRunnerProxy::IProxyFactory::TPtr& factory, const ITaskRunnerInvoker::TPtr& invoker, - const TString& traceId) + const TString& traceId, + TWorkerRuntimeData* runtimeData) : TActor<TTaskRunnerActor>(&TTaskRunnerActor::Handler) , Parent(parent) , TraceId(traceId) @@ -62,7 +54,9 @@ public: , Invoker(invoker) , Local(Invoker->IsLocal()) , Settings(MakeIntrusive<TDqConfiguration>()) - , StageId(0) { + , StageId(0) + , RuntimeData(runtimeData) + { } ~TTaskRunnerActor() { } @@ -495,7 +489,7 @@ private: auto sourcesMap = Sources; - Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap, sourcesMap, memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId]() mutable { + Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap, sourcesMap, memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId, runtimeData=RuntimeData]() mutable { try { // auto guard = taskRunner->BindAllocator(); // only for local mode // guard.GetMutex()->SetLimit(memLimit); @@ -522,10 +516,17 @@ private: res, std::move(inputChannelFreeSpace), std::move(sourcesFreeSpace), - GetSensors(response), - GetRusage(response)), + GetSensors(response)), /*flags=*/0, cookie)); + + if (runtimeData) { + ::TRusage delta; + delta.Stime = TDuration::MicroSeconds(response.GetRusage().GetStime()); + delta.Utime = TDuration::MicroSeconds(response.GetRusage().GetUtime()); + delta.MajorPageFaults = response.GetRusage().GetMajorPageFaults(); + runtimeData->AddRusageDelta(delta); + } } catch (...) { auto status = taskRunner->GetStatus(); actorSystem->Send( @@ -550,15 +551,18 @@ private: THashSet<ui32> Sources; TIntrusivePtr<TDqConfiguration> Settings; ui64 StageId; + TWorkerRuntimeData* RuntimeData; }; class TTaskRunnerActorFactory: public ITaskRunnerActorFactory { public: TTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, - const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory) + const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + TWorkerRuntimeData* runtimeData) : ProxyFactory(proxyFactory) , InvokerFactory(invokerFactory) + , RuntimeData(runtimeData) { } std::tuple<ITaskRunnerActor*, NActors::IActor*> Create( @@ -567,7 +571,7 @@ public: THashSet<ui32>&&, THolder<NYql::NDq::TDqMemoryQuota>&&) override { - auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), traceId); + auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), traceId, RuntimeData); return std::make_tuple( static_cast<ITaskRunnerActor*>(actor), static_cast<NActors::IActor*>(actor) @@ -577,13 +581,15 @@ public: private: NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory; NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory; + TWorkerRuntimeData* RuntimeData; }; ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, - const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory) + const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + TWorkerRuntimeData* runtimeData) { - return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory)); + return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData)); } } // namespace NTaskRunnerActor diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h index 103dbeabd68..045e64f42e2 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h @@ -18,14 +18,21 @@ #include <ydb/library/yql/providers/dq/task_runner/task_runner_invoker.h> #include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h> -namespace NYql::NDq { +namespace NYql { + +struct TWorkerRuntimeData; + +namespace NDq { namespace NTaskRunnerActor { ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory( const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory, - const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory); + const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory, + TWorkerRuntimeData* runtimeData = nullptr); } // namespace NTaskRunnerActor -} // namespace NYql::NDq +} // namespace NDq + +} // namespace NYql |