aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-26 17:30:55 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-26 17:30:55 +0300
commit633ac5815dc143f1e006ee94287596f6a734a679 (patch)
tree7b878033bb71f9b4cfc83cbb5ed1606e205e9d52
parent226931a6797f969a0107ebfb340156a7c54b7eaf (diff)
downloadydb-633ac5815dc143f1e006ee94287596f6a734a679.tar.gz
Handle empty allocation request and reply with error, do not assert it
ref:20ab039df2c25981830cce00cb052cabf836b3a3
-rw-r--r--ydb/core/yq/libs/actors/nodes_manager.cpp77
1 files changed, 42 insertions, 35 deletions
diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp
index df7f804d26c..11ff9a8f495 100644
--- a/ydb/core/yq/libs/actors/nodes_manager.cpp
+++ b/ydb/core/yq/libs/actors/nodes_manager.cpp
@@ -78,47 +78,54 @@ private:
ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc();
const auto &rec = ev->Get()->Record;
const auto count = rec.GetCount();
- Y_ASSERT(count != 0);
- auto resourceId = rec.GetResourceId();
- if (!resourceId) {
- resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
- }
- TVector<TPeer> nodes;
- for (ui32 i = 0; i < count; ++i) {
- TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0};
- if (!Peers.empty()) {
- auto FirstPeer = NextPeer;
- while (true) {
- if (NextPeer >= Peers.size()) {
- NextPeer = 0;
- }
+ auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
+
+ if (count == 0) {
+ auto& error = *req->Record.MutableError();
+ error.SetErrorCode(NYql::NDqProto::EMISMATCH);
+ error.SetMessage("Incorrect request - 0 nodes requested");
+ } else {
+ auto resourceId = rec.GetResourceId();
+ if (!resourceId) {
+ resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
+ }
- auto& nextNode = Peers[NextPeer];
- ++NextPeer;
-
- if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then
- || nextNode.MemoryLimit == 0 // not limit defined for the node
- || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough
- ) {
- // adjust allocated size to place next tasks correctly, will be reset after next health check
- nextNode.MemoryAllocated += MkqlInitialMemoryLimit;
- node = nextNode;
- break;
+ TVector<TPeer> nodes;
+ for (ui32 i = 0; i < count; ++i) {
+ TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0};
+ if (!Peers.empty()) {
+ auto FirstPeer = NextPeer;
+ while (true) {
+ if (NextPeer >= Peers.size()) {
+ NextPeer = 0;
+ }
+
+ auto& nextNode = Peers[NextPeer];
+ ++NextPeer;
+
+ if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then
+ || nextNode.MemoryLimit == 0 // not limit defined for the node
+ || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough
+ ) {
+ // adjust allocated size to place next tasks correctly, will be reset after next health check
+ nextNode.MemoryAllocated += MkqlInitialMemoryLimit;
+ node = nextNode;
+ break;
+ }
}
}
+ nodes.push_back(node);
}
- nodes.push_back(node);
- }
- auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
- req->Record.ClearError();
- auto& group = *req->Record.MutableNodes();
- group.SetResourceId(resourceId);
- for (const auto& node : nodes) {
- auto* worker = group.AddWorker();
- *worker->MutableGuid() = node.InstanceId;
- worker->SetNodeId(node.NodeId);
+ req->Record.ClearError();
+ auto& group = *req->Record.MutableNodes();
+ group.SetResourceId(resourceId);
+ for (const auto& node : nodes) {
+ auto* worker = group.AddWorker();
+ *worker->MutableGuid() = node.InstanceId;
+ worker->SetNodeId(node.NodeId);
+ }
}
LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());