summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2023-02-26 14:40:53 +0300
committerhor911 <[email protected]>2023-02-26 14:40:53 +0300
commit3240d5de137fa0c817be1ad64eff5c4bd109e675 (patch)
tree3b89547fdd5d4dd6099e332638ab9af8d2f9d5ea
parente125aeee8224246e6b9cdb56873324dfd22d24c2 (diff)
Flexible task memory settings
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp17
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp41
-rw-r--r--ydb/core/yq/libs/actors/proxy.h7
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp60
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp14
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h14
-rw-r--r--ydb/core/yq/libs/init/init.cpp7
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto1
-rw-r--r--ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp23
9 files changed, 86 insertions, 98 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index e524c4af6cd..b4d43ff946c 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -106,6 +106,13 @@ private:
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
+ ui64 totalMemoryLimit = 0;
+ if (rec.TaskSize() > i) {
+ totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit();
+ }
+ if (totalMemoryLimit == 0) {
+ totalMemoryLimit = MkqlInitialMemoryLimit;
+ }
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
@@ -120,13 +127,13 @@ private:
if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
- || nextNode.MemoryLimit >= nextNode.MemoryAllocated + MkqlInitialMemoryLimit) // or enough
+ || nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
) {
// adjust allocated size to place next tasks correctly, will be reset after next health check
- nextNode.MemoryAllocated += MkqlInitialMemoryLimit;
+ nextNode.MemoryAllocated += totalMemoryLimit;
if (nextNode.NodeId == SelfId().NodeId()) {
// eventually synced self allocation info
- memoryAllocated += MkqlInitialMemoryLimit;
+ memoryAllocated += totalMemoryLimit;
}
node = nextNode;
selfPlacement = false;
@@ -139,8 +146,8 @@ private:
}
}
if (selfPlacement) {
- if (memoryLimit == 0 || memoryLimit >= memoryAllocated + MkqlInitialMemoryLimit) {
- memoryAllocated += MkqlInitialMemoryLimit;
+ if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
+ memoryAllocated += totalMemoryLimit;
} else {
placementFailure = true;
auto& error = *req->Record.MutableError();
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index d9d16da106e..5abae610f40 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -109,12 +109,7 @@ public:
TPendingFetcher(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const ::NYq::NConfig::TCommonConfig& commonConfig,
- const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
- const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig,
- const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig,
- const ::NYq::NConfig::TPingerConfig& pingerConfig,
- const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const ::NYq::NConfig::TConfig& config,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -129,12 +124,7 @@ public:
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
- , CommonConfig(commonConfig)
- , CheckpointCoordinatorConfig(checkpointCoordinatorConfig)
- , PrivateApiConfig(privateApiConfig)
- , GatewaysConfig(gatewaysConfig)
- , PingerConfig(pingerConfig)
- , RateLimiterConfig(rateLimiterConfig)
+ , Config(config)
, FunctionRegistry(functionRegistry)
, TimeProvider(timeProvider)
, RandomProvider(randomProvider)
@@ -360,9 +350,7 @@ private:
FunctionRegistry, RandomProvider,
ModuleResolver, ModuleResolver->GetNextUniqueId(),
DqCompFactory, PqCmConnections,
- CommonConfig, CheckpointCoordinatorConfig,
- PrivateApiConfig, GatewaysConfig, PingerConfig,
- RateLimiterConfig,
+ Config,
task.text(), task.scope(), task.user_token(),
DatabaseResolver, queryId,
task.user_id(), GetOwnerId(), task.generation(),
@@ -373,7 +361,7 @@ private:
task.query_type(),
task.query_syntax(),
task.execute_mode(),
- GetEntityIdAsString(CommonConfig.GetIdsPrefix(), EEntityType::RESULT),
+ GetEntityIdAsString(Config.GetCommon().GetIdsPrefix(), EEntityType::RESULT),
task.state_load_mode(),
task.disposition(),
task.status(),
@@ -414,12 +402,7 @@ private:
NYq::TYqSharedResources::TPtr YqSharedResources;
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
- NYq::NConfig::TCommonConfig CommonConfig;
- NYq::NConfig::TCheckpointCoordinatorConfig CheckpointCoordinatorConfig;
- NYq::NConfig::TPrivateApiConfig PrivateApiConfig;
- NYq::NConfig::TGatewaysConfig GatewaysConfig;
- NYq::NConfig::TPingerConfig PingerConfig;
- NYq::NConfig::TRateLimiterConfig RateLimiterConfig;
+ NYq::NConfig::TConfig Config;
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
TIntrusivePtr<ITimeProvider> TimeProvider;
@@ -467,12 +450,7 @@ private:
NActors::IActor* CreatePendingFetcher(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const ::NYq::NConfig::TCommonConfig& commonConfig,
- const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
- const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig,
- const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig,
- const ::NYq::NConfig::TPingerConfig& pingerConfig,
- const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const ::NYq::NConfig::TConfig& config,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider> randomProvider,
@@ -488,12 +466,7 @@ NActors::IActor* CreatePendingFetcher(
return new TPendingFetcher(
yqSharedResources,
credentialsProviderFactory,
- commonConfig,
- checkpointCoordinatorConfig,
- privateApiConfig,
- gatewaysConfig,
- pingerConfig,
- rateLimiterConfig,
+ config,
functionRegistry,
timeProvider,
randomProvider,
diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h
index 09ff70aa6e3..049b4a70597 100644
--- a/ydb/core/yq/libs/actors/proxy.h
+++ b/ydb/core/yq/libs/actors/proxy.h
@@ -39,12 +39,7 @@ NActors::TActorId MakePendingFetcherId(ui32 nodeId);
NActors::IActor* CreatePendingFetcher(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const ::NYq::NConfig::TCommonConfig& commonConfig,
- const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
- const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig,
- const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig,
- const ::NYq::NConfig::TPingerConfig& pingerConfig,
- const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const ::NYq::NConfig::TConfig& config,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider> randomProvider,
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 0f8508b2843..ae2c384d556 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -282,10 +282,10 @@ public:
, Params(std::move(params))
, CreatedAt(Params.CreatedAt)
, QueryCounters(queryCounters)
- , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled())
- , MaxTasksPerStage(Params.CommonConfig.GetMaxTasksPerStage() ? Params.CommonConfig.GetMaxTasksPerStage() : 500)
- , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40)
- , Compressor(Params.CommonConfig.GetQueryArtifactsCompressionMethod(), Params.CommonConfig.GetQueryArtifactsCompressionMinSize())
+ , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.Config.GetCheckpointCoordinator().GetEnabled())
+ , MaxTasksPerStage(Params.Config.GetCommon().GetMaxTasksPerStage() ? Params.Config.GetCommon().GetMaxTasksPerStage() : 500)
+ , MaxTasksPerOperation(Params.Config.GetCommon().GetMaxTasksPerOperation() ? Params.Config.GetCommon().GetMaxTasksPerOperation() : 40)
+ , Compressor(Params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), Params.Config.GetCommon().GetQueryArtifactsCompressionMinSize())
{
QueryCounters.SetUptimePublicAndServiceCounter(0);
}
@@ -308,7 +308,7 @@ public:
Params.QueryId,
Params.Owner,
SelfId(),
- Params.PingerConfig,
+ Params.Config.GetPinger(),
Params.Deadline,
QueryCounters,
CreatedAt
@@ -545,7 +545,7 @@ private:
break;
case YandexQuery::QueryMeta::STARTING:
QueryStateUpdateRequest.mutable_resources()->set_rate_limiter(
- Params.RateLimiterConfig.GetEnabled() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED);
+ Params.Config.GetRateLimiter().GetEnabled() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED);
QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::PREPARE);
// know nothing about read rules yet
Params.Status = YandexQuery::QueryMeta::RUNNING; // ???
@@ -702,6 +702,32 @@ private:
}
}
+ void FillMemoryInfo() {
+ auto mkqlDefaultLimit = Params.Config.GetResourceManager().GetMkqlInitialMemoryLimit();
+ if (mkqlDefaultLimit == 0) {
+ mkqlDefaultLimit = 8_GB;
+ }
+
+ auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight();
+ if (s3ReadDefaultInflightLimit == 0) {
+ s3ReadDefaultInflightLimit = 200_MB;
+ }
+
+ for (auto& graphParams : DqGraphParams) {
+ for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) {
+ if (task.GetInitialTaskMemoryLimit() == 0) {
+ ui64 limitTotal = mkqlDefaultLimit;
+ for (auto& input : *task.MutableInputs()) {
+ if (input.HasSource() && input.GetSource().GetType() == "S3Source") {
+ limitTotal += s3ReadDefaultInflightLimit;
+ }
+ }
+ task.SetInitialTaskMemoryLimit(limitTotal);
+ }
+ }
+ }
+ }
+
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
auto tag = (RunActorWakeupTag) ev->Get()->Tag;
switch (tag) {
@@ -1272,7 +1298,7 @@ private:
}
bool StartRateLimiterResourceDeleterIfCan() {
- if (!RateLimiterResourceDeleterId && !RateLimiterResourceCreatorId && FinalizingStatusIsWritten && QueryResponseArrived && Params.RateLimiterConfig.GetEnabled()) {
+ if (!RateLimiterResourceDeleterId && !RateLimiterResourceCreatorId && FinalizingStatusIsWritten && QueryResponseArrived && Params.Config.GetRateLimiter().GetEnabled()) {
LOG_D("Start rate limiter resource deleter");
RateLimiterResourceDeleterId = Register(CreateRateLimiterResourceDeleter(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName));
return true;
@@ -1363,7 +1389,7 @@ private:
::NYq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision),
NYql::NDq::MakeCheckpointStorageID(),
SelfId(),
- Params.CheckpointCoordinatorConfig,
+ Params.Config.GetCheckpointCoordinator(),
QueryCounters.Counters,
dqGraphParams,
Params.StateLoadMode,
@@ -1408,7 +1434,7 @@ private:
// Copy settings from config
// They are stronger than settings from this function.
- dqSettings = Params.GatewaysConfig.GetDq().GetDefaultSettings();
+ dqSettings = Params.Config.GetGateways().GetDq().GetDefaultSettings();
THashSet<TString> settingsInConfig;
for (const auto& s : dqSettings) {
@@ -1454,13 +1480,13 @@ private:
}
void AddClustersFromConfig(NYql::TGatewaysConfig& gatewaysConfig, THashMap<TString, TString>& clusters) const {
- for (const auto& pq : Params.GatewaysConfig.GetPq().GetClusterMapping()) {
+ for (const auto& pq : Params.Config.GetGateways().GetPq().GetClusterMapping()) {
auto& clusterCfg = *gatewaysConfig.MutablePq()->AddClusterMapping();
clusterCfg = pq;
clusters.emplace(clusterCfg.GetName(), PqProviderName);
}
- for (const auto& solomon : Params.GatewaysConfig.GetSolomon().GetClusterMapping()) {
+ for (const auto& solomon : Params.Config.GetGateways().GetSolomon().GetClusterMapping()) {
auto& clusterCfg = *gatewaysConfig.MutableSolomon()->AddClusterMapping();
clusterCfg = solomon;
clusters.emplace(clusterCfg.GetName(), SolomonProviderName);
@@ -1570,7 +1596,7 @@ private:
notFinished = true;
}
- if (!RateLimiterResourceWasDeleted && Params.RateLimiterConfig.GetEnabled()) {
+ if (!RateLimiterResourceWasDeleted && Params.Config.GetRateLimiter().GetEnabled()) {
StartRateLimiterResourceDeleterIfCan();
notFinished = true;
}
@@ -1632,19 +1658,19 @@ private:
SetupDqSettings(*gatewaysConfig.MutableDq());
// the main idea of having Params.GatewaysConfig is to copy clusters only
// but in this case we have to copy S3 provider limits
- *gatewaysConfig.MutableS3() = Params.GatewaysConfig.GetS3();
+ *gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3();
gatewaysConfig.MutableS3()->ClearClusterMapping();
THashMap<TString, TString> clusters;
- TString monitoringEndpoint = Params.CommonConfig.GetMonitoringEndpoint();
+ TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint();
//todo: consider cluster name clashes
AddClustersFromConfig(gatewaysConfig, clusters);
AddSystemClusters(gatewaysConfig, clusters, Params.AuthToken);
AddClustersFromConnections(YqConnections,
- Params.CommonConfig.GetUseBearerForYdb(),
- Params.CommonConfig.GetObjectStorageEndpoint(),
+ Params.Config.GetCommon().GetUseBearerForYdb(),
+ Params.Config.GetCommon().GetObjectStorageEndpoint(),
monitoringEndpoint,
Params.AuthToken,
Params.AccountIdSignatures,
@@ -1654,7 +1680,7 @@ private:
TVector<TDataProviderInitializer> dataProvidersInit;
const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver,
- Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId);
+ Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), Params.Config.GetCommon().GetMdbGateway(), Params.Config.GetCommon().GetMdbTransformHost(), Params.QueryId);
{
// TBD: move init to better place
QueryStateUpdateRequest.set_scope(Params.Scope.ToString());
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index 94f0af5d02b..7c634164f0e 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -14,12 +14,7 @@ TRunActorParams::TRunActorParams(
ui64 nextUniqueId,
NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const ::NYq::NConfig::TCommonConfig& commonConfig,
- const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
- const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig,
- const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig,
- const ::NYq::NConfig::TPingerConfig& pingerConfig,
- const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const ::NYq::NConfig::TConfig& config,
const TString& sql,
const NYdb::NFq::TScope& scope,
const TString& authToken,
@@ -65,12 +60,7 @@ TRunActorParams::TRunActorParams(
, NextUniqueId(nextUniqueId)
, DqCompFactory(dqCompFactory)
, PqCmConnections(std::move(pqCmConnections))
- , CommonConfig(commonConfig)
- , CheckpointCoordinatorConfig(checkpointCoordinatorConfig)
- , PrivateApiConfig(privateApiConfig)
- , GatewaysConfig(gatewaysConfig)
- , PingerConfig(pingerConfig)
- , RateLimiterConfig(rateLimiterConfig)
+ , Config(config)
, Sql(sql)
, Scope(scope)
, AuthToken(authToken)
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index c6195babecf..d2115f9f03c 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -31,12 +31,7 @@ struct TRunActorParams { // TODO2 : Change name
ui64 nextUniqueId,
NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const ::NYq::NConfig::TCommonConfig& commonConfig,
- const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig,
- const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig,
- const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig,
- const ::NYq::NConfig::TPingerConfig& pingerConfig,
- const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig,
+ const ::NYq::NConfig::TConfig& config,
const TString& sql,
const NYdb::NFq::TScope& scope,
const TString& authToken,
@@ -87,12 +82,7 @@ struct TRunActorParams { // TODO2 : Change name
NKikimr::NMiniKQL::TComputationNodeFactory DqCompFactory;
::NPq::NConfigurationManager::IConnections::TPtr PqCmConnections;
- const ::NYq::NConfig::TCommonConfig CommonConfig;
- const ::NYq::NConfig::TCheckpointCoordinatorConfig CheckpointCoordinatorConfig;
- const ::NYq::NConfig::TPrivateApiConfig PrivateApiConfig;
- const ::NYq::NConfig::TGatewaysConfig GatewaysConfig;
- const ::NYq::NConfig::TPingerConfig PingerConfig;
- const ::NYq::NConfig::TRateLimiterConfig RateLimiterConfig;
+ const ::NYq::NConfig::TConfig Config;
const TString Sql;
const NYdb::NFq::TScope Scope;
const TString AuthToken;
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 97930d951cb..049a77b401a 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -286,12 +286,7 @@ void Init(
auto fetcher = CreatePendingFetcher(
yqSharedResources,
NKikimr::CreateYdbCredentialsProviderFactory,
- protoConfig.GetCommon(),
- protoConfig.GetCheckpointCoordinator(),
- protoConfig.GetPrivateApi(),
- protoConfig.GetGateways(),
- protoConfig.GetPinger(),
- protoConfig.GetRateLimiter(),
+ protoConfig,
appData->FunctionRegistry,
TAppData::TimeProvider,
TAppData::RandomProvider,
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index b812255752f..9365589705c 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -174,4 +174,5 @@ message TDqTask {
bool CreateSuspended = 8;
string RateLimiter = 10;
string RateLimiterResource = 11;
+ uint64 InitialTaskMemoryLimit = 12;
}
diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
index f39ba024ca7..7da4c641583 100644
--- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
+++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp
@@ -215,7 +215,23 @@ private:
auto count = ev->Get()->Record.GetCount();
Y_VERIFY(count > 0);
- bool canAllocate = MemoryQuoter->Allocate(traceId, 0, count * Options.MkqlInitialMemoryLimit);
+ auto& tasks = *ev->Get()->Record.MutableTask();
+
+ ui64 totalInitialTaskMemoryLimit = 0;
+ if (createComputeActor) {
+ Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count));
+ for (auto& task : tasks) {
+ auto taskLimit = task.GetInitialTaskMemoryLimit();
+ if (taskLimit == 0) {
+ taskLimit = Options.MkqlInitialMemoryLimit;
+ }
+ totalInitialTaskMemoryLimit += taskLimit;
+ }
+ } else {
+ totalInitialTaskMemoryLimit = count * Options.MkqlInitialMemoryLimit;
+ }
+
+ bool canAllocate = MemoryQuoter->Allocate(traceId, 0, totalInitialTaskMemoryLimit);
if (!canAllocate) {
Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks", NYql::NDqProto::StatusIds::OVERLOADED), 0, ev->Cookie);
return;
@@ -232,11 +248,6 @@ private:
TInstant::Now() + TDuration::MilliSeconds(ev->Get()->Record.GetFreeWorkerAfterMs());
}
- auto& tasks = *ev->Get()->Record.MutableTask();
-
- if (createComputeActor) {
- Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count));
- }
auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId());
::NMonitoring::TDynamicCounterPtr taskCounters;