# coding: utf-8
from yatest_lib import test_splitter
def get_chunks(tests, modulo, mode):
chunks = []
if mode == "MODULO":
for modulo_index in range(modulo):
chunks.append(test_splitter.get_shuffled_chunk(tests, modulo, modulo_index))
elif mode == "SEQUENTIAL":
for modulo_index in range(modulo):
chunks.append(test_splitter.get_sequential_chunk(tests, modulo, modulo_index))
else:
raise ValueError("no such mode")
return chunks
def check_not_intersect(chunk_list):
test_set = set()
total_size = 0
for tests in chunk_list:
total_size += len(tests)
test_set.update(tests)
return total_size == len(test_set)
def check_max_diff(chunk_list):
return max(map(len, chunk_list)) - min(map(len, chunk_list))
def test_lot_of_chunks():
for chunk_count in range(10, 20):
for tests_count in range(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert chunks.count([]) == chunk_count - tests_count
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert chunks.count([]) == chunk_count - tests_count
assert len(chunks) == chunk_count
def test_lot_of_tests():
for tests_count in range(10, 20):
for chunk_count in range(2, tests_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
def prime_chunk_count():
for chunk_count in [7, 11, 13, 17, 23, 29]:
for tests_count in range(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
def get_divisors(number):
divisors = []
for d in range(1, number + 1):
if number % d == 0:
divisors.append(d)
return divisors
def equal_chunks():
for chunk_count in range(12, 31):
for tests_count in get_divisors(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) == 0
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) == 0
assert len(chunks) == chunk_count
def chunk_count_equal_tests_count():
for chunk_count in range(10, 20):
tests_count = chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count