diff options
author | rekby <rekby@ydb.tech> | 2023-03-10 20:34:57 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2023-03-10 20:34:57 +0300 |
commit | 68707ff3df03cb20da93f18b8e608a699aa6009f (patch) | |
tree | a9a91254a3e055a87cef1ae8227d792edd54c603 | |
parent | 784fe0389a93a37ce2a541c2c50fded6903251df (diff) | |
download | ydb-68707ff3df03cb20da93f18b8e608a699aa6009f.tar.gz |
Remove ydb tornado from tests
-rw-r--r-- | ydb/tests/functional/api/test_tornado_frameworks.py | 72 | ||||
-rw-r--r-- | ydb/tests/functional/rename/common.py | 48 | ||||
-rw-r--r-- | ydb/tests/functional/rename/test.py | 177 |
3 files changed, 100 insertions, 197 deletions
diff --git a/ydb/tests/functional/api/test_tornado_frameworks.py b/ydb/tests/functional/api/test_tornado_frameworks.py deleted file mode 100644 index 2b7808239b9..00000000000 --- a/ydb/tests/functional/api/test_tornado_frameworks.py +++ /dev/null @@ -1,72 +0,0 @@ -# -*- coding: utf-8 -*- -import tornado.ioloop -from hamcrest import assert_that, is_, not_, none - -from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory -import ydb -import ydb.tornado - - -async def raising_error(obj): - if obj.num <= 3: - obj.num += 1 - raise ydb.BadSession("Test bad session") - return 7 - - -async def cor(): - class RaisingError(object): - num = 0 - - o = RaisingError() - - return await ydb.tornado.retry_operation(lambda: raising_error(o)) - - -class TestTornadoFrameworks(object): - @classmethod - def setup_class(cls): - cls.cluster = kikimr_cluster_factory() - cls.cluster.start() - cls.driver = ydb.Driver( - ydb.DriverConfig( - database='/Root', - endpoint="%s:%s" % ( - cls.cluster.nodes[1].host, cls.cluster.nodes[1].port - ) - ) - ) - cls.driver.wait() - - @classmethod - def teardown_class(cls): - if hasattr(cls, 'cluster'): - cls.cluster.stop() - - if hasattr(cls, 'driver'): - cls.driver.stop() - - def test_raising_error(self): - result = tornado.ioloop.IOLoop.current().run_sync( - cor, - ) - - assert result == 7 - - def test_retry_operation(self): - result = tornado.ioloop.IOLoop.current().run_sync( - lambda: ydb.tornado.retry_operation( - lambda: ydb.tornado.as_tornado_future( - self.driver.table_client.session().async_create() - ) - ) - ) - - assert_that( - result.session_id, - is_( - not_( - none() - ) - ) - ) diff --git a/ydb/tests/functional/rename/common.py b/ydb/tests/functional/rename/common.py index b0085107178..92daa2bbab1 100644 --- a/ydb/tests/functional/rename/common.py +++ b/ydb/tests/functional/rename/common.py @@ -1,11 +1,9 @@ # -*- coding: utf-8 -*- import logging import six -import ydb - -from tornado import gen -from ydb.tornado import as_tornado_future +import asyncio +import ydb logger = logging.getLogger(__name__) @@ -16,10 +14,11 @@ robust_retries = ydb.RetrySettings() \ async def async_retry_operation(callee, retry_settings=None, *args, **kwargs): - opt_generator = ydb.retry_operation_impl(callee, retry_settings, *args, **kwargs) + opt_generator = ydb.retry_operation_impl( + callee, retry_settings, *args, **kwargs) for next_opt in opt_generator: if isinstance(next_opt, ydb.YdbRetryOperationSleepOpt): - await gen.sleep(next_opt.timeout) + await asyncio.sleep(next_opt.timeout) else: try: return await next_opt.result @@ -30,35 +29,25 @@ async def async_retry_operation(callee, retry_settings=None, *args, **kwargs): next_opt.set_exception(e) -async def async_execute_serializable_job(pool: ydb.SessionPool, query, parameters): +async def async_execute_serializable_job(pool: ydb.aio.SessionPool, query, parameters): async def calle(pool, query, parameters): - with pool.async_checkout() as async_session: - session = await as_tornado_future(async_session) - prepared_query = await as_tornado_future(session.async_prepare(query)) + async with pool.checkout() as session: + prepared_query = await session.prepare(query) with session.transaction(ydb.SerializableReadWrite()) as tx: - result = await as_tornado_future( - tx.async_execute( - prepared_query, - parameters=parameters, - commit_tx=True - ) - ) + result = await tx.execute(prepared_query, parameters=parameters, commit_tx=True) return result return await async_retry_operation(calle, robust_retries, pool, query, parameters) -async def async_execute_stale_ro_job(pool: ydb.SessionPool, query, parameters): +async def async_execute_stale_ro_job(pool: ydb.aio.SessionPool, query, parameters): async def calle(pool, query, parameters): - with pool.async_checkout() as async_session: - session = await as_tornado_future(async_session) - prepared_query = await as_tornado_future(session.async_prepare(query)) + async with pool.checkout() as session: + prepared_query = await session.prepare(query) with session.transaction(ydb.StaleReadOnly()) as tx: - result = await as_tornado_future( - tx.async_execute( - prepared_query, - parameters=parameters, - commit_tx=True - ) + result = await tx.execute( + prepared_query, + parameters=parameters, + commit_tx=True ) return result return await async_retry_operation(calle, robust_retries, pool, query, parameters) @@ -66,9 +55,8 @@ async def async_execute_stale_ro_job(pool: ydb.SessionPool, query, parameters): async def async_scheme_job(pool: ydb.SessionPool, query): async def calle(pool, query): - with pool.async_checkout() as async_session: - session = await as_tornado_future(async_session) - result = await as_tornado_future(session.async_execute_scheme(query)) + async with pool.checkout() as session: + result = await session.execute_scheme(query) return result return await async_retry_operation(calle, robust_retries, pool, query) diff --git a/ydb/tests/functional/rename/test.py b/ydb/tests/functional/rename/test.py index 00699a9cce2..9b9769fbf8e 100644 --- a/ydb/tests/functional/rename/test.py +++ b/ydb/tests/functional/rename/test.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- import logging import os +import asyncio import pytest -from tornado import gen -from tornado.ioloop import IOLoop import ydb -from ydb.tornado import as_tornado_future +import ydb.aio from ydb.tests.library.common.types import Erasure, from_bytes from ydb.tests.library.harness.util import LogLevels @@ -51,71 +50,53 @@ DROP TABLE `{table}` """) -def create_simple_table(pool, path): - async def async_create(): - await async_scheme_job(pool, SIMPLE_TABLE_TEMPLATE.format(table=path)) +async def create_simple_table(pool, path): + await async_scheme_job(pool, SIMPLE_TABLE_TEMPLATE.format(table=path)) - return async_create +async def create_indexed_table(pool, path): + await async_scheme_job(pool, INDEXED_TABLE_TEMPLATE.format(table=path)) -def create_indexed_table(pool, path): - async def async_create(): - await async_scheme_job(pool, INDEXED_TABLE_TEMPLATE.format(table=path)) - return async_create +async def create_indexed_async_table(pool, path): + await async_scheme_job(pool, INDEXED_ASYNC_TABLE_TEMPLATE.format(table=path)) -def create_indexed_async_table(pool, path): - async def async_create(): - await async_scheme_job(pool, INDEXED_ASYNC_TABLE_TEMPLATE.format(table=path)) - - return async_create +async def replace_table(pool, src, dst, *args): + async with pool.checkout() as session: + await session.rename_tables( + [ + ydb.table.RenameItem( + source_path=src, + destination_path=dst, + replace_destination=True, + ) + ] + ) -def replace_table(pool, src, dst, *args): - async def async_move(): - with pool.async_checkout() as async_session: - session = await as_tornado_future(async_session) - await as_tornado_future( - session.async_rename_tables( - [ - ydb.table.RenameItem( - source_path=src, - destination_path=dst, - replace_destination=True, - ) - ] - ) - ) - return async_move - - -def substitute_table(pool, src, dst, backup): - async def async_move(): - try: - await async_scheme_job(pool, DROP_TABLE_TEMPLATE.format(table=backup)) - except ydb.issues.SchemeError: - pass - - with pool.async_checkout() as async_session: - session = await as_tornado_future(async_session) - await as_tornado_future( - session.async_rename_tables( - [ - ydb.table.RenameItem( - source_path=dst, - destination_path=backup, - replace_destination=False, - ), - ydb.table.RenameItem( - source_path=src, - destination_path=dst, - replace_destination=False, - ) - ] +async def substitute_table(pool, src, dst, backup): + try: + await async_scheme_job(pool, DROP_TABLE_TEMPLATE.format(table=backup)) + except ydb.issues.SchemeError: + pass + + with pool.async_checkout() as async_session: + session = await async_session + await session.rename_tables( + [ + ydb.table.RenameItem( + source_path=dst, + destination_path=backup, + replace_destination=False, + ), + ydb.table.RenameItem( + source_path=src, + destination_path=dst, + replace_destination=False, ) - ) - return async_move + ] + ) class Simple: @@ -123,16 +104,16 @@ class Simple: self._driver_configs = driver_configs self._database = driver_configs.database - self._driver = ydb.Driver(self._driver_configs) - self._driver.wait(timeout=5) - - self._pool = ydb.SessionPool(driver=self._driver, size=10) - self._create_method = create_method self._replace_method = replace_method self._select_from_index = select_from_index + async def driver_init_async(self): + self._driver = ydb.aio.Driver(self._driver_configs) + await self._driver.wait(timeout=15) + self._pool = ydb.aio.SessionPool(driver=self._driver, size=10) + upsert_table_template = ( r""" DECLARE $key AS Uint64; @@ -160,8 +141,8 @@ class Simple: tables = ["table_main", "table_tmp", "table_backup"] - def prepare(self): - IOLoop.current().run_sync(lambda: self.async_prepare("table_main")) + async def prepare(self): + await self.async_prepare("table_main") @staticmethod def _value_for_table_row(table, key): @@ -173,7 +154,7 @@ class Simple: async def async_prepare(self, table): logger.info("begin") - await self._create_method(self._pool, table)() + await self._create_method(self._pool, table) logger.info("create_method done") coros = [] @@ -183,13 +164,13 @@ class Simple: '$key': idx, '$value': self._value_for_table_row(table, idx) } - coros.append(async_execute_serializable_job(self._pool, query, parameters)) + coros.append(asyncio.create_task(async_execute_serializable_job(self._pool, query, parameters))) logger.info("wait coros") - await gen.multi(coros) + await asyncio.wait(coros) - def move(self): - IOLoop.current().run_sync(lambda: self.async_prepare("table_tmp")) + async def move(self): + await self.async_prepare("table_tmp") coros = [] @@ -199,14 +180,14 @@ class Simple: '$key': idx, '$value': self._value_for_table_row("table_main", idx) } - coros.append(async_repeat_n_times(async_execute_serializable_job, 20, self._pool, write_query, parameters)) + coros.append(asyncio.create_task(async_repeat_n_times(async_execute_serializable_job, 20, self._pool, write_query, parameters))) read_query = self.select_table_template.format(table="table_main") for idx in range(20): parameters = { '$key': 10+idx, } - coros.append(async_repeat_n_times(async_execute_serializable_job, 15, self._pool, read_query, parameters)) + coros.append(asyncio.create_task(async_repeat_n_times(async_execute_serializable_job, 15, self._pool, read_query, parameters))) if self._select_from_index: read_index_query = self.select_index_table_template.format(table="table_main") @@ -214,21 +195,23 @@ class Simple: parameters = { '$value': self._value_for_table_row("table_main", idx) } - coros.append(async_repeat_n_times(async_execute_stale_ro_job, 15, self._pool, read_index_query, parameters)) + coros.append(asyncio.create_task(async_repeat_n_times(async_execute_stale_ro_job, 15, self._pool, read_index_query, parameters))) coros.append( - self._replace_method( - self._pool, - os.path.join(self._database, "table_tmp"), - os.path.join(self._database, "table_main"), - os.path.join(self._database, "table_backup") - )() + asyncio.create_task( + self._replace_method( + self._pool, + os.path.join(self._database, "table_tmp"), + os.path.join(self._database, "table_main"), + os.path.join(self._database, "table_backup") + ) + ) ) async def calle(): - await gen.multi(coros) + await asyncio.wait(coros) - IOLoop.current().run_sync(lambda: calle()) + await calle() # local configuration for the ydb cluster (fetched by ydb_cluster_configuration fixture) @@ -256,21 +239,25 @@ CLUSTER_CONFIG = dict( substitute_table, ]) def test_client_gets_retriable_errors_when_rename(create_method, select_from_index, replace_method, ydb_database, ydb_endpoint): - database = ydb_database - logger.info(" database is %s", database) + async def test(): + database = ydb_database + logger.info(" database is %s", database) - driver_configs = ydb.DriverConfig( - ydb_endpoint, - database - ) + driver_configs = ydb.DriverConfig( + ydb_endpoint, + database + ) + + scenario = Simple(driver_configs, create_method, replace_method, select_from_index) + await scenario.driver_init_async() - scenario = Simple(driver_configs, create_method, replace_method, select_from_index) + logger.info(" database is %s: PREPARE", database) + await scenario.prepare() - logger.info(" database is %s: PREPARE", database) - scenario.prepare() + logger.info(" database is %s: MOVE 1", database) + await scenario.move() - logger.info(" database is %s: MOVE 1", database) - scenario.move() + logger.info(" database is %s: MOVE 2", database) + await scenario.move() - logger.info(" database is %s: MOVE 2", database) - scenario.move() + asyncio.run(test()) |