diff options
author | robot-piglet <[email protected]> | 2025-07-13 23:05:26 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-07-13 23:15:55 +0300 |
commit | 8cb77383dfe491d72dd3820518a3c1b49c8487a8 (patch) | |
tree | a5f7ff010010986024367dd7755b55f723775ad0 /contrib/python/multidict/tests/test_mutable_multidict.py | |
parent | e3196ab85e3299b286ebe0f8c2d37b6841f7e5e5 (diff) |
Intermediate changes
commit_hash:78e65545069362958586e25e4c5fd36d5ac3a4a2
Diffstat (limited to 'contrib/python/multidict/tests/test_mutable_multidict.py')
-rw-r--r-- | contrib/python/multidict/tests/test_mutable_multidict.py | 104 |
1 files changed, 103 insertions, 1 deletions
diff --git a/contrib/python/multidict/tests/test_mutable_multidict.py b/contrib/python/multidict/tests/test_mutable_multidict.py index d6fe42fc255..f97646a12bd 100644 --- a/contrib/python/multidict/tests/test_mutable_multidict.py +++ b/contrib/python/multidict/tests/test_mutable_multidict.py @@ -3,7 +3,6 @@ import sys from typing import Union import pytest - from multidict import ( CIMultiDict, CIMultiDictProxy, @@ -429,6 +428,51 @@ class TestMutableMultiDict: d2 = case_sensitive_multidict_class(p) assert d2 == d + def test_merge( + self, + case_sensitive_multidict_class: type[MultiDict[Union[str, int]]], + ) -> None: + d = case_sensitive_multidict_class({"key": "one"}) + assert d == {"key": "one"} + + d.merge([("key", "other"), ("key2", "two")], key2=3, foo="bar") + assert 4 == len(d) + itms = d.items() + # we can't guarantee order of kwargs + assert ("key", "one") in itms + assert ("key2", "two") in itms + assert ("key2", 3) in itms + assert ("foo", "bar") in itms + + other = case_sensitive_multidict_class({"key": "other"}, bar="baz") + + d.merge(other) + assert ("bar", "baz") in d.items() + + d.merge({"key": "other", "boo": "moo"}) + assert ("boo", "moo") in d.items() + + d.merge() + assert 6 == len(d) + + assert ("key", "other") not in d.items() + + with pytest.raises(TypeError): + d.merge("foo", "bar") # type: ignore[arg-type, call-arg] + + def test_merge_from_proxy( + self, + case_sensitive_multidict_class: type[MultiDict[str]], + case_sensitive_multidict_proxy_class: type[MultiDictProxy[str]], + ) -> None: + d = case_sensitive_multidict_class([("a", "a"), ("b", "b")]) + proxy = case_sensitive_multidict_proxy_class(d) + + d2 = case_sensitive_multidict_class() + d2.merge(proxy) + + assert [("a", "a"), ("b", "b")] == list(d2.items()) + class TestCIMutableMultiDict: def test_getall( @@ -813,3 +857,61 @@ class TestCIMutableMultiDict: assert md.keys() == md2.keys() - {"User-Agent"} md.update([("User-Agent", b"Bacon/1.0")]) assert md.keys() == md2.keys() + + def test_update_with_crash_in_the_middle( + self, case_insensitive_multidict_class: type[CIMultiDict[str]] + ) -> None: + class Hack(str): + def lower(self) -> str: + raise RuntimeError + + d = case_insensitive_multidict_class([("a", "a"), ("b", "b")]) + with pytest.raises(RuntimeError): + lst = [("c", "c"), ("a", "a2"), (Hack("b"), "b2")] + d.update(lst) + + assert [("a", "a2"), ("b", "b"), ("c", "c")] == list(d.items()) + + +def test_multidict_shrink_regression() -> None: + """ + Regression test for _md_shrink pointer increment bug in 6.6.0. + + The bug was introduced in PR #1200 which added _md_shrink to optimize + memory usage. The bug occurs when new_ep == old_ep (first non-deleted + entry), causing new_ep to not be incremented. This results in the first + entry being overwritten and memory corruption. + + See: https://github.com/aio-libs/multidict/issues/1221 + """ + # Test case that reproduces the corruption + md: MultiDict[str] = MultiDict() + + # Create pattern: [kept, deleted, kept, kept, ...] + # This triggers new_ep == old_ep on first iteration of _md_shrink + for i in range(10): + md[f"k{i}"] = f"v{i}" + + # Delete some entries but keep the first one + # This creates the exact condition for the bug + for i in range(1, 10, 2): + del md[f"k{i}"] + + # Trigger shrink by adding many entries + # When the internal array needs to resize, it will call _md_shrink + # because md->used < md->keys->nentries + for i in range(50): + md[f"new{i}"] = f"val{i}" + + # The bug would cause k0 to be lost due to memory corruption! + assert "k0" in md, "First entry k0 was lost due to memory corruption!" + assert md["k0"] == "v0", "First entry value was corrupted!" + + # Verify all other kept entries survived + for i in range(0, 10, 2): + assert f"k{i}" in md, f"Entry k{i} missing!" + assert md[f"k{i}"] == f"v{i}", f"Entry k{i} has wrong value!" + + # Verify new entries + for i in range(50): + assert md[f"new{i}"] == f"val{i}" |