diff options
| -rw-r--r-- | ydb/tests/functional/serverless/test_serverless.py | 67 |
1 files changed, 32 insertions, 35 deletions
diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py index dcc6c4e619c..1eda8db92a0 100644 --- a/ydb/tests/functional/serverless/test_serverless.py +++ b/ydb/tests/functional/serverless/test_serverless.py @@ -7,6 +7,8 @@ import copy import pytest import subprocess import datetime +import random +import string from ydb.retries import ( RetrySettings, @@ -490,32 +492,25 @@ def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_s session.execute_scheme( f""" CREATE TABLE `{path}` ( - id Uint64 NOT NULL, + ts Timestamp NOT NULL, value_string Utf8, - PRIMARY KEY(id) + PRIMARY KEY(ts) ) - PARTITION BY HASH(id) + PARTITION BY HASH(ts) WITH ( STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, - TTL=interval("PT1M") on id as seconds + TTL=interval("PT1M") on ts ) """ ) class BulkUpsertRow(object): - __slots__ = ('id', 'value_string') + __slots__ = ('ts', 'value_string') - def __init__(self, id, value_string): - self.id = id - self.value_string = value_string - - @gen.coroutine - def async_bulk_upsert(path, rows): - column_types = ydb.BulkUpsertColumns() \ - .add_column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)) \ - .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8)) - yield driver.table_client.async_bulk_upsert(path, rows, column_types) + def __init__(self, ts): + self.ts = ts + self.value_string = ''.join(random.choices(string.ascii_lowercase, k=1024)) driver.scheme_client.make_directory(os.path.join(database, "dirA0")) with ydb.QuerySessionPool(driver) as qpool: @@ -523,34 +518,36 @@ def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_s with ydb.SessionPool(driver) as pool: pool.retry_operation_sync(create_table, None, path) - data = 'a' * 7000000 - for start in range(0, 1): - IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(int(datetime.datetime.now().timestamp()), data)])) - - for _ in range(120): - time.sleep(1) - described = ydb_cluster.client.describe(database, '') - logger.debug('database state after write_keys: %s', described) - if described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded: + logger.info("Write data into table") + column_types = ydb.BulkUpsertColumns() \ + .add_column('ts', ydb.OptionalType(ydb.PrimitiveType.Timestamp)) \ + .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8)) + now = datetime.datetime.now() + for i in range(1000): + bulk_size = 1000 # rows + rows = [BulkUpsertRow((now + datetime.timedelta(microseconds=dt))) for dt in range(i * bulk_size, (i + 1) * bulk_size)] + try: + driver.table_client.bulk_upsert(path, rows, column_types) + except ydb.issues.Overloaded: + described = ydb_cluster.client.describe(database, '') + logger.debug('database state when oveloaded: %s', described) + assert described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded, 'database did not move into DiskQuotaExceeded state' break else: - assert False, 'database did not move into DiskQuotaExceeded state' + assert False, 'database did not move into Overloaded state' - # Writes should be denied when database moves into DiskQuotaExceeded state - time.sleep(1) - logger.debug("start insert") + logger.info("Upsert data whith SQL") with pytest.raises(ydb.issues.Overloaded, match=r'.*overload data error.*'): qpool.execute_with_retries( - "UPSERT INTO `{}`(id, value_string) VALUES({}, 'xxx')".format(path, int(datetime.datetime.now().timestamp()) + 100), + "UPSERT INTO `{}` (ts, value_string) VALUES(Timestamp('2020-01-01T00:00:00.000000Z'), 'xxx')".format(path), retry_settings=RetrySettings(max_retries=0)) - logger.debug("finish insert") - with pytest.raises(ydb.issues.Overloaded, match=r'.*System overloaded.*'): - IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')])) - for _ in range(300): - time.sleep(1) + logger.info("Waiting for data deleted by TTL") + + for _ in range(30): + time.sleep(10) described = ydb_cluster.client.describe(database, '') - logger.debug('database state after erase_keys: %s', described) + logger.debug('database state when waiting for TTL: %s', described) if not described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded: break else: |
