aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-06-17 02:25:38 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-06-17 02:25:38 +0300
commitddc339a55fe43529cacdde3dd3232df49210a1db (patch)
tree9528ac745b6bab06079f3eefaa517245ca0941df
parent96f5c2e1b598247a3a9f11198acfe7f3081e4f11 (diff)
downloadydb-ddc339a55fe43529cacdde3dd3232df49210a1db.tar.gz
Check for self-load as well in placement
ref:448d52ca5fdd793734e0acb841d1b500d7adb269
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp32
1 files changed, 25 insertions, 7 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index 11ff9a8f495..b8bbe99f752 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -91,9 +91,13 @@ private:
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}
+ bool placementFailure = false;
+ ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
+ ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
TVector<TPeer> nodes;
for (ui32 i = 0; i < count; ++i) {
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0};
+ bool selfPlacement = true;
if (!Peers.empty()) {
auto FirstPeer = NextPeer;
while (true) {
@@ -111,20 +115,34 @@ private:
// adjust allocated size to place next tasks correctly, will be reset after next health check
nextNode.MemoryAllocated += MkqlInitialMemoryLimit;
node = nextNode;
+ selfPlacement = false;
break;
}
}
}
+ if (selfPlacement) {
+ if (memoryLimit == 0 || memoryLimit > memoryAllocated + MkqlInitialMemoryLimit) {
+ memoryAllocated += MkqlInitialMemoryLimit;
+ } else {
+ placementFailure = true;
+ auto& error = *req->Record.MutableError();
+ error.SetErrorCode(NYql::NDqProto::EMISMATCH);
+ error.SetMessage("Not enough free memory in the cluster");
+ break;
+ }
+ }
nodes.push_back(node);
}
- req->Record.ClearError();
- auto& group = *req->Record.MutableNodes();
- group.SetResourceId(resourceId);
- for (const auto& node : nodes) {
- auto* worker = group.AddWorker();
- *worker->MutableGuid() = node.InstanceId;
- worker->SetNodeId(node.NodeId);
+ if (!placementFailure) {
+ req->Record.ClearError();
+ auto& group = *req->Record.MutableNodes();
+ group.SetResourceId(resourceId);
+ for (const auto& node : nodes) {
+ auto* worker = group.AddWorker();
+ *worker->MutableGuid() = node.InstanceId;
+ worker->SetNodeId(node.NodeId);
+ }
}
}
LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());