aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey J <wintchester@gmail.com>2024-10-18 23:05:24 +0300
committerGitHub <noreply@github.com>2024-10-18 23:05:24 +0300
commit07f2e60d02d95eab14a86a4b9469db1af7795001 (patch)
treef18d4950c44f0b7061980cde19e03443a16f87b8
parent857b20e06a4717c0b6cad0f64cb53a226f1ce894 (diff)
downloadydb-07f2e60d02d95eab14a86a4b9469db1af7795001.tar.gz
add some queue for nodes operations (#10620)
Co-authored-by: Sergey J <wint@yandex-team.ru>
-rw-r--r--ydb/tools/ydbd_slice/__init__.py9
-rw-r--r--ydb/tools/ydbd_slice/nodes.py21
2 files changed, 28 insertions, 2 deletions
diff --git a/ydb/tools/ydbd_slice/__init__.py b/ydb/tools/ydbd_slice/__init__.py
index d74ae53358..e11c1dd9e9 100644
--- a/ydb/tools/ydbd_slice/__init__.py
+++ b/ydb/tools/ydbd_slice/__init__.py
@@ -315,7 +315,7 @@ def deduce_nodes_from_args(args, walle_provider, ssh_user):
sys.exit("unable to deduce hosts")
logger.info("use nodes '%s'", result)
- return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user)
+ return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user, queue_size=args.cmd_queue_size)
def ya_build(arcadia_root, artifact, opts, dry_run):
@@ -1187,6 +1187,13 @@ def main(walle_provider=None):
default="ver-01gswscgce37hdbqyssjm3nd7x",
help=''
)
+ parser.add_argument(
+ "--cmd-queue-size",
+ metavar="SIZE",
+ type=int,
+ default=0,
+ help='the size of the command queue (for ssh commands), which limits their parallel execution on remote nodes'
+ )
modes = parser.add_subparsers()
walle_provider = walle_provider or NopHostsInformationProvider()
diff --git a/ydb/tools/ydbd_slice/nodes.py b/ydb/tools/ydbd_slice/nodes.py
index 3b743543f7..e0ae5c8ae2 100644
--- a/ydb/tools/ydbd_slice/nodes.py
+++ b/ydb/tools/ydbd_slice/nodes.py
@@ -2,13 +2,14 @@ import os
import sys
import logging
import subprocess
+import queue
logger = logging.getLogger(__name__)
class Nodes(object):
- def __init__(self, nodes, dry_run=False, ssh_user=None):
+ def __init__(self, nodes, dry_run=False, ssh_user=None, queue_size=0):
assert isinstance(nodes, list)
assert len(nodes) > 0
assert isinstance(nodes[0], str)
@@ -16,6 +17,8 @@ class Nodes(object):
self._dry_run = bool(dry_run)
self._ssh_user = ssh_user
self._logger = logger.getChild(self.__class__.__name__)
+ self._queue = queue.Queue(queue_size)
+ self._qsize = queue_size
@property
def nodes_list(self):
@@ -83,7 +86,23 @@ class Nodes(object):
actual_cmd = self._get_ssh_command_prefix() + [host, cmd]
process = subprocess.Popen(actual_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ if self._qsize > 0:
+ self._queue.put((actual_cmd, process, host))
+ if not self._queue.full():
+ continue
+ if not self._queue.empty():
+ actual_cmd, process, host = self._queue.get()
+ process.wait()
+
running_jobs.append((actual_cmd, process, host))
+
+ if self._qsize > 0:
+ while not self._queue.empty():
+ actual_cmd, process, host = self._queue.get()
+ process.wait()
+ running_jobs.append((actual_cmd, process, host))
+
return running_jobs
def execute_async(self, cmd, check_retcode=True, nodes=None, results=None):