aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-01-25 14:35:42 +0300
committersnaury <snaury@ydb.tech>2023-01-25 14:35:42 +0300
commit9891487626af4bc05ee05ac05f06238704fac139 (patch)
tree9972d158c4b7ddeb3e62705b151904c5d9c428e0
parentc82bf5bd4f14e4b564537b009e78e549e888976a (diff)
downloadydb-9891487626af4bc05ee05ac05f06238704fac139.tar.gz
Add retries to ydb_serializable to support session rebalancing
-rw-r--r--ydb/tests/tools/ydb_serializable/lib/__init__.py305
1 files changed, 187 insertions, 118 deletions
diff --git a/ydb/tests/tools/ydb_serializable/lib/__init__.py b/ydb/tests/tools/ydb_serializable/lib/__init__.py
index 4816864dc6..0ad3abdaf9 100644
--- a/ydb/tests/tools/ydb_serializable/lib/__init__.py
+++ b/ydb/tests/tools/ydb_serializable/lib/__init__.py
@@ -301,20 +301,40 @@ class DatabaseChecker(object):
return self.SessionContext(self, session)
@gen.coroutine
- def async_perform_point_reads(self, history, table, options, checker, deadline):
- with (yield self.async_session()) as session:
- read_query = yield session.async_prepare(QUERY_POINT_READS.format(
- TABLE=table))
-
- while time.time() < deadline:
- keys = checker.select_read_from_write_keys(cnt=random.randint(1, options.shards))
- if not keys:
- # There are not enough in-progress writes to this table yet, spin a little
- yield gen.sleep(0.000001)
- continue
-
- node = history.add(History.Begin('reads', None, read_keys=keys)).apply_to(checker)
+ def async_retry_operation(self, callable, deadline):
+ while time.time() < deadline:
+ try:
+ with (yield self.async_session()) as session:
+ try:
+ result = yield callable(session)
+ except (ydb.Aborted, ydb.Undetermined, ydb.NotFound, ydb.InternalError):
+ raise # these are not retried
+ except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
+ continue # retry
+ except ydb.BadSession:
+ continue # retry
+ raise gen.Return(result)
+ else:
+ raise ydb.Aborted('deadline reached')
+ @gen.coroutine
+ def async_perform_point_reads(self, history, table, options, checker, deadline):
+ read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
+ '$data': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)),
+ })
+
+ while time.time() < deadline:
+ keys = checker.select_read_from_write_keys(cnt=random.randint(1, options.shards))
+ if not keys:
+ # There are not enough in-progress writes to this table yet, spin a little
+ yield gen.sleep(0.000001)
+ continue
+
+ node = history.add(History.Begin('reads', None, read_keys=keys)).apply_to(checker)
+
+ @gen.coroutine
+ def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
@@ -329,16 +349,7 @@ class DatabaseChecker(object):
)
if not simple_tx:
yield tx.async_commit()
- except ydb.Aborted:
- history.add(History.Abort('reads', node.value)).apply_to(checker)
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- pass # transaction outcome unknown
- else:
- values = {}
- for row in rss[0].rows:
- values[row.key] = row.value
-
- history.add(History.Commit('reads', node.value, values)).apply_to(checker)
+ raise gen.Return(rss)
finally:
if tx.tx_id is not None:
try:
@@ -346,17 +357,34 @@ class DatabaseChecker(object):
except ydb.Error:
pass
+ try:
+ rss = yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('reads', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
+ values = {}
+ for row in rss[0].rows:
+ values[row.key] = row.value
+
+ history.add(History.Commit('reads', node.value, values)).apply_to(checker)
+
@gen.coroutine
def async_perform_point_writes(self, history, table, options, checker, deadline):
- with (yield self.async_session()) as session:
- write_query = yield session.async_prepare(QUERY_POINT_WRITES.format(
- TABLE=table))
+ write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), {
+ '$data': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)
+ .add_member('value', ydb.PrimitiveType.Uint64)),
+ })
- while time.time() < deadline:
- keys = checker.select_write_keys(cnt=random.randint(1, options.shards))
+ while time.time() < deadline:
+ keys = checker.select_write_keys(cnt=random.randint(1, options.shards))
- node = history.add(History.Begin('writes', None, write_keys=keys)).apply_to(checker)
+ node = history.add(History.Begin('writes', None, write_keys=keys)).apply_to(checker)
+ @gen.coroutine
+ def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
@@ -371,12 +399,6 @@ class DatabaseChecker(object):
)
if not simple_tx:
yield tx.async_commit()
- except ydb.Aborted:
- history.add(History.Abort('writes', node.value)).apply_to(checker)
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- pass # transaction outcome unknown
- else:
- history.add(History.Commit('writes', node.value)).apply_to(checker)
finally:
if tx.tx_id is not None:
try:
@@ -384,26 +406,46 @@ class DatabaseChecker(object):
except ydb.Error:
pass
- checker.release_write_keys(keys)
+ try:
+ yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('writes', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
+ history.add(History.Commit('writes', node.value)).apply_to(checker)
+
+ checker.release_write_keys(keys)
@gen.coroutine
def async_perform_point_reads_writes(self, history, table, options, checker, deadline, keysets):
- with (yield self.async_session()) as session:
- read_query = yield session.async_prepare(QUERY_POINT_READS.format(
- TABLE=table))
- write_query = yield session.async_prepare(QUERY_POINT_WRITES.format(
- TABLE=table))
- read_write_query = yield session.async_prepare(QUERY_POINT_READS_WRITES.format(
- TABLE=table))
-
- while time.time() < deadline:
- read_keys = checker.select_read_keys(cnt=random.randint(1, options.shards))
- write_keys = checker.select_write_keys(cnt=random.randint(1, options.shards))
-
- keysets.add(tuple(sorted(write_keys)))
-
- node = history.add(History.Begin('reads+writes', None, read_keys=read_keys, write_keys=write_keys)).apply_to(checker)
-
+ read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
+ '$data': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)),
+ })
+ write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), {
+ '$data': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)
+ .add_member('value', ydb.PrimitiveType.Uint64)),
+ })
+ read_write_query = ydb.DataQuery(QUERY_POINT_READS_WRITES.format(TABLE=table), {
+ '$reads': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)),
+ '$writes': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)
+ .add_member('value', ydb.PrimitiveType.Uint64)),
+ })
+
+ while time.time() < deadline:
+ read_keys = checker.select_read_keys(cnt=random.randint(1, options.shards))
+ write_keys = checker.select_write_keys(cnt=random.randint(1, options.shards))
+
+ keysets.add(tuple(sorted(write_keys)))
+
+ node = history.add(History.Begin('reads+writes', None, read_keys=read_keys, write_keys=write_keys)).apply_to(checker)
+
+ @gen.coroutine
+ def perform(session):
# Read/Write tx may fail with TLI
tx = session.transaction(ydb.SerializableReadWrite())
try:
@@ -440,16 +482,7 @@ class DatabaseChecker(object):
},
commit_tx=True,
)
- except ydb.Aborted:
- history.add(History.Abort('reads+writes', node.value)).apply_to(checker)
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- pass # transaction outcome unknown
- else:
- values = {}
- for row in rss[0].rows:
- values[row.key] = row.value
-
- history.add(History.Commit('reads+writes', node.value, values)).apply_to(checker)
+ raise gen.Return(rss)
finally:
if tx.tx_id is not None:
try:
@@ -457,26 +490,42 @@ class DatabaseChecker(object):
except ydb.Error:
pass
- keysets.discard(tuple(sorted(write_keys)))
+ try:
+ rss = yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('reads+writes', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
+ values = {}
+ for row in rss[0].rows:
+ values[row.key] = row.value
+
+ history.add(History.Commit('reads+writes', node.value, values)).apply_to(checker)
- checker.release_write_keys(write_keys)
+ keysets.discard(tuple(sorted(write_keys)))
+
+ checker.release_write_keys(write_keys)
@gen.coroutine
def async_perform_verifying_reads(self, history, table, options, checker, deadline, keysets):
- with (yield self.async_session()) as session:
- read_query = yield session.async_prepare(QUERY_POINT_READS.format(
- TABLE=table))
-
- while time.time() < deadline:
- if not keysets:
- # There are not enough in-progress writes to this table yet, spin a little
- yield gen.sleep(0.000001)
- continue
+ read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), {
+ '$data': ydb.ListType(ydb.StructType()
+ .add_member('key', ydb.PrimitiveType.Uint64)),
+ })
- keys = random.choice(list(keysets))
+ while time.time() < deadline:
+ if not keysets:
+ # There are not enough in-progress writes to this table yet, spin a little
+ yield gen.sleep(0.000001)
+ continue
- node = history.add(History.Begin('reads_of_writes', None, read_keys=keys)).apply_to(checker)
+ keys = random.choice(list(keysets))
+ node = history.add(History.Begin('reads_of_writes', None, read_keys=keys)).apply_to(checker)
+
+ @gen.coroutine
+ def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
@@ -491,16 +540,7 @@ class DatabaseChecker(object):
)
if not simple_tx:
yield tx.async_commit()
- except ydb.Aborted:
- history.add(History.Abort('reads_of_writes', node.value)).apply_to(checker)
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- pass # transaction outcome unknown
- else:
- values = {}
- for row in rss[0].rows:
- values[row.key] = row.value
-
- history.add(History.Commit('reads_of_writes', node.value, values)).apply_to(checker)
+ raise gen.Return(rss)
finally:
if tx.tx_id is not None:
try:
@@ -508,19 +548,35 @@ class DatabaseChecker(object):
except ydb.Error:
pass
+ try:
+ rss = yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('reads_of_writes', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
+ values = {}
+ for row in rss[0].rows:
+ values[row.key] = row.value
+
+ history.add(History.Commit('reads_of_writes', node.value, values)).apply_to(checker)
+
@gen.coroutine
def async_perform_range_reads(self, history, table, options, checker, deadline):
- with (yield self.async_session()) as session:
- range_query = yield session.async_prepare(QUERY_RANGE_READS.format(
- TABLE=table))
+ range_query = ydb.DataQuery(QUERY_RANGE_READS.format(TABLE=table), {
+ '$minKey': ydb.PrimitiveType.Uint64,
+ '$maxKey': ydb.PrimitiveType.Uint64,
+ })
- while time.time() < deadline:
- min_key = random.randint(0, options.keys)
- max_key = random.randint(min_key, options.keys)
- read_keys = list(range(min_key, max_key + 1))
+ while time.time() < deadline:
+ min_key = random.randint(0, options.keys)
+ max_key = random.randint(min_key, options.keys)
+ read_keys = list(range(min_key, max_key + 1))
- node = history.add(History.Begin('read_range', None, read_keys=read_keys)).apply_to(checker)
+ node = history.add(History.Begin('read_range', None, read_keys=read_keys)).apply_to(checker)
+ @gen.coroutine
+ def perform(session):
tx = session.transaction(ydb.SerializableReadWrite())
try:
simple_tx = bool(random.randint(0, 1))
@@ -534,16 +590,7 @@ class DatabaseChecker(object):
)
if not simple_tx:
yield tx.async_commit()
- except ydb.Aborted:
- history.add(History.Abort('read_range', node.value)).apply_to(checker)
- except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError):
- pass # transaction outcome unknown
- else:
- values = {}
- for row in rss[0].rows:
- values[row.key] = row.value
-
- history.add(History.Commit('read_range', node.value, values)).apply_to(checker)
+ raise gen.Return(rss)
finally:
if tx.tx_id is not None:
try:
@@ -551,24 +598,38 @@ class DatabaseChecker(object):
except ydb.Error:
pass
+ try:
+ rss = yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('read_range', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
+ values = {}
+ for row in rss[0].rows:
+ values[row.key] = row.value
+
+ history.add(History.Commit('read_range', node.value, values)).apply_to(checker)
+
@gen.coroutine
def async_perform_read_tables(self, history, table, options, checker, deadline):
- with (yield self.async_session()) as session:
- while time.time() < deadline:
- if options.read_table_ranges:
- min_key = random.randint(0, options.keys)
- max_key = random.randint(min_key, options.keys)
- read_keys = list(range(min_key, max_key + 1))
- key_range = ydb.KeyRange(
- ydb.KeyBound((min_key,), KEY_PREFIX_TYPE, inclusive=True),
- ydb.KeyBound((max_key,), KEY_PREFIX_TYPE, inclusive=True))
- else:
- read_keys = sorted(checker.keys)
- key_range = None
+ while time.time() < deadline:
+ if options.read_table_ranges:
+ min_key = random.randint(0, options.keys)
+ max_key = random.randint(min_key, options.keys)
+ read_keys = list(range(min_key, max_key + 1))
+ key_range = ydb.KeyRange(
+ ydb.KeyBound((min_key,), KEY_PREFIX_TYPE, inclusive=True),
+ ydb.KeyBound((max_key,), KEY_PREFIX_TYPE, inclusive=True))
+ else:
+ read_keys = sorted(checker.keys)
+ key_range = None
- if not options.ignore_read_table:
- node = history.add(History.Begin('read_table', None, read_keys=read_keys)).apply_to(checker)
+ if not options.ignore_read_table:
+ node = history.add(History.Begin('read_table', None, read_keys=read_keys)).apply_to(checker)
+ @gen.coroutine
+ def perform(session):
values = {}
failed = False
for chunk_future in session.async_read_table(table, key_range, use_snapshot=options.read_table_snapshot):
@@ -581,7 +642,15 @@ class DatabaseChecker(object):
break
for row in chunk.rows:
values[row.key] = row.value
-
+ raise gen.Return((values, failed))
+
+ try:
+ values, failed = yield self.async_retry_operation(perform, deadline)
+ except ydb.Aborted:
+ history.add(History.Abort('read_table', node.value)).apply_to(checker)
+ except ydb.Undetermined:
+ pass # transaction outcome unknown
+ else:
if not options.ignore_read_table:
# We mark ReadTable committed even when it fails
history.add(History.Commit('read_table', node.value, values, flags='partial_read' if failed else None)).apply_to(checker)