aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-04-09 10:33:36 +0000
committerGitHub <noreply@github.com>2024-04-09 13:33:36 +0300
commit1abde1c19ee88b450e131f7d6882f7bd28bbda42 (patch)
treef431c28772a2ea441b440cef488fcfd2c28f85f0
parent34e4a37cbe98992bea2f024ffd6f97ea59e8eceb (diff)
downloadydb-1abde1c19ee88b450e131f7d6882f7bd28bbda42.tar.gz
ybdb_slice improvements (#3585)
-rw-r--r--ydb/tools/ydbd_slice/handlers.py118
-rw-r--r--ydb/tools/ydbd_slice/nodes.py5
2 files changed, 67 insertions, 56 deletions
diff --git a/ydb/tools/ydbd_slice/handlers.py b/ydb/tools/ydbd_slice/handlers.py
index 709561a1637..1ebb0fac52f 100644
--- a/ydb/tools/ydbd_slice/handlers.py
+++ b/ydb/tools/ydbd_slice/handlers.py
@@ -21,21 +21,26 @@ class CalledProcessError(subprocess.CalledProcessError):
def format_drivers(nodes):
- cmd = "sudo find /dev/disk/by-partlabel/ -maxdepth 1 -name 'kikimr_*' " \
- "-exec dd if=/dev/zero of={} bs=1M count=1 status=none \;" # noqa: W605
+ cmd = r"sudo find /dev/disk/ -path '*/by-partlabel/kikimr_*' " \
+ r"-exec dd if=/dev/zero of={} bs=1M count=1 status=none \;"
nodes.execute_async(cmd)
-def clear_registered_slots(nodes):
- nodes.execute_async("sudo find /Berkanavt/ -maxdepth 1 -type d -name 'kikimr_*' -exec rm -rf -- {} \;") # noqa: W605
+def _ensure_berkanavt_exists(nodes):
+ cmd = r"sudo mkdir -p /Berkanavt"
+ nodes.execute_async(cmd)
+
+
+def _clear_registered_slots(nodes):
+ nodes.execute_async(r"sudo find /Berkanavt/ -maxdepth 1 -type d -name 'kikimr_*' -exec rm -rf -- {} \;")
-def clear_slot(nodes, slot):
- cmd = "sudo find /Berkanavt/ -maxdepth 1 -type d -name kikimr_{slot} -exec rm -rf -- {{}} \;".format(slot=slot.slot) # noqa: W605
+def _clear_slot(nodes, slot):
+ cmd = r"sudo find /Berkanavt/ -maxdepth 1 -type d -name kikimr_{slot} -exec rm -rf -- {{}} \;".format(slot=slot.slot)
nodes.execute_async(cmd)
-def clear_logs(nodes):
+def _clear_logs(nodes):
cmd = "sudo service rsyslog stop; " \
"find /Berkanavt/ -mindepth 2 -maxdepth 2 -name logs | egrep '^/Berkanavt/kikimr' | sudo xargs -I% find % -mindepth 1 -delete; " \
"sudo service rsyslog start;"
@@ -53,13 +58,13 @@ def slice_clear(components, nodes, cluster_details, walle_provider):
if 'dynamic_slots' in components:
for slot in cluster_details.dynamic_slots.values():
- clear_slot(nodes, slot)
+ _clear_slot(nodes, slot)
if 'kikimr' in components:
format_drivers(nodes)
-def invoke_scripts(dynamic_cfg_path, scripts):
+def _invoke_scripts(dynamic_cfg_path, scripts):
for script_name in scripts:
script_path = os.path.join(dynamic_cfg_path, script_name)
if os.path.isfile(script_path):
@@ -71,13 +76,13 @@ def invoke_scripts(dynamic_cfg_path, scripts):
raise CalledProcessError(er)
-def dynamic_configure(configurations):
+def _dynamic_configure(configurations):
dynamic_cfg_path = configurations.create_dynamic_cfg()
# wait for bs to configure
time_remaining = 60
while True:
try:
- invoke_scripts(dynamic_cfg_path, ['init_storage.bash'])
+ _invoke_scripts(dynamic_cfg_path, ['init_storage.bash'])
break
except CalledProcessError:
time_to_wait = min(time_remaining, 5)
@@ -85,7 +90,7 @@ def dynamic_configure(configurations):
raise
time_remaining -= time_to_wait
time.sleep(time_to_wait)
- invoke_scripts(
+ _invoke_scripts(
dynamic_cfg_path, (
"init_cms.bash",
"init_compute.bash",
@@ -96,34 +101,35 @@ def dynamic_configure(configurations):
def slice_install(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider):
+ _ensure_berkanavt_exists(nodes)
slice_stop(components, nodes, cluster_details, walle_provider)
if 'dynamic_slots' in components or 'kikimr' in components:
- stop_all_slots(nodes)
- clear_registered_slots(nodes)
+ _stop_all_slots(nodes)
+ _clear_registered_slots(nodes)
if do_clear_logs:
- clear_logs(nodes)
+ _clear_logs(nodes)
if 'kikimr' in components:
format_drivers(nodes)
if 'bin' in components.get('kikimr', []):
- update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin)
+ _update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin)
if 'cfg' in components.get('kikimr', []):
static_cfg_path = configurator.create_static_cfg()
- update_cfg(nodes, static_cfg_path)
- deploy_secrets(nodes, args.yav_version)
+ _update_cfg(nodes, static_cfg_path)
+ _deploy_secrets(nodes, args.yav_version)
- start_static(nodes)
- dynamic_configure(configurator)
+ _start_static(nodes)
+ _dynamic_configure(configurator)
- deploy_slot_configs(components, nodes, cluster_details, walle_provider)
- start_dynamic(components, nodes, cluster_details, walle_provider)
+ _deploy_slot_configs(components, nodes, cluster_details, walle_provider)
+ _start_dynamic(components, nodes, cluster_details, walle_provider)
-def get_available_slots(components, nodes, cluster_details, walle_provider):
+def _get_available_slots(components, nodes, cluster_details, walle_provider):
if 'dynamic_slots' not in components:
return {}
@@ -145,7 +151,7 @@ def get_available_slots(components, nodes, cluster_details, walle_provider):
return (slots_per_domain, all_available_slots_count, )
-def deploy_slot_config_for_tenant(nodes, slot, tenant, node):
+def _deploy_slot_config_for_tenant(nodes, slot, tenant, node):
slot_dir = "/Berkanavt/kikimr_{slot}".format(slot=slot.slot)
logs_dir = slot_dir + "/logs"
slot_cfg = slot_dir + "/slot_cfg"
@@ -176,11 +182,11 @@ mon={mon}""".format(
nodes.execute_async(cmd, check_retcode=False, nodes=[node])
-def deploy_slot_configs(components, nodes, cluster_details, walle_provider):
+def _deploy_slot_configs(components, nodes, cluster_details, walle_provider):
if 'dynamic_slots' not in components:
return
- slots_per_domain = get_available_slots(components, nodes, cluster_details, walle_provider)[0]
+ slots_per_domain = _get_available_slots(components, nodes, cluster_details, walle_provider)[0]
for domain in cluster_details.domains:
slots_taken = set()
available_slots_per_zone = slots_per_domain[domain.domain_name]
@@ -194,14 +200,14 @@ def deploy_slot_configs(components, nodes, cluster_details, walle_provider):
if (slot, node) in slots_taken:
continue
slots_taken.add((slot, node))
- deploy_slot_config_for_tenant(nodes, slot, tenant, node)
+ _deploy_slot_config_for_tenant(nodes, slot, tenant, node)
break
except IndexError:
logger.critical('insufficient slots allocated')
return
-def start_slot(nodes, slot):
+def _start_slot(nodes, slot):
cmd = "sudo sh -c \"if [ -x /sbin/start ]; "\
" then start kikimr-multi slot={slot} tenant=dynamic mbus={mbus} grpc={grpc} mon={mon} ic={ic}; "\
" else systemctl start kikimr-multi@{slot}; fi\"".format(
@@ -214,7 +220,7 @@ def start_slot(nodes, slot):
nodes.execute_async(cmd, check_retcode=False)
-def start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None):
+def _start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None):
cmd = "sudo sh -c \"if [ -x /sbin/start ]; "\
" then start kikimr-multi slot={slot} tenant=/{domain}/{name} mbus={mbus} grpc={grpc} mon={mon} ic={ic}; "\
" else systemctl start kikimr-multi@{slot}; fi\"".format(
@@ -231,7 +237,7 @@ def start_slot_for_tenant(nodes, slot, tenant, host, node_bind=None):
nodes.execute_async(cmd, check_retcode=False, nodes=[host])
-def start_all_slots(nodes):
+def _start_all_slots(nodes):
cmd = "find /Berkanavt/ -maxdepth 1 -type d -name kikimr_* " \
" | while read x; do " \
" sudo sh -c \"if [ -x /sbin/start ]; "\
@@ -241,11 +247,11 @@ def start_all_slots(nodes):
nodes.execute_async(cmd, check_retcode=False)
-def start_static(nodes):
- nodes.execute_async("sudo service kikimr start", check_retcode=False)
+def _start_static(nodes):
+ nodes.execute_async("sudo service kikimr start", check_retcode=True)
-def start_dynamic(components, nodes, cluster_details, walle_provider):
+def _start_dynamic(components, nodes, cluster_details, walle_provider):
if 'dynamic_slots' in components:
def get_numa_nodes(nodes):
@@ -260,7 +266,7 @@ def start_dynamic(components, nodes, cluster_details, walle_provider):
numa_nodes = None # get_numa_nodes(nodes)
numa_nodes_counters = {node: 0 for node in nodes.nodes_list}
- (slots_per_domain, all_available_slots_count,) = get_available_slots(components, nodes, cluster_details, walle_provider)
+ (slots_per_domain, all_available_slots_count,) = _get_available_slots(components, nodes, cluster_details, walle_provider)
for domain in cluster_details.domains:
@@ -281,12 +287,12 @@ def start_dynamic(components, nodes, cluster_details, walle_provider):
continue
slots_taken.add((slot, node))
if domain.bind_slots_to_numa_nodes and numa_nodes[node] > 0:
- start_slot_for_tenant(nodes, slot, tenant, host=node,
+ _start_slot_for_tenant(nodes, slot, tenant, host=node,
node_bind=numa_nodes_counters[node])
numa_nodes_counters[node] += 1
numa_nodes_counters[node] %= numa_nodes[node]
else:
- start_slot_for_tenant(nodes, slot, tenant, host=node)
+ _start_slot_for_tenant(nodes, slot, tenant, host=node)
break
except IndexError:
logger.critical('insufficient slots allocated')
@@ -297,12 +303,12 @@ def start_dynamic(components, nodes, cluster_details, walle_provider):
def slice_start(components, nodes, cluster_details, walle_provider):
if 'kikimr' in components:
- start_static(nodes)
+ _start_static(nodes)
- start_dynamic(components, nodes, cluster_details, walle_provider)
+ _start_dynamic(components, nodes, cluster_details, walle_provider)
-def stop_all_slots(nodes):
+def _stop_all_slots(nodes):
cmd = "find /Berkanavt/ -maxdepth 1 -type d -name kikimr_* " \
" | while read x; do " \
" sudo sh -c \"if [ -x /sbin/stop ]; "\
@@ -312,7 +318,7 @@ def stop_all_slots(nodes):
nodes.execute_async(cmd, check_retcode=False)
-def stop_slot_ret(nodes, slot):
+def _stop_slot_ret(nodes, slot):
cmd = "sudo sh -c \"if [ -x /sbin/stop ]; "\
" then stop kikimr-multi slot={slot}; "\
" else systemctl stop kikimr-multi@{slot}; fi\"".format(
@@ -321,30 +327,30 @@ def stop_slot_ret(nodes, slot):
return nodes.execute_async_ret(cmd, check_retcode=False)
-def stop_slot(nodes, slot):
- tasks = stop_slot_ret(nodes, slot)
+def _stop_slot(nodes, slot):
+ tasks = _stop_slot_ret(nodes, slot)
nodes._check_async_execution(tasks, False)
-def stop_static(nodes):
+def _stop_static(nodes):
nodes.execute_async("sudo service kikimr stop", check_retcode=False)
-def stop_dynamic(components, nodes, cluster_details):
+def _stop_dynamic(components, nodes, cluster_details):
if 'dynamic_slots' in components:
tasks = []
for slot in cluster_details.dynamic_slots.values():
- tasks_slot = stop_slot_ret(nodes, slot)
+ tasks_slot = _stop_slot_ret(nodes, slot)
for task in tasks_slot:
tasks.append(task)
nodes._check_async_execution(tasks, False)
def slice_stop(components, nodes, cluster_details, walle_provider):
- stop_dynamic(components, nodes, cluster_details)
+ _stop_dynamic(components, nodes, cluster_details)
if 'kikimr' in components:
- stop_static(nodes)
+ _stop_static(nodes)
slice_kikimr_path = '/Berkanavt/kikimr/bin/kikimr'
@@ -352,7 +358,7 @@ slice_cfg_path = '/Berkanavt/kikimr/cfg'
slice_secrets_path = '/Berkanavt/kikimr/token'
-def update_kikimr(nodes, bin_path, compressed_path):
+def _update_kikimr(nodes, bin_path, compressed_path):
bin_directory = os.path.dirname(bin_path)
nodes.copy(bin_path, slice_kikimr_path, compressed_path=compressed_path)
for lib in ['libiconv.so', 'liblibaio-dynamic.so', 'liblibidn-dynamic.so']:
@@ -362,11 +368,11 @@ def update_kikimr(nodes, bin_path, compressed_path):
nodes.copy(lib_path, remote_lib_path)
-def update_cfg(nodes, cfg_path):
+def _update_cfg(nodes, cfg_path):
nodes.copy(cfg_path, slice_cfg_path, directory=True)
-def deploy_secrets(nodes, yav_version):
+def _deploy_secrets(nodes, yav_version):
if not yav_version:
return
@@ -397,20 +403,20 @@ def deploy_secrets(nodes, yav_version):
def slice_update(components, nodes, cluster_details, configurator, do_clear_logs, args, walle_provider):
if do_clear_logs:
- clear_logs(nodes)
+ _clear_logs(nodes)
if 'kikimr' in components:
if 'bin' in components.get('kikimr', []):
- update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin)
+ _update_kikimr(nodes, configurator.kikimr_bin, configurator.kikimr_compressed_bin)
slice_stop(components, nodes, cluster_details, walle_provider)
if 'kikimr' in components:
if 'cfg' in components.get('kikimr', []):
static = configurator.create_static_cfg()
- update_cfg(nodes, static)
- deploy_secrets(nodes, args.yav_version)
+ _update_cfg(nodes, static)
+ _deploy_secrets(nodes, args.yav_version)
- deploy_slot_configs(components, nodes, cluster_details, walle_provider)
+ _deploy_slot_configs(components, nodes, cluster_details, walle_provider)
slice_start(components, nodes, cluster_details, walle_provider)
@@ -419,6 +425,6 @@ def slice_update_raw_configs(components, nodes, cluster_details, config_path, wa
if 'kikimr' in components:
if 'cfg' in components.get('kikimr', []):
kikimr_cfg = os.path.join(config_path, 'kikimr-static')
- update_cfg(nodes, kikimr_cfg)
+ _update_cfg(nodes, kikimr_cfg)
slice_start(components, nodes, cluster_details, walle_provider)
diff --git a/ydb/tools/ydbd_slice/nodes.py b/ydb/tools/ydbd_slice/nodes.py
index 13a2b38d724..62a4a742d06 100644
--- a/ydb/tools/ydbd_slice/nodes.py
+++ b/ydb/tools/ydbd_slice/nodes.py
@@ -32,6 +32,8 @@ class Nodes(object):
for cmd, process, host in running_jobs:
out, err = process.communicate()
+ out = out.decode("utf-8", errors='replace')
+ err = err.decode("utf-8", errors='replace')
retcode = process.poll()
if retcode != 0:
status_line = "execution '{cmd}' finished with '{retcode}' retcode".format(
@@ -128,6 +130,9 @@ class Nodes(object):
local_path = compressed_path
original_remote_path = remote_path
remote_path += '.zstd'
+
+ self.execute_async("sudo mkdir -p {}".format(os.path.dirname(remote_path)))
+
hub = self._nodes[0]
self._copy_on_node(local_path, hub, remote_path)
self._copy_between_nodes(hub, remote_path, self._nodes[1:], remote_path)