aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrekby <rekby@ydb.tech>2023-03-10 20:34:57 +0300
committerrekby <rekby@ydb.tech>2023-03-10 20:34:57 +0300
commit68707ff3df03cb20da93f18b8e608a699aa6009f (patch)
treea9a91254a3e055a87cef1ae8227d792edd54c603
parent784fe0389a93a37ce2a541c2c50fded6903251df (diff)
downloadydb-68707ff3df03cb20da93f18b8e608a699aa6009f.tar.gz
Remove ydb tornado from tests
-rw-r--r--ydb/tests/functional/api/test_tornado_frameworks.py72
-rw-r--r--ydb/tests/functional/rename/common.py48
-rw-r--r--ydb/tests/functional/rename/test.py177
3 files changed, 100 insertions, 197 deletions
diff --git a/ydb/tests/functional/api/test_tornado_frameworks.py b/ydb/tests/functional/api/test_tornado_frameworks.py
deleted file mode 100644
index 2b7808239b9..00000000000
--- a/ydb/tests/functional/api/test_tornado_frameworks.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# -*- coding: utf-8 -*-
-import tornado.ioloop
-from hamcrest import assert_that, is_, not_, none
-
-from ydb.tests.library.harness.kikimr_cluster import kikimr_cluster_factory
-import ydb
-import ydb.tornado
-
-
-async def raising_error(obj):
- if obj.num <= 3:
- obj.num += 1
- raise ydb.BadSession("Test bad session")
- return 7
-
-
-async def cor():
- class RaisingError(object):
- num = 0
-
- o = RaisingError()
-
- return await ydb.tornado.retry_operation(lambda: raising_error(o))
-
-
-class TestTornadoFrameworks(object):
- @classmethod
- def setup_class(cls):
- cls.cluster = kikimr_cluster_factory()
- cls.cluster.start()
- cls.driver = ydb.Driver(
- ydb.DriverConfig(
- database='/Root',
- endpoint="%s:%s" % (
- cls.cluster.nodes[1].host, cls.cluster.nodes[1].port
- )
- )
- )
- cls.driver.wait()
-
- @classmethod
- def teardown_class(cls):
- if hasattr(cls, 'cluster'):
- cls.cluster.stop()
-
- if hasattr(cls, 'driver'):
- cls.driver.stop()
-
- def test_raising_error(self):
- result = tornado.ioloop.IOLoop.current().run_sync(
- cor,
- )
-
- assert result == 7
-
- def test_retry_operation(self):
- result = tornado.ioloop.IOLoop.current().run_sync(
- lambda: ydb.tornado.retry_operation(
- lambda: ydb.tornado.as_tornado_future(
- self.driver.table_client.session().async_create()
- )
- )
- )
-
- assert_that(
- result.session_id,
- is_(
- not_(
- none()
- )
- )
- )
diff --git a/ydb/tests/functional/rename/common.py b/ydb/tests/functional/rename/common.py
index b0085107178..92daa2bbab1 100644
--- a/ydb/tests/functional/rename/common.py
+++ b/ydb/tests/functional/rename/common.py
@@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
import logging
import six
-import ydb
-
-from tornado import gen
-from ydb.tornado import as_tornado_future
+import asyncio
+import ydb
logger = logging.getLogger(__name__)
@@ -16,10 +14,11 @@ robust_retries = ydb.RetrySettings() \
async def async_retry_operation(callee, retry_settings=None, *args, **kwargs):
- opt_generator = ydb.retry_operation_impl(callee, retry_settings, *args, **kwargs)
+ opt_generator = ydb.retry_operation_impl(
+ callee, retry_settings, *args, **kwargs)
for next_opt in opt_generator:
if isinstance(next_opt, ydb.YdbRetryOperationSleepOpt):
- await gen.sleep(next_opt.timeout)
+ await asyncio.sleep(next_opt.timeout)
else:
try:
return await next_opt.result
@@ -30,35 +29,25 @@ async def async_retry_operation(callee, retry_settings=None, *args, **kwargs):
next_opt.set_exception(e)
-async def async_execute_serializable_job(pool: ydb.SessionPool, query, parameters):
+async def async_execute_serializable_job(pool: ydb.aio.SessionPool, query, parameters):
async def calle(pool, query, parameters):
- with pool.async_checkout() as async_session:
- session = await as_tornado_future(async_session)
- prepared_query = await as_tornado_future(session.async_prepare(query))
+ async with pool.checkout() as session:
+ prepared_query = await session.prepare(query)
with session.transaction(ydb.SerializableReadWrite()) as tx:
- result = await as_tornado_future(
- tx.async_execute(
- prepared_query,
- parameters=parameters,
- commit_tx=True
- )
- )
+ result = await tx.execute(prepared_query, parameters=parameters, commit_tx=True)
return result
return await async_retry_operation(calle, robust_retries, pool, query, parameters)
-async def async_execute_stale_ro_job(pool: ydb.SessionPool, query, parameters):
+async def async_execute_stale_ro_job(pool: ydb.aio.SessionPool, query, parameters):
async def calle(pool, query, parameters):
- with pool.async_checkout() as async_session:
- session = await as_tornado_future(async_session)
- prepared_query = await as_tornado_future(session.async_prepare(query))
+ async with pool.checkout() as session:
+ prepared_query = await session.prepare(query)
with session.transaction(ydb.StaleReadOnly()) as tx:
- result = await as_tornado_future(
- tx.async_execute(
- prepared_query,
- parameters=parameters,
- commit_tx=True
- )
+ result = await tx.execute(
+ prepared_query,
+ parameters=parameters,
+ commit_tx=True
)
return result
return await async_retry_operation(calle, robust_retries, pool, query, parameters)
@@ -66,9 +55,8 @@ async def async_execute_stale_ro_job(pool: ydb.SessionPool, query, parameters):
async def async_scheme_job(pool: ydb.SessionPool, query):
async def calle(pool, query):
- with pool.async_checkout() as async_session:
- session = await as_tornado_future(async_session)
- result = await as_tornado_future(session.async_execute_scheme(query))
+ async with pool.checkout() as session:
+ result = await session.execute_scheme(query)
return result
return await async_retry_operation(calle, robust_retries, pool, query)
diff --git a/ydb/tests/functional/rename/test.py b/ydb/tests/functional/rename/test.py
index 00699a9cce2..9b9769fbf8e 100644
--- a/ydb/tests/functional/rename/test.py
+++ b/ydb/tests/functional/rename/test.py
@@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
import logging
import os
+import asyncio
import pytest
-from tornado import gen
-from tornado.ioloop import IOLoop
import ydb
-from ydb.tornado import as_tornado_future
+import ydb.aio
from ydb.tests.library.common.types import Erasure, from_bytes
from ydb.tests.library.harness.util import LogLevels
@@ -51,71 +50,53 @@ DROP TABLE `{table}`
""")
-def create_simple_table(pool, path):
- async def async_create():
- await async_scheme_job(pool, SIMPLE_TABLE_TEMPLATE.format(table=path))
+async def create_simple_table(pool, path):
+ await async_scheme_job(pool, SIMPLE_TABLE_TEMPLATE.format(table=path))
- return async_create
+async def create_indexed_table(pool, path):
+ await async_scheme_job(pool, INDEXED_TABLE_TEMPLATE.format(table=path))
-def create_indexed_table(pool, path):
- async def async_create():
- await async_scheme_job(pool, INDEXED_TABLE_TEMPLATE.format(table=path))
- return async_create
+async def create_indexed_async_table(pool, path):
+ await async_scheme_job(pool, INDEXED_ASYNC_TABLE_TEMPLATE.format(table=path))
-def create_indexed_async_table(pool, path):
- async def async_create():
- await async_scheme_job(pool, INDEXED_ASYNC_TABLE_TEMPLATE.format(table=path))
-
- return async_create
+async def replace_table(pool, src, dst, *args):
+ async with pool.checkout() as session:
+ await session.rename_tables(
+ [
+ ydb.table.RenameItem(
+ source_path=src,
+ destination_path=dst,
+ replace_destination=True,
+ )
+ ]
+ )
-def replace_table(pool, src, dst, *args):
- async def async_move():
- with pool.async_checkout() as async_session:
- session = await as_tornado_future(async_session)
- await as_tornado_future(
- session.async_rename_tables(
- [
- ydb.table.RenameItem(
- source_path=src,
- destination_path=dst,
- replace_destination=True,
- )
- ]
- )
- )
- return async_move
-
-
-def substitute_table(pool, src, dst, backup):
- async def async_move():
- try:
- await async_scheme_job(pool, DROP_TABLE_TEMPLATE.format(table=backup))
- except ydb.issues.SchemeError:
- pass
-
- with pool.async_checkout() as async_session:
- session = await as_tornado_future(async_session)
- await as_tornado_future(
- session.async_rename_tables(
- [
- ydb.table.RenameItem(
- source_path=dst,
- destination_path=backup,
- replace_destination=False,
- ),
- ydb.table.RenameItem(
- source_path=src,
- destination_path=dst,
- replace_destination=False,
- )
- ]
+async def substitute_table(pool, src, dst, backup):
+ try:
+ await async_scheme_job(pool, DROP_TABLE_TEMPLATE.format(table=backup))
+ except ydb.issues.SchemeError:
+ pass
+
+ with pool.async_checkout() as async_session:
+ session = await async_session
+ await session.rename_tables(
+ [
+ ydb.table.RenameItem(
+ source_path=dst,
+ destination_path=backup,
+ replace_destination=False,
+ ),
+ ydb.table.RenameItem(
+ source_path=src,
+ destination_path=dst,
+ replace_destination=False,
)
- )
- return async_move
+ ]
+ )
class Simple:
@@ -123,16 +104,16 @@ class Simple:
self._driver_configs = driver_configs
self._database = driver_configs.database
- self._driver = ydb.Driver(self._driver_configs)
- self._driver.wait(timeout=5)
-
- self._pool = ydb.SessionPool(driver=self._driver, size=10)
-
self._create_method = create_method
self._replace_method = replace_method
self._select_from_index = select_from_index
+ async def driver_init_async(self):
+ self._driver = ydb.aio.Driver(self._driver_configs)
+ await self._driver.wait(timeout=15)
+ self._pool = ydb.aio.SessionPool(driver=self._driver, size=10)
+
upsert_table_template = (
r"""
DECLARE $key AS Uint64;
@@ -160,8 +141,8 @@ class Simple:
tables = ["table_main", "table_tmp", "table_backup"]
- def prepare(self):
- IOLoop.current().run_sync(lambda: self.async_prepare("table_main"))
+ async def prepare(self):
+ await self.async_prepare("table_main")
@staticmethod
def _value_for_table_row(table, key):
@@ -173,7 +154,7 @@ class Simple:
async def async_prepare(self, table):
logger.info("begin")
- await self._create_method(self._pool, table)()
+ await self._create_method(self._pool, table)
logger.info("create_method done")
coros = []
@@ -183,13 +164,13 @@ class Simple:
'$key': idx,
'$value': self._value_for_table_row(table, idx)
}
- coros.append(async_execute_serializable_job(self._pool, query, parameters))
+ coros.append(asyncio.create_task(async_execute_serializable_job(self._pool, query, parameters)))
logger.info("wait coros")
- await gen.multi(coros)
+ await asyncio.wait(coros)
- def move(self):
- IOLoop.current().run_sync(lambda: self.async_prepare("table_tmp"))
+ async def move(self):
+ await self.async_prepare("table_tmp")
coros = []
@@ -199,14 +180,14 @@ class Simple:
'$key': idx,
'$value': self._value_for_table_row("table_main", idx)
}
- coros.append(async_repeat_n_times(async_execute_serializable_job, 20, self._pool, write_query, parameters))
+ coros.append(asyncio.create_task(async_repeat_n_times(async_execute_serializable_job, 20, self._pool, write_query, parameters)))
read_query = self.select_table_template.format(table="table_main")
for idx in range(20):
parameters = {
'$key': 10+idx,
}
- coros.append(async_repeat_n_times(async_execute_serializable_job, 15, self._pool, read_query, parameters))
+ coros.append(asyncio.create_task(async_repeat_n_times(async_execute_serializable_job, 15, self._pool, read_query, parameters)))
if self._select_from_index:
read_index_query = self.select_index_table_template.format(table="table_main")
@@ -214,21 +195,23 @@ class Simple:
parameters = {
'$value': self._value_for_table_row("table_main", idx)
}
- coros.append(async_repeat_n_times(async_execute_stale_ro_job, 15, self._pool, read_index_query, parameters))
+ coros.append(asyncio.create_task(async_repeat_n_times(async_execute_stale_ro_job, 15, self._pool, read_index_query, parameters)))
coros.append(
- self._replace_method(
- self._pool,
- os.path.join(self._database, "table_tmp"),
- os.path.join(self._database, "table_main"),
- os.path.join(self._database, "table_backup")
- )()
+ asyncio.create_task(
+ self._replace_method(
+ self._pool,
+ os.path.join(self._database, "table_tmp"),
+ os.path.join(self._database, "table_main"),
+ os.path.join(self._database, "table_backup")
+ )
+ )
)
async def calle():
- await gen.multi(coros)
+ await asyncio.wait(coros)
- IOLoop.current().run_sync(lambda: calle())
+ await calle()
# local configuration for the ydb cluster (fetched by ydb_cluster_configuration fixture)
@@ -256,21 +239,25 @@ CLUSTER_CONFIG = dict(
substitute_table,
])
def test_client_gets_retriable_errors_when_rename(create_method, select_from_index, replace_method, ydb_database, ydb_endpoint):
- database = ydb_database
- logger.info(" database is %s", database)
+ async def test():
+ database = ydb_database
+ logger.info(" database is %s", database)
- driver_configs = ydb.DriverConfig(
- ydb_endpoint,
- database
- )
+ driver_configs = ydb.DriverConfig(
+ ydb_endpoint,
+ database
+ )
+
+ scenario = Simple(driver_configs, create_method, replace_method, select_from_index)
+ await scenario.driver_init_async()
- scenario = Simple(driver_configs, create_method, replace_method, select_from_index)
+ logger.info(" database is %s: PREPARE", database)
+ await scenario.prepare()
- logger.info(" database is %s: PREPARE", database)
- scenario.prepare()
+ logger.info(" database is %s: MOVE 1", database)
+ await scenario.move()
- logger.info(" database is %s: MOVE 1", database)
- scenario.move()
+ logger.info(" database is %s: MOVE 2", database)
+ await scenario.move()
- logger.info(" database is %s: MOVE 2", database)
- scenario.move()
+ asyncio.run(test())