diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-26 17:30:55 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-26 17:30:55 +0300 |
commit | 633ac5815dc143f1e006ee94287596f6a734a679 (patch) | |
tree | 7b878033bb71f9b4cfc83cbb5ed1606e205e9d52 | |
parent | 226931a6797f969a0107ebfb340156a7c54b7eaf (diff) | |
download | ydb-633ac5815dc143f1e006ee94287596f6a734a679.tar.gz |
Handle empty allocation request and reply with error, do not assert it
ref:20ab039df2c25981830cce00cb052cabf836b3a3
-rw-r--r-- | ydb/core/yq/libs/actors/nodes_manager.cpp | 77 |
1 files changed, 42 insertions, 35 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index df7f804d26c..11ff9a8f495 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -78,47 +78,54 @@ private: ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc(); const auto &rec = ev->Get()->Record; const auto count = rec.GetCount(); - Y_ASSERT(count != 0); - auto resourceId = rec.GetResourceId(); - if (!resourceId) { - resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); - } - TVector<TPeer> nodes; - for (ui32 i = 0; i < count; ++i) { - TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; - if (!Peers.empty()) { - auto FirstPeer = NextPeer; - while (true) { - if (NextPeer >= Peers.size()) { - NextPeer = 0; - } + auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>(); + + if (count == 0) { + auto& error = *req->Record.MutableError(); + error.SetErrorCode(NYql::NDqProto::EMISMATCH); + error.SetMessage("Incorrect request - 0 nodes requested"); + } else { + auto resourceId = rec.GetResourceId(); + if (!resourceId) { + resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + } - auto& nextNode = Peers[NextPeer]; - ++NextPeer; - - if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then - || nextNode.MemoryLimit == 0 // not limit defined for the node - || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough - ) { - // adjust allocated size to place next tasks correctly, will be reset after next health check - nextNode.MemoryAllocated += MkqlInitialMemoryLimit; - node = nextNode; - break; + TVector<TPeer> nodes; + for (ui32 i = 0; i < count; ++i) { + TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; + if (!Peers.empty()) { + auto FirstPeer = NextPeer; + while (true) { + if (NextPeer >= Peers.size()) { + NextPeer = 0; + } + + auto& nextNode = Peers[NextPeer]; + ++NextPeer; + + if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then + || nextNode.MemoryLimit == 0 // not limit defined for the node + || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough + ) { + // adjust allocated size to place next tasks correctly, will be reset after next health check + nextNode.MemoryAllocated += MkqlInitialMemoryLimit; + node = nextNode; + break; + } } } + nodes.push_back(node); } - nodes.push_back(node); - } - auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>(); - req->Record.ClearError(); - auto& group = *req->Record.MutableNodes(); - group.SetResourceId(resourceId); - for (const auto& node : nodes) { - auto* worker = group.AddWorker(); - *worker->MutableGuid() = node.InstanceId; - worker->SetNodeId(node.NodeId); + req->Record.ClearError(); + auto& group = *req->Record.MutableNodes(); + group.SetResourceId(resourceId); + for (const auto& node : nodes) { + auto* worker = group.AddWorker(); + *worker->MutableGuid() = node.InstanceId; + worker->SetNodeId(node.NodeId); + } } LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString()); |