diff options
author | snaury <snaury@ydb.tech> | 2023-01-25 14:35:42 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-01-25 14:35:42 +0300 |
commit | 9891487626af4bc05ee05ac05f06238704fac139 (patch) | |
tree | 9972d158c4b7ddeb3e62705b151904c5d9c428e0 | |
parent | c82bf5bd4f14e4b564537b009e78e549e888976a (diff) | |
download | ydb-9891487626af4bc05ee05ac05f06238704fac139.tar.gz |
Add retries to ydb_serializable to support session rebalancing
-rw-r--r-- | ydb/tests/tools/ydb_serializable/lib/__init__.py | 305 |
1 files changed, 187 insertions, 118 deletions
diff --git a/ydb/tests/tools/ydb_serializable/lib/__init__.py b/ydb/tests/tools/ydb_serializable/lib/__init__.py index 4816864dc6..0ad3abdaf9 100644 --- a/ydb/tests/tools/ydb_serializable/lib/__init__.py +++ b/ydb/tests/tools/ydb_serializable/lib/__init__.py @@ -301,20 +301,40 @@ class DatabaseChecker(object): return self.SessionContext(self, session) @gen.coroutine - def async_perform_point_reads(self, history, table, options, checker, deadline): - with (yield self.async_session()) as session: - read_query = yield session.async_prepare(QUERY_POINT_READS.format( - TABLE=table)) - - while time.time() < deadline: - keys = checker.select_read_from_write_keys(cnt=random.randint(1, options.shards)) - if not keys: - # There are not enough in-progress writes to this table yet, spin a little - yield gen.sleep(0.000001) - continue - - node = history.add(History.Begin('reads', None, read_keys=keys)).apply_to(checker) + def async_retry_operation(self, callable, deadline): + while time.time() < deadline: + try: + with (yield self.async_session()) as session: + try: + result = yield callable(session) + except (ydb.Aborted, ydb.Undetermined, ydb.NotFound, ydb.InternalError): + raise # these are not retried + except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): + continue # retry + except ydb.BadSession: + continue # retry + raise gen.Return(result) + else: + raise ydb.Aborted('deadline reached') + @gen.coroutine + def async_perform_point_reads(self, history, table, options, checker, deadline): + read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), { + '$data': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64)), + }) + + while time.time() < deadline: + keys = checker.select_read_from_write_keys(cnt=random.randint(1, options.shards)) + if not keys: + # There are not enough in-progress writes to this table yet, spin a little + yield gen.sleep(0.000001) + continue + + node = history.add(History.Begin('reads', None, read_keys=keys)).apply_to(checker) + + @gen.coroutine + def perform(session): tx = session.transaction(ydb.SerializableReadWrite()) try: simple_tx = bool(random.randint(0, 1)) @@ -329,16 +349,7 @@ class DatabaseChecker(object): ) if not simple_tx: yield tx.async_commit() - except ydb.Aborted: - history.add(History.Abort('reads', node.value)).apply_to(checker) - except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): - pass # transaction outcome unknown - else: - values = {} - for row in rss[0].rows: - values[row.key] = row.value - - history.add(History.Commit('reads', node.value, values)).apply_to(checker) + raise gen.Return(rss) finally: if tx.tx_id is not None: try: @@ -346,17 +357,34 @@ class DatabaseChecker(object): except ydb.Error: pass + try: + rss = yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('reads', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: + values = {} + for row in rss[0].rows: + values[row.key] = row.value + + history.add(History.Commit('reads', node.value, values)).apply_to(checker) + @gen.coroutine def async_perform_point_writes(self, history, table, options, checker, deadline): - with (yield self.async_session()) as session: - write_query = yield session.async_prepare(QUERY_POINT_WRITES.format( - TABLE=table)) + write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), { + '$data': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64) + .add_member('value', ydb.PrimitiveType.Uint64)), + }) - while time.time() < deadline: - keys = checker.select_write_keys(cnt=random.randint(1, options.shards)) + while time.time() < deadline: + keys = checker.select_write_keys(cnt=random.randint(1, options.shards)) - node = history.add(History.Begin('writes', None, write_keys=keys)).apply_to(checker) + node = history.add(History.Begin('writes', None, write_keys=keys)).apply_to(checker) + @gen.coroutine + def perform(session): tx = session.transaction(ydb.SerializableReadWrite()) try: simple_tx = bool(random.randint(0, 1)) @@ -371,12 +399,6 @@ class DatabaseChecker(object): ) if not simple_tx: yield tx.async_commit() - except ydb.Aborted: - history.add(History.Abort('writes', node.value)).apply_to(checker) - except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): - pass # transaction outcome unknown - else: - history.add(History.Commit('writes', node.value)).apply_to(checker) finally: if tx.tx_id is not None: try: @@ -384,26 +406,46 @@ class DatabaseChecker(object): except ydb.Error: pass - checker.release_write_keys(keys) + try: + yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('writes', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: + history.add(History.Commit('writes', node.value)).apply_to(checker) + + checker.release_write_keys(keys) @gen.coroutine def async_perform_point_reads_writes(self, history, table, options, checker, deadline, keysets): - with (yield self.async_session()) as session: - read_query = yield session.async_prepare(QUERY_POINT_READS.format( - TABLE=table)) - write_query = yield session.async_prepare(QUERY_POINT_WRITES.format( - TABLE=table)) - read_write_query = yield session.async_prepare(QUERY_POINT_READS_WRITES.format( - TABLE=table)) - - while time.time() < deadline: - read_keys = checker.select_read_keys(cnt=random.randint(1, options.shards)) - write_keys = checker.select_write_keys(cnt=random.randint(1, options.shards)) - - keysets.add(tuple(sorted(write_keys))) - - node = history.add(History.Begin('reads+writes', None, read_keys=read_keys, write_keys=write_keys)).apply_to(checker) - + read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), { + '$data': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64)), + }) + write_query = ydb.DataQuery(QUERY_POINT_WRITES.format(TABLE=table), { + '$data': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64) + .add_member('value', ydb.PrimitiveType.Uint64)), + }) + read_write_query = ydb.DataQuery(QUERY_POINT_READS_WRITES.format(TABLE=table), { + '$reads': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64)), + '$writes': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64) + .add_member('value', ydb.PrimitiveType.Uint64)), + }) + + while time.time() < deadline: + read_keys = checker.select_read_keys(cnt=random.randint(1, options.shards)) + write_keys = checker.select_write_keys(cnt=random.randint(1, options.shards)) + + keysets.add(tuple(sorted(write_keys))) + + node = history.add(History.Begin('reads+writes', None, read_keys=read_keys, write_keys=write_keys)).apply_to(checker) + + @gen.coroutine + def perform(session): # Read/Write tx may fail with TLI tx = session.transaction(ydb.SerializableReadWrite()) try: @@ -440,16 +482,7 @@ class DatabaseChecker(object): }, commit_tx=True, ) - except ydb.Aborted: - history.add(History.Abort('reads+writes', node.value)).apply_to(checker) - except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): - pass # transaction outcome unknown - else: - values = {} - for row in rss[0].rows: - values[row.key] = row.value - - history.add(History.Commit('reads+writes', node.value, values)).apply_to(checker) + raise gen.Return(rss) finally: if tx.tx_id is not None: try: @@ -457,26 +490,42 @@ class DatabaseChecker(object): except ydb.Error: pass - keysets.discard(tuple(sorted(write_keys))) + try: + rss = yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('reads+writes', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: + values = {} + for row in rss[0].rows: + values[row.key] = row.value + + history.add(History.Commit('reads+writes', node.value, values)).apply_to(checker) - checker.release_write_keys(write_keys) + keysets.discard(tuple(sorted(write_keys))) + + checker.release_write_keys(write_keys) @gen.coroutine def async_perform_verifying_reads(self, history, table, options, checker, deadline, keysets): - with (yield self.async_session()) as session: - read_query = yield session.async_prepare(QUERY_POINT_READS.format( - TABLE=table)) - - while time.time() < deadline: - if not keysets: - # There are not enough in-progress writes to this table yet, spin a little - yield gen.sleep(0.000001) - continue + read_query = ydb.DataQuery(QUERY_POINT_READS.format(TABLE=table), { + '$data': ydb.ListType(ydb.StructType() + .add_member('key', ydb.PrimitiveType.Uint64)), + }) - keys = random.choice(list(keysets)) + while time.time() < deadline: + if not keysets: + # There are not enough in-progress writes to this table yet, spin a little + yield gen.sleep(0.000001) + continue - node = history.add(History.Begin('reads_of_writes', None, read_keys=keys)).apply_to(checker) + keys = random.choice(list(keysets)) + node = history.add(History.Begin('reads_of_writes', None, read_keys=keys)).apply_to(checker) + + @gen.coroutine + def perform(session): tx = session.transaction(ydb.SerializableReadWrite()) try: simple_tx = bool(random.randint(0, 1)) @@ -491,16 +540,7 @@ class DatabaseChecker(object): ) if not simple_tx: yield tx.async_commit() - except ydb.Aborted: - history.add(History.Abort('reads_of_writes', node.value)).apply_to(checker) - except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): - pass # transaction outcome unknown - else: - values = {} - for row in rss[0].rows: - values[row.key] = row.value - - history.add(History.Commit('reads_of_writes', node.value, values)).apply_to(checker) + raise gen.Return(rss) finally: if tx.tx_id is not None: try: @@ -508,19 +548,35 @@ class DatabaseChecker(object): except ydb.Error: pass + try: + rss = yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('reads_of_writes', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: + values = {} + for row in rss[0].rows: + values[row.key] = row.value + + history.add(History.Commit('reads_of_writes', node.value, values)).apply_to(checker) + @gen.coroutine def async_perform_range_reads(self, history, table, options, checker, deadline): - with (yield self.async_session()) as session: - range_query = yield session.async_prepare(QUERY_RANGE_READS.format( - TABLE=table)) + range_query = ydb.DataQuery(QUERY_RANGE_READS.format(TABLE=table), { + '$minKey': ydb.PrimitiveType.Uint64, + '$maxKey': ydb.PrimitiveType.Uint64, + }) - while time.time() < deadline: - min_key = random.randint(0, options.keys) - max_key = random.randint(min_key, options.keys) - read_keys = list(range(min_key, max_key + 1)) + while time.time() < deadline: + min_key = random.randint(0, options.keys) + max_key = random.randint(min_key, options.keys) + read_keys = list(range(min_key, max_key + 1)) - node = history.add(History.Begin('read_range', None, read_keys=read_keys)).apply_to(checker) + node = history.add(History.Begin('read_range', None, read_keys=read_keys)).apply_to(checker) + @gen.coroutine + def perform(session): tx = session.transaction(ydb.SerializableReadWrite()) try: simple_tx = bool(random.randint(0, 1)) @@ -534,16 +590,7 @@ class DatabaseChecker(object): ) if not simple_tx: yield tx.async_commit() - except ydb.Aborted: - history.add(History.Abort('read_range', node.value)).apply_to(checker) - except (ydb.Unavailable, ydb.Overloaded, ydb.ConnectionError): - pass # transaction outcome unknown - else: - values = {} - for row in rss[0].rows: - values[row.key] = row.value - - history.add(History.Commit('read_range', node.value, values)).apply_to(checker) + raise gen.Return(rss) finally: if tx.tx_id is not None: try: @@ -551,24 +598,38 @@ class DatabaseChecker(object): except ydb.Error: pass + try: + rss = yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('read_range', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: + values = {} + for row in rss[0].rows: + values[row.key] = row.value + + history.add(History.Commit('read_range', node.value, values)).apply_to(checker) + @gen.coroutine def async_perform_read_tables(self, history, table, options, checker, deadline): - with (yield self.async_session()) as session: - while time.time() < deadline: - if options.read_table_ranges: - min_key = random.randint(0, options.keys) - max_key = random.randint(min_key, options.keys) - read_keys = list(range(min_key, max_key + 1)) - key_range = ydb.KeyRange( - ydb.KeyBound((min_key,), KEY_PREFIX_TYPE, inclusive=True), - ydb.KeyBound((max_key,), KEY_PREFIX_TYPE, inclusive=True)) - else: - read_keys = sorted(checker.keys) - key_range = None + while time.time() < deadline: + if options.read_table_ranges: + min_key = random.randint(0, options.keys) + max_key = random.randint(min_key, options.keys) + read_keys = list(range(min_key, max_key + 1)) + key_range = ydb.KeyRange( + ydb.KeyBound((min_key,), KEY_PREFIX_TYPE, inclusive=True), + ydb.KeyBound((max_key,), KEY_PREFIX_TYPE, inclusive=True)) + else: + read_keys = sorted(checker.keys) + key_range = None - if not options.ignore_read_table: - node = history.add(History.Begin('read_table', None, read_keys=read_keys)).apply_to(checker) + if not options.ignore_read_table: + node = history.add(History.Begin('read_table', None, read_keys=read_keys)).apply_to(checker) + @gen.coroutine + def perform(session): values = {} failed = False for chunk_future in session.async_read_table(table, key_range, use_snapshot=options.read_table_snapshot): @@ -581,7 +642,15 @@ class DatabaseChecker(object): break for row in chunk.rows: values[row.key] = row.value - + raise gen.Return((values, failed)) + + try: + values, failed = yield self.async_retry_operation(perform, deadline) + except ydb.Aborted: + history.add(History.Abort('read_table', node.value)).apply_to(checker) + except ydb.Undetermined: + pass # transaction outcome unknown + else: if not options.ignore_read_table: # We mark ReadTable committed even when it fails history.add(History.Commit('read_table', node.value, values, flags='partial_read' if failed else None)).apply_to(checker) |