diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-12-06 03:00:55 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-12-06 03:20:43 +0300 |
commit | e27eef2a27f8d4e2b352cf4d38a8e5bb61926fbc (patch) | |
tree | b98b23a09869748bbc484144c56e6a18eb963999 | |
parent | 2b27731e53b45e8ebb704a0deda15ce0527e7f79 (diff) | |
download | ydb-e27eef2a27f8d4e2b352cf4d38a8e5bb61926fbc.tar.gz |
Enable/disable spilling in local worker manager
5 files changed, 12 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index 1aa8610ba9..ea95feb7da 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -23,7 +23,8 @@ IActor* CreateComputeActor( const TString& computeActorType, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, - NDqProto::EDqStatsMode statsMode) + NDqProto::EDqStatsMode statsMode, + bool enableSpilling) { auto memoryLimits = NDq::TComputeMemoryLimits(); memoryLimits.ChannelBufferSize = 1000000; @@ -40,7 +41,7 @@ IActor* CreateComputeActor( computeRuntimeSettings.ExtraMemoryAllocationPool = 3; computeRuntimeSettings.FailOnUndelivery = false; computeRuntimeSettings.StatsMode = (statsMode != NDqProto::DQ_STATS_MODE_UNSPECIFIED) ? statsMode : NDqProto::DQ_STATS_MODE_FULL; - computeRuntimeSettings.UseSpilling = options.UseSpilling; + computeRuntimeSettings.UseSpilling = enableSpilling; computeRuntimeSettings.AsyncInputPushLimit = 64_MB; // clear fake actorids diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.h b/ydb/library/yql/providers/dq/actors/compute_actor.h index 807e2d24c1..39c239245f 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.h +++ b/ydb/library/yql/providers/dq/actors/compute_actor.h @@ -14,6 +14,7 @@ NActors::IActor* CreateComputeActor( const TString& computeActorType, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory, ::NMonitoring::TDynamicCounterPtr taskCounters, - NDqProto::EDqStatsMode statsMode); + NDqProto::EDqStatsMode statsMode, + bool enableSpilling); } // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 3e54991766..0d58319b12 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -188,6 +188,7 @@ private: const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync"); + bool enableSpilling = Settings->SpillingEngine.Get().GetOrElse(TDqSettings::TDefault::SpillingEngine) != TDqSettings::ESpillingEngine::Disable; auto resourceAllocator = RegisterChild(CreateResourceAllocator( GwmActorId, SelfId(), ControlId, workerCount, @@ -201,6 +202,7 @@ private: allocateRequest->Record.SetCreateComputeActor(enableComputeActor); allocateRequest->Record.SetComputeActorType(computeActorType); allocateRequest->Record.SetStatsMode(StatsMode); + allocateRequest->Record.SetEnableSpilling(enableSpilling); if (enableComputeActor) { ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index a7d30d20e5..69c6b0441d 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -39,6 +39,7 @@ message TAllocateWorkersRequest { uint64 FreeWorkerAfterMs = 14; NYql.NDqProto.EDqStatsMode StatsMode = 16; + bool EnableSpilling = 17; // false } message TWorkerGroup { diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index bc105955e3..a50b119519 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -219,6 +219,7 @@ private: } bool createComputeActor = ev->Get()->Record.GetCreateComputeActor(); TString computeActorType = ev->Get()->Record.GetComputeActorType(); + bool enableSpilling = Options.UseSpilling && ev->Get()->Record.GetEnableSpilling(); if (createComputeActor && !Options.CanUseComputeActor) { Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Compute Actor Disabled", NYql::NDqProto::StatusIds::BAD_REQUEST), 0, ev->Cookie); @@ -296,14 +297,15 @@ private: computeActorType, Options.TaskRunnerActorFactory, taskCounters, - ev->Get()->Record.GetStatsMode())); + ev->Get()->Record.GetStatsMode(), + enableSpilling)); } else { actor.Reset(CreateWorkerActor( Options.RuntimeData, traceId, Options.TaskRunnerActorFactory, Options.AsyncIoFactory, - Options.UseSpilling)); + enableSpilling)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr |