diff options
| author | Aleksandr Khoroshilov <[email protected]> | 2022-06-17 02:25:38 +0300 | 
|---|---|---|
| committer | Aleksandr Khoroshilov <[email protected]> | 2022-06-17 02:25:38 +0300 | 
| commit | ddc339a55fe43529cacdde3dd3232df49210a1db (patch) | |
| tree | 9528ac745b6bab06079f3eefaa517245ca0941df | |
| parent | 96f5c2e1b598247a3a9f11198acfe7f3081e4f11 (diff) | |
Check for self-load as well in placement
ref:448d52ca5fdd793734e0acb841d1b500d7adb269
| -rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 32 | 
1 files changed, 25 insertions, 7 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index 11ff9a8f495..b8bbe99f752 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -91,9 +91,13 @@ private:                  resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();              } +            bool placementFailure = false; +            ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()); +            ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());              TVector<TPeer> nodes;              for (ui32 i = 0; i < count; ++i) {                  TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; +                bool selfPlacement = true;                  if (!Peers.empty()) {                      auto FirstPeer = NextPeer;                      while (true) { @@ -111,20 +115,34 @@ private:                              // adjust allocated size to place next tasks correctly, will be reset after next health check                              nextNode.MemoryAllocated += MkqlInitialMemoryLimit;                              node = nextNode; +                            selfPlacement = false;                              break;                          }                      }                  } +                if (selfPlacement) { +                    if (memoryLimit == 0 || memoryLimit > memoryAllocated + MkqlInitialMemoryLimit) { +                        memoryAllocated += MkqlInitialMemoryLimit; +                    } else { +                        placementFailure = true; +                        auto& error = *req->Record.MutableError(); +                        error.SetErrorCode(NYql::NDqProto::EMISMATCH); +                        error.SetMessage("Not enough free memory in the cluster"); +                        break; +                    } +                }                  nodes.push_back(node);              } -            req->Record.ClearError(); -            auto& group = *req->Record.MutableNodes(); -            group.SetResourceId(resourceId); -            for (const auto& node : nodes) { -                auto* worker = group.AddWorker(); -                *worker->MutableGuid() = node.InstanceId; -                worker->SetNodeId(node.NodeId); +            if (!placementFailure) { +                req->Record.ClearError(); +                auto& group = *req->Record.MutableNodes(); +                group.SetResourceId(resourceId); +                for (const auto& node : nodes) { +                    auto* worker = group.AddWorker(); +                    *worker->MutableGuid() = node.InstanceId; +                    worker->SetNodeId(node.NodeId); +                }              }          }          LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());  | 
