aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-12-06 03:00:55 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-12-06 03:20:43 +0300
commite27eef2a27f8d4e2b352cf4d38a8e5bb61926fbc (patch)
treeb98b23a09869748bbc484144c56e6a18eb963999
parent2b27731e53b45e8ebb704a0deda15ce0527e7f79 (diff)
downloadydb-e27eef2a27f8d4e2b352cf4d38a8e5bb61926fbc.tar.gz
Enable/disable spilling in local worker manager
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp5
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.h3
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/dqs.proto1
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp6
5 files changed, 12 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 1aa8610ba9..ea95feb7da 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -23,7 +23,8 @@ IActor* CreateComputeActor(
const TString& computeActorType,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters,
- NDqProto::EDqStatsMode statsMode)
+ NDqProto::EDqStatsMode statsMode,
+ bool enableSpilling)
{
auto memoryLimits = NDq::TComputeMemoryLimits();
memoryLimits.ChannelBufferSize = 1000000;
@@ -40,7 +41,7 @@ IActor* CreateComputeActor(
computeRuntimeSettings.ExtraMemoryAllocationPool = 3;
computeRuntimeSettings.FailOnUndelivery = false;
computeRuntimeSettings.StatsMode = (statsMode != NDqProto::DQ_STATS_MODE_UNSPECIFIED) ? statsMode : NDqProto::DQ_STATS_MODE_FULL;
- computeRuntimeSettings.UseSpilling = options.UseSpilling;
+ computeRuntimeSettings.UseSpilling = enableSpilling;
computeRuntimeSettings.AsyncInputPushLimit = 64_MB;
// clear fake actorids
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h
index 807e2d24c1..39c239245f 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.h
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.h
@@ -14,6 +14,7 @@ NActors::IActor* CreateComputeActor(
const TString& computeActorType,
const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory,
::NMonitoring::TDynamicCounterPtr taskCounters,
- NDqProto::EDqStatsMode statsMode);
+ NDqProto::EDqStatsMode statsMode,
+ bool enableSpilling);
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index 3e54991766..0d58319b12 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -188,6 +188,7 @@ private:
const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync");
+ bool enableSpilling = Settings->SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != TDqSettings::ESpillingEngine::Disable;
auto resourceAllocator = RegisterChild(CreateResourceAllocator(
GwmActorId, SelfId(), ControlId, workerCount,
@@ -201,6 +202,7 @@ private:
allocateRequest->Record.SetCreateComputeActor(enableComputeActor);
allocateRequest->Record.SetComputeActorType(computeActorType);
allocateRequest->Record.SetStatsMode(StatsMode);
+ allocateRequest->Record.SetEnableSpilling(enableSpilling);
if (enableComputeActor) {
ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId());
}
diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto
index a7d30d20e5..69c6b0441d 100644
--- a/ydb/library/yql/providers/dq/api/protos/dqs.proto
+++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto
@@ -39,6 +39,7 @@ message TAllocateWorkersRequest {
uint64 FreeWorkerAfterMs = 14;
NYql.NDqProto.EDqStatsMode StatsMode = 16;
+ bool EnableSpilling = 17; // false
}
message TWorkerGroup {
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index bc105955e3..a50b119519 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -219,6 +219,7 @@ private:
}
bool createComputeActor = ev->Get()->Record.GetCreateComputeActor();
TString computeActorType = ev->Get()->Record.GetComputeActorType();
+ bool enableSpilling = Options.UseSpilling && ev->Get()->Record.GetEnableSpilling();
if (createComputeActor && !Options.CanUseComputeActor) {
Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Compute Actor Disabled", NYql::NDqProto::StatusIds::BAD_REQUEST), 0, ev->Cookie);
@@ -296,14 +297,15 @@ private:
computeActorType,
Options.TaskRunnerActorFactory,
taskCounters,
- ev->Get()->Record.GetStatsMode()));
+ ev->Get()->Record.GetStatsMode(),
+ enableSpilling));
} else {
actor.Reset(CreateWorkerActor(
Options.RuntimeData,
traceId,
Options.TaskRunnerActorFactory,
Options.AsyncIoFactory,
- Options.UseSpilling));
+ enableSpilling));
}
allocationInfo.WorkerActors.emplace_back(RegisterChild(
actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr