diff options
author | Sergey J <wintchester@gmail.com> | 2024-10-18 23:05:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-18 23:05:24 +0300 |
commit | 07f2e60d02d95eab14a86a4b9469db1af7795001 (patch) | |
tree | f18d4950c44f0b7061980cde19e03443a16f87b8 | |
parent | 857b20e06a4717c0b6cad0f64cb53a226f1ce894 (diff) | |
download | ydb-07f2e60d02d95eab14a86a4b9469db1af7795001.tar.gz |
add some queue for nodes operations (#10620)
Co-authored-by: Sergey J <wint@yandex-team.ru>
-rw-r--r-- | ydb/tools/ydbd_slice/__init__.py | 9 | ||||
-rw-r--r-- | ydb/tools/ydbd_slice/nodes.py | 21 |
2 files changed, 28 insertions, 2 deletions
diff --git a/ydb/tools/ydbd_slice/__init__.py b/ydb/tools/ydbd_slice/__init__.py index d74ae53358..e11c1dd9e9 100644 --- a/ydb/tools/ydbd_slice/__init__.py +++ b/ydb/tools/ydbd_slice/__init__.py @@ -315,7 +315,7 @@ def deduce_nodes_from_args(args, walle_provider, ssh_user): sys.exit("unable to deduce hosts") logger.info("use nodes '%s'", result) - return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user) + return nodes.Nodes(result, args.dry_run, ssh_user=ssh_user, queue_size=args.cmd_queue_size) def ya_build(arcadia_root, artifact, opts, dry_run): @@ -1187,6 +1187,13 @@ def main(walle_provider=None): default="ver-01gswscgce37hdbqyssjm3nd7x", help='' ) + parser.add_argument( + "--cmd-queue-size", + metavar="SIZE", + type=int, + default=0, + help='the size of the command queue (for ssh commands), which limits their parallel execution on remote nodes' + ) modes = parser.add_subparsers() walle_provider = walle_provider or NopHostsInformationProvider() diff --git a/ydb/tools/ydbd_slice/nodes.py b/ydb/tools/ydbd_slice/nodes.py index 3b743543f7..e0ae5c8ae2 100644 --- a/ydb/tools/ydbd_slice/nodes.py +++ b/ydb/tools/ydbd_slice/nodes.py @@ -2,13 +2,14 @@ import os import sys import logging import subprocess +import queue logger = logging.getLogger(__name__) class Nodes(object): - def __init__(self, nodes, dry_run=False, ssh_user=None): + def __init__(self, nodes, dry_run=False, ssh_user=None, queue_size=0): assert isinstance(nodes, list) assert len(nodes) > 0 assert isinstance(nodes[0], str) @@ -16,6 +17,8 @@ class Nodes(object): self._dry_run = bool(dry_run) self._ssh_user = ssh_user self._logger = logger.getChild(self.__class__.__name__) + self._queue = queue.Queue(queue_size) + self._qsize = queue_size @property def nodes_list(self): @@ -83,7 +86,23 @@ class Nodes(object): actual_cmd = self._get_ssh_command_prefix() + [host, cmd] process = subprocess.Popen(actual_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + if self._qsize > 0: + self._queue.put((actual_cmd, process, host)) + if not self._queue.full(): + continue + if not self._queue.empty(): + actual_cmd, process, host = self._queue.get() + process.wait() + running_jobs.append((actual_cmd, process, host)) + + if self._qsize > 0: + while not self._queue.empty(): + actual_cmd, process, host = self._queue.get() + process.wait() + running_jobs.append((actual_cmd, process, host)) + return running_jobs def execute_async(self, cmd, check_retcode=True, nodes=None, results=None): |