summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <[email protected]>2022-05-25 19:13:20 +0300
committerAlexey Ozeritskiy <[email protected]>2022-05-25 19:13:20 +0300
commitab6d08ba5049a701adf054e48943c85d90607319 (patch)
tree8d7a649f6a5e959de2eebdf893445b87aebc08bd
parent10ee71fe578e9241b1e9198ef9467a7e5ca59b9f (diff)
Use rusage-fields for CA execution mode
ref:208a43cbd87d312a0b62e456865a4f9eb7e50441
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h17
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp1
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp46
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h13
8 files changed, 43 insertions, 47 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 40db3c57e7d..2cd69388977 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -657,8 +657,8 @@ add_subdirectory(ydb/library/yql/providers/dq/counters)
add_subdirectory(ydb/library/yql/providers/dq/task_runner)
add_subdirectory(ydb/library/yql/providers/dq/task_runner_actor)
add_subdirectory(ydb/library/yql/dq/actors/task_runner)
-add_subdirectory(ydb/library/yql/providers/dq/worker_manager)
add_subdirectory(ydb/library/yql/providers/dq/runtime)
+add_subdirectory(ydb/library/yql/providers/dq/worker_manager)
add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface)
add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index f55fb7f1213..ba3ec09dd9c 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -737,8 +737,8 @@ add_subdirectory(ydb/library/yql/providers/dq/counters)
add_subdirectory(ydb/library/yql/providers/dq/task_runner)
add_subdirectory(ydb/library/yql/providers/dq/task_runner_actor)
add_subdirectory(ydb/library/yql/dq/actors/task_runner)
-add_subdirectory(ydb/library/yql/providers/dq/worker_manager)
add_subdirectory(ydb/library/yql/providers/dq/runtime)
+add_subdirectory(ydb/library/yql/providers/dq/worker_manager)
add_subdirectory(ydb/library/yql/providers/dq/worker_manager/interface)
add_subdirectory(ydb/core/yq/libs/common)
add_subdirectory(ydb/core/yq/libs/control_plane_storage/events)
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 82a68507bff..3c060393b76 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -58,12 +58,6 @@ struct TTaskRunnerActorSensorEntry {
i64 Count = 0;
};
-struct TTaskRunnerActorRusage {
- i64 Utime = 0;
- i64 Stime = 0;
- i64 MajorPageFaults = 0;
-};
-
using TTaskRunnerActorSensors = TVector<TTaskRunnerActorSensorEntry>;
struct TEvError
@@ -202,14 +196,12 @@ struct TEvTaskRunFinished
THashMap<ui32, ui64>&& inputMap,
THashMap<ui32, ui64>&& sourcesMap,
const TTaskRunnerActorSensors& sensors = {},
- const TTaskRunnerActorRusage& rusage = {},
const TDqMemoryQuota::TProfileStats& profileStats = {},
ui64 mkqlMemoryLimit = 0,
THolder<NDqProto::TMiniKqlProgramState>&& programState = nullptr,
bool checkpointRequestedFromTaskRunner = false)
: RunStatus(runStatus)
, Sensors(sensors)
- , Rusage(rusage)
, InputChannelFreeSpace(std::move(inputMap))
, SourcesFreeSpace(std::move(sourcesMap))
, ProfileStats(profileStats)
@@ -220,7 +212,6 @@ struct TEvTaskRunFinished
NDq::ERunStatus RunStatus;
TTaskRunnerActorSensors Sensors;
- TTaskRunnerActorRusage Rusage;
THashMap<ui32, ui64> InputChannelFreeSpace;
THashMap<ui32, ui64> SourcesFreeSpace;
@@ -264,7 +255,7 @@ struct TEvChannelPopFinished
struct TCheckpointRequest {
TCheckpointRequest(TVector<ui32>&& channelIds, TVector<ui32>&& sinkIds, const NDqProto::TCheckpoint& checkpoint)
: ChannelIds(std::move(channelIds))
- , SinkIds(std::move(sinkIds))
+ , SinkIds(std::move(sinkIds))
, Checkpoint(checkpoint) {
}
@@ -278,13 +269,13 @@ struct TEvContinueRun
TEvContinueRun() = default;
- explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly)
+ explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly)
: ChannelId(0)
, MemLimit(0)
, FreeSpace(0)
- , CheckpointRequest(std::move(checkpointRequest))
+ , CheckpointRequest(std::move(checkpointRequest))
, CheckpointOnly(checkpointOnly)
- { }
+ { }
TEvContinueRun(ui32 channelId, ui64 freeSpace)
: ChannelId(channelId)
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 4c8afa0cb09..310a1c0c6ef 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -197,7 +197,6 @@ private:
std::move(inputChannelFreeSpace),
std::move(sourcesFreeSpace),
{},
- {},
MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(),
MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0,
std::move(mkqlProgramState),
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 22d9bc86682..d99a3cc12b1 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -527,14 +527,6 @@ private:
void OnRunFinished(TEvTaskRunFinished::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
auto res = ev->Get()->RunStatus;
- if (RuntimeData) {
- ::TRusage delta;
- delta.Stime = TDuration::MicroSeconds(ev->Get()->Rusage.Stime);
- delta.Utime = TDuration::MicroSeconds(ev->Get()->Rusage.Utime);
- delta.MajorPageFaults = ev->Get()->Rusage.MajorPageFaults;
- RuntimeData->AddRusageDelta(delta);
- }
-
Stat.AddCounters2(ev->Get()->Sensors);
switch (res) {
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt b/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt
index 0831df79d43..b138e813537 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt
+++ b/ydb/library/yql/providers/dq/task_runner_actor/CMakeLists.txt
@@ -17,6 +17,7 @@ target_link_libraries(providers-dq-task_runner_actor PUBLIC
cpp-actors-core
dq-actors-task_runner
dq-api-protos
+ providers-dq-runtime
yql-utils-actors
providers-dq-task_runner
)
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index a2321bf2529..f86368a8565 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -2,7 +2,8 @@
#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/dq/actors/actor_helpers.h>
-#include "ydb/library/yql/providers/dq/actors/events.h"
+#include <ydb/library/yql/providers/dq/actors/events.h>
+#include <ydb/library/yql/providers/dq/runtime/runtime_data.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/counters/counters.h>
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
@@ -31,16 +32,6 @@ TTaskRunnerActorSensors GetSensors(const T& t) {
return result;
}
-template<typename T>
-TTaskRunnerActorRusage GetRusage(const T& t) {
- TTaskRunnerActorRusage rusage = {
- t.GetRusage().GetUtime(),
- t.GetRusage().GetStime(),
- t.GetRusage().GetMajorPageFaults()
- };
- return rusage;
-}
-
} // namespace
class TTaskRunnerActor
@@ -54,7 +45,8 @@ public:
ITaskRunnerActor::ICallbacks* parent,
const NTaskRunnerProxy::IProxyFactory::TPtr& factory,
const ITaskRunnerInvoker::TPtr& invoker,
- const TString& traceId)
+ const TString& traceId,
+ TWorkerRuntimeData* runtimeData)
: TActor<TTaskRunnerActor>(&TTaskRunnerActor::Handler)
, Parent(parent)
, TraceId(traceId)
@@ -62,7 +54,9 @@ public:
, Invoker(invoker)
, Local(Invoker->IsLocal())
, Settings(MakeIntrusive<TDqConfiguration>())
- , StageId(0) {
+ , StageId(0)
+ , RuntimeData(runtimeData)
+ {
}
~TTaskRunnerActor() { }
@@ -495,7 +489,7 @@ private:
auto sourcesMap = Sources;
- Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap, sourcesMap, memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId]() mutable {
+ Invoker->Invoke([selfId, cookie, actorSystem, replyTo, taskRunner=TaskRunner, inputMap, sourcesMap, memLimit=ev->Get()->MemLimit, settings=Settings, stageId=StageId, runtimeData=RuntimeData]() mutable {
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
// guard.GetMutex()->SetLimit(memLimit);
@@ -522,10 +516,17 @@ private:
res,
std::move(inputChannelFreeSpace),
std::move(sourcesFreeSpace),
- GetSensors(response),
- GetRusage(response)),
+ GetSensors(response)),
/*flags=*/0,
cookie));
+
+ if (runtimeData) {
+ ::TRusage delta;
+ delta.Stime = TDuration::MicroSeconds(response.GetRusage().GetStime());
+ delta.Utime = TDuration::MicroSeconds(response.GetRusage().GetUtime());
+ delta.MajorPageFaults = response.GetRusage().GetMajorPageFaults();
+ runtimeData->AddRusageDelta(delta);
+ }
} catch (...) {
auto status = taskRunner->GetStatus();
actorSystem->Send(
@@ -550,15 +551,18 @@ private:
THashSet<ui32> Sources;
TIntrusivePtr<TDqConfiguration> Settings;
ui64 StageId;
+ TWorkerRuntimeData* RuntimeData;
};
class TTaskRunnerActorFactory: public ITaskRunnerActorFactory {
public:
TTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
- const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory)
+ const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ TWorkerRuntimeData* runtimeData)
: ProxyFactory(proxyFactory)
, InvokerFactory(invokerFactory)
+ , RuntimeData(runtimeData)
{ }
std::tuple<ITaskRunnerActor*, NActors::IActor*> Create(
@@ -567,7 +571,7 @@ public:
THashSet<ui32>&&,
THolder<NYql::NDq::TDqMemoryQuota>&&) override
{
- auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), traceId);
+ auto* actor = new TTaskRunnerActor(parent, ProxyFactory, InvokerFactory->Create(), traceId, RuntimeData);
return std::make_tuple(
static_cast<ITaskRunnerActor*>(actor),
static_cast<NActors::IActor*>(actor)
@@ -577,13 +581,15 @@ public:
private:
NTaskRunnerProxy::IProxyFactory::TPtr ProxyFactory;
NDqs::ITaskRunnerInvokerFactory::TPtr InvokerFactory;
+ TWorkerRuntimeData* RuntimeData;
};
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
- const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory)
+ const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ TWorkerRuntimeData* runtimeData)
{
- return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory));
+ return ITaskRunnerActorFactory::TPtr(new TTaskRunnerActorFactory(proxyFactory, invokerFactory, runtimeData));
}
} // namespace NTaskRunnerActor
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
index 103dbeabd68..045e64f42e2 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.h
@@ -18,14 +18,21 @@
#include <ydb/library/yql/providers/dq/task_runner/task_runner_invoker.h>
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
-namespace NYql::NDq {
+namespace NYql {
+
+struct TWorkerRuntimeData;
+
+namespace NDq {
namespace NTaskRunnerActor {
ITaskRunnerActorFactory::TPtr CreateTaskRunnerActorFactory(
const NTaskRunnerProxy::IProxyFactory::TPtr& proxyFactory,
- const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory);
+ const NDqs::ITaskRunnerInvokerFactory::TPtr& invokerFactory,
+ TWorkerRuntimeData* runtimeData = nullptr);
} // namespace NTaskRunnerActor
-} // namespace NYql::NDq
+} // namespace NDq
+
+} // namespace NYql