diff options
author | alexbogo <[email protected]> | 2022-08-30 11:25:54 +0300 |
---|---|---|
committer | alexbogo <[email protected]> | 2022-08-30 11:25:54 +0300 |
commit | 9f9a20af2885daf80e5e7d16c7a3597aeefdb5c8 (patch) | |
tree | 66ea605dc01aa3ed4c3bc736dbded9304b3cbfd9 | |
parent | 7ca4cd1adec5e5a04f4b873151b11392453f787d (diff) |
[ymq] making the balancing test faster
init
-rw-r--r-- | ydb/tests/functional/sqs/merge_split_common_table/test.py | 44 |
1 files changed, 28 insertions, 16 deletions
diff --git a/ydb/tests/functional/sqs/merge_split_common_table/test.py b/ydb/tests/functional/sqs/merge_split_common_table/test.py index fd7dd064780..0fe0beefc79 100644 --- a/ydb/tests/functional/sqs/merge_split_common_table/test.py +++ b/ydb/tests/functional/sqs/merge_split_common_table/test.py @@ -81,16 +81,29 @@ class TestSqsSplitMergeTables(KikimrSqsTestBase): return len(list(filter(bool, leaders_per_node)))
def send_messages(self, is_fifo, queue_urls, messages_count=1, message_length=16):
+ logging.info('starting to send messages...')
group_id = 'group' if is_fifo else None
for i in range(messages_count):
for queue_url in queue_urls:
self.seq_no += 1
self._send_message_and_assert(queue_url, random_string(message_length), seq_no=self.seq_no if is_fifo else None, group_id=group_id)
+ logging.info('messages have been sent.')
+
+ def get_partitions(self, table_path):
+ session = ydb.retry_operation_sync(lambda: self._driver.table_client.session().create())
+ response = session.describe_table(
+ table_path,
+ ydb.DescribeTableSettings().with_include_table_stats(True)
+ )
+ return response.table_stats.partitions
def run_test(self, is_fifo):
self._init_with_params(is_fifo, tables_format=1)
- queues_count = 25
+ balancing_table_path = '/Root/SQS/.' + ('FIFO' if is_fifo else 'STD') + '/Messages'
+ queues_count = 10
queue_urls = []
+ partitions = self.get_partitions(balancing_table_path)
+ assert partitions > 1, 'incorrect initial partitions count'
for index in range(queues_count):
queue_name = f'q_{index}_{self.queue_name}'
@@ -102,31 +115,30 @@ class TestSqsSplitMergeTables(KikimrSqsTestBase): self.send_messages(is_fifo, queue_urls)
logging.info('messages have been sent #1')
- balancing_table_path = '/Root/SQS/.' + ('FIFO' if is_fifo else 'STD') + '/Messages'
self.alter_table(balancing_table_path)
- self.send_messages(is_fifo, queue_urls)
- logging.info('messages have been sent #2')
-
while True:
- time.sleep(1)
+ self.send_messages(is_fifo, queue_urls)
leaders_per_node = self.get_leaders_per_nodes()
nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
- logging.debug(f'wait merge... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
- if nodes_with_leaders == 1 and sum(leaders_per_node) == queues_count:
+ cur_partitions = self.get_partitions(balancing_table_path)
+ logging.debug(f'wait merge... partitions={cur_partitions}, nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if cur_partitions < partitions and nodes_with_leaders <= cur_partitions and sum(leaders_per_node) == queues_count:
break
+ time.sleep(5)
- logging.info('all leaders on 1 node')
+ partitions = self.get_partitions(balancing_table_path)
+ nodes_with_leaders = self.get_nodes_with_leaders()
+ logging.info(f'all leaders on {nodes_with_leaders} node, partitions {partitions}')
self.force_split(balancing_table_path)
- self.send_messages(is_fifo, queue_urls, messages_count=2)
- logging.info('messages have been sent #3')
while True:
- time.sleep(1)
+ self.send_messages(is_fifo, queue_urls)
leaders_per_node = self.get_leaders_per_nodes()
- nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
- logging.info(f'wait split... nodes_with_leaders={nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
- if nodes_with_leaders > 1 and sum(leaders_per_node) == queues_count:
+ cur_nodes_with_leaders = len(list(filter(bool, leaders_per_node)))
+ cur_partitions = self.get_partitions(balancing_table_path)
+ logging.info(f'wait split... partitions={cur_partitions}, nodes_with_leaders={cur_nodes_with_leaders} all_leaders={sum(leaders_per_node)} : {leaders_per_node}')
+ if cur_partitions > partitions and cur_nodes_with_leaders > nodes_with_leaders and sum(leaders_per_node) == queues_count:
break
- logging.info(f'test finished. Leaders pre node : {leaders_per_node}')
+ logging.info(f'test finished. Leaders per node : {leaders_per_node}')
|