diff options
| author | robot-piglet <[email protected]> | 2025-09-17 23:38:38 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-09-17 23:51:08 +0300 |
| commit | 0aadd2b8b6f8a556cd5e17ed31adfaae1b4b456c (patch) | |
| tree | c14edb738750e4eb120c35fc2eace6a8afce3937 /contrib/python/more-itertools/py3/tests | |
| parent | 28b8afacb9cf086be40ad51d270a6aa09a3dc2d4 (diff) | |
Intermediate changes
commit_hash:46fb8895a86e7e705f3f141ab4e54f33d325f394
Diffstat (limited to 'contrib/python/more-itertools/py3/tests')
| -rw-r--r-- | contrib/python/more-itertools/py3/tests/test_more.py | 451 | ||||
| -rw-r--r-- | contrib/python/more-itertools/py3/tests/test_recipes.py | 134 |
2 files changed, 583 insertions, 2 deletions
diff --git a/contrib/python/more-itertools/py3/tests/test_more.py b/contrib/python/more-itertools/py3/tests/test_more.py index f30e747d190..d608334b11f 100644 --- a/contrib/python/more-itertools/py3/tests/test_more.py +++ b/contrib/python/more-itertools/py3/tests/test_more.py @@ -1,6 +1,11 @@ +from __future__ import annotations + import cmath +import gc +import platform +import weakref -from collections import Counter, abc +from collections import Counter, abc, deque from collections.abc import Set from datetime import datetime, timedelta from decimal import Decimal @@ -28,6 +33,7 @@ from statistics import mean from string import ascii_letters from sys import version_info from time import sleep +from typing import Iterable, Iterator, NamedTuple from unittest import skipIf, TestCase import more_itertools as mi @@ -174,6 +180,16 @@ class LastTests(TestCase): with self.assertRaises(ValueError): mi.last(iterable) + def test_reversed_is_none(self): + # See https://github.com/more-itertools/more-itertools/issues/1001 + class ReversedIsNone: + __reversed__ = None + + def __iter__(self): + return iter([1]) + + self.assertEqual(mi.last(ReversedIsNone()), 1) + class NthOrLastTests(TestCase): """Tests for ``nth_or_last()``""" @@ -528,6 +544,88 @@ class DistinctPermutationsTests(TestCase): self.assertCountEqual(actual, expected) +class DerangementsTests(TestCase): + def test_unique_values(self): + n = 8 + expected = set( + x + for x in permutations(range(n)) + if not any(x[i] == i for i in range(n)) + ) + for i, iterable in enumerate( + [ + range(n), + list(range(n)), + set(range(n)), + ] + ): + actual = set(mi.derangements(iterable)) + self.assertEqual(actual, expected) + + def test_repeated_values(self): + self.assertEqual( + [''.join(x) for x in mi.derangements('AACD')], + [ + 'AADC', + 'ACDA', + 'ADAC', + 'CADA', + 'CDAA', + 'CDAA', + 'DAAC', + 'DCAA', + 'DCAA', + ], + ) + + def test_unsortable_unhashable(self): + iterable = (0, True, ['Carol']) + actual = list(mi.derangements(iterable)) + expected = [(True, ['Carol'], 0), (['Carol'], 0, True)] + self.assertListEqual(actual, expected) + + def test_r(self): + s = 'ABCD' + for r, expected in [ + (0, ['']), + (1, ['B', 'C', 'D']), + (2, ['BA', 'BC', 'BD', 'CA', 'CD', 'DA', 'DC']), + ( + 3, + [ + 'BAD', + 'BCA', + 'BCD', + 'BDA', + 'CAB', + 'CAD', + 'CDA', + 'CDB', + 'DAB', + 'DCA', + 'DCB', + ], + ), + ( + 4, + [ + 'BADC', + 'BCDA', + 'BDAC', + 'CADB', + 'CDAB', + 'CDBA', + 'DABC', + 'DCAB', + 'DCBA', + ], + ), + ]: + with self.subTest(r=r): + actual = [''.join(x) for x in mi.derangements(s, r=r)] + self.assertEqual(actual, expected) + + class IlenTests(TestCase): def test_ilen(self): """Sanity-checks for ``ilen()``.""" @@ -1116,6 +1214,42 @@ class InterleaveEvenlyTests(TestCase): list(mi.interleave_evenly(iterables, lengths=lengths)) +class InterleaveRandomlyTests(TestCase): + def test_basic(self): + seed(0) # For reproducibility + iterables = [1, 2, 3], 'abc', (True, False, None) + self.assertEqual( + list(mi.interleave_randomly(*iterables)), + ['a', 'b', 1, 'c', True, False, None, 2, 3], + ) + + def test_some_empty(self): + self.assertEqual( + list(mi.interleave_randomly([1, 2, 3], [], [])), + [1, 2, 3], + ) + self.assertEqual( + list(mi.interleave_randomly([], [1, 2, 3], [])), + [1, 2, 3], + ) + self.assertEqual( + list(mi.interleave_randomly([], [], [1, 2, 3])), + [1, 2, 3], + ) + + def test_all_empty(self): + iterables = [], [], [] + self.assertEqual(list(mi.interleave_randomly(*iterables)), []) + + def test_no_args(self): + self.assertEqual(list(mi.interleave_randomly()), []) + + def test_bad_type(self): + # Should raise TypeError if not all arguments are iterable + with self.assertRaises(TypeError): + list(mi.interleave_randomly(1, [2, 3], 'abc')) + + class TestCollapse(TestCase): """Tests for ``collapse()``""" @@ -3161,6 +3295,30 @@ class StripFunctionTests(TestCase): self.assertEqual(list(mi.strip(iterable, pred)), iterable[3:-3]) +class IteratorWithWeakReferences: + class _AnObj: + pass + + @classmethod + def FROM_SIZE(cls, size: int) -> IteratorWithWeakReferences: + return cls([IteratorWithWeakReferences._AnObj() for _ in range(size)]) + + def __init__(self, iterable: Iterable): + self._data = deque(element for element in iterable) + self._weakReferences = [weakref.ref(a) for a in self._data] + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> object: + if len(self._data) == 0: + raise StopIteration + return self._data.popleft() + + def weakReferencesState(self) -> list[bool]: + return [wr() is not None for wr in self._weakReferences] + + class IsliceExtendedTests(TestCase): def test_all(self): iterable = ['0', '1', '2', '3', '4', '5'] @@ -3202,6 +3360,134 @@ class IsliceExtendedTests(TestCase): with self.assertRaises(TypeError): mi.islice_extended(count())[13] + def test_elements_lifecycle(self): + # CPython does reference counting. + # GC is not required when ref counting is supported. + refCountSupported = platform.python_implementation() == 'CPython' + + class TestCase(NamedTuple): + initialSize: int + slice: int + # list of expected intermediate elements states (alive or not) + # during a complete iteration + expectedAliveStates: list[list[int]] + + # fmt: off + testCases = [ + # testcases for: start>0, stop>0, step>0 + TestCase(initialSize=3, slice=(None, None, 1), expectedAliveStates=[ # noqa: E501 + [1, 1, 1], [0, 1, 1], [0, 0, 1], [0, 0, 0], [0, 0, 0]]), + TestCase(initialSize=3, slice=(0, None, 1), expectedAliveStates=[ + [1, 1, 1], [0, 1, 1], [0, 0, 1], [0, 0, 0], [0, 0, 0]]), + TestCase(initialSize=3, slice=(1, 2, 1), expectedAliveStates=[ + [1, 1, 1], [0, 0, 1], [0, 0, 1]]), + TestCase(initialSize=4, slice=(0, None, 2), expectedAliveStates=[ + [1, 1, 1, 1], [0, 1, 1, 1], [0, 0, 0, 1], [0, 0, 0, 0]]), + TestCase(initialSize=5, slice=(1, 4, 2), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 1, 1, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 1]]), # noqa: E501 + TestCase(initialSize=5, slice=(4, 1, 1), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 0, 0, 1]]), + + # FYI: to process a negative start/stop index, we need to iterate + # on the whole iterator. All the elements will be consumed + # and will ALWAYS be released on full iteration completion. + + # testcases for: start<0, stop>0, step>0 + TestCase(initialSize=3, slice=(-3, None, 1), expectedAliveStates=[ + [1, 1, 1], [0, 1, 1], [0, 0, 1], [0, 0, 0], [0, 0, 0]]), + TestCase(initialSize=3, slice=(-2, 2, 1), expectedAliveStates=[ + [1, 1, 1], [0, 0, 1], [0, 0, 0]]), + TestCase(initialSize=4, slice=(-4, None, 2), expectedAliveStates=[ + [1, 1, 1, 1], [0, 1, 1, 1], [0, 0, 0, 1], [0, 0, 0, 0]]), + TestCase(initialSize=5, slice=(-4, 4, 2), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 1, 1, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=3, slice=(-2, 0, 1), expectedAliveStates=[ + [1, 1, 1], [0, 0, 0]]), + + # testcases for: start>0, stop<0, step>0 + TestCase(initialSize=3, slice=(None, -1, 1), expectedAliveStates=[ + [1, 1, 1], [0, 1, 1], [0, 0, 1], [0, 0, 0]]), + TestCase(initialSize=4, slice=(1, -1, 1), expectedAliveStates=[ + [1, 1, 1, 1], [0, 0, 1, 1], [0, 0, 0, 1], [0, 0, 0, 0]]), + TestCase(initialSize=5, slice=(None, -2, 2), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 1, 1, 1, 1], [0, 0, 0, 1, 1], [0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=5, slice=(1, -1, 2), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 1, 1, 1], [0, 0, 0, 0, 1], [0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=5, slice=(4, -5, 2), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 0, 0, 0]]), + + # testcases for: start>0, stop>0, step<0 + TestCase(initialSize=3, slice=(None, None, -1), expectedAliveStates=[ # noqa: E501 + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1], [0, 0, 0]]), + TestCase(initialSize=3, slice=(2, None, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1], [0, 0, 0]]), + TestCase(initialSize=3, slice=(None, 0, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [0, 1, 1], [0, 1, 1], [0, 0, 0]]), + TestCase(initialSize=6, slice=(3, 1, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1], [0, 0, 0, 0, 1, 1]]), # noqa: E501 + TestCase(initialSize=5, slice=(1, 3, -1), expectedAliveStates=[ + # ⚠️could be improved. Final state could be [0, 0, 1, 1, 1] + [1, 1, 1, 1, 1], [0, 0, 0, 0, 1]]), + + # testcases for: start<0, stop>0, step<0 + TestCase(initialSize=3, slice=(-1, None, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1], [0, 0, 0]]), + TestCase(initialSize=3, slice=(-1, 0, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [0, 1, 1], [0, 1, 1], [0, 0, 0]]), + TestCase(initialSize=6, slice=(-2, None, -2), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=6, slice=(-2, 1, -2), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1], [0, 0, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=6, slice=(-4, 4, -2), expectedAliveStates=[ + [1, 1, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0]]), + + # testcases for: start>0, stop<0, step<0 + TestCase(initialSize=3, slice=(None, -3, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [0, 1, 1], [0, 1, 1], [0, 0, 0]]), + TestCase(initialSize=3, slice=(None, -4, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1], [1, 1, 1], [1, 1, 1], [1, 1, 1], [0, 0, 0]]), + TestCase(initialSize=5, slice=(3, -4, -1), expectedAliveStates=[ + # ⚠️could be improved, elements are only released on final step + [1, 1, 1, 1, 1], [0, 0, 1, 1, 1], [0, 0, 1, 1, 1], [0, 0, 0, 0, 0]]), # noqa: E501 + TestCase(initialSize=5, slice=(1, -1, -1), expectedAliveStates=[ + [1, 1, 1, 1, 1], [0, 0, 0, 0, 0]]), + ] + # fmt: on + + for index, testCase in enumerate(testCases): + with self.subTest(f"{index:02d}", testCase=testCase): + iterator = IteratorWithWeakReferences.FROM_SIZE( + testCase.initialSize + ) + islice_iterator = mi.islice_extended(iterator, *testCase.slice) + + aliveStates = [] + refCountSupported or gc.collect() + # initial alive states + aliveStates.append(iterator.weakReferencesState()) + while True: + try: + next(islice_iterator) + refCountSupported or gc.collect() + # intermediate alive states + aliveStates.append(iterator.weakReferencesState()) + except StopIteration: + refCountSupported or gc.collect() + # final alive states + aliveStates.append(iterator.weakReferencesState()) + break + self.assertEqual(aliveStates, testCase.expectedAliveStates) + class ConsecutiveGroupsTest(TestCase): def test_numbers(self): @@ -3939,6 +4225,12 @@ class SetPartitionsTests(TestCase): self._normalize_partitions(actual), ) + def test_min_max(self): + it = 'abcdefg' + self.assertEqual( + list(mi.set_partitions(it, min_size=4, max_size=3)), [] + ) + class TimeLimitedTests(TestCase): def test_basic(self): @@ -4186,6 +4478,32 @@ class MapIfTests(TestCase): class SampleTests(TestCase): + def test_specific_sample(self): + """Verify reproducibility.""" + + # Note, this test is surprisingly robust. Although it depends on the quality + # of the underlying libmath implementations for log, exp, and log1p, the + # number of samples and population size are small enough that small errors + # in those underlying functions won't affect the sample. + + seed(8675309) + self.assertEqual( + list(mi.sample(range(10**5), k=5)), + [16845, 79805, 76057, 58302, 40472], + ) + + seed(8675309) + self.assertEqual( + list(mi.sample(range(10**5), counts=[1, 2] * (10**5 // 2), k=5)), + [87899, 53203, 38868, 11230, 50705], + ) + + seed(8675309) + self.assertEqual( + list(mi.sample(range(10**5), weights=range(1, 10**5 + 1), k=5)), + [50915, 33816, 32250, 98284, 43517], + ) + def test_unit_case(self): """Test against a fixed case by seeding the random module.""" # Beware that this test really just verifies random.random() behavior. @@ -5969,3 +6287,134 @@ class DoubleStarMapTests(TestCase): actual = list(mi.doublestarmap(lambda x: x, [])) expected = [] self.assertEqual(actual, expected) + + +class ArgMinArgMaxTests(TestCase): + def test_basic(self): + for i, iterable, expected_min, expected_max in ( + (1, [10, 2, 20, 5, 17, 4], 1, 2), + (2, [10, -2, -20, 5, 17, 4], 2, 4), + (3, [10, 10, 20, 10], 0, 2), + (4, [30, 30, 20, 30], 2, 0), + ): + with self.subTest(i=i): + self.assertEqual(mi.argmin(iterable), expected_min) + self.assertEqual(mi.argmax(iterable), expected_max) + + def test_key(self): + for i, iterable, key, expected_min, expected_max in ( + (1, [10, -2, -20, 5, 17, 4], abs, 1, 2), + ( + 2, + [[0] * 10, [0] * 5, [0] * 3, [0] * 12, [0] * 2, [0] * 3], + len, + 4, + 3, + ), + ): + with self.subTest(i=i): + self.assertEqual(mi.argmin(iterable, key=key), expected_min) + self.assertEqual(mi.argmax(iterable, key=key), expected_max) + + +class ExtractTests(TestCase): + def test_basics(self): + extract = mi.extract + data = 'abcdefghijklmnopqrstuvwxyz' + + # Test iterator inputs, increasing and decreasing indices, and repeats. + self.assertEqual( + list(extract(iter(data), iter([7, 4, 11, 11, 14]))), + ['h', 'e', 'l', 'l', 'o'], + ) + + # Empty indices + self.assertEqual(list(extract(iter(data), iter([]))), []) + + # Result is an iterator + iterator = extract('abc', [0, 1, 2]) + self.assertTrue(hasattr(iterator, '__next__')) + + # Error cases + + with self.assertRaises(TypeError): + list(extract(None, [])) # Non-iterable data source + with self.assertRaises(TypeError): + list(extract(data, None)) # Non-iterable indices + with self.assertRaises(ValueError): + list(extract(data, [0.0, 1.0, 2.0])) # Non-integer indices + with self.assertRaises(ValueError): + list(extract(data, [1, 2, -3])) # Negative indices + with self.assertRaises(IndexError): + list(extract(data, [1, 2, len(data)])) # Indices out of range + + def test_negative_one_bug(self): + # When the lowest index was exactly -1, it matched the initial + # iterator_position of -1 giving a zero advance step. + extract = mi.extract + + with self.assertRaises(ValueError): + list(extract('abcdefg', [1, 2, -1])) + + def test_none_value_bug(self): + # The buffer used to be a list with unused slots marked with None. + # The mark got conflated with None values in the data stream. + extract = mi.extract + data = ['a', 'b', 'None', 'c', 'd'] + self.assertEqual(list(extract(data, range(5))), data) + + def test_all_orderings(self): + # Thorough test for all cases of five indices to detect + # obscure corner case bugs. + extract = mi.extract + + data = 'abcdefg' + for indices in product(range(6), repeat=5): + with self.subTest(indices=indices): + actual = tuple(extract(data, indices)) + expected = itemgetter(*indices)(data) + self.assertEqual(actual, expected) + + def test_early_free(self): + # No references are held for skipped values or for previously + # emitted values regardless of how long they were in the buffer. + + extract = mi.extract + + class TrackDels(str): + def __del__(self): + dead.add(str(self)) + + dead = set() + iterator = extract(map(TrackDels, 'ABCDEF'), [3, 2, 4, 5]) + + value = next(iterator) + gc.collect() # Force collection on PyPy. + self.assertEqual(value, 'D') # Returns D. Buffered C is alive. + self.assertEqual(dead, {'A', 'B'}) # A and B are dead. + + value = next(iterator) + gc.collect() # Force collection on PyPy + self.assertEqual(value, 'C') # Returns C. + + value = next(iterator) + gc.collect() # Force collection on PyPy + self.assertEqual(value, 'E') # Returns E. + self.assertEqual(dead, {'A', 'B', 'D', 'C'}) # D and C are now dead. + + def test_lazy_consumption(self): + extract = mi.extract + + input_stream = mi.peekable(iter('ABCDEFGHIJKLM')) + iterator = extract(input_stream, [4, 2, 10]) + + self.assertEqual(next(iterator), 'E') # C is still buffered + self.assertEqual(input_stream.peek(), 'F') + + self.assertEqual(next(iterator), 'C') + self.assertEqual(input_stream.peek(), 'F') + + # Infinite input + self.assertEqual( + list(extract(count(), [5, 7, 3, 9, 4])), [5, 7, 3, 9, 4] + ) diff --git a/contrib/python/more-itertools/py3/tests/test_recipes.py b/contrib/python/more-itertools/py3/tests/test_recipes.py index 63be39426eb..2ee5db6f855 100644 --- a/contrib/python/more-itertools/py3/tests/test_recipes.py +++ b/contrib/python/more-itertools/py3/tests/test_recipes.py @@ -3,14 +3,17 @@ from decimal import Decimal from doctest import DocTestSuite from fractions import Fraction from functools import reduce -from itertools import combinations, count, groupby, permutations +from itertools import combinations, count, groupby, permutations, islice from operator import mul from math import comb, prod, factorial +from statistics import mean from sys import version_info from unittest import TestCase, skipIf from unittest.mock import patch import more_itertools as mi +import statistics +import random def load_tests(loader, tests, ignore): @@ -1124,6 +1127,59 @@ class ReshapeTests(TestCase): actual = list(mi.reshape(matrix, cols)) self.assertEqual(actual, expected) + def test_multidimensional(self): + reshape = mi.reshape + + def shape(tensor): + if not hasattr(tensor, '__iter__'): + return () + seq = list(tensor) + return (len(seq),) + shape(seq[0]) + + matrix = [(0, 1), (2, 3), (4, 5)] + self.assertEqual(shape(matrix), (3, 2)) + + for new_shape in [ + (2, 3), + (6,), + (6, 1), + (1, 6), + (2, 1, 3, 1), + (1, 1, 3, 1, 2), + ]: + with self.subTest(new_shape=new_shape): + new_matrix = reshape(matrix, new_shape) + self.assertEqual(shape(new_matrix), new_shape) + + # Truncation: Input larger than the requested shape + self.assertEqual(list(reshape(matrix, [3])), [0, 1, 2]) + + # Incomplete structure: Input smaller than the requested shape + self.assertEqual(list(reshape(matrix, [8])), [0, 1, 2, 3, 4, 5]) + + # Str and bytes treated as scalars + word_matrix = [[['ab', b'de', 'gh', b'jk']]] # Shape: 1 x 1 x 4 + self.assertEqual( + list(reshape(word_matrix, (2, 2))), + [('ab', b'de'), ('gh', b'jk')], + ) + + # Empty input + self.assertEqual(list(reshape([[]], shape=(1,))), []) + + # Non-uniform input: scalar where a tensor is expected + with self.assertRaises(TypeError): + list(mi.reshape([[10, 20, 30], 40], shape=(4,))) + + # Non-integer indices + with self.assertRaises((TypeError, ValueError)): + matrix = [(0, 1), (2, 3), (4, 5)] + list(reshape(matrix, ('a', 'b', 'c'))) + + # Indices smaller than one + with self.assertRaises(ValueError): + list(reshape(matrix, (6, 0, 1))) + class MatMulTests(TestCase): def test_n_by_n(self): @@ -1438,3 +1494,79 @@ class MultinomialTests(TestCase): multinomial(5, 'x') # No non-numeric inputs with self.assertRaises(TypeError): multinomial([5, 7]) # No sequence inputs + + +class RunningMedianTests(TestCase): + def test_vs_statistics_median(self): + running_median = mi.running_median + + for data in [ + random.choices(range(-500, 500), k=500), + # Apply unary plus to force context rounding. + [+Decimal(random.uniform(-500, 500)) for _ in range(500)], + [ + Fraction(random.randrange(-500, 500), random.randrange(1, 500)) + for _ in range(500) + ], + ]: + with self.subTest(data=data): + for k, rm in enumerate(running_median(iter(data)), start=1): + expected = statistics.median(data[:k]) + self.assertEqual(rm, expected) + self.assertEqual(type(rm), type(expected)) + + self.assertEqual(list(running_median([])), []) # Empty input + + def test_vs_statistics_median_windowed(self): + running_median = mi.running_median + size = 10 + + for data in [ + random.choices(range(-500, 500), k=500), + # Apply unary plus to force context rounding. + [+Decimal(random.uniform(-500, 500)) for _ in range(500)], + [ + Fraction(random.randrange(-500, 500), random.randrange(1, 500)) + for _ in range(500) + ], + ]: + with self.subTest(data=data): + iterator = running_median(iter(data), maxlen=size) + for k, rm in enumerate(iterator, start=1): + expected = statistics.median(data[max(0, k - size) : k]) + self.assertEqual(rm, expected) + self.assertEqual(type(rm), type(expected)) + + self.assertEqual(list(running_median([], maxlen=1)), []) # Empty input + + # Window size of 1 should return the original dataset unchanged + data = random.choices(range(-500, 500), k=500) + self.assertEqual(list(running_median(data, maxlen=1)), data) + + # Window size of 2 is a moving average of pairs + data = random.choices(range(-500, 500), k=500) + expected = list(map(mean, mi.pairwise(data))) + actual = list(islice(running_median(data, maxlen=2), 1, None)) + self.assertEqual(actual, expected) + + # A window larger than the dataset should give the same + # result as an unbounded running median. + data = random.choices(range(-500, 500), k=500) + self.assertEqual( + list(running_median(data, maxlen=600)), list(running_median(data)) + ) + + def test_error_cases(self): + running_median = mi.running_median + with self.assertRaises(TypeError): + running_median(1234) # Non-iterable input + with self.assertRaises(TypeError): + running_median([], maxlen=3.0) # Non-integer type for window size + with self.assertRaises(ValueError): + running_median([], maxlen=0) # Invalid window size + with self.assertRaises(TypeError): + list(running_median([3 + 4j, 5 - 7j])) # Unorderable input type + with self.assertRaises(TypeError): + list( + running_median(['abc', 'def', 'ghi']) + ) # Input type that doesn't support division |
