aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-03-27 15:00:32 +0300
committersnaury <snaury@ydb.tech>2023-03-27 15:00:32 +0300
commit4c07fb6ba6c6e74bd2effc77c5ed3a9ef4d17237 (patch)
tree80f213ae0780826f3921fe73dd33d8a5818e043d
parenta4bf94d2c76a6ddfc2d701208ee5482ab1f70bd6 (diff)
downloadydb-4c07fb6ba6c6e74bd2effc77c5ed3a9ef4d17237.tar.gz
Switch ydb_serializable to asyncio
-rw-r--r--ydb/tests/functional/serializable/test.py16
-rw-r--r--ydb/tests/tools/ydb_serializable/__main__.py24
-rw-r--r--ydb/tests/tools/ydb_serializable/lib/__init__.py394
-rw-r--r--ydb/tests/tools/ydb_serializable/lib/ya.make3
4 files changed, 201 insertions, 236 deletions
diff --git a/ydb/tests/functional/serializable/test.py b/ydb/tests/functional/serializable/test.py
index dcb297f73f..2b2bb2d779 100644
--- a/ydb/tests/functional/serializable/test.py
+++ b/ydb/tests/functional/serializable/test.py
@@ -96,11 +96,15 @@ def test_local():
with open('ydb_database.txt') as r:
database = r.read()
- checker = DatabaseChecker(endpoint, database)
- options = DatabaseCheckerOptions()
+ async def async_wrapper():
+ async with DatabaseChecker(endpoint, database) as checker:
+ options = DatabaseCheckerOptions()
- options.read_table_ranges = False
- checker.run(options)
+ options.read_table_ranges = False
+ await checker.async_run(options)
- options.read_table_ranges = True
- checker.run(options)
+ options.read_table_ranges = True
+ await checker.async_run(options)
+
+ import asyncio
+ asyncio.run(async_wrapper())
diff --git a/ydb/tests/tools/ydb_serializable/__main__.py b/ydb/tests/tools/ydb_serializable/__main__.py
index 9f107cacde..8ae95dc9b8 100644
--- a/ydb/tests/tools/ydb_serializable/__main__.py
+++ b/ydb/tests/tools/ydb_serializable/__main__.py
@@ -4,6 +4,7 @@ import time
import signal
import multiprocessing
from argparse import ArgumentParser
+import asyncio
from ydb.tests.tools.ydb_serializable.lib import (
DatabaseChecker,
DatabaseCheckerOptions,
@@ -49,20 +50,13 @@ def main():
options.ignore_read_table = args.ignore_read_table
options.read_table_snapshot = args.read_table_snapshot
- def run_single():
- def handler(signum, frame, sys=sys, logger=logger):
- logger.warning('Terminating on signal %d', signum)
- sys.exit(1)
-
- signal.signal(signal.SIGINT, handler)
- signal.signal(signal.SIGTERM, handler)
-
+ async def async_run_single():
iterations = args.iterations
- with DatabaseChecker(args.endpoint, args.database, path=args.path, logger=logger) as checker:
+ async with DatabaseChecker(args.endpoint, args.database, path=args.path, logger=logger) as checker:
while iterations is None or iterations > 0:
try:
- checker.run(options)
+ await checker.async_run(options)
except SerializabilityError as e:
e.history.write_to_file(os.path.join(args.output_path, os.path.basename(e.table) + '_history.json'))
raise
@@ -70,6 +64,16 @@ def main():
if iterations is not None:
iterations -= 1
+ def run_single():
+ def handler(signum, frame, sys=sys, logger=logger):
+ logger.warning('Terminating on signal %d', signum)
+ sys.exit(1)
+
+ signal.signal(signal.SIGINT, handler)
+ signal.signal(signal.SIGTERM, handler)
+
+ asyncio.run(async_run_single())
+
def run_multiple():
def handler(signum, frame, sys=sys, logger=logger):
logger.warning('Terminating on signal %d', signum)
diff --git a/ydb/tests/tools/ydb_serializable/lib/__init__.py b/ydb/tests/tools/ydb_serializable/lib/__init__.py
index 0ad3abdaf9..791b8cc139 100644
--- a/ydb/tests/tools/ydb_serializable/lib/__init__.py
+++ b/ydb/tests/tools/ydb_serializable/lib/__init__.py
@@ -3,68 +3,92 @@ import json
import time
import random
from datetime import datetime
-from tornado import gen
-from tornado.ioloop import IOLoop
+import asyncio
from ydb.tests.library.serializability.checker import SerializabilityError, SerializabilityChecker
-import ydb
-
-KEY_PREFIX_TYPE = ydb.TupleType().add_element(ydb.OptionalType(ydb.PrimitiveType.Uint64))
-
-
-QUERY_POINT_READS = '''\
---!syntax_v1
-
-DECLARE $data AS List<Struct<
- key: Uint64>>;
-
-SELECT t.key AS key, t.value AS value
-FROM AS_TABLE($data) AS d
-INNER JOIN `{TABLE}` AS t ON t.key = d.key;
-'''
-
-
-QUERY_POINT_WRITES = '''\
---!syntax_v1
-
-DECLARE $data AS List<Struct<
- key: Uint64,
- value: Uint64>>;
-
-UPSERT INTO `{TABLE}`
-SELECT key, value
-FROM AS_TABLE($data);
-'''
+import ydb.aio
-QUERY_POINT_READS_WRITES = '''\
---!syntax_v1
-
-DECLARE $reads AS List<Struct<
- key: Uint64>>;
-DECLARE $writes AS List<Struct<
- key: Uint64,
- value: Uint64>>;
-
-SELECT t.key AS key, t.value AS value
-FROM AS_TABLE($reads) AS r
-INNER JOIN `{TABLE}` AS t ON t.key = r.key;
-
-UPSERT INTO `{TABLE}`
-SELECT key, value
-FROM AS_TABLE($writes);
-'''
-
-
-QUERY_RANGE_READS = '''\
---!syntax_v1
+KEY_PREFIX_TYPE = ydb.TupleType().add_element(ydb.OptionalType(ydb.PrimitiveType.Uint64))
-DECLARE $minKey AS Uint64;
-DECLARE $maxKey AS Uint64;
-SELECT key, value
-FROM `{TABLE}`
-WHERE key >= $minKey AND key <= $maxKey;
-'''
+def generate_query_point_reads(table, var='$data'):
+ text = '''\
+ DECLARE {var} AS List<Struct<
+ key: Uint64>>;
+
+ SELECT t.key AS key, t.value AS value
+ FROM AS_TABLE({var}) AS d
+ INNER JOIN `{TABLE}` AS t ON t.key = d.key;
+ '''.format(TABLE=table, var=var)
+ row_type = (
+ ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64))
+ return ydb.DataQuery(text, {
+ var: ydb.ListType(row_type),
+ })
+
+
+def generate_query_point_writes(table, var='$data'):
+ text = '''\
+ DECLARE {var} AS List<Struct<
+ key: Uint64,
+ value: Uint64>>;
+
+ UPSERT INTO `{TABLE}`
+ SELECT key, value
+ FROM AS_TABLE({var});
+ '''.format(TABLE=table, var=var)
+ row_type = (
+ ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)
+ .add_member('value', ydb.PrimitiveType.Uint64))
+ return ydb.DataQuery(text, {
+ var: ydb.ListType(row_type),
+ })
+
+
+def generate_query_point_reads_writes(table, readsvar='$reads', writesvar='$writes'):
+ text = '''\
+ DECLARE {readsvar} AS List<Struct<
+ key: Uint64>>;
+ DECLARE {writesvar} AS List<Struct<
+ key: Uint64,
+ value: Uint64>>;
+
+ SELECT t.key AS key, t.value AS value
+ FROM AS_TABLE({readsvar}) AS d
+ INNER JOIN `{TABLE}` AS t ON t.key = d.key;
+
+ UPSERT INTO `{TABLE}`
+ SELECT key, value
+ FROM AS_TABLE({writesvar});
+ '''.format(TABLE=table, readsvar=readsvar, writesvar=writesvar)
+ reads_row_type = (
+ ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64))
+ writes_row_type = (
+ ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)
+ .add_member('value', ydb.PrimitiveType.Uint64))
+ return ydb.DataQuery(text, {
+ readsvar: ydb.ListType(reads_row_type),
+ writesvar: ydb.ListType(writes_row_type),
+ })
+
+
+def generate_query_range_reads(table, minvar='$minKey', maxvar='$maxKey'):
+ text = '''\
+ DECLARE {minvar} AS Uint64;
+ DECLARE {maxvar} AS Uint64;
+
+ SELECT key, value
+ FROM `{TABLE}`
+ WHERE key >= {minvar} AND key <= {maxvar};
+ '''.format(TABLE=table, minvar=minvar, maxvar=maxvar)
+ return ydb.DataQuery(text, {
+ minvar: ydb.PrimitiveType.Uint64,
+ maxvar: ydb.PrimitiveType.Uint64,
+ })
def generate_random_name(cnt=20):
@@ -254,91 +278,55 @@ class DatabaseChecker(object):
self.database = database
self.path = path
self.logger = logger
+ self.pool = None
- ydb.interceptor.monkey_patch_event_handler()
-
- self.driver = ydb.Driver(ydb.ConnectionParams(endpoint, database))
- self.driver.wait()
+ async def async_init(self):
+ if self.pool is None:
+ self.driver = ydb.aio.Driver(ydb.ConnectionParams(self.endpoint, self.database))
+ await self.driver.wait(fail_fast=True)
+ self.pool = ydb.aio.SessionPool(self.driver, 2 ** 32)
- self.sessions = []
-
- def __enter__(self):
+ async def __aenter__(self):
+ await self.async_init()
return self
- def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
- self.driver.stop()
-
- class SessionContext(object):
- def __init__(self, checker, session):
- self._checker = checker
- self._session = session
+ async def __aexit__(self, exc_type=None, exc_val=None, exc_tb=None):
+ await self.pool.stop()
+ await self.driver.stop()
- def __enter__(self):
- return self._session
-
- def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
- assert self._session is not None
- if exc_type is not None or self._session.closing():
- # self._session.delete()
- self._session = None
- else:
- self._checker.sessions.append(self._session)
- self._session = None
-
- @gen.coroutine
- def async_session(self):
- if self.sessions:
- session = self.sessions.pop()
- else:
- session = yield self.driver.table_client.session().async_create()
- raise gen.Return(self.SessionContext(self, session))
-
- def sync_session(self):
- if self.sessions:
- session = self.sessions.pop()
- else:
- session = self.driver.table_client.session().create()
- return self.SessionContext(self, session)
-
- @gen.coroutine
- def async_retry_operation(self, callable, deadline):
+ async def async_retry_operation(self, callable, deadline):
while time.time() < deadline:
try:
- with (yield self.async_session()) as session:
+ async with self.pool.checkout() as session:
try:
- result = yield callable(session)
+ result = await callable(session)
except (ydb.Aborted, ydb.Undetermined, ydb.NotFound, ydb.InternalError):
raise # these are not retried
except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
continue # retry
except ydb.BadSession:
continue # retry
- raise gen.Return(result)
+ return result
else:
raise ydb.Aborted('deadline reached')
- @gen.coroutine
- def async_perform_point_reads(self, history, table, options, checker, deadline):
- read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
- '$data': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)),
- })
+ async def async_perform_point_reads(self, history, table, options, checker, deadline):
+ read_query = generate_query_point_reads(table)
while time.time() < deadline:
keys = checker.select_read_from_write_keys(cnt=random.randint(1, options.shards))
if not keys:
# There are not enough in-progress writes to this table yet, spin a little
- yield gen.sleep(0.000001)
+ await asyncio.sleep(0.000001)
continue
node = history.add(History.Begin('reads', None, read_keys=keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
- rss = yield tx.async_execute(
+ rss = await tx.execute(
read_query,
parameters={
'$data': [
@@ -348,17 +336,17 @@ class DatabaseChecker(object):
commit_tx=simple_tx,
)
if not simple_tx:
- yield tx.async_commit()
- raise gen.Return(rss)
+ await tx.commit()
+ return rss
finally:
if tx.tx_id is not None:
try:
- yield tx.async_rollback()
+ await tx.rollback()
except ydb.Error:
pass
try:
- rss = yield self.async_retry_operation(perform, deadline)
+ rss = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('reads', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -370,25 +358,19 @@ class DatabaseChecker(object):
history.add(History.Commit('reads', node.value, values)).apply_to(checker)
- @gen.coroutine
- def async_perform_point_writes(self, history, table, options, checker, deadline):
- write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), {
- '$data': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)
- .add_member('value', ydb.PrimitiveType.Uint64)),
- })
+ async def async_perform_point_writes(self, history, table, options, checker, deadline):
+ write_query = generate_query_point_writes(table)
while time.time() < deadline:
keys = checker.select_write_keys(cnt=random.randint(1, options.shards))
node = history.add(History.Begin('writes', None, write_keys=keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
- yield tx.async_execute(
+ await tx.execute(
write_query,
parameters={
'$data': [
@@ -398,16 +380,16 @@ class DatabaseChecker(object):
commit_tx=simple_tx,
)
if not simple_tx:
- yield tx.async_commit()
+ await tx.commit()
finally:
if tx.tx_id is not None:
try:
- yield tx.async_rollback()
+ await tx.rollback()
except ydb.Error:
pass
try:
- yield self.async_retry_operation(perform, deadline)
+ await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('writes', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -417,24 +399,10 @@ class DatabaseChecker(object):
checker.release_write_keys(keys)
- @gen.coroutine
- def async_perform_point_reads_writes(self, history, table, options, checker, deadline, keysets):
- read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
- '$data': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)),
- })
- write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), {
- '$data': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)
- .add_member('value', ydb.PrimitiveType.Uint64)),
- })
- read_write_query = ydb.DataQuery(QUERY_POINT_READS_WRITES.format(TABLE=table), {
- '$reads': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)),
- '$writes': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)
- .add_member('value', ydb.PrimitiveType.Uint64)),
- })
+ async def async_perform_point_reads_writes(self, history, table, options, checker, deadline, keysets):
+ read_query = generate_query_point_reads(table)
+ write_query = generate_query_point_writes(table)
+ read_write_query = generate_query_point_reads_writes(table)
while time.time() < deadline:
read_keys = checker.select_read_keys(cnt=random.randint(1, options.shards))
@@ -444,14 +412,13 @@ class DatabaseChecker(object):
node = history.add(History.Begin('reads+writes', None, read_keys=read_keys, write_keys=write_keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
# Read/Write tx may fail with TLI
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
if simple_tx:
- rss = yield tx.async_execute(
+ rss = await tx.execute(
read_write_query,
parameters={
'$reads': [
@@ -464,7 +431,7 @@ class DatabaseChecker(object):
commit_tx=True,
)
else:
- rss = yield tx.async_execute(
+ rss = await tx.execute(
read_query,
parameters={
'$data': [
@@ -473,7 +440,7 @@ class DatabaseChecker(object):
},
commit_tx=False,
)
- yield tx.async_execute(
+ await tx.execute(
write_query,
parameters={
'$data': [
@@ -482,16 +449,16 @@ class DatabaseChecker(object):
},
commit_tx=True,
)
- raise gen.Return(rss)
+ return rss
finally:
if tx.tx_id is not None:
try:
- yield tx.async_rollback()
+ await tx.rollback()
except ydb.Error:
pass
try:
- rss = yield self.async_retry_operation(perform, deadline)
+ rss = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('reads+writes', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -507,29 +474,24 @@ class DatabaseChecker(object):
checker.release_write_keys(write_keys)
- @gen.coroutine
- def async_perform_verifying_reads(self, history, table, options, checker, deadline, keysets):
- read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
- '$data': ydb.ListType(ydb.StructType()
- .add_member('key', ydb.PrimitiveType.Uint64)),
- })
+ async def async_perform_verifying_reads(self, history, table, options, checker, deadline, keysets):
+ read_query = generate_query_point_reads(table)
while time.time() < deadline:
if not keysets:
# There are not enough in-progress writes to this table yet, spin a little
- yield gen.sleep(0.000001)
+ await asyncio.sleep(0.000001)
continue
keys = random.choice(list(keysets))
node = history.add(History.Begin('reads_of_writes', None, read_keys=keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
- rss = yield tx.async_execute(
+ rss = await tx.execute(
read_query,
parameters={
'$data': [
@@ -539,17 +501,17 @@ class DatabaseChecker(object):
commit_tx=simple_tx,
)
if not simple_tx:
- yield tx.async_commit()
- raise gen.Return(rss)
+ await tx.commit()
+ return rss
finally:
if tx.tx_id is not None:
try:
- yield tx.async_rollback()
+ await tx.rollback()
except ydb.Error:
pass
try:
- rss = yield self.async_retry_operation(perform, deadline)
+ rss = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('reads_of_writes', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -561,12 +523,8 @@ class DatabaseChecker(object):
history.add(History.Commit('reads_of_writes', node.value, values)).apply_to(checker)
- @gen.coroutine
- def async_perform_range_reads(self, history, table, options, checker, deadline):
- range_query = ydb.DataQuery(QUERY_RANGE_READS.format(TABLE=table), {
- '$minKey': ydb.PrimitiveType.Uint64,
- '$maxKey': ydb.PrimitiveType.Uint64,
- })
+ async def async_perform_range_reads(self, history, table, options, checker, deadline):
+ range_query = generate_query_range_reads(table)
while time.time() < deadline:
min_key = random.randint(0, options.keys)
@@ -575,12 +533,11 @@ class DatabaseChecker(object):
node = history.add(History.Begin('read_range', None, read_keys=read_keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
- rss = yield tx.async_execute(
+ rss = await tx.execute(
range_query,
parameters={
'$minKey': min_key,
@@ -589,17 +546,17 @@ class DatabaseChecker(object):
commit_tx=simple_tx,
)
if not simple_tx:
- yield tx.async_commit()
- raise gen.Return(rss)
+ await tx.commit()
+ return rss
finally:
if tx.tx_id is not None:
try:
- yield tx.async_rollback()
+ await tx.rollback()
except ydb.Error:
pass
try:
- rss = yield self.async_retry_operation(perform, deadline)
+ rss = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('read_range', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -611,8 +568,7 @@ class DatabaseChecker(object):
history.add(History.Commit('read_range', node.value, values)).apply_to(checker)
- @gen.coroutine
- def async_perform_read_tables(self, history, table, options, checker, deadline):
+ async def async_perform_read_tables(self, history, table, options, checker, deadline):
while time.time() < deadline:
if options.read_table_ranges:
min_key = random.randint(0, options.keys)
@@ -628,24 +584,20 @@ class DatabaseChecker(object):
if not options.ignore_read_table:
node = history.add(History.Begin('read_table', None, read_keys=read_keys)).apply_to(checker)
- @gen.coroutine
- def perform(session):
+ async def perform(session):
values = {}
failed = False
- for chunk_future in session.async_read_table(table, key_range, use_snapshot=options.read_table_snapshot):
- try:
- chunk = yield chunk_future
- except StopIteration:
- break
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- failed = True
- break
- for row in chunk.rows:
- values[row.key] = row.value
- raise gen.Return((values, failed))
+ chunks = await session.read_table(table, key_range, use_snapshot=options.read_table_snapshot)
+ try:
+ async for chunk in chunks:
+ for row in chunk.rows:
+ values[row.key] = row.value
+ except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
+ failed = True
+ return (values, failed)
try:
- values, failed = yield self.async_retry_operation(perform, deadline)
+ values, failed = await self.async_retry_operation(perform, deadline)
except ydb.Aborted:
history.add(History.Abort('read_table', node.value)).apply_to(checker)
except ydb.Undetermined:
@@ -655,8 +607,7 @@ class DatabaseChecker(object):
# We mark ReadTable committed even when it fails
history.add(History.Commit('read_table', node.value, values, flags='partial_read' if failed else None)).apply_to(checker)
- @gen.coroutine
- def async_perform_test(self, history, table, options, checker):
+ async def async_perform_test(self, history, table, options, checker):
futures = []
deadline = time.time() + options.seconds
@@ -678,12 +629,12 @@ class DatabaseChecker(object):
for _ in range(options.readtablers):
futures.append(self.async_perform_read_tables(history, table, options, checker, deadline=deadline))
- waiter = gen.WaitIterator(*futures)
- while not waiter.done():
- yield waiter.next()
+ random.shuffle(futures)
- def before_test(self, table, options):
- with self.sync_session() as session:
+ await asyncio.gather(*futures)
+
+ async def async_before_test(self, table, options):
+ async def perform(session):
splits = []
for i in range(options.shards - 1):
splits.append(ydb.KeyBound((options.keys * (i + 1) // options.shards,)))
@@ -707,10 +658,12 @@ class DatabaseChecker(object):
if self.logger is not None:
self.logger.info('Creating %s', table)
- session.create_table(table, description)
+ await session.create_table(table, description)
+
+ await self.pool.retry_operation(perform)
- def before_verify(self, history, table, options, checker):
- with self.sync_session() as session:
+ async def async_before_verify(self, history, table, options, checker):
+ async def perform(session):
node = history.add(History.Begin('read_table_final', None, read_keys=checker.keys, linearizable='global')).apply_to(checker)
if self.logger is not None:
@@ -719,7 +672,8 @@ class DatabaseChecker(object):
while True:
values = {}
try:
- for chunk in session.read_table(table):
+ chunks = await session.read_table(table)
+ async for chunk in chunks:
for row in chunk.rows:
values[row.key] = row.value
except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
@@ -731,20 +685,24 @@ class DatabaseChecker(object):
history.add(History.Commit('read_table_final', node.value, values)).apply_to(checker)
- def after_test(self, table, options):
- with self.sync_session() as session:
+ await self.pool.retry_operation(perform)
+
+ async def async_after_test(self, table, options):
+ async def perform(session):
# Avoid leaving tables around
if self.logger is not None:
self.logger.info('Dropping %s', table)
- session.drop_table(table)
+ await session.drop_table(table)
+
+ await self.pool.retry_operation(perform)
- def run(self, options=None):
+ async def async_run(self, options=None):
if options is None:
options = DatabaseCheckerOptions()
table = self.path + '/' + datetime.now().strftime('%Y%m%d_%H%M%S_') + generate_random_name()
- self.before_test(table, options)
+ await self.async_before_test(table, options)
keep_data = False
@@ -757,9 +715,9 @@ class DatabaseChecker(object):
if self.logger is not None:
self.logger.info('Starting load test...')
- IOLoop.current().run_sync(lambda: self.async_perform_test(history, table, options, checker))
+ await self.async_perform_test(history, table, options, checker)
- self.before_verify(history, table, options, checker)
+ await self.async_before_verify(history, table, options, checker)
if self.logger is not None:
self.logger.info(
@@ -779,7 +737,7 @@ class DatabaseChecker(object):
finally:
if not keep_data:
- self.after_test(table, options)
+ await self.async_after_test(table, options)
if self.logger is not None:
self.logger.info('OK')
diff --git a/ydb/tests/tools/ydb_serializable/lib/ya.make b/ydb/tests/tools/ydb_serializable/lib/ya.make
index 1d7266c897..a0b5200d7b 100644
--- a/ydb/tests/tools/ydb_serializable/lib/ya.make
+++ b/ydb/tests/tools/ydb_serializable/lib/ya.make
@@ -1,9 +1,8 @@
PY3_LIBRARY()
PEERDIR(
- contrib/python/tornado/tornado-4
ydb/tests/library
- ydb/public/sdk/python
+ ydb/public/sdk/python3
)
PY_SRCS(__init__.py)