summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <[email protected]>2023-04-13 12:55:20 +0300
committersnaury <[email protected]>2023-04-13 12:55:20 +0300
commit89ca66f252da3a017f748b167553026305555021 (patch)
tree5efc707c78adea53a169b200b98868d8008c20f7
parent9fc74dc5d2668478e58fe1ff338d095d58c60c75 (diff)
Add oplog to ydb serializable for problem diagnostics
-rw-r--r--ydb/tests/tools/ydb_serializable/__main__.py26
-rw-r--r--ydb/tests/tools/ydb_serializable/lib/__init__.py260
2 files changed, 200 insertions, 86 deletions
diff --git a/ydb/tests/tools/ydb_serializable/__main__.py b/ydb/tests/tools/ydb_serializable/__main__.py
index f03cc08bced..f750ecf4394 100644
--- a/ydb/tests/tools/ydb_serializable/__main__.py
+++ b/ydb/tests/tools/ydb_serializable/__main__.py
@@ -36,6 +36,7 @@ def main():
parser.add_argument('--processes', type=int, default=1, help='Number of processes to fork into, default is 1')
parser.add_argument('--print-unique-errors', dest='print_unique_errors', action='store_const', const=1, default=0, help='Print unique errors that happen during execution')
parser.add_argument('--print-unique-traceback', dest='print_unique_errors', action='store_const', const=2, help='Print traceback for unique errors that happen during execution')
+ parser.add_argument('--oplog-results', dest='oplog_results', action='store_true', default=False, help='Store operation results in oplog dumps')
args = parser.parse_args()
logger = DummyLogger()
@@ -53,8 +54,9 @@ def main():
options.read_table_ranges = args.read_table_ranges
options.ignore_read_table = args.ignore_read_table
options.read_table_snapshot = args.read_table_snapshot
+ options.oplog_results = args.oplog_results
- async def async_run_single():
+ async def async_run_single_inner():
iterations = args.iterations
async with DatabaseChecker(args.endpoint, args.database, path=args.path, logger=logger, print_unique_errors=args.print_unique_errors) as checker:
@@ -63,20 +65,36 @@ def main():
await checker.async_run(options)
except SerializabilityError as e:
e.history.write_to_file(os.path.join(args.output_path, os.path.basename(e.table) + '_history.json'))
+ e.history.write_log_to_file(os.path.join(args.output_path, os.path.basename(e.table) + '_log.json'))
raise
if iterations is not None:
iterations -= 1
- def run_single():
- def handler(signum, frame, sys=sys, logger=logger):
+ async def async_run_single():
+ loop = asyncio.get_event_loop()
+ task = asyncio.create_task(async_run_single_inner())
+
+ def handler(signum, frame, sys=sys, logger=logger, loop=loop, task=task):
logger.warning('Terminating on signal %d', signum)
+
+ def do_cancel():
+ if not task.done():
+ task.cancel()
+
+ loop.call_soon_threadsafe(do_cancel)
sys.exit(1)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
- asyncio.run(async_run_single())
+ await task
+
+ def run_single():
+ try:
+ asyncio.run(async_run_single())
+ except asyncio.exceptions.CancelledError:
+ sys.exit(1)
def run_multiple():
def handler(signum, frame, sys=sys, logger=logger):
diff --git a/ydb/tests/tools/ydb_serializable/lib/__init__.py b/ydb/tests/tools/ydb_serializable/lib/__init__.py
index 189156151f0..52b9ec7b8e3 100644
--- a/ydb/tests/tools/ydb_serializable/lib/__init__.py
+++ b/ydb/tests/tools/ydb_serializable/lib/__init__.py
@@ -2,6 +2,7 @@
import json
import time
import random
+from contextlib import contextmanager
from datetime import datetime
import asyncio
from ydb.tests.library.serializability.checker import SerializabilityError, SerializabilityChecker
@@ -122,6 +123,7 @@ class DummyLogger(object):
class History(object):
def __init__(self):
self.items = []
+ self.oplog = []
class Prepare(object):
def __init__(self, keys):
@@ -250,6 +252,41 @@ class History(object):
for item in self.items:
item.apply_to(checker)
+ class OpLogEntry(object):
+ def __init__(self, start_time, op, description):
+ self.start_time = start_time
+ self.end_time = None
+ self.op = op
+ self.description = description
+ self._results = []
+
+ def result(self, result):
+ self._results.append(result)
+
+ def done(self):
+ if self.end_time is None:
+ self.end_time = time.time()
+
+ def to_json(self):
+ return [self.start_time, self.end_time, self.op, self.description] + self._results
+
+ def log_op_begin(self, op, description):
+ entry = self.OpLogEntry(time.time(), op, description)
+ self.oplog.append(entry)
+ return entry
+
+ @contextmanager
+ def log_op(self, op, description):
+ entry = self.log_op_begin(op, description)
+ try:
+ try:
+ yield entry
+ finally:
+ entry.done()
+ except BaseException as e:
+ entry.result(f'Exception: {e}')
+ raise
+
@classmethod
def from_json(cls, items):
self = cls()
@@ -276,6 +313,19 @@ class History(object):
first = False
f.write(']\n')
+ def write_log_to_file(self, filename):
+ if not self.oplog:
+ return
+ with open(filename, 'w') as f:
+ first = True
+ f.write('[')
+ for item in self.oplog:
+ if not first:
+ f.write(',\n ')
+ f.write(json.dumps(item.to_json()))
+ first = False
+ f.write(']\n')
+
class DatabaseCheckerOptions(object):
def __init__(self):
@@ -291,6 +341,7 @@ class DatabaseCheckerOptions(object):
self.read_table_ranges = False
self.ignore_read_table = False
self.read_table_snapshot = None
+ self.oplog_results = False
class DatabaseChecker(object):
@@ -388,20 +439,24 @@ class DatabaseChecker(object):
observed_values = None
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
- rss = await tx.execute(
- read_query,
- parameters={
- '$data': [
- {'key': key} for key in keys
- ],
- },
- commit_tx=simple_tx,
- )
+ with history.log_op(node.value, 'read+commit' if simple_tx else 'read') as log:
+ rss = await tx.execute(
+ read_query,
+ parameters={
+ '$data': [
+ {'key': key} for key in keys
+ ],
+ },
+ commit_tx=simple_tx,
+ )
observed_values = {}
for row in rss[0].rows:
observed_values[row.key] = row.value
+ if options.oplog_results:
+ log.result(dict(observed_values))
if not simple_tx:
- await tx.commit()
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
try:
await self.async_retry_operation(perform, deadline)
@@ -423,17 +478,19 @@ class DatabaseChecker(object):
async def perform(session):
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
- await tx.execute(
- write_query,
- parameters={
- '$data': [
- {'key': key, 'value': node.value} for key in keys
- ],
- },
- commit_tx=simple_tx,
- )
+ with history.log_op(node.value, 'write+commit' if simple_tx else 'write'):
+ await tx.execute(
+ write_query,
+ parameters={
+ '$data': [
+ {'key': key, 'value': node.value} for key in keys
+ ],
+ },
+ commit_tx=simple_tx,
+ )
if not simple_tx:
- await tx.commit()
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
try:
await self.async_retry_operation(perform, deadline)
@@ -492,16 +549,25 @@ class DatabaseChecker(object):
if simple_tx:
query = rwr_query if read1_keys else wr_query
parameters = dict(**read1_params, **write_params, **read2_params)
- rss = await tx.execute(query, parameters, commit_tx=True)
+ with history.log_op(node.value, 'read+write+read+commit' if read1_keys else 'write+read+commit') as log:
+ rss = await tx.execute(query, parameters, commit_tx=True)
if read1_keys:
read1 = rss[0]
read2 = rss[1]
+ if options.oplog_results:
+ log.result({row.key: row.value for row in read1.rows})
+ log.result({row.key: row.value for row in read2.rows})
else:
read1 = None
read2 = rss[0]
+ if options.oplog_results:
+ log.result({row.key: row.value for row in read2.rows})
elif read1_keys:
- rss = await tx.execute(read1_query, read1_params, commit_tx=False)
+ with history.log_op(node.value, 'read') as log:
+ rss = await tx.execute(read1_query, read1_params, commit_tx=False)
read1 = rss[0]
+ if options.oplog_results:
+ log.result({row.key: row.value for row in read1.rows})
else:
read1 = None
if read1_keys:
@@ -510,9 +576,13 @@ class DatabaseChecker(object):
observed_values[row.key] = row.value
node.expected_read_keys = tuple(read1_keys)
if not simple_tx:
- await tx.execute(write_query, write_params, commit_tx=False)
- rss = await tx.execute(read2_query, read2_params, commit_tx=fuse_commit)
+ with history.log_op(node.value, 'write'):
+ await tx.execute(write_query, write_params, commit_tx=False)
+ with history.log_op(node.value, 'read+commit' if fuse_commit else 'read') as log:
+ rss = await tx.execute(read2_query, read2_params, commit_tx=fuse_commit)
read2 = rss[0]
+ if options.oplog_results:
+ log.result({row.key: row.value for row in read2.rows})
for row in read2.rows:
if row.key in write_keys_set:
if row.value != node.value:
@@ -523,7 +593,8 @@ class DatabaseChecker(object):
observed_values[row.key] = row.value
node.expected_read_keys = tuple(read_keys)
if not simple_tx and not fuse_commit:
- await tx.commit()
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
try:
await self.async_retry_operation(perform, deadline)
@@ -553,38 +624,49 @@ class DatabaseChecker(object):
# Read/Write tx may fail with TLI
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
+ fuse_commit = bool(random.randint(0, 1))
if simple_tx:
- rss = await tx.execute(
- read_write_query,
- parameters={
- '$reads': [
- {'key': key} for key in read_keys
- ],
- '$writes': [
- {'key': key, 'value': node.value} for key in write_keys
- ],
- },
- commit_tx=True,
- )
+ with history.log_op(node.value, 'read+write+commit' if fuse_commit else 'read+write') as log:
+ rss = await tx.execute(
+ read_write_query,
+ parameters={
+ '$reads': [
+ {'key': key} for key in read_keys
+ ],
+ '$writes': [
+ {'key': key, 'value': node.value} for key in write_keys
+ ],
+ },
+ commit_tx=fuse_commit,
+ )
+ if options.oplog_results:
+ log.result({row.key: row.value for row in rss[0].rows})
else:
- rss = await tx.execute(
- read_query,
- parameters={
- '$data': [
- {'key': key} for key in read_keys
- ],
- },
- commit_tx=False,
- )
- await tx.execute(
- write_query,
- parameters={
- '$data': [
- {'key': key, 'value': node.value} for key in write_keys
- ],
- },
- commit_tx=True,
- )
+ with history.log_op(node.value, 'read') as log:
+ rss = await tx.execute(
+ read_query,
+ parameters={
+ '$data': [
+ {'key': key} for key in read_keys
+ ],
+ },
+ commit_tx=False,
+ )
+ if options.oplog_results:
+ log.result({row.key: row.value for row in rss[0].rows})
+ with history.log_op(node.value, 'write+commit' if fuse_commit else 'write') as log:
+ await tx.execute(
+ write_query,
+ parameters={
+ '$data': [
+ {'key': key, 'value': node.value} for key in write_keys
+ ],
+ },
+ commit_tx=fuse_commit,
+ )
+ if not fuse_commit:
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
return rss
try:
@@ -620,17 +702,21 @@ class DatabaseChecker(object):
async def perform(session):
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
- rss = await tx.execute(
- read_query,
- parameters={
- '$data': [
- {'key': key} for key in keys
- ],
- },
- commit_tx=simple_tx,
- )
+ with history.log_op(node.value, 'read+commit' if simple_tx else 'read') as log:
+ rss = await tx.execute(
+ read_query,
+ parameters={
+ '$data': [
+ {'key': key} for key in keys
+ ],
+ },
+ commit_tx=simple_tx,
+ )
+ if options.oplog_results:
+ log.result({row.key: row.value for row in rss[0].rows})
if not simple_tx:
- await tx.commit()
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
return rss
try:
@@ -659,16 +745,20 @@ class DatabaseChecker(object):
async def perform(session):
async with session.transaction(ydb.SerializableReadWrite()) as tx:
simple_tx = bool(random.randint(0, 1))
- rss = await tx.execute(
- range_query,
- parameters={
- '$minKey': min_key,
- '$maxKey': max_key,
- },
- commit_tx=simple_tx,
- )
+ with history.log_op(node.value, 'range_read+commit' if simple_tx else 'range_read') as log:
+ rss = await tx.execute(
+ range_query,
+ parameters={
+ '$minKey': min_key,
+ '$maxKey': max_key,
+ },
+ commit_tx=simple_tx,
+ )
+ if options.oplog_results:
+ log.result({row.key: row.value for row in rss[0].rows})
if not simple_tx:
- await tx.commit()
+ with history.log_op(node.value, 'commit'):
+ await tx.commit()
return rss
try:
@@ -703,11 +793,14 @@ class DatabaseChecker(object):
async def perform(session):
values = {}
failed = False
- chunks = await session.read_table(table, key_range, use_snapshot=options.read_table_snapshot)
try:
- async for chunk in chunks:
- for row in chunk.rows:
- values[row.key] = row.value
+ with history.log_op(node.value, 'read_table') as log:
+ chunks = await session.read_table(table, key_range, use_snapshot=options.read_table_snapshot)
+ async for chunk in chunks:
+ for row in chunk.rows:
+ values[row.key] = row.value
+ if options.oplog_results:
+ log.result({row.key: row.value for row in chunk.rows})
except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
failed = True
return (values, failed)
@@ -791,10 +884,13 @@ class DatabaseChecker(object):
while True:
values = {}
try:
- chunks = await session.read_table(table)
- async for chunk in chunks:
- for row in chunk.rows:
- values[row.key] = row.value
+ with history.log_op(node.value, 'read_table_final') as log:
+ chunks = await session.read_table(table)
+ async for chunk in chunks:
+ for row in chunk.rows:
+ values[row.key] = row.value
+ if options.oplog_results:
+ log.result({row.key: row.value for row in chunk.rows})
except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
if self.logger is not None:
self.logger.info('Temporary failure, retrying...')