diff options
author | snaury <[email protected]> | 2023-04-13 12:55:20 +0300 |
---|---|---|
committer | snaury <[email protected]> | 2023-04-13 12:55:20 +0300 |
commit | 89ca66f252da3a017f748b167553026305555021 (patch) | |
tree | 5efc707c78adea53a169b200b98868d8008c20f7 | |
parent | 9fc74dc5d2668478e58fe1ff338d095d58c60c75 (diff) |
Add oplog to ydb serializable for problem diagnostics
-rw-r--r-- | ydb/tests/tools/ydb_serializable/__main__.py | 26 | ||||
-rw-r--r-- | ydb/tests/tools/ydb_serializable/lib/__init__.py | 260 |
2 files changed, 200 insertions, 86 deletions
diff --git a/ydb/tests/tools/ydb_serializable/__main__.py b/ydb/tests/tools/ydb_serializable/__main__.py index f03cc08bced..f750ecf4394 100644 --- a/ydb/tests/tools/ydb_serializable/__main__.py +++ b/ydb/tests/tools/ydb_serializable/__main__.py @@ -36,6 +36,7 @@ def main(): parser.add_argument('--processes', type=int, default=1, help='Number of processes to fork into, default is 1') parser.add_argument('--print-unique-errors', dest='print_unique_errors', action='store_const', const=1, default=0, help='Print unique errors that happen during execution') parser.add_argument('--print-unique-traceback', dest='print_unique_errors', action='store_const', const=2, help='Print traceback for unique errors that happen during execution') + parser.add_argument('--oplog-results', dest='oplog_results', action='store_true', default=False, help='Store operation results in oplog dumps') args = parser.parse_args() logger = DummyLogger() @@ -53,8 +54,9 @@ def main(): options.read_table_ranges = args.read_table_ranges options.ignore_read_table = args.ignore_read_table options.read_table_snapshot = args.read_table_snapshot + options.oplog_results = args.oplog_results - async def async_run_single(): + async def async_run_single_inner(): iterations = args.iterations async with DatabaseChecker(args.endpoint, args.database, path=args.path, logger=logger, print_unique_errors=args.print_unique_errors) as checker: @@ -63,20 +65,36 @@ def main(): await checker.async_run(options) except SerializabilityError as e: e.history.write_to_file(os.path.join(args.output_path, os.path.basename(e.table) + '_history.json')) + e.history.write_log_to_file(os.path.join(args.output_path, os.path.basename(e.table) + '_log.json')) raise if iterations is not None: iterations -= 1 - def run_single(): - def handler(signum, frame, sys=sys, logger=logger): + async def async_run_single(): + loop = asyncio.get_event_loop() + task = asyncio.create_task(async_run_single_inner()) + + def handler(signum, frame, sys=sys, logger=logger, loop=loop, task=task): logger.warning('Terminating on signal %d', signum) + + def do_cancel(): + if not task.done(): + task.cancel() + + loop.call_soon_threadsafe(do_cancel) sys.exit(1) signal.signal(signal.SIGINT, handler) signal.signal(signal.SIGTERM, handler) - asyncio.run(async_run_single()) + await task + + def run_single(): + try: + asyncio.run(async_run_single()) + except asyncio.exceptions.CancelledError: + sys.exit(1) def run_multiple(): def handler(signum, frame, sys=sys, logger=logger): diff --git a/ydb/tests/tools/ydb_serializable/lib/__init__.py b/ydb/tests/tools/ydb_serializable/lib/__init__.py index 189156151f0..52b9ec7b8e3 100644 --- a/ydb/tests/tools/ydb_serializable/lib/__init__.py +++ b/ydb/tests/tools/ydb_serializable/lib/__init__.py @@ -2,6 +2,7 @@ import json import time import random +from contextlib import contextmanager from datetime import datetime import asyncio from ydb.tests.library.serializability.checker import SerializabilityError, SerializabilityChecker @@ -122,6 +123,7 @@ class DummyLogger(object): class History(object): def __init__(self): self.items = [] + self.oplog = [] class Prepare(object): def __init__(self, keys): @@ -250,6 +252,41 @@ class History(object): for item in self.items: item.apply_to(checker) + class OpLogEntry(object): + def __init__(self, start_time, op, description): + self.start_time = start_time + self.end_time = None + self.op = op + self.description = description + self._results = [] + + def result(self, result): + self._results.append(result) + + def done(self): + if self.end_time is None: + self.end_time = time.time() + + def to_json(self): + return [self.start_time, self.end_time, self.op, self.description] + self._results + + def log_op_begin(self, op, description): + entry = self.OpLogEntry(time.time(), op, description) + self.oplog.append(entry) + return entry + + @contextmanager + def log_op(self, op, description): + entry = self.log_op_begin(op, description) + try: + try: + yield entry + finally: + entry.done() + except BaseException as e: + entry.result(f'Exception: {e}') + raise + @classmethod def from_json(cls, items): self = cls() @@ -276,6 +313,19 @@ class History(object): first = False f.write(']\n') + def write_log_to_file(self, filename): + if not self.oplog: + return + with open(filename, 'w') as f: + first = True + f.write('[') + for item in self.oplog: + if not first: + f.write(',\n ') + f.write(json.dumps(item.to_json())) + first = False + f.write(']\n') + class DatabaseCheckerOptions(object): def __init__(self): @@ -291,6 +341,7 @@ class DatabaseCheckerOptions(object): self.read_table_ranges = False self.ignore_read_table = False self.read_table_snapshot = None + self.oplog_results = False class DatabaseChecker(object): @@ -388,20 +439,24 @@ class DatabaseChecker(object): observed_values = None async with session.transaction(ydb.SerializableReadWrite()) as tx: simple_tx = bool(random.randint(0, 1)) - rss = await tx.execute( - read_query, - parameters={ - '$data': [ - {'key': key} for key in keys - ], - }, - commit_tx=simple_tx, - ) + with history.log_op(node.value, 'read+commit' if simple_tx else 'read') as log: + rss = await tx.execute( + read_query, + parameters={ + '$data': [ + {'key': key} for key in keys + ], + }, + commit_tx=simple_tx, + ) observed_values = {} for row in rss[0].rows: observed_values[row.key] = row.value + if options.oplog_results: + log.result(dict(observed_values)) if not simple_tx: - await tx.commit() + with history.log_op(node.value, 'commit'): + await tx.commit() try: await self.async_retry_operation(perform, deadline) @@ -423,17 +478,19 @@ class DatabaseChecker(object): async def perform(session): async with session.transaction(ydb.SerializableReadWrite()) as tx: simple_tx = bool(random.randint(0, 1)) - await tx.execute( - write_query, - parameters={ - '$data': [ - {'key': key, 'value': node.value} for key in keys - ], - }, - commit_tx=simple_tx, - ) + with history.log_op(node.value, 'write+commit' if simple_tx else 'write'): + await tx.execute( + write_query, + parameters={ + '$data': [ + {'key': key, 'value': node.value} for key in keys + ], + }, + commit_tx=simple_tx, + ) if not simple_tx: - await tx.commit() + with history.log_op(node.value, 'commit'): + await tx.commit() try: await self.async_retry_operation(perform, deadline) @@ -492,16 +549,25 @@ class DatabaseChecker(object): if simple_tx: query = rwr_query if read1_keys else wr_query parameters = dict(**read1_params, **write_params, **read2_params) - rss = await tx.execute(query, parameters, commit_tx=True) + with history.log_op(node.value, 'read+write+read+commit' if read1_keys else 'write+read+commit') as log: + rss = await tx.execute(query, parameters, commit_tx=True) if read1_keys: read1 = rss[0] read2 = rss[1] + if options.oplog_results: + log.result({row.key: row.value for row in read1.rows}) + log.result({row.key: row.value for row in read2.rows}) else: read1 = None read2 = rss[0] + if options.oplog_results: + log.result({row.key: row.value for row in read2.rows}) elif read1_keys: - rss = await tx.execute(read1_query, read1_params, commit_tx=False) + with history.log_op(node.value, 'read') as log: + rss = await tx.execute(read1_query, read1_params, commit_tx=False) read1 = rss[0] + if options.oplog_results: + log.result({row.key: row.value for row in read1.rows}) else: read1 = None if read1_keys: @@ -510,9 +576,13 @@ class DatabaseChecker(object): observed_values[row.key] = row.value node.expected_read_keys = tuple(read1_keys) if not simple_tx: - await tx.execute(write_query, write_params, commit_tx=False) - rss = await tx.execute(read2_query, read2_params, commit_tx=fuse_commit) + with history.log_op(node.value, 'write'): + await tx.execute(write_query, write_params, commit_tx=False) + with history.log_op(node.value, 'read+commit' if fuse_commit else 'read') as log: + rss = await tx.execute(read2_query, read2_params, commit_tx=fuse_commit) read2 = rss[0] + if options.oplog_results: + log.result({row.key: row.value for row in read2.rows}) for row in read2.rows: if row.key in write_keys_set: if row.value != node.value: @@ -523,7 +593,8 @@ class DatabaseChecker(object): observed_values[row.key] = row.value node.expected_read_keys = tuple(read_keys) if not simple_tx and not fuse_commit: - await tx.commit() + with history.log_op(node.value, 'commit'): + await tx.commit() try: await self.async_retry_operation(perform, deadline) @@ -553,38 +624,49 @@ class DatabaseChecker(object): # Read/Write tx may fail with TLI async with session.transaction(ydb.SerializableReadWrite()) as tx: simple_tx = bool(random.randint(0, 1)) + fuse_commit = bool(random.randint(0, 1)) if simple_tx: - rss = await tx.execute( - read_write_query, - parameters={ - '$reads': [ - {'key': key} for key in read_keys - ], - '$writes': [ - {'key': key, 'value': node.value} for key in write_keys - ], - }, - commit_tx=True, - ) + with history.log_op(node.value, 'read+write+commit' if fuse_commit else 'read+write') as log: + rss = await tx.execute( + read_write_query, + parameters={ + '$reads': [ + {'key': key} for key in read_keys + ], + '$writes': [ + {'key': key, 'value': node.value} for key in write_keys + ], + }, + commit_tx=fuse_commit, + ) + if options.oplog_results: + log.result({row.key: row.value for row in rss[0].rows}) else: - rss = await tx.execute( - read_query, - parameters={ - '$data': [ - {'key': key} for key in read_keys - ], - }, - commit_tx=False, - ) - await tx.execute( - write_query, - parameters={ - '$data': [ - {'key': key, 'value': node.value} for key in write_keys - ], - }, - commit_tx=True, - ) + with history.log_op(node.value, 'read') as log: + rss = await tx.execute( + read_query, + parameters={ + '$data': [ + {'key': key} for key in read_keys + ], + }, + commit_tx=False, + ) + if options.oplog_results: + log.result({row.key: row.value for row in rss[0].rows}) + with history.log_op(node.value, 'write+commit' if fuse_commit else 'write') as log: + await tx.execute( + write_query, + parameters={ + '$data': [ + {'key': key, 'value': node.value} for key in write_keys + ], + }, + commit_tx=fuse_commit, + ) + if not fuse_commit: + with history.log_op(node.value, 'commit'): + await tx.commit() return rss try: @@ -620,17 +702,21 @@ class DatabaseChecker(object): async def perform(session): async with session.transaction(ydb.SerializableReadWrite()) as tx: simple_tx = bool(random.randint(0, 1)) - rss = await tx.execute( - read_query, - parameters={ - '$data': [ - {'key': key} for key in keys - ], - }, - commit_tx=simple_tx, - ) + with history.log_op(node.value, 'read+commit' if simple_tx else 'read') as log: + rss = await tx.execute( + read_query, + parameters={ + '$data': [ + {'key': key} for key in keys + ], + }, + commit_tx=simple_tx, + ) + if options.oplog_results: + log.result({row.key: row.value for row in rss[0].rows}) if not simple_tx: - await tx.commit() + with history.log_op(node.value, 'commit'): + await tx.commit() return rss try: @@ -659,16 +745,20 @@ class DatabaseChecker(object): async def perform(session): async with session.transaction(ydb.SerializableReadWrite()) as tx: simple_tx = bool(random.randint(0, 1)) - rss = await tx.execute( - range_query, - parameters={ - '$minKey': min_key, - '$maxKey': max_key, - }, - commit_tx=simple_tx, - ) + with history.log_op(node.value, 'range_read+commit' if simple_tx else 'range_read') as log: + rss = await tx.execute( + range_query, + parameters={ + '$minKey': min_key, + '$maxKey': max_key, + }, + commit_tx=simple_tx, + ) + if options.oplog_results: + log.result({row.key: row.value for row in rss[0].rows}) if not simple_tx: - await tx.commit() + with history.log_op(node.value, 'commit'): + await tx.commit() return rss try: @@ -703,11 +793,14 @@ class DatabaseChecker(object): async def perform(session): values = {} failed = False - chunks = await session.read_table(table, key_range, use_snapshot=options.read_table_snapshot) try: - async for chunk in chunks: - for row in chunk.rows: - values[row.key] = row.value + with history.log_op(node.value, 'read_table') as log: + chunks = await session.read_table(table, key_range, use_snapshot=options.read_table_snapshot) + async for chunk in chunks: + for row in chunk.rows: + values[row.key] = row.value + if options.oplog_results: + log.result({row.key: row.value for row in chunk.rows}) except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): failed = True return (values, failed) @@ -791,10 +884,13 @@ class DatabaseChecker(object): while True: values = {} try: - chunks = await session.read_table(table) - async for chunk in chunks: - for row in chunk.rows: - values[row.key] = row.value + with history.log_op(node.value, 'read_table_final') as log: + chunks = await session.read_table(table) + async for chunk in chunks: + for row in chunk.rows: + values[row.key] = row.value + if options.oplog_results: + log.result({row.key: row.value for row in chunk.rows}) except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): if self.logger is not None: self.logger.info('Temporary failure, retrying...') |