aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKirill Rysin <35688753+naspirato@users.noreply.github.com>2025-02-10 15:21:05 +0100
committerGitHub <noreply@github.com>2025-02-10 14:21:05 +0000
commit0ebda4f23335dd897c4397bcc1f7b045dc007d38 (patch)
treed171f34c7890671f8d602e48c7f9f6ac6e68965a
parent544a6a716befd99dfc6299e8889cb66437a65a5b (diff)
downloadydb-0ebda4f23335dd897c4397bcc1f7b045dc007d38.tar.gz
Test stability: workload_log fixes (#14384)
-rw-r--r--ydb/tests/library/wardens/factories.py4
-rw-r--r--ydb/tests/library/wardens/logs.py5
-rw-r--r--ydb/tests/stability/tool/__main__.py182
-rw-r--r--ydb/tests/stability/tool/how_to.md6
4 files changed, 148 insertions, 49 deletions
diff --git a/ydb/tests/library/wardens/factories.py b/ydb/tests/library/wardens/factories.py
index f689d8f514..59191508fa 100644
--- a/ydb/tests/library/wardens/factories.py
+++ b/ydb/tests/library/wardens/factories.py
@@ -11,7 +11,7 @@ from ydb.tests.library.wardens.hive import AllTabletsAliveLivenessWarden, BootQu
from ydb.tests.library.wardens.schemeshard import SchemeShardHasNoInFlightTransactions
-def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True):
+def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True, modification_days=1):
list_of_host_names = [node.host for node in cluster.nodes.values()]
wardens = [AllPDisksAreInValidStateSafetyWarden(cluster)]
wardens.extend(kikimr_grep_dmesg_safety_warden_factory(list_of_host_names, ssh_username))
@@ -24,7 +24,7 @@ def safety_warden_factory(cluster, ssh_username, lines_after=5, cut=True):
for directory, list_of_host_names in by_directory.items():
wardens.extend(
kikimr_start_logs_safety_warden_factory(
- list_of_host_names, ssh_username, directory, lines_after, cut
+ list_of_host_names, ssh_username, directory, lines_after, cut, modification_days
)
)
diff --git a/ydb/tests/library/wardens/logs.py b/ydb/tests/library/wardens/logs.py
index dd272efcec..572b45ad08 100644
--- a/ydb/tests/library/wardens/logs.py
+++ b/ydb/tests/library/wardens/logs.py
@@ -10,9 +10,9 @@ from ydb.tests.library.nemesis.safety_warden import GrepGzippedLogFilesForMarker
def kikimr_start_logs_safety_warden_factory(
- list_of_host_names, ssh_username, deploy_path, lines_after=5, cut=True
+ list_of_host_names, ssh_username, deploy_path, lines_after=5, cut=True, modification_days=1
):
- start_markers = ['VERIFY', 'FAIL', 'signal 11', 'signal 6', 'signal 15', 'uncaught exception']
+ start_markers = ['VERIFY', 'FAIL ', 'signal 11', 'signal 6', 'signal 15', 'uncaught exception', 'ERROR: AddressSanitizer', 'SIG']
username = ssh_username
return [
GrepLogFileForMarkers(
@@ -27,6 +27,7 @@ def kikimr_start_logs_safety_warden_factory(
list_of_host_names,
log_file_pattern=os.path.join(deploy_path, 'kikimr.start.*gz'),
list_of_markers=start_markers,
+ modification_days=modification_days,
username=username,
lines_after=lines_after,
cut=cut
diff --git a/ydb/tests/stability/tool/__main__.py b/ydb/tests/stability/tool/__main__.py
index 87dcdd2b7d..d2f3a5725e 100644
--- a/ydb/tests/stability/tool/__main__.py
+++ b/ydb/tests/stability/tool/__main__.py
@@ -105,7 +105,7 @@ class bcolors:
class StabilityCluster:
- def __init__(self, ssh_username, cluster_path, ydbd_path, ydbd_next_path=None):
+ def __init__(self, ssh_username, cluster_path, ydbd_path=None, ydbd_next_path=None):
self.working_dir = os.path.join(tempfile.gettempdir(), "ydb_stability")
os.makedirs(self.working_dir, exist_ok=True)
self.ssh_username = ssh_username
@@ -145,7 +145,7 @@ class StabilityCluster:
for line in traces.split('\n'):
line = re.sub(r' @ 0x[a-fA-F0-9]+', '', line)
# Убираем все до текста ошибки или указателя на строку кода
- match_verify = re.search(r'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception', line)
+ match_verify = re.search(r'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG', line)
match_code_file_line = re.search(r'\s+(\S+\.cpp:\d+).*', line)
if match_verify:
@@ -198,12 +198,11 @@ class StabilityCluster:
trace = trace + line + '\n'
return traces
- def get_all_errors(self):
- logging.getLogger().setLevel(logging.WARNING)
+ def get_all_errors(self, mode='all'):
all_results = []
- for node in self.kikimr_cluster.nodes.values():
- result = node.ssh_command("""
- ls -ltr /Berkanavt/kikimr*/logs/kikimr* |
+ if mode == 'all' or mode == 'raw' or mode == 'aggr':
+ command = """
+ ls -ltr /Berkanavt/kikim*/logs/kikimr* |
awk '{print $NF}' |
while read file; do
case "$file" in
@@ -212,27 +211,53 @@ class StabilityCluster:
*) cat "$file" ;;
esac
done |
- grep -E 'VERIFY|FAIL|signal 11|signal 6|signal 15|uncaught exception' -A 20
- """, raise_on_error=False)
+ grep -E 'VERIFY|FAIL |signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG' -A 40 -B 20
+ """
+ elif mode == 'last':
+ command = """
+ ls -ltr /Berkanavt/kikim*/logs/kikimr |
+ awk '{print $NF}' |
+ while read file; do
+ cat "$file" | grep -E 'VERIFY|FAIL |signal 11|signal 6|signal 15|uncaught exception|ERROR: AddressSanitizer|SIG' -A 40 -B 20 | tail -120
+ echo "--"
+ done
+ """
+ for node in self.kikimr_cluster.nodes.values():
+ result = node.ssh_command(command, raise_on_error=False)
if result:
all_results.append(result.decode('utf-8'))
all_results = self.process_lines(all_results)
return all_results
- def get_errors(self):
- errors = self.get_all_errors()
- unique_traces = self.find_unique_traces_with_counts(errors)
- for trace in unique_traces:
- print(f"Trace (Occurrences: {len(unique_traces[trace])}):\n{trace}\n{'-'*60}")
+ def get_errors(self, mode='raw'):
+ errors = self.get_all_errors(mode=mode)
+ if mode == 'raw' or mode == 'last':
+ print('Traces:')
+ for trace in errors:
+ print(f"{trace}\n{'-'*60}")
+ else:
+ unique_traces = self.find_unique_traces_with_counts(errors)
+ for trace in unique_traces:
+ print(f"Trace (Occurrences: {len(unique_traces[trace])}):\n{trace}\n{'-'*60}")
def perform_checks(self):
- safety_violations = safety_warden_factory(self.kikimr_cluster, self.ssh_username, lines_after=20, cut=False).list_of_safety_violations()
+ safety_violations = safety_warden_factory(self.kikimr_cluster, self.ssh_username, lines_after=20, cut=False, modification_days=3).list_of_safety_violations()
liveness_violations = liveness_warden_factory(self.kikimr_cluster, self.ssh_username).list_of_liveness_violations
coredumps_search_results = {}
for node in self.kikimr_cluster.nodes.values():
result = node.ssh_command('find /coredumps/ -type f | wc -l', raise_on_error=False)
coredumps_search_results[node.host.split(':')[0]] = int(result.decode('utf-8'))
+ minidumps_search_results = {}
+ for node in self.kikimr_cluster.nodes.values():
+ result = node.ssh_command('''
+ if [ -d "/Berkanavt/minidumps/" ]; then
+ find /Berkanavt/minidumps/ -type f | wc -l
+ else
+ echo 0
+ fi
+ ''', raise_on_error=False)
+ minidumps_search_results[node.host.split(':')[0]] = int(result.decode('utf-8'))
print("SAFETY WARDEN:")
for i, violation in enumerate(safety_violations):
@@ -249,6 +274,9 @@ class StabilityCluster:
print("COREDUMPS:")
for node in coredumps_search_results:
print(f' {node}: {coredumps_search_results[node]}')
+ print("MINIDUMPS:")
+ for node in coredumps_search_results:
+ print(f' {node}: {minidumps_search_results[node]}')
def start_nemesis(self):
for node in self.kikimr_cluster.nodes.values():
@@ -258,7 +286,7 @@ class StabilityCluster:
for node in self.kikimr_cluster.nodes.values():
node.ssh_command(
'sudo pkill screen',
- raise_on_error=True
+ raise_on_error=False
)
def stop_nemesis(self):
@@ -284,17 +312,14 @@ class StabilityCluster:
print(f'\t{state_object}:\t{status}')
def cleanup(self, mode='all'):
- if mode in ['all', 'logs']:
- self.kikimr_cluster.cleanup_logs()
for node in self.kikimr_cluster.nodes.values():
if mode in ['all', 'dumps']:
node.ssh_command('sudo rm -rf /coredumps/*', raise_on_error=False)
if mode in ['all', 'logs']:
+ node.ssh_command('sudo find /Berkanavt/kikimr*/logs/kikimr* -type f -exec rm -f {} +', raise_on_error=False)
node.ssh_command('sudo rm -rf /Berkanavt/nemesis/log/*', raise_on_error=False)
- if mode == 'all':
- self.stop_nemesis()
- node.ssh_command('sudo pkill screen', raise_on_error=False)
- node.ssh_command('sudo rm -rf /Berkanavt/kikimr/bin/*', raise_on_error=False)
+ if mode in ['all', 'logs']:
+ self.kikimr_cluster.cleanup_logs()
def deploy_ydb(self):
self.cleanup()
@@ -309,6 +334,7 @@ class StabilityCluster:
node.ssh_command("/Berkanavt/kikimr/bin/kikimr admin console validator disable bootstrap", raise_on_error=True)
self.deploy_tools()
+ self.get_state()
def deploy_tools(self):
for node in self.kikimr_cluster.nodes.values():
@@ -348,7 +374,7 @@ def parse_args():
)
parser.add_argument(
"--ydbd_path",
- required=True,
+ required=False,
type=path_type,
help="Path to ydbd",
)
@@ -371,7 +397,10 @@ def parse_args():
nargs="+",
choices=[
"get_errors",
+ "get_errors_aggr",
+ "get_errors_last",
"get_state",
+ "clean_workload",
"cleanup",
"cleanup_logs",
"cleanup_dumps",
@@ -379,7 +408,7 @@ def parse_args():
"deploy_tools",
"start_nemesis",
"stop_nemesis",
- "start_all_workloads",
+ "start_default_workloads",
"start_workload_simple_queue_row",
"start_workload_simple_queue_column",
"start_workload_olap_workload",
@@ -387,16 +416,37 @@ def parse_args():
"start_workload_log_column",
"start_workload_log_row",
"stop_workloads",
+ "stop_workload",
"perform_checks",
],
help="actions to execute",
)
+ args, unknown = parser.parse_known_args()
+ if "stop_workload" in args.actions:
+ parser.add_argument(
+ "--name",
+ type=str,
+ required=True,
+ help="Name of the workload to stop",
+ choices=list(DICT_OF_PROCESSES.keys())
+ )
+
+ if "clean_workload" in args.actions:
+ parser.add_argument(
+ "--name",
+ type=str,
+ required=True,
+ help="Name of the workload to stop",
+ choices=list(DICT_OF_PROCESSES.keys())
+ )
+
return parser.parse_args()
def main():
args = parse_args()
ssh_username = args.ssh_user
+ print('Initing cluster info')
stability_cluster = StabilityCluster(
ssh_username=ssh_username,
cluster_path=args.cluster_path,
@@ -405,8 +455,13 @@ def main():
)
for action in args.actions:
+ print(f'Start action {action}')
if action == "get_errors":
- stability_cluster.get_errors()
+ stability_cluster.get_errors(mode='raw')
+ if action == "get_errors_aggr":
+ stability_cluster.get_errors(mode='aggr')
+ if action == "get_errors_last":
+ stability_cluster.get_errors(mode='last')
if action == "get_state":
stability_cluster.get_state()
if action == "deploy_ydb":
@@ -419,7 +474,7 @@ def main():
stability_cluster.cleanup('dumps')
if action == "deploy_tools":
stability_cluster.deploy_tools()
- if action == "start_all_workloads":
+ if action == "start_default_workloads":
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
node.ssh_command(
'screen -s simple_queue_row -d -m bash -c "while true; do /Berkanavt/nemesis/bin/simple_queue --database /Root/db1 --mode row; done"',
@@ -434,6 +489,43 @@ def main():
raise_on_error=True
)
stability_cluster.get_state()
+ if action == "stop_workload":
+ workload_name = args.name
+ if DICT_OF_PROCESSES.get(workload_name):
+ for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
+ node.ssh_command(
+ f"ps aux | grep {workload_name} | grep -v grep | awk '{{print $2}}' | xargs kill -9",
+ raise_on_error=True)
+ else:
+ print(f"Unknown workload {workload_name}")
+ stability_cluster.get_state()
+ if "clean_workload" in action:
+ workload_name = args.name
+ if DICT_OF_PROCESSES.get(workload_name):
+ store_type_list = []
+ if 'column' in workload_name:
+ store_type_list.append('column')
+ elif 'row' in workload_name:
+ store_type_list.append('row')
+ else:
+ store_type_list = ['column', 'row']
+ if 'log_' in workload_name:
+ first_node = stability_cluster.kikimr_cluster.nodes[1]
+ for store_type in store_type_list:
+ first_node.ssh_command([
+ '/Berkanavt/nemesis/bin/ydb_cli',
+ '--endpoint', f'grpc://localhost:{first_node.grpc_port}',
+ '--database', '/Root/db1',
+ 'workload', 'log', 'clean',
+ '--path', f'log_workload_{store_type}',
+ ],
+ raise_on_error=True
+ )
+ else:
+ print(f"Not supported workload clean command for {workload_name}")
+ else:
+ print(f"Unknown workload {workload_name}")
+ stability_cluster.get_state()
if "start_workload_log" in action:
store_type_list = []
if action == 'start_workload_log_column':
@@ -448,27 +540,18 @@ def main():
'/Berkanavt/nemesis/bin/ydb_cli',
'--endpoint', f'grpc://localhost:{first_node.grpc_port}',
'--database', '/Root/db1',
- 'workload', 'log', 'clean',
- '--path', f'log_workload_{store_type}',
- ],
- raise_on_error=True
- )
- first_node.ssh_command([
- '/Berkanavt/nemesis/bin/ydb_cli',
- '--endpoint', f'grpc://localhost:{first_node.grpc_port}',
- '--database', '/Root/db1',
'workload', 'log', 'init',
'--len', '1000',
- '--int-cols', '20',
- '--key-cols', '20',
+ '--int-cols', '18',
+ '--key-cols', '18',
'--min-partitions', '100',
'--partition-size', '10',
'--auto-partition', '0',
'--store', store_type,
'--path', f'log_workload_{store_type}',
- '--ttl', '3600'
+ '--ttl', '20160'
],
- raise_on_error=True
+ raise_on_error=False
)
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
node.ssh_command([
@@ -478,9 +561,9 @@ def main():
'--database', '/Root/db1',
'workload', 'log', 'run', 'bulk_upsert',
'--len', '1000',
- '--int-cols', '20',
- '--key-cols', '20',
- '--threads', '20',
+ '--int-cols', '18',
+ '--key-cols', '18',
+ '--threads', '1',
'--timestamp_deviation', '180',
'--seconds', '86400',
'--path', f'log_workload_{store_type}',
@@ -488,6 +571,21 @@ def main():
],
raise_on_error=True
)
+ node.ssh_command([
+ f'screen -s workload_log_{store_type}_select -d -m bash -c "while true; do',
+ '/Berkanavt/nemesis/bin/ydb_cli',
+ '--verbose',
+ '--endpoint', f'grpc://localhost:{node.grpc_port}',
+ '--database', '/Root/db1',
+ 'workload', 'log', 'run', 'select',
+ '--client-timeout', '1800000',
+ '--threads', '1',
+ '--seconds', '86400',
+ '--path', f'log_workload_{store_type}',
+ '; done"'
+ ],
+ raise_on_error=True
+ )
stability_cluster.get_state()
if action == "start_workload_simple_queue_row":
for node_id, node in enumerate(stability_cluster.kikimr_cluster.nodes.values()):
diff --git a/ydb/tests/stability/tool/how_to.md b/ydb/tests/stability/tool/how_to.md
index c6595aaca5..8be1e3b80a 100644
--- a/ydb/tests/stability/tool/how_to.md
+++ b/ydb/tests/stability/tool/how_to.md
@@ -16,16 +16,16 @@
./tool deploy_tools --cluster_path=<path_to_cluster.yaml> --ydbd_path=<repo_root>/ydb/apps/ydbd/ydbd
```
5) start workload:
- - `start_all_workloads` - start all listed below workloads
+ - `start_default_workloads` - start all listed below workloads
- `start_workload_simple_queue_row`
- `start_workload_simple_queue_column`
- `start_workload_olap_workload`
- not included in `start_all_workloads`:
+ not included in `start_default_workloads`:
- `start_workload_log`
```
- ./tool start_all_workloads --cluster_path=<path_to_cluster.yaml> --ydbd_path=<repo_root>/ydb/apps/ydbd/ydbd
+ ./tool start_default_workloads --cluster_path=<path_to_cluster.yaml> --ydbd_path=<repo_root>/ydb/apps/ydbd/ydbd
```
to stop workload, use command `stop_workloads`