diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-04-09 10:33:36 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-09 13:33:36 +0300 |
commit | 1abde1c19ee88b450e131f7d6882f7bd28bbda42 (patch) | |
tree | f431c28772a2ea441b440cef488fcfd2c28f85f0 | |
parent | 34e4a37cbe98992bea2f024ffd6f97ea59e8eceb (diff) | |
download | ydb-1abde1c19ee88b450e131f7d6882f7bd28bbda42.tar.gz |
ybdb_slice improvements (#3585)
-rw-r--r-- | ydb/tools/ydbd_slice/handlers.py | 118 | ||||
-rw-r--r-- | ydb/tools/ydbd_slice/nodes.py | 5 |
2 files changed, 67 insertions, 56 deletions
diff --git a/ydb/tools/ydbd_slice/handlers.py b/ydb/tools/ydbd_slice/handlers.py index 709561a1637..1ebb0fac52f 100644 --- a/ydb/tools/ydbd_slice/handlers.py +++ b/ydb/tools/ydbd_slice/handlers.py @@ -21,21 +21,26 @@ class CalledProcessError(subprocess.CalledProcessError): def format_drivers(nodes): - cmd = "sudo find /dev/disk/by-partlabel/ -maxdepth 1 -name 'kikimr_*' " \ - "-exec dd if=/dev/zero of={} bs=1M count=1 status=none \;" # noqa: W605 + cmd = r"sudo find /dev/disk/ -path '*/by-partlabel/kikimr_*' " \ + r"-exec dd if=/dev/zero of={} bs=1M count=1 status=none \;" nodes.execute_async(cmd) -def clear_registered_slots(nodes): - nodes.execute_async("sudo find /Berkanavt/ -maxdepth 1 -type d -name 'kikimr_*' -exec rm -rf -- {} \;") # noqa: W605 +def _ensure_berkanavt_exists(nodes): + cmd = r"sudo mkdir -p /Berkanavt" + nodes.execute_async(cmd) + + +def _clear_registered_slots(nodes): + nodes.execute_async(r"sudo find /Berkanavt/ -maxdepth 1 -type d -name 'kikimr_*' -exec rm -rf -- {} \;") -def clear_slot(nodes, slot): - cmd = "sudo find /Berkanavt/ -maxdepth 1 -type d -name kikimr_{slot} -exec rm -rf -- {{}} \;".format(slot=slot.slot) # noqa: W605 +def _clear_slot(nodes, slot): + cmd = r"sudo find /Berkanavt/ -maxdepth 1 -type d -name kikimr_{slot} -exec rm -rf -- {{}} \;".format(slot=slot.slot) nodes.execute_async(cmd) -def clear_logs(nodes): +def _clear_logs(nodes): cmd = "sudo service rsyslog stop; " \ "find /Berkanavt/ -mindepth 2 -maxdepth 2 -name logs | egrep '^/Berkanavt/kikimr' | sudo xargs -I% find % -mindepth 1 -delete; " \ "sudo service rsyslog start;" @@ -53,13 +58,13 @@ def slice_clear(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' in components: for slot in cluster_details.dynamic_slots.values(): - clear_slot(nodes, slot) + _clear_slot(nodes, slot) if 'kikimr' in components: format_drivers(nodes) -def invoke_scripts(dynamic_cfg_path, scripts): +def _invoke_scripts(dynamic_cfg_path, scripts): for script_name in scripts: script_path = os.path.join(dynamic_cfg_path, script_name) if os.path.isfile(script_path): @@ -71,13 +76,13 @@ def invoke_scripts(dynamic_cfg_path, scripts): raise CalledProcessError(er) -def dynamic_configure(configurations): +def _dynamic_configure(configurations): dynamic_cfg_path = configurations.create_dynamic_cfg() # wait for bs to configure time_remaining = 60 while True: try: - invoke_scripts(dynamic_cfg_path, ['init_storage.bash']) + _invoke_scripts(dynamic_cfg_path, ['init_storage.bash']) break except CalledProcessError: time_to_wait = min(time_remaining, 5) @@ -85,7 +90,7 @@ def dynamic_configure(configurations): raise time_remaining -= time_to_wait time.sleep(time_to_wait) - invoke_scripts( + _invoke_scripts( dynamic_cfg_path, ( "init_cms.bash", "init_compute.bash", @@ -96,34 +101,35 @@ def dynamic_configure(configurations): def slice_install(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider): + _ensure_berkanavt_exists(nodes) slice_stop(components, nodes, cluster_details, walle_provider) if 'dynamic_slots' in components or 'kikimr' in components: - stop_all_slots(nodes) - clear_registered_slots(nodes) + _stop_all_slots(nodes) + _clear_registered_slots(nodes) if do_clear_logs: - clear_logs(nodes) + _clear_logs(nodes) if 'kikimr' in components: format_drivers(nodes) if 'bin' in components.get('kikimr', []): - update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin) + _update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin) if 'cfg' in components.get('kikimr', []): static_cfg_path = configurator.create_static_cfg() - update_cfg(nodes, static_cfg_path) - deploy_secrets(nodes, args.yav_version) + _update_cfg(nodes, static_cfg_path) + _deploy_secrets(nodes, args.yav_version) - start_static(nodes) - dynamic_configure(configurator) + _start_static(nodes) + _dynamic_configure(configurator) - deploy_slot_configs(components, nodes, cluster_details, walle_provider) - start_dynamic(components, nodes, cluster_details, walle_provider) + _deploy_slot_configs(components, nodes, cluster_details, walle_provider) + _start_dynamic(components, nodes, cluster_details, walle_provider) -def get_available_slots(components, nodes, cluster_details, walle_provider): +def _get_available_slots(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' not in components: return {} @@ -145,7 +151,7 @@ def get_available_slots(components, nodes, cluster_details, walle_provider): return (slots_per_domain, all_available_slots_count, ) -def deploy_slot_config_for_tenant(nodes, slot, tenant, node): +def _deploy_slot_config_for_tenant(nodes, slot, tenant, node): slot_dir = "/Berkanavt/kikimr_{slot}".format(slot=slot.slot) logs_dir = slot_dir + "/logs" slot_cfg = slot_dir + "/slot_cfg" @@ -176,11 +182,11 @@ mon={mon}""".format( nodes.execute_async(cmd, check_retcode=False, nodes=[node]) -def deploy_slot_configs(components, nodes, cluster_details, walle_provider): +def _deploy_slot_configs(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' not in components: return - slots_per_domain = get_available_slots(components, nodes, cluster_details, walle_provider)[0] + slots_per_domain = _get_available_slots(components, nodes, cluster_details, walle_provider)[0] for domain in cluster_details.domains: slots_taken = set() available_slots_per_zone = slots_per_domain[domain.domain_name] @@ -194,14 +200,14 @@ def deploy_slot_configs(components, nodes, cluster_details, walle_provider): if (slot, node) in slots_taken: continue slots_taken.add((slot, node)) - deploy_slot_config_for_tenant(nodes, slot, tenant, node) + _deploy_slot_config_for_tenant(nodes, slot, tenant, node) break except IndexError: logger.critical('insufficient slots allocated') return -def start_slot(nodes, slot): +def _start_slot(nodes, slot): cmd = "sudo sh -c \"if [ -x /sbin/start ]; "\ " then start kikimr-multi slot={slot} tenant=dynamic mbus={mbus} grpc={grpc} mon={mon} ic={ic}; "\ " else systemctl start kikimr-multi@{slot}; fi\"".format( @@ -214,7 +220,7 @@ def start_slot(nodes, slot): nodes.execute_async(cmd, check_retcode=False) -def start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None): +def _start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None): cmd = "sudo sh -c \"if [ -x /sbin/start ]; "\ " then start kikimr-multi slot={slot} tenant=/{domain}/{name} mbus={mbus} grpc={grpc} mon={mon} ic={ic}; "\ " else systemctl start kikimr-multi@{slot}; fi\"".format( @@ -231,7 +237,7 @@ def start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None): nodes.execute_async(cmd, check_retcode=False, nodes=[host]) -def start_all_slots(nodes): +def _start_all_slots(nodes): cmd = "find /Berkanavt/ -maxdepth 1 -type d -name kikimr_* " \ " | while read x; do " \ " sudo sh -c \"if [ -x /sbin/start ]; "\ @@ -241,11 +247,11 @@ def start_all_slots(nodes): nodes.execute_async(cmd, check_retcode=False) -def start_static(nodes): - nodes.execute_async("sudo service kikimr start", check_retcode=False) +def _start_static(nodes): + nodes.execute_async("sudo service kikimr start", check_retcode=True) -def start_dynamic(components, nodes, cluster_details, walle_provider): +def _start_dynamic(components, nodes, cluster_details, walle_provider): if 'dynamic_slots' in components: def get_numa_nodes(nodes): @@ -260,7 +266,7 @@ def start_dynamic(components, nodes, cluster_details, walle_provider): numa_nodes = None # get_numa_nodes(nodes) numa_nodes_counters = {node: 0 for node in nodes.nodes_list} - (slots_per_domain, all_available_slots_count,) = get_available_slots(components, nodes, cluster_details, walle_provider) + (slots_per_domain, all_available_slots_count,) = _get_available_slots(components, nodes, cluster_details, walle_provider) for domain in cluster_details.domains: @@ -281,12 +287,12 @@ def start_dynamic(components, nodes, cluster_details, walle_provider): continue slots_taken.add((slot, node)) if domain.bind_slots_to_numa_nodes and numa_nodes[node] > 0: - start_slot_for_tenant(nodes, slot, tenant, host=node, + _start_slot_for_tenant(nodes, slot, tenant, host=node, node_bind=numa_nodes_counters[node]) numa_nodes_counters[node] += 1 numa_nodes_counters[node] %= numa_nodes[node] else: - start_slot_for_tenant(nodes, slot, tenant, host=node) + _start_slot_for_tenant(nodes, slot, tenant, host=node) break except IndexError: logger.critical('insufficient slots allocated') @@ -297,12 +303,12 @@ def start_dynamic(components, nodes, cluster_details, walle_provider): def slice_start(components, nodes, cluster_details, walle_provider): if 'kikimr' in components: - start_static(nodes) + _start_static(nodes) - start_dynamic(components, nodes, cluster_details, walle_provider) + _start_dynamic(components, nodes, cluster_details, walle_provider) -def stop_all_slots(nodes): +def _stop_all_slots(nodes): cmd = "find /Berkanavt/ -maxdepth 1 -type d -name kikimr_* " \ " | while read x; do " \ " sudo sh -c \"if [ -x /sbin/stop ]; "\ @@ -312,7 +318,7 @@ def stop_all_slots(nodes): nodes.execute_async(cmd, check_retcode=False) -def stop_slot_ret(nodes, slot): +def _stop_slot_ret(nodes, slot): cmd = "sudo sh -c \"if [ -x /sbin/stop ]; "\ " then stop kikimr-multi slot={slot}; "\ " else systemctl stop kikimr-multi@{slot}; fi\"".format( @@ -321,30 +327,30 @@ def stop_slot_ret(nodes, slot): return nodes.execute_async_ret(cmd, check_retcode=False) -def stop_slot(nodes, slot): - tasks = stop_slot_ret(nodes, slot) +def _stop_slot(nodes, slot): + tasks = _stop_slot_ret(nodes, slot) nodes._check_async_execution(tasks, False) -def stop_static(nodes): +def _stop_static(nodes): nodes.execute_async("sudo service kikimr stop", check_retcode=False) -def stop_dynamic(components, nodes, cluster_details): +def _stop_dynamic(components, nodes, cluster_details): if 'dynamic_slots' in components: tasks = [] for slot in cluster_details.dynamic_slots.values(): - tasks_slot = stop_slot_ret(nodes, slot) + tasks_slot = _stop_slot_ret(nodes, slot) for task in tasks_slot: tasks.append(task) nodes._check_async_execution(tasks, False) def slice_stop(components, nodes, cluster_details, walle_provider): - stop_dynamic(components, nodes, cluster_details) + _stop_dynamic(components, nodes, cluster_details) if 'kikimr' in components: - stop_static(nodes) + _stop_static(nodes) slice_kikimr_path = '/Berkanavt/kikimr/bin/kikimr' @@ -352,7 +358,7 @@ slice_cfg_path = '/Berkanavt/kikimr/cfg' slice_secrets_path = '/Berkanavt/kikimr/token' -def update_kikimr(nodes, bin_path, compressed_path): +def _update_kikimr(nodes, bin_path, compressed_path): bin_directory = os.path.dirname(bin_path) nodes.copy(bin_path, slice_kikimr_path, compressed_path=compressed_path) for lib in ['libiconv.so', 'liblibaio-dynamic.so', 'liblibidn-dynamic.so']: @@ -362,11 +368,11 @@ def update_kikimr(nodes, bin_path, compressed_path): nodes.copy(lib_path, remote_lib_path) -def update_cfg(nodes, cfg_path): +def _update_cfg(nodes, cfg_path): nodes.copy(cfg_path, slice_cfg_path, directory=True) -def deploy_secrets(nodes, yav_version): +def _deploy_secrets(nodes, yav_version): if not yav_version: return @@ -397,20 +403,20 @@ def deploy_secrets(nodes, yav_version): def slice_update(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider): if do_clear_logs: - clear_logs(nodes) + _clear_logs(nodes) if 'kikimr' in components: if 'bin' in components.get('kikimr', []): - update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin) + _update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin) slice_stop(components, nodes, cluster_details, walle_provider) if 'kikimr' in components: if 'cfg' in components.get('kikimr', []): static = configurator.create_static_cfg() - update_cfg(nodes, static) - deploy_secrets(nodes, args.yav_version) + _update_cfg(nodes, static) + _deploy_secrets(nodes, args.yav_version) - deploy_slot_configs(components, nodes, cluster_details, walle_provider) + _deploy_slot_configs(components, nodes, cluster_details, walle_provider) slice_start(components, nodes, cluster_details, walle_provider) @@ -419,6 +425,6 @@ def slice_update_raw_configs(components, nodes, cluster_details, config_path, wa if 'kikimr' in components: if 'cfg' in components.get('kikimr', []): kikimr_cfg = os.path.join(config_path, 'kikimr-static') - update_cfg(nodes, kikimr_cfg) + _update_cfg(nodes, kikimr_cfg) slice_start(components, nodes, cluster_details, walle_provider) diff --git a/ydb/tools/ydbd_slice/nodes.py b/ydb/tools/ydbd_slice/nodes.py index 13a2b38d724..62a4a742d06 100644 --- a/ydb/tools/ydbd_slice/nodes.py +++ b/ydb/tools/ydbd_slice/nodes.py @@ -32,6 +32,8 @@ class Nodes(object): for cmd, process, host in running_jobs: out, err = process.communicate() + out = out.decode("utf-8", errors='replace') + err = err.decode("utf-8", errors='replace') retcode = process.poll() if retcode != 0: status_line = "execution '{cmd}' finished with '{retcode}' retcode".format( @@ -128,6 +130,9 @@ class Nodes(object): local_path = compressed_path original_remote_path = remote_path remote_path += '.zstd' + + self.execute_async("sudo mkdir -p {}".format(os.path.dirname(remote_path))) + hub = self._nodes[0] self._copy_on_node(local_path, hub, remote_path) self._copy_between_nodes(hub, remote_path, self._nodes[1:], remote_path) |