aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-24 11:26:07 +0300
committerssmike <ssmike@ydb.tech>2023-01-24 11:26:07 +0300
commit605baa4c326c4b7bf0a4f4b5434a4dad8066d0d5 (patch)
tree3014135e5ef8013e1f17d141f15c5d73644b0777
parent789a43e036d1e1a28c8444330ad13c0a9e7b5235 (diff)
downloadydb-605baa4c326c4b7bf0a4f4b5434a4dad8066d0d5.tar.gz
Dont use resourcebroker in dataqueries
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp31
-rw-r--r--ydb/core/kqp/rm_service/kqp_rm_service.cpp58
2 files changed, 71 insertions, 18 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 3e4e92632d..e219a4c91f 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -138,6 +138,15 @@ private:
return ReplyError(txId, request.Executer, msg, NKikimrKqp::TEvStartKqpTasksResponse::INTERNAL_ERROR);
}
+ NRm::EKqpMemoryPool memoryPool;
+ if (msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN) {
+ memoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ } else if (msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA) {
+ memoryPool = NRm::EKqpMemoryPool::DataQuery;
+ } else {
+ memoryPool = NRm::EKqpMemoryPool::Unspecified;
+ }
+
ui32 requestChannels = 0;
for (auto& dqTask : *msg.MutableTasks()) {
auto estimation = EstimateTaskResources(dqTask, Config);
@@ -161,7 +170,7 @@ private:
LOG_D("TxId: " << txId << ", channels: " << requestChannels
<< ", computeActors: " << msg.GetTasks().size() << ", memory: " << request.TotalMemory);
- ui64 txMemory = State.GetTxMemory(txId, NRm::EKqpMemoryPool::ScanQuery) + request.TotalMemory;
+ ui64 txMemory = State.GetTxMemory(txId, memoryPool) + request.TotalMemory;
if (txMemory > Config.GetQueryMemoryLimit()) {
LOG_N("TxId: " << txId << ", requested too many memory: " << request.TotalMemory
<< "(" << txMemory << " for this Tx), limit: " << Config.GetQueryMemoryLimit());
@@ -177,7 +186,7 @@ private:
for (auto& task : request.InFlyTasks) {
NRm::TKqpResourcesRequest resourcesRequest;
resourcesRequest.ExecutionUnits = 1;
- resourcesRequest.MemoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ resourcesRequest.MemoryPool = memoryPool;
// !!!!!!!!!!!!!!!!!!!!!
// we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing.
@@ -228,9 +237,9 @@ private:
memoryLimits.MkqlLightProgramMemoryLimit = Config.GetMkqlLightProgramMemoryLimit();
memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit();
if (Config.GetEnableInstantMkqlMemoryAlloc()) {
- memoryLimits.AllocateMemoryFn = [rm = ResourceManager()](const auto& txId, ui64 taskId, ui64 memory) {
+ memoryLimits.AllocateMemoryFn = [rm = ResourceManager(), memoryPool](const auto& txId, ui64 taskId, ui64 memory) {
NRm::TKqpResourcesRequest resources;
- resources.MemoryPool = NRm::EKqpMemoryPool::ScanQuery;
+ resources.MemoryPool = memoryPool;
resources.Memory = memory;
if (rm->AllocateResources(std::get<ui64>(txId), taskId, resources)) {
@@ -249,16 +258,8 @@ private:
request.Deadline = TAppData::TimeProvider->Now() + *runtimeSettingsBase.Timeout;
}
- if (msgRtSettings.GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN) {
- runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::ScanQuery;
- runtimeSettingsBase.FailOnUndelivery = false;
- } else if (msgRtSettings.GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA) {
- runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::DataQuery;
- runtimeSettingsBase.FailOnUndelivery = true;
- } else {
- runtimeSettingsBase.ExtraMemoryAllocationPool = NRm::EKqpMemoryPool::Unspecified;
- runtimeSettingsBase.FailOnUndelivery = true;
- }
+ runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool;
+ runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType() != NYql::NDqProto::TComputeRuntimeSettings::SCAN;
runtimeSettingsBase.StatsMode = msgRtSettings.GetStatsMode();
runtimeSettingsBase.UseLLVM = msgRtSettings.GetUseLLVM();
@@ -333,7 +334,7 @@ private:
Send(request.Executer, reply.Release(), IEventHandle::FlagTrackDelivery, txId);
- State.NewRequest(txId, requester, std::move(request), NRm::EKqpMemoryPool::ScanQuery);
+ State.NewRequest(txId, requester, std::move(request), memoryPool);
}
void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) {
diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
index 81f3ec8612..45023d56a2 100644
--- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp
+++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp
@@ -91,6 +91,8 @@ struct TTxState {
ui64 TxExternalDataQueryMemory = 0;
ui32 TxExecutionUnits = 0;
TInstant CreatedAt;
+
+ bool IsDataQuery = false;
};
struct TTxStatesBucket {
@@ -205,6 +207,10 @@ public:
bool AllocateResources(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
TKqpNotEnoughResources* details) override
{
+ if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
+ NotifyExternalResourcesAllocated(txId, taskId, resources);
+ return true;
+ }
Y_VERIFY(resources.MemoryPool == EKqpMemoryPool::ScanQuery);
if (Y_UNLIKELY(resources.Memory == 0 && resources.ExecutionUnits == 0)) {
return true;
@@ -350,12 +356,20 @@ public:
auto& txBucket = TxBucket(txId);
- with_lock (txBucket.Lock) {
+ {
+ TMaybe<TGuard<TMutex>> guard;
+ guard.ConstructInPlace(txBucket.Lock);
+
auto txIt = txBucket.Txs.find(txId);
if (txIt == txBucket.Txs.end()) {
return;
}
+ if (txIt->second.IsDataQuery) {
+ guard.Clear();
+ return NotifyExternalResourcesFreed(txId, taskId);
+ }
+
auto taskIt = txIt->second.Tasks.find(taskId);
if (taskIt == txIt->second.Tasks.end()) {
return;
@@ -403,11 +417,18 @@ public:
auto& txBucket = TxBucket(txId);
- with_lock (txBucket.Lock) {
+ {
+ TMaybe<TGuard<TMutex>> guard;
+ guard.ConstructInPlace(txBucket.Lock);
+
auto txIt = txBucket.Txs.find(txId);
if (txIt == txBucket.Txs.end()) {
return;
}
+ if (txIt->second.IsDataQuery) {
+ guard.Clear();
+ return NotifyExternalResourcesFreed(txId);
+ }
for (auto& [taskId, taskState] : txIt->second.Tasks) {
bool finished = ResourceBroker->FinishTaskInstant(
@@ -442,12 +463,14 @@ public:
void NotifyExternalResourcesAllocated(ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
LOG_D("TxId: " << txId << ", taskId: " << taskId << ". External allocation: " << resources.ToString());
- YQL_ENSURE(resources.ExecutionUnits == 0);
+ // we don't register data execution units for now
+ //YQL_ENSURE(resources.ExecutionUnits == 0);
YQL_ENSURE(resources.MemoryPool == EKqpMemoryPool::DataQuery);
auto& txBucket = TxBucket(txId);
with_lock (txBucket.Lock) {
auto& tx = txBucket.Txs[txId];
+ tx.IsDataQuery = true;
auto& task = tx.Tasks[taskId];
task.ExternalDataQueryMemory = resources.Memory;
@@ -501,6 +524,35 @@ public:
FireResourcesPublishing();
}
+ void NotifyExternalResourcesFreed(ui64 txId) {
+ LOG_D("TxId: " << txId << ". External free.");
+
+ ui64 releaseMemory = 0;
+
+ auto& txBucket = TxBucket(txId);
+ with_lock (txBucket.Lock) {
+ auto txIt = txBucket.Txs.find(txId);
+ if (txIt == txBucket.Txs.end()) {
+ return;
+ }
+
+ for (auto task : txIt->second.Tasks) {
+ releaseMemory += task.second.ExternalDataQueryMemory;
+ }
+ txBucket.Txs.erase(txId);
+ } // with_lock (txBucket.Lock)
+
+ with_lock (Lock) {
+ Y_VERIFY_DEBUG(ExternalDataQueryMemory >= releaseMemory);
+ ExternalDataQueryMemory -= releaseMemory;
+ } // with_lock (Lock)
+
+ Counters->RmExternalMemory->Sub(releaseMemory);
+ Y_VERIFY_DEBUG(Counters->RmExternalMemory->Val() >= 0);
+
+ FireResourcesPublishing();
+ }
+
void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, "Schedule Snapshot request");
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();