summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/tests/functional/serverless/test_serverless.py67
1 files changed, 32 insertions, 35 deletions
diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py
index dcc6c4e619c..1eda8db92a0 100644
--- a/ydb/tests/functional/serverless/test_serverless.py
+++ b/ydb/tests/functional/serverless/test_serverless.py
@@ -7,6 +7,8 @@ import copy
import pytest
import subprocess
import datetime
+import random
+import string
from ydb.retries import (
RetrySettings,
@@ -490,32 +492,25 @@ def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_s
session.execute_scheme(
f"""
CREATE TABLE `{path}` (
- id Uint64 NOT NULL,
+ ts Timestamp NOT NULL,
value_string Utf8,
- PRIMARY KEY(id)
+ PRIMARY KEY(ts)
)
- PARTITION BY HASH(id)
+ PARTITION BY HASH(ts)
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
- TTL=interval("PT1M") on id as seconds
+ TTL=interval("PT1M") on ts
)
"""
)
class BulkUpsertRow(object):
- __slots__ = ('id', 'value_string')
+ __slots__ = ('ts', 'value_string')
- def __init__(self, id, value_string):
- self.id = id
- self.value_string = value_string
-
- @gen.coroutine
- def async_bulk_upsert(path, rows):
- column_types = ydb.BulkUpsertColumns() \
- .add_column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)) \
- .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8))
- yield driver.table_client.async_bulk_upsert(path, rows, column_types)
+ def __init__(self, ts):
+ self.ts = ts
+ self.value_string = ''.join(random.choices(string.ascii_lowercase, k=1024))
driver.scheme_client.make_directory(os.path.join(database, "dirA0"))
with ydb.QuerySessionPool(driver) as qpool:
@@ -523,34 +518,36 @@ def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_s
with ydb.SessionPool(driver) as pool:
pool.retry_operation_sync(create_table, None, path)
- data = 'a' * 7000000
- for start in range(0, 1):
- IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(int(datetime.datetime.now().timestamp()), data)]))
-
- for _ in range(120):
- time.sleep(1)
- described = ydb_cluster.client.describe(database, '')
- logger.debug('database state after write_keys: %s', described)
- if described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded:
+ logger.info("Write data into table")
+ column_types = ydb.BulkUpsertColumns() \
+ .add_column('ts', ydb.OptionalType(ydb.PrimitiveType.Timestamp)) \
+ .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ now = datetime.datetime.now()
+ for i in range(1000):
+ bulk_size = 1000 # rows
+ rows = [BulkUpsertRow((now + datetime.timedelta(microseconds=dt))) for dt in range(i * bulk_size, (i + 1) * bulk_size)]
+ try:
+ driver.table_client.bulk_upsert(path, rows, column_types)
+ except ydb.issues.Overloaded:
+ described = ydb_cluster.client.describe(database, '')
+ logger.debug('database state when oveloaded: %s', described)
+ assert described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded, 'database did not move into DiskQuotaExceeded state'
break
else:
- assert False, 'database did not move into DiskQuotaExceeded state'
+ assert False, 'database did not move into Overloaded state'
- # Writes should be denied when database moves into DiskQuotaExceeded state
- time.sleep(1)
- logger.debug("start insert")
+ logger.info("Upsert data whith SQL")
with pytest.raises(ydb.issues.Overloaded, match=r'.*overload data error.*'):
qpool.execute_with_retries(
- "UPSERT INTO `{}`(id, value_string) VALUES({}, 'xxx')".format(path, int(datetime.datetime.now().timestamp()) + 100),
+ "UPSERT INTO `{}` (ts, value_string) VALUES(Timestamp('2020-01-01T00:00:00.000000Z'), 'xxx')".format(path),
retry_settings=RetrySettings(max_retries=0))
- logger.debug("finish insert")
- with pytest.raises(ydb.issues.Overloaded, match=r'.*System overloaded.*'):
- IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')]))
- for _ in range(300):
- time.sleep(1)
+ logger.info("Waiting for data deleted by TTL")
+
+ for _ in range(30):
+ time.sleep(10)
described = ydb_cluster.client.describe(database, '')
- logger.debug('database state after erase_keys: %s', described)
+ logger.debug('database state when waiting for TTL: %s', described)
if not described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded:
break
else: