diff options
| author | hor911 <[email protected]> | 2023-02-26 14:40:53 +0300 |
|---|---|---|
| committer | hor911 <[email protected]> | 2023-02-26 14:40:53 +0300 |
| commit | 3240d5de137fa0c817be1ad64eff5c4bd109e675 (patch) | |
| tree | 3b89547fdd5d4dd6099e332638ab9af8d2f9d5ea | |
| parent | e125aeee8224246e6b9cdb56873324dfd22d24c2 (diff) | |
Flexible task memory settings
| -rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 17 | ||||
| -rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/yq/libs/actors/proxy.h | 7 | ||||
| -rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 60 | ||||
| -rw-r--r-- | ydb/core/yq/libs/actors/run_actor_params.cpp | 14 | ||||
| -rw-r--r-- | ydb/core/yq/libs/actors/run_actor_params.h | 14 | ||||
| -rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 7 | ||||
| -rw-r--r-- | ydb/library/yql/dq/proto/dq_tasks.proto | 1 | ||||
| -rw-r--r-- | ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp | 23 |
9 files changed, 86 insertions, 98 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index e524c4af6cd..b4d43ff946c 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -106,6 +106,13 @@ private: ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()); TVector<TPeer> nodes; for (ui32 i = 0; i < count; ++i) { + ui64 totalMemoryLimit = 0; + if (rec.TaskSize() > i) { + totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit(); + } + if (totalMemoryLimit == 0) { + totalMemoryLimit = MkqlInitialMemoryLimit; + } TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter}; bool selfPlacement = true; if (!Peers.empty()) { @@ -120,13 +127,13 @@ private: if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match && ( nextNode.MemoryLimit == 0 // memory is NOT limited - || nextNode.MemoryLimit >= nextNode.MemoryAllocated + MkqlInitialMemoryLimit) // or enough + || nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough ) { // adjust allocated size to place next tasks correctly, will be reset after next health check - nextNode.MemoryAllocated += MkqlInitialMemoryLimit; + nextNode.MemoryAllocated += totalMemoryLimit; if (nextNode.NodeId == SelfId().NodeId()) { // eventually synced self allocation info - memoryAllocated += MkqlInitialMemoryLimit; + memoryAllocated += totalMemoryLimit; } node = nextNode; selfPlacement = false; @@ -139,8 +146,8 @@ private: } } if (selfPlacement) { - if (memoryLimit == 0 || memoryLimit >= memoryAllocated + MkqlInitialMemoryLimit) { - memoryAllocated += MkqlInitialMemoryLimit; + if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) { + memoryAllocated += totalMemoryLimit; } else { placementFailure = true; auto& error = *req->Record.MutableError(); diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index d9d16da106e..5abae610f40 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -109,12 +109,7 @@ public: TPendingFetcher( const NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const ::NYq::NConfig::TCommonConfig& commonConfig, - const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, - const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig, - const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig, - const ::NYq::NConfig::TPingerConfig& pingerConfig, - const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const ::NYq::NConfig::TConfig& config, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, @@ -129,12 +124,7 @@ public: ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) - , CommonConfig(commonConfig) - , CheckpointCoordinatorConfig(checkpointCoordinatorConfig) - , PrivateApiConfig(privateApiConfig) - , GatewaysConfig(gatewaysConfig) - , PingerConfig(pingerConfig) - , RateLimiterConfig(rateLimiterConfig) + , Config(config) , FunctionRegistry(functionRegistry) , TimeProvider(timeProvider) , RandomProvider(randomProvider) @@ -360,9 +350,7 @@ private: FunctionRegistry, RandomProvider, ModuleResolver, ModuleResolver->GetNextUniqueId(), DqCompFactory, PqCmConnections, - CommonConfig, CheckpointCoordinatorConfig, - PrivateApiConfig, GatewaysConfig, PingerConfig, - RateLimiterConfig, + Config, task.text(), task.scope(), task.user_token(), DatabaseResolver, queryId, task.user_id(), GetOwnerId(), task.generation(), @@ -373,7 +361,7 @@ private: task.query_type(), task.query_syntax(), task.execute_mode(), - GetEntityIdAsString(CommonConfig.GetIdsPrefix(), EEntityType::RESULT), + GetEntityIdAsString(Config.GetCommon().GetIdsPrefix(), EEntityType::RESULT), task.state_load_mode(), task.disposition(), task.status(), @@ -414,12 +402,7 @@ private: NYq::TYqSharedResources::TPtr YqSharedResources; NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; - NYq::NConfig::TCommonConfig CommonConfig; - NYq::NConfig::TCheckpointCoordinatorConfig CheckpointCoordinatorConfig; - NYq::NConfig::TPrivateApiConfig PrivateApiConfig; - NYq::NConfig::TGatewaysConfig GatewaysConfig; - NYq::NConfig::TPingerConfig PingerConfig; - NYq::NConfig::TRateLimiterConfig RateLimiterConfig; + NYq::NConfig::TConfig Config; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; TIntrusivePtr<ITimeProvider> TimeProvider; @@ -467,12 +450,7 @@ private: NActors::IActor* CreatePendingFetcher( const NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const ::NYq::NConfig::TCommonConfig& commonConfig, - const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, - const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig, - const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig, - const ::NYq::NConfig::TPingerConfig& pingerConfig, - const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const ::NYq::NConfig::TConfig& config, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, @@ -488,12 +466,7 @@ NActors::IActor* CreatePendingFetcher( return new TPendingFetcher( yqSharedResources, credentialsProviderFactory, - commonConfig, - checkpointCoordinatorConfig, - privateApiConfig, - gatewaysConfig, - pingerConfig, - rateLimiterConfig, + config, functionRegistry, timeProvider, randomProvider, diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h index 09ff70aa6e3..049b4a70597 100644 --- a/ydb/core/yq/libs/actors/proxy.h +++ b/ydb/core/yq/libs/actors/proxy.h @@ -39,12 +39,7 @@ NActors::TActorId MakePendingFetcherId(ui32 nodeId); NActors::IActor* CreatePendingFetcher( const NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const ::NYq::NConfig::TCommonConfig& commonConfig, - const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, - const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig, - const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig, - const ::NYq::NConfig::TPingerConfig& pingerConfig, - const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const ::NYq::NConfig::TConfig& config, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 0f8508b2843..ae2c384d556 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -282,10 +282,10 @@ public: , Params(std::move(params)) , CreatedAt(Params.CreatedAt) , QueryCounters(queryCounters) - , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled()) - , MaxTasksPerStage(Params.CommonConfig.GetMaxTasksPerStage() ? Params.CommonConfig.GetMaxTasksPerStage() : 500) - , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40) - , Compressor(Params.CommonConfig.GetQueryArtifactsCompressionMethod(), Params.CommonConfig.GetQueryArtifactsCompressionMinSize()) + , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.Config.GetCheckpointCoordinator().GetEnabled()) + , MaxTasksPerStage(Params.Config.GetCommon().GetMaxTasksPerStage() ? Params.Config.GetCommon().GetMaxTasksPerStage() : 500) + , MaxTasksPerOperation(Params.Config.GetCommon().GetMaxTasksPerOperation() ? Params.Config.GetCommon().GetMaxTasksPerOperation() : 40) + , Compressor(Params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), Params.Config.GetCommon().GetQueryArtifactsCompressionMinSize()) { QueryCounters.SetUptimePublicAndServiceCounter(0); } @@ -308,7 +308,7 @@ public: Params.QueryId, Params.Owner, SelfId(), - Params.PingerConfig, + Params.Config.GetPinger(), Params.Deadline, QueryCounters, CreatedAt @@ -545,7 +545,7 @@ private: break; case YandexQuery::QueryMeta::STARTING: QueryStateUpdateRequest.mutable_resources()->set_rate_limiter( - Params.RateLimiterConfig.GetEnabled() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); + Params.Config.GetRateLimiter().GetEnabled() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::PREPARE); // know nothing about read rules yet Params.Status = YandexQuery::QueryMeta::RUNNING; // ??? @@ -702,6 +702,32 @@ private: } } + void FillMemoryInfo() { + auto mkqlDefaultLimit = Params.Config.GetResourceManager().GetMkqlInitialMemoryLimit(); + if (mkqlDefaultLimit == 0) { + mkqlDefaultLimit = 8_GB; + } + + auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight(); + if (s3ReadDefaultInflightLimit == 0) { + s3ReadDefaultInflightLimit = 200_MB; + } + + for (auto& graphParams : DqGraphParams) { + for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) { + if (task.GetInitialTaskMemoryLimit() == 0) { + ui64 limitTotal = mkqlDefaultLimit; + for (auto& input : *task.MutableInputs()) { + if (input.HasSource() && input.GetSource().GetType() == "S3Source") { + limitTotal += s3ReadDefaultInflightLimit; + } + } + task.SetInitialTaskMemoryLimit(limitTotal); + } + } + } + } + void Handle(NActors::TEvents::TEvWakeup::TPtr& ev) { auto tag = (RunActorWakeupTag) ev->Get()->Tag; switch (tag) { @@ -1272,7 +1298,7 @@ private: } bool StartRateLimiterResourceDeleterIfCan() { - if (!RateLimiterResourceDeleterId && !RateLimiterResourceCreatorId && FinalizingStatusIsWritten && QueryResponseArrived && Params.RateLimiterConfig.GetEnabled()) { + if (!RateLimiterResourceDeleterId && !RateLimiterResourceCreatorId && FinalizingStatusIsWritten && QueryResponseArrived && Params.Config.GetRateLimiter().GetEnabled()) { LOG_D("Start rate limiter resource deleter"); RateLimiterResourceDeleterId = Register(CreateRateLimiterResourceDeleter(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName)); return true; @@ -1363,7 +1389,7 @@ private: ::NYq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision), NYql::NDq::MakeCheckpointStorageID(), SelfId(), - Params.CheckpointCoordinatorConfig, + Params.Config.GetCheckpointCoordinator(), QueryCounters.Counters, dqGraphParams, Params.StateLoadMode, @@ -1408,7 +1434,7 @@ private: // Copy settings from config // They are stronger than settings from this function. - dqSettings = Params.GatewaysConfig.GetDq().GetDefaultSettings(); + dqSettings = Params.Config.GetGateways().GetDq().GetDefaultSettings(); THashSet<TString> settingsInConfig; for (const auto& s : dqSettings) { @@ -1454,13 +1480,13 @@ private: } void AddClustersFromConfig(NYql::TGatewaysConfig& gatewaysConfig, THashMap<TString, TString>& clusters) const { - for (const auto& pq : Params.GatewaysConfig.GetPq().GetClusterMapping()) { + for (const auto& pq : Params.Config.GetGateways().GetPq().GetClusterMapping()) { auto& clusterCfg = *gatewaysConfig.MutablePq()->AddClusterMapping(); clusterCfg = pq; clusters.emplace(clusterCfg.GetName(), PqProviderName); } - for (const auto& solomon : Params.GatewaysConfig.GetSolomon().GetClusterMapping()) { + for (const auto& solomon : Params.Config.GetGateways().GetSolomon().GetClusterMapping()) { auto& clusterCfg = *gatewaysConfig.MutableSolomon()->AddClusterMapping(); clusterCfg = solomon; clusters.emplace(clusterCfg.GetName(), SolomonProviderName); @@ -1570,7 +1596,7 @@ private: notFinished = true; } - if (!RateLimiterResourceWasDeleted && Params.RateLimiterConfig.GetEnabled()) { + if (!RateLimiterResourceWasDeleted && Params.Config.GetRateLimiter().GetEnabled()) { StartRateLimiterResourceDeleterIfCan(); notFinished = true; } @@ -1632,19 +1658,19 @@ private: SetupDqSettings(*gatewaysConfig.MutableDq()); // the main idea of having Params.GatewaysConfig is to copy clusters only // but in this case we have to copy S3 provider limits - *gatewaysConfig.MutableS3() = Params.GatewaysConfig.GetS3(); + *gatewaysConfig.MutableS3() = Params.Config.GetGateways().GetS3(); gatewaysConfig.MutableS3()->ClearClusterMapping(); THashMap<TString, TString> clusters; - TString monitoringEndpoint = Params.CommonConfig.GetMonitoringEndpoint(); + TString monitoringEndpoint = Params.Config.GetCommon().GetMonitoringEndpoint(); //todo: consider cluster name clashes AddClustersFromConfig(gatewaysConfig, clusters); AddSystemClusters(gatewaysConfig, clusters, Params.AuthToken); AddClustersFromConnections(YqConnections, - Params.CommonConfig.GetUseBearerForYdb(), - Params.CommonConfig.GetObjectStorageEndpoint(), + Params.Config.GetCommon().GetUseBearerForYdb(), + Params.Config.GetCommon().GetObjectStorageEndpoint(), monitoringEndpoint, Params.AuthToken, Params.AccountIdSignatures, @@ -1654,7 +1680,7 @@ private: TVector<TDataProviderInitializer> dataProvidersInit; const std::shared_ptr<IDatabaseAsyncResolver> dbResolver = std::make_shared<TDatabaseAsyncResolverImpl>(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, - Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId); + Params.Config.GetCommon().GetYdbMvpCloudEndpoint(), Params.Config.GetCommon().GetMdbGateway(), Params.Config.GetCommon().GetMdbTransformHost(), Params.QueryId); { // TBD: move init to better place QueryStateUpdateRequest.set_scope(Params.Scope.ToString()); diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 94f0af5d02b..7c634164f0e 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -14,12 +14,7 @@ TRunActorParams::TRunActorParams( ui64 nextUniqueId, NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory, ::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections, - const ::NYq::NConfig::TCommonConfig& commonConfig, - const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, - const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig, - const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig, - const ::NYq::NConfig::TPingerConfig& pingerConfig, - const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const ::NYq::NConfig::TConfig& config, const TString& sql, const NYdb::NFq::TScope& scope, const TString& authToken, @@ -65,12 +60,7 @@ TRunActorParams::TRunActorParams( , NextUniqueId(nextUniqueId) , DqCompFactory(dqCompFactory) , PqCmConnections(std::move(pqCmConnections)) - , CommonConfig(commonConfig) - , CheckpointCoordinatorConfig(checkpointCoordinatorConfig) - , PrivateApiConfig(privateApiConfig) - , GatewaysConfig(gatewaysConfig) - , PingerConfig(pingerConfig) - , RateLimiterConfig(rateLimiterConfig) + , Config(config) , Sql(sql) , Scope(scope) , AuthToken(authToken) diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index c6195babecf..d2115f9f03c 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -31,12 +31,7 @@ struct TRunActorParams { // TODO2 : Change name ui64 nextUniqueId, NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory, ::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections, - const ::NYq::NConfig::TCommonConfig& commonConfig, - const ::NYq::NConfig::TCheckpointCoordinatorConfig& checkpointCoordinatorConfig, - const ::NYq::NConfig::TPrivateApiConfig& privateApiConfig, - const ::NYq::NConfig::TGatewaysConfig& gatewaysConfig, - const ::NYq::NConfig::TPingerConfig& pingerConfig, - const ::NYq::NConfig::TRateLimiterConfig& rateLimiterConfig, + const ::NYq::NConfig::TConfig& config, const TString& sql, const NYdb::NFq::TScope& scope, const TString& authToken, @@ -87,12 +82,7 @@ struct TRunActorParams { // TODO2 : Change name NKikimr::NMiniKQL::TComputationNodeFactory DqCompFactory; ::NPq::NConfigurationManager::IConnections::TPtr PqCmConnections; - const ::NYq::NConfig::TCommonConfig CommonConfig; - const ::NYq::NConfig::TCheckpointCoordinatorConfig CheckpointCoordinatorConfig; - const ::NYq::NConfig::TPrivateApiConfig PrivateApiConfig; - const ::NYq::NConfig::TGatewaysConfig GatewaysConfig; - const ::NYq::NConfig::TPingerConfig PingerConfig; - const ::NYq::NConfig::TRateLimiterConfig RateLimiterConfig; + const ::NYq::NConfig::TConfig Config; const TString Sql; const NYdb::NFq::TScope Scope; const TString AuthToken; diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 97930d951cb..049a77b401a 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -286,12 +286,7 @@ void Init( auto fetcher = CreatePendingFetcher( yqSharedResources, NKikimr::CreateYdbCredentialsProviderFactory, - protoConfig.GetCommon(), - protoConfig.GetCheckpointCoordinator(), - protoConfig.GetPrivateApi(), - protoConfig.GetGateways(), - protoConfig.GetPinger(), - protoConfig.GetRateLimiter(), + protoConfig, appData->FunctionRegistry, TAppData::TimeProvider, TAppData::RandomProvider, diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index b812255752f..9365589705c 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -174,4 +174,5 @@ message TDqTask { bool CreateSuspended = 8; string RateLimiter = 10; string RateLimiterResource = 11; + uint64 InitialTaskMemoryLimit = 12; } diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index f39ba024ca7..7da4c641583 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -215,7 +215,23 @@ private: auto count = ev->Get()->Record.GetCount(); Y_VERIFY(count > 0); - bool canAllocate = MemoryQuoter->Allocate(traceId, 0, count * Options.MkqlInitialMemoryLimit); + auto& tasks = *ev->Get()->Record.MutableTask(); + + ui64 totalInitialTaskMemoryLimit = 0; + if (createComputeActor) { + Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count)); + for (auto& task : tasks) { + auto taskLimit = task.GetInitialTaskMemoryLimit(); + if (taskLimit == 0) { + taskLimit = Options.MkqlInitialMemoryLimit; + } + totalInitialTaskMemoryLimit += taskLimit; + } + } else { + totalInitialTaskMemoryLimit = count * Options.MkqlInitialMemoryLimit; + } + + bool canAllocate = MemoryQuoter->Allocate(traceId, 0, totalInitialTaskMemoryLimit); if (!canAllocate) { Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks", NYql::NDqProto::StatusIds::OVERLOADED), 0, ev->Cookie); return; @@ -232,11 +248,6 @@ private: TInstant::Now() + TDuration::MilliSeconds(ev->Get()->Record.GetFreeWorkerAfterMs()); } - auto& tasks = *ev->Get()->Record.MutableTask(); - - if (createComputeActor) { - Y_VERIFY(static_cast<int>(tasks.size()) == static_cast<int>(count)); - } auto resultId = ActorIdFromProto(ev->Get()->Record.GetResultActorId()); ::NMonitoring::TDynamicCounterPtr taskCounters; |
