diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-17 02:25:38 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-06-17 02:25:38 +0300 |
commit | ddc339a55fe43529cacdde3dd3232df49210a1db (patch) | |
tree | 9528ac745b6bab06079f3eefaa517245ca0941df | |
parent | 96f5c2e1b598247a3a9f11198acfe7f3081e4f11 (diff) | |
download | ydb-ddc339a55fe43529cacdde3dd3232df49210a1db.tar.gz |
Check for self-load as well in placement
ref:448d52ca5fdd793734e0acb841d1b500d7adb269
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 32 |
1 files changed, 25 insertions, 7 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index 11ff9a8f495..b8bbe99f752 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -91,9 +91,13 @@ private: resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); } + bool placementFailure = false; + ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()); + ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()); TVector<TPeer> nodes; for (ui32 i = 0; i < count; ++i) { TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; + bool selfPlacement = true; if (!Peers.empty()) { auto FirstPeer = NextPeer; while (true) { @@ -111,20 +115,34 @@ private: // adjust allocated size to place next tasks correctly, will be reset after next health check nextNode.MemoryAllocated += MkqlInitialMemoryLimit; node = nextNode; + selfPlacement = false; break; } } } + if (selfPlacement) { + if (memoryLimit == 0 || memoryLimit > memoryAllocated + MkqlInitialMemoryLimit) { + memoryAllocated += MkqlInitialMemoryLimit; + } else { + placementFailure = true; + auto& error = *req->Record.MutableError(); + error.SetErrorCode(NYql::NDqProto::EMISMATCH); + error.SetMessage("Not enough free memory in the cluster"); + break; + } + } nodes.push_back(node); } - req->Record.ClearError(); - auto& group = *req->Record.MutableNodes(); - group.SetResourceId(resourceId); - for (const auto& node : nodes) { - auto* worker = group.AddWorker(); - *worker->MutableGuid() = node.InstanceId; - worker->SetNodeId(node.NodeId); + if (!placementFailure) { + req->Record.ClearError(); + auto& group = *req->Record.MutableNodes(); + group.SetResourceId(resourceId); + for (const auto& node : nodes) { + auto* worker = group.AddWorker(); + *worker->MutableGuid() = node.InstanceId; + worker->SetNodeId(node.NodeId); + } } } LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString()); |