1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# coding: utf-8
from yatest_lib import test_splitter
def get_chunks(tests, modulo, mode):
chunks = []
if mode == "MODULO":
for modulo_index in range(modulo):
chunks.append(test_splitter.get_shuffled_chunk(tests, modulo, modulo_index))
elif mode == "SEQUENTIAL":
for modulo_index in range(modulo):
chunks.append(test_splitter.get_sequential_chunk(tests, modulo, modulo_index))
else:
raise ValueError("no such mode")
return chunks
def check_not_intersect(chunk_list):
test_set = set()
total_size = 0
for tests in chunk_list:
total_size += len(tests)
test_set.update(tests)
return total_size == len(test_set)
def check_max_diff(chunk_list):
return max(map(len, chunk_list)) - min(map(len, chunk_list))
def test_lot_of_chunks():
for chunk_count in range(10, 20):
for tests_count in range(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert chunks.count([]) == chunk_count - tests_count
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert chunks.count([]) == chunk_count - tests_count
assert len(chunks) == chunk_count
def test_lot_of_tests():
for tests_count in range(10, 20):
for chunk_count in range(2, tests_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
def prime_chunk_count():
for chunk_count in [7, 11, 13, 17, 23, 29]:
for tests_count in range(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
def get_divisors(number):
divisors = []
for d in range(1, number + 1):
if number % d == 0:
divisors.append(d)
return divisors
def equal_chunks():
for chunk_count in range(12, 31):
for tests_count in get_divisors(chunk_count):
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) == 0
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) == 0
assert len(chunks) == chunk_count
def chunk_count_equal_tests_count():
for chunk_count in range(10, 20):
tests_count = chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "SEQUENTIAL")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
chunks = get_chunks(range(tests_count), chunk_count, "MODULO")
assert check_not_intersect(chunks)
assert check_max_diff(chunks) <= 1
assert len(chunks) == chunk_count
|