diff options
author | ssmike <ssmike@ydb.tech> | 2023-01-24 11:26:07 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-01-24 11:26:07 +0300 |
commit | 605baa4c326c4b7bf0a4f4b5434a4dad8066d0d5 (patch) | |
tree | 3014135e5ef8013e1f17d141f15c5d73644b0777 | |
parent | 789a43e036d1e1a28c8444330ad13c0a9e7b5235 (diff) | |
download | ydb-605baa4c326c4b7bf0a4f4b5434a4dad8066d0d5.tar.gz |
Dont use resourcebroker in dataqueries
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/rm_service/kqp_rm_service.cpp | 58 |
2 files changed, 71 insertions, 18 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 3e4e92632d..e219a4c91f 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -138,6 +138,15 @@ private: return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR); } + NRm::EKqpMemoryPool memoryPool; + if (msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN) { + memoryPool = NRm::EKqpMemoryPool::ScanQuery; + } else if (msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA) { + memoryPool = NRm::EKqpMemoryPool::DataQuery; + } else { + memoryPool = NRm::EKqpMemoryPool::Unspecified; + } + ui32 requestChannels = 0; for (auto& dqTask : *msg.MutableTasks()) { auto estimation = EstimateTaskResources(dqTask, Config); @@ -161,7 +170,7 @@ private: LOG_D("TxId: " << txId << ", channels: " << requestChannels << ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory); - ui64 txMemory = State.GetTxMemory(txId, NRm::EKqpMemoryPool::ScanQuery) + request.TotalMemory; + ui64 txMemory = State.GetTxMemory(txId, memoryPool) + request.TotalMemory; if (txMemory > Config.GetQueryMemoryLimit()) { LOG_N("TxId: " << txId << ", requested too many memory: " << request.TotalMemory << "(" << txMemory << " for this Tx), limit: " << Config.GetQueryMemoryLimit()); @@ -177,7 +186,7 @@ private: for (auto& task : request.InFlyTasks) { NRm::TKqpResourcesRequest resourcesRequest; resourcesRequest.ExecutionUnits = 1; - resourcesRequest.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; + resourcesRequest.MemoryPool = memoryPool; // !!!!!!!!!!!!!!!!!!!!! // we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing. @@ -228,9 +237,9 @@ private: memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit(); memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit(); if (Config.GetEnableInstantMkqlMemoryAlloc()) { - memoryLimits.AllocateMemoryFn = [rm = ResourceManager()](const auto& txId, ui64 taskId, ui64 memory) { + memoryLimits.AllocateMemoryFn = [rm = ResourceManager(), memoryPool](const auto& txId, ui64 taskId, ui64 memory) { NRm::TKqpResourcesRequest resources; - resources.MemoryPool = NRm::EKqpMemoryPool::ScanQuery; + resources.MemoryPool = memoryPool; resources.Memory = memory; if (rm->AllocateResources(std::get<ui64>(txId), taskId, resources)) { @@ -249,16 +258,8 @@ private: request.Deadline = TAppData::TimeProvider->Now() + *runtimeSettingsBase.Timeout; } - if (msgRtSettings.GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN) { - runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::ScanQuery; - runtimeSettingsBase.FailOnUndelivery = false; - } else if (msgRtSettings.GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA) { - runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery; - runtimeSettingsBase.FailOnUndelivery = true; - } else { - runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified; - runtimeSettingsBase.FailOnUndelivery = true; - } + runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool; + runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN; runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode(); runtimeSettingsBase.UseLLVM = msgRtSettings.GetUseLLVM(); @@ -333,7 +334,7 @@ private: Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId); - State.NewRequest(txId, requester, std::move(request), NRm::EKqpMemoryPool::ScanQuery); + State.NewRequest(txId, requester, std::move(request), memoryPool); } void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) { diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index 81f3ec8612..45023d56a2 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -91,6 +91,8 @@ struct TTxState { ui64 TxExternalDataQueryMemory = 0; ui32 TxExecutionUnits = 0; TInstant CreatedAt; + + bool IsDataQuery = false; }; struct TTxStatesBucket { @@ -205,6 +207,10 @@ public: bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources, TKqpNotEnoughResources* details) override { + if (resources.MemoryPool == EKqpMemoryPool::DataQuery) { + NotifyExternalResourcesAllocated(txId, taskId, resources); + return true; + } Y_VERIFY(resources.MemoryPool == EKqpMemoryPool::ScanQuery); if (Y_UNLIKELY(resources.Memory == 0 && resources.ExecutionUnits == 0)) { return true; @@ -350,12 +356,20 @@ public: auto& txBucket = TxBucket(txId); - with_lock (txBucket.Lock) { + { + TMaybe<TGuard<TMutex>> guard; + guard.ConstructInPlace(txBucket.Lock); + auto txIt = txBucket.Txs.find(txId); if (txIt == txBucket.Txs.end()) { return; } + if (txIt->second.IsDataQuery) { + guard.Clear(); + return NotifyExternalResourcesFreed(txId, taskId); + } + auto taskIt = txIt->second.Tasks.find(taskId); if (taskIt == txIt->second.Tasks.end()) { return; @@ -403,11 +417,18 @@ public: auto& txBucket = TxBucket(txId); - with_lock (txBucket.Lock) { + { + TMaybe<TGuard<TMutex>> guard; + guard.ConstructInPlace(txBucket.Lock); + auto txIt = txBucket.Txs.find(txId); if (txIt == txBucket.Txs.end()) { return; } + if (txIt->second.IsDataQuery) { + guard.Clear(); + return NotifyExternalResourcesFreed(txId); + } for (auto& [taskId, taskState] : txIt->second.Tasks) { bool finished = ResourceBroker->FinishTaskInstant( @@ -442,12 +463,14 @@ public: void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override { LOG_D("TxId: " << txId << ", taskId: " << taskId << ". External allocation: " << resources.ToString()); - YQL_ENSURE(resources.ExecutionUnits == 0); + // we don't register data execution units for now + //YQL_ENSURE(resources.ExecutionUnits == 0); YQL_ENSURE(resources.MemoryPool == EKqpMemoryPool::DataQuery); auto& txBucket = TxBucket(txId); with_lock (txBucket.Lock) { auto& tx = txBucket.Txs[txId]; + tx.IsDataQuery = true; auto& task = tx.Tasks[taskId]; task.ExternalDataQueryMemory = resources.Memory; @@ -501,6 +524,35 @@ public: FireResourcesPublishing(); } + void NotifyExternalResourcesFreed(ui64 txId) { + LOG_D("TxId: " << txId << ". External free."); + + ui64 releaseMemory = 0; + + auto& txBucket = TxBucket(txId); + with_lock (txBucket.Lock) { + auto txIt = txBucket.Txs.find(txId); + if (txIt == txBucket.Txs.end()) { + return; + } + + for (auto task : txIt->second.Tasks) { + releaseMemory += task.second.ExternalDataQueryMemory; + } + txBucket.Txs.erase(txId); + } // with_lock (txBucket.Lock) + + with_lock (Lock) { + Y_VERIFY_DEBUG(ExternalDataQueryMemory >= releaseMemory); + ExternalDataQueryMemory -= releaseMemory; + } // with_lock (Lock) + + Counters->RmExternalMemory->Sub(releaseMemory); + Y_VERIFY_DEBUG(Counters->RmExternalMemory->Val() >= 0); + + FireResourcesPublishing(); + } + void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request"); auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>(); |