aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py
diff options
context:
space:
mode:
authormaxim-yurchuk <maxim-yurchuk@yandex-team.com>2024-10-09 12:29:46 +0300
committermaxim-yurchuk <maxim-yurchuk@yandex-team.com>2024-10-09 13:14:22 +0300
commit9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80 (patch)
treea8fb3181d5947c0d78cf402aa56e686130179049 /contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py
parenta44b779cd359f06c3ebbef4ec98c6b38609d9d85 (diff)
downloadydb-9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80.tar.gz
publishFullContrib: true for ydb
<HIDDEN_URL> commit_hash:c82a80ac4594723cebf2c7387dec9c60217f603e
Diffstat (limited to 'contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py')
-rw-r--r--contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py2109
1 files changed, 2109 insertions, 0 deletions
diff --git a/contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py b/contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py
new file mode 100644
index 0000000000..3a2df58326
--- /dev/null
+++ b/contrib/python/zope.interface/py2/zope/interface/tests/test_adapter.py
@@ -0,0 +1,2109 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Adapter registry tests
+"""
+import unittest
+
+from __tests__.tests import OptimizationTestMixin
+
+# pylint:disable=inherit-non-class,protected-access,too-many-lines
+# pylint:disable=attribute-defined-outside-init,blacklisted-name
+
+def _makeInterfaces():
+ from zope.interface import Interface
+
+ class IB0(Interface):
+ pass
+ class IB1(IB0):
+ pass
+ class IB2(IB0):
+ pass
+ class IB3(IB2, IB1):
+ pass
+ class IB4(IB1, IB2):
+ pass
+
+ class IF0(Interface):
+ pass
+ class IF1(IF0):
+ pass
+
+ class IR0(Interface):
+ pass
+ class IR1(IR0):
+ pass
+
+ return IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1
+
+
+# Custom types to use as part of the AdapterRegistry data structures.
+# Our custom types do strict type checking to make sure
+# types propagate through the data tree as expected.
+class CustomDataTypeBase(object):
+ _data = None
+ def __getitem__(self, name):
+ return self._data[name]
+
+ def __setitem__(self, name, value):
+ self._data[name] = value
+
+ def __delitem__(self, name):
+ del self._data[name]
+
+ def __len__(self):
+ return len(self._data)
+
+ def __contains__(self, name):
+ return name in self._data
+
+ def __eq__(self, other):
+ if other is self:
+ return True
+ # pylint:disable=unidiomatic-typecheck
+ if type(other) != type(self):
+ return False
+ return other._data == self._data
+
+ def __repr__(self):
+ return repr(self._data)
+
+class CustomMapping(CustomDataTypeBase):
+ def __init__(self, other=None):
+ self._data = {}
+ if other:
+ self._data.update(other)
+ self.get = self._data.get
+ self.items = self._data.items
+
+
+class CustomSequence(CustomDataTypeBase):
+ def __init__(self, other=None):
+ self._data = []
+ if other:
+ self._data.extend(other)
+ self.append = self._data.append
+
+class CustomLeafSequence(CustomSequence):
+ pass
+
+class CustomProvided(CustomMapping):
+ pass
+
+
+class BaseAdapterRegistryTests(unittest.TestCase):
+
+ maxDiff = None
+
+ def _getBaseAdapterRegistry(self):
+ from zope.interface.adapter import BaseAdapterRegistry
+ return BaseAdapterRegistry
+
+ def _getTargetClass(self):
+ BaseAdapterRegistry = self._getBaseAdapterRegistry()
+ class _CUT(BaseAdapterRegistry):
+ class LookupClass(object):
+ _changed = _extendors = ()
+ def __init__(self, reg):
+ pass
+ def changed(self, orig):
+ self._changed += (orig,)
+ def add_extendor(self, provided):
+ self._extendors += (provided,)
+ def remove_extendor(self, provided):
+ self._extendors = tuple([x for x in self._extendors
+ if x != provided])
+ for name in BaseAdapterRegistry._delegated:
+ setattr(_CUT.LookupClass, name, object())
+ return _CUT
+
+ def _makeOne(self):
+ return self._getTargetClass()()
+
+ def _getMappingType(self):
+ return dict
+
+ def _getProvidedType(self):
+ return dict
+
+ def _getMutableListType(self):
+ return list
+
+ def _getLeafSequenceType(self):
+ return tuple
+
+ def test_lookup_delegation(self):
+ CUT = self._getTargetClass()
+ registry = CUT()
+ for name in CUT._delegated:
+ self.assertIs(getattr(registry, name), getattr(registry._v_lookup, name))
+
+ def test__generation_on_first_creation(self):
+ registry = self._makeOne()
+ # Bumped to 1 in BaseAdapterRegistry.__init__
+ self.assertEqual(registry._generation, 1)
+
+ def test__generation_after_calling_changed(self):
+ registry = self._makeOne()
+ orig = object()
+ registry.changed(orig)
+ # Bumped to 1 in BaseAdapterRegistry.__init__
+ self.assertEqual(registry._generation, 2)
+ self.assertEqual(registry._v_lookup._changed, (registry, orig,))
+
+ def test__generation_after_changing___bases__(self):
+ class _Base(object):
+ pass
+ registry = self._makeOne()
+ registry.__bases__ = (_Base,)
+ self.assertEqual(registry._generation, 2)
+
+ def _check_basic_types_of_adapters(self, registry, expected_order=2):
+ self.assertEqual(len(registry._adapters), expected_order) # order 0 and order 1
+ self.assertIsInstance(registry._adapters, self._getMutableListType())
+ MT = self._getMappingType()
+ for mapping in registry._adapters:
+ self.assertIsInstance(mapping, MT)
+ self.assertEqual(registry._adapters[0], MT())
+ self.assertIsInstance(registry._adapters[1], MT)
+ self.assertEqual(len(registry._adapters[expected_order - 1]), 1)
+
+ def _check_basic_types_of_subscribers(self, registry, expected_order=2):
+ self.assertEqual(len(registry._subscribers), expected_order) # order 0 and order 1
+ self.assertIsInstance(registry._subscribers, self._getMutableListType())
+ MT = self._getMappingType()
+ for mapping in registry._subscribers:
+ self.assertIsInstance(mapping, MT)
+ if expected_order:
+ self.assertEqual(registry._subscribers[0], MT())
+ self.assertIsInstance(registry._subscribers[1], MT)
+ self.assertEqual(len(registry._subscribers[expected_order - 1]), 1)
+
+ def test_register(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.register([IB0], IR0, '', 'A1')
+ self.assertEqual(registry.registered([IB0], IR0, ''), 'A1')
+ self.assertEqual(registry._generation, 2)
+ self._check_basic_types_of_adapters(registry)
+ MT = self._getMappingType()
+ self.assertEqual(registry._adapters[1], MT({
+ IB0: MT({
+ IR0: MT({'': 'A1'})
+ })
+ }))
+ PT = self._getProvidedType()
+ self.assertEqual(registry._provided, PT({
+ IR0: 1
+ }))
+
+ registered = list(registry.allRegistrations())
+ self.assertEqual(registered, [(
+ (IB0,), # required
+ IR0, # provided
+ '', # name
+ 'A1' # value
+ )])
+
+ def test_register_multiple_allRegistrations(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ # Use several different depths and several different names
+ registry.register([], IR0, '', 'A1')
+ registry.register([], IR0, 'name1', 'A2')
+
+ registry.register([IB0], IR0, '', 'A1')
+ registry.register([IB0], IR0, 'name2', 'A2')
+ registry.register([IB0], IR1, '', 'A3')
+ registry.register([IB0], IR1, 'name3', 'A4')
+
+ registry.register([IB0, IB1], IR0, '', 'A1')
+ registry.register([IB0, IB2], IR0, 'name2', 'A2')
+ registry.register([IB0, IB2], IR1, 'name4', 'A4')
+ registry.register([IB0, IB3], IR1, '', 'A3')
+
+ def build_adapters(L, MT):
+ return L([
+ # 0
+ MT({
+ IR0: MT({
+ '': 'A1',
+ 'name1': 'A2'
+ })
+ }),
+ # 1
+ MT({
+ IB0: MT({
+ IR0: MT({
+ '': 'A1',
+ 'name2': 'A2'
+ }),
+ IR1: MT({
+ '': 'A3',
+ 'name3': 'A4'
+ })
+ })
+ }),
+ # 3
+ MT({
+ IB0: MT({
+ IB1: MT({
+ IR0: MT({'': 'A1'})
+ }),
+ IB2: MT({
+ IR0: MT({'name2': 'A2'}),
+ IR1: MT({'name4': 'A4'}),
+ }),
+ IB3: MT({
+ IR1: MT({'': 'A3'})
+ })
+ }),
+ }),
+ ])
+
+ self.assertEqual(registry._adapters,
+ build_adapters(L=self._getMutableListType(),
+ MT=self._getMappingType()))
+
+ registered = sorted(registry.allRegistrations())
+ self.assertEqual(registered, [
+ ((), IR0, '', 'A1'),
+ ((), IR0, 'name1', 'A2'),
+ ((IB0,), IR0, '', 'A1'),
+ ((IB0,), IR0, 'name2', 'A2'),
+ ((IB0,), IR1, '', 'A3'),
+ ((IB0,), IR1, 'name3', 'A4'),
+ ((IB0, IB1), IR0, '', 'A1'),
+ ((IB0, IB2), IR0, 'name2', 'A2'),
+ ((IB0, IB2), IR1, 'name4', 'A4'),
+ ((IB0, IB3), IR1, '', 'A3')
+ ])
+
+ # We can duplicate to another object.
+ registry2 = self._makeOne()
+ for args in registered:
+ registry2.register(*args)
+
+ self.assertEqual(registry2._adapters, registry._adapters)
+ self.assertEqual(registry2._provided, registry._provided)
+
+ # We can change the types and rebuild the data structures.
+ registry._mappingType = CustomMapping
+ registry._leafSequenceType = CustomLeafSequence
+ registry._sequenceType = CustomSequence
+ registry._providedType = CustomProvided
+ def addValue(existing, new):
+ existing = existing if existing is not None else CustomLeafSequence()
+ existing.append(new)
+ return existing
+ registry._addValueToLeaf = addValue
+
+ registry.rebuild()
+
+ self.assertEqual(registry._adapters,
+ build_adapters(
+ L=CustomSequence,
+ MT=CustomMapping
+ ))
+
+ def test_register_with_invalid_name(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ with self.assertRaises(ValueError):
+ registry.register([IB0], IR0, object(), 'A1')
+
+ def test_register_with_value_None_unregisters(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.register([None], IR0, '', 'A1')
+ registry.register([None], IR0, '', None)
+ self.assertEqual(len(registry._adapters), 0)
+ self.assertIsInstance(registry._adapters, self._getMutableListType())
+ registered = list(registry.allRegistrations())
+ self.assertEqual(registered, [])
+
+ def test_register_with_same_value(self):
+ from zope.interface import Interface
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ _value = object()
+ registry.register([None], IR0, '', _value)
+ _before = registry._generation
+ registry.register([None], IR0, '', _value)
+ self.assertEqual(registry._generation, _before) # skipped changed()
+ self._check_basic_types_of_adapters(registry)
+ MT = self._getMappingType()
+ self.assertEqual(registry._adapters[1], MT(
+ {
+ Interface: MT(
+ {
+ IR0: MT({'': _value})
+ }
+ )
+ }
+ ))
+ registered = list(registry.allRegistrations())
+ self.assertEqual(registered, [(
+ (Interface,), # required
+ IR0, # provided
+ '', # name
+ _value # value
+ )])
+
+
+ def test_registered_empty(self):
+ registry = self._makeOne()
+ self.assertEqual(registry.registered([None], None, ''), None)
+ registered = list(registry.allRegistrations())
+ self.assertEqual(registered, [])
+
+ def test_registered_non_empty_miss(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.register([IB1], None, '', 'A1')
+ self.assertEqual(registry.registered([IB2], None, ''), None)
+
+ def test_registered_non_empty_hit(self):
+ registry = self._makeOne()
+ registry.register([None], None, '', 'A1')
+ self.assertEqual(registry.registered([None], None, ''), 'A1')
+
+ def test_unregister_empty(self):
+ registry = self._makeOne()
+ registry.unregister([None], None, '') # doesn't raise
+ self.assertEqual(registry.registered([None], None, ''), None)
+ self.assertEqual(len(registry._provided), 0)
+
+ def test_unregister_non_empty_miss_on_required(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.register([IB1], None, '', 'A1')
+ registry.unregister([IB2], None, '') # doesn't raise
+ self.assertEqual(registry.registered([IB1], None, ''), 'A1')
+ self._check_basic_types_of_adapters(registry)
+ MT = self._getMappingType()
+ self.assertEqual(registry._adapters[1], MT(
+ {
+ IB1: MT(
+ {
+ None: MT({'': 'A1'})
+ }
+ )
+ }
+ ))
+ PT = self._getProvidedType()
+ self.assertEqual(registry._provided, PT({
+ None: 1
+ }))
+
+ def test_unregister_non_empty_miss_on_name(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.register([IB1], None, '', 'A1')
+ registry.unregister([IB1], None, 'nonesuch') # doesn't raise
+ self.assertEqual(registry.registered([IB1], None, ''), 'A1')
+ self._check_basic_types_of_adapters(registry)
+ MT = self._getMappingType()
+ self.assertEqual(registry._adapters[1], MT(
+ {
+ IB1: MT(
+ {
+ None: MT({'': 'A1'})
+ }
+ )
+ }
+ ))
+ PT = self._getProvidedType()
+ self.assertEqual(registry._provided, PT({
+ None: 1
+ }))
+
+ def test_unregister_with_value_not_None_miss(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ orig = object()
+ nomatch = object()
+ registry.register([IB1], None, '', orig)
+ registry.unregister([IB1], None, '', nomatch) #doesn't raise
+ self.assertIs(registry.registered([IB1], None, ''), orig)
+
+ def test_unregister_hit_clears_empty_subcomponents(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ one = object()
+ another = object()
+ registry.register([IB1, IB2], None, '', one)
+ registry.register([IB1, IB3], None, '', another)
+ self._check_basic_types_of_adapters(registry, expected_order=3)
+ self.assertIn(IB2, registry._adapters[2][IB1])
+ self.assertIn(IB3, registry._adapters[2][IB1])
+ MT = self._getMappingType()
+ self.assertEqual(registry._adapters[2], MT(
+ {
+ IB1: MT(
+ {
+ IB2: MT({None: MT({'': one})}),
+ IB3: MT({None: MT({'': another})})
+ }
+ )
+ }
+ ))
+ PT = self._getProvidedType()
+ self.assertEqual(registry._provided, PT({
+ None: 2
+ }))
+
+ registry.unregister([IB1, IB3], None, '', another)
+ self.assertIn(IB2, registry._adapters[2][IB1])
+ self.assertNotIn(IB3, registry._adapters[2][IB1])
+ self.assertEqual(registry._adapters[2], MT(
+ {
+ IB1: MT(
+ {
+ IB2: MT({None: MT({'': one})}),
+ }
+ )
+ }
+ ))
+ self.assertEqual(registry._provided, PT({
+ None: 1
+ }))
+
+ def test_unsubscribe_empty(self):
+ registry = self._makeOne()
+ registry.unsubscribe([None], None, '') #doesn't raise
+ self.assertEqual(registry.registered([None], None, ''), None)
+ self._check_basic_types_of_subscribers(registry, expected_order=0)
+
+ def test_unsubscribe_hit(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ orig = object()
+ registry.subscribe([IB1], None, orig)
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ PT = self._getProvidedType()
+ self._check_basic_types_of_subscribers(registry)
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ None: MT({
+ '': L((orig,))
+ })
+ })
+ }))
+ self.assertEqual(registry._provided, PT({}))
+ registry.unsubscribe([IB1], None, orig) #doesn't raise
+ self.assertEqual(len(registry._subscribers), 0)
+ self.assertEqual(registry._provided, PT({}))
+
+ def assertLeafIdentity(self, leaf1, leaf2):
+ """
+ Implementations may choose to use new, immutable objects
+ instead of mutating existing subscriber leaf objects, or vice versa.
+
+ The default implementation uses immutable tuples, so they are never
+ the same. Other implementations may use persistent lists so they should be
+ the same and mutated in place. Subclasses testing this behaviour need to
+ override this method.
+ """
+ self.assertIsNot(leaf1, leaf2)
+
+ def test_unsubscribe_after_multiple(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ first = object()
+ second = object()
+ third = object()
+ fourth = object()
+ registry.subscribe([IB1], None, first)
+ registry.subscribe([IB1], None, second)
+ registry.subscribe([IB1], IR0, third)
+ registry.subscribe([IB1], IR0, fourth)
+ self._check_basic_types_of_subscribers(registry, expected_order=2)
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ PT = self._getProvidedType()
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ None: MT({'': L((first, second))}),
+ IR0: MT({'': L((third, fourth))}),
+ })
+ }))
+ self.assertEqual(registry._provided, PT({
+ IR0: 2
+ }))
+ # The leaf objects may or may not stay the same as they are unsubscribed,
+ # depending on the implementation
+ IR0_leaf_orig = registry._subscribers[1][IB1][IR0]['']
+ Non_leaf_orig = registry._subscribers[1][IB1][None]['']
+
+ registry.unsubscribe([IB1], None, first)
+ registry.unsubscribe([IB1], IR0, third)
+
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ None: MT({'': L((second,))}),
+ IR0: MT({'': L((fourth,))}),
+ })
+ }))
+ self.assertEqual(registry._provided, PT({
+ IR0: 1
+ }))
+ IR0_leaf_new = registry._subscribers[1][IB1][IR0]['']
+ Non_leaf_new = registry._subscribers[1][IB1][None]['']
+
+ self.assertLeafIdentity(IR0_leaf_orig, IR0_leaf_new)
+ self.assertLeafIdentity(Non_leaf_orig, Non_leaf_new)
+
+ registry.unsubscribe([IB1], None, second)
+ registry.unsubscribe([IB1], IR0, fourth)
+ self.assertEqual(len(registry._subscribers), 0)
+ self.assertEqual(len(registry._provided), 0)
+
+ def test_subscribe_unsubscribe_identical_objects_provided(self):
+ # https://github.com/zopefoundation/zope.interface/issues/227
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ first = object()
+ registry.subscribe([IB1], IR0, first)
+ registry.subscribe([IB1], IR0, first)
+
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ PT = self._getProvidedType()
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ IR0: MT({'': L((first, first))}),
+ })
+ }))
+ self.assertEqual(registry._provided, PT({
+ IR0: 2
+ }))
+
+ registry.unsubscribe([IB1], IR0, first)
+ registry.unsubscribe([IB1], IR0, first)
+ self.assertEqual(len(registry._subscribers), 0)
+ self.assertEqual(registry._provided, PT())
+
+ def test_subscribe_unsubscribe_nonequal_objects_provided(self):
+ # https://github.com/zopefoundation/zope.interface/issues/227
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ first = object()
+ second = object()
+ registry.subscribe([IB1], IR0, first)
+ registry.subscribe([IB1], IR0, second)
+
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ PT = self._getProvidedType()
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ IR0: MT({'': L((first, second))}),
+ })
+ }))
+ self.assertEqual(registry._provided, PT({
+ IR0: 2
+ }))
+
+ registry.unsubscribe([IB1], IR0, first)
+ registry.unsubscribe([IB1], IR0, second)
+ self.assertEqual(len(registry._subscribers), 0)
+ self.assertEqual(registry._provided, PT())
+
+ def test_subscribed_empty(self):
+ registry = self._makeOne()
+ self.assertIsNone(registry.subscribed([None], None, ''))
+ subscribed = list(registry.allSubscriptions())
+ self.assertEqual(subscribed, [])
+
+ def test_subscribed_non_empty_miss(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.subscribe([IB1], IF0, 'A1')
+ # Mismatch required
+ self.assertIsNone(registry.subscribed([IB2], IF0, ''))
+ # Mismatch provided
+ self.assertIsNone(registry.subscribed([IB1], IF1, ''))
+ # Mismatch value
+ self.assertIsNone(registry.subscribed([IB1], IF0, ''))
+
+ def test_subscribed_non_empty_hit(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.subscribe([IB0], IF0, 'A1')
+ self.assertEqual(registry.subscribed([IB0], IF0, 'A1'), 'A1')
+
+ def test_unsubscribe_w_None_after_multiple(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ first = object()
+ second = object()
+
+ registry.subscribe([IB1], None, first)
+ registry.subscribe([IB1], None, second)
+ self._check_basic_types_of_subscribers(registry, expected_order=2)
+ registry.unsubscribe([IB1], None)
+ self.assertEqual(len(registry._subscribers), 0)
+
+ def test_unsubscribe_non_empty_miss_on_required(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.subscribe([IB1], None, 'A1')
+ self._check_basic_types_of_subscribers(registry, expected_order=2)
+ registry.unsubscribe([IB2], None, '') # doesn't raise
+ self.assertEqual(len(registry._subscribers), 2)
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ None: MT({'': L(('A1',))}),
+ })
+ }))
+
+ def test_unsubscribe_non_empty_miss_on_value(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ registry.subscribe([IB1], None, 'A1')
+ self._check_basic_types_of_subscribers(registry, expected_order=2)
+ registry.unsubscribe([IB1], None, 'A2') # doesn't raise
+ self.assertEqual(len(registry._subscribers), 2)
+ MT = self._getMappingType()
+ L = self._getLeafSequenceType()
+ self.assertEqual(registry._subscribers[1], MT({
+ IB1: MT({
+ None: MT({'': L(('A1',))}),
+ })
+ }))
+
+ def test_unsubscribe_with_value_not_None_miss(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ orig = object()
+ nomatch = object()
+ registry.subscribe([IB1], None, orig)
+ registry.unsubscribe([IB1], None, nomatch) #doesn't raise
+ self.assertEqual(len(registry._subscribers), 2)
+
+ def _instance_method_notify_target(self):
+ self.fail("Example method, not intended to be called.")
+
+ def test_unsubscribe_instance_method(self):
+ # Checking that the values are compared by equality, not identity
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ self.assertEqual(len(registry._subscribers), 0)
+ registry.subscribe([IB1], None, self._instance_method_notify_target)
+ registry.unsubscribe([IB1], None, self._instance_method_notify_target)
+ self.assertEqual(len(registry._subscribers), 0)
+
+ def test_subscribe_multiple_allRegistrations(self):
+ IB0, IB1, IB2, IB3, IB4, IF0, IF1, IR0, IR1 = _makeInterfaces() # pylint:disable=unused-variable
+ registry = self._makeOne()
+ # Use several different depths and several different values
+ registry.subscribe([], IR0, 'A1')
+ registry.subscribe([], IR0, 'A2')
+
+ registry.subscribe([IB0], IR0, 'A1')
+ registry.subscribe([IB0], IR0, 'A2')
+ registry.subscribe([IB0], IR1, 'A3')
+ registry.subscribe([IB0], IR1, 'A4')
+
+ registry.subscribe([IB0, IB1], IR0, 'A1')
+ registry.subscribe([IB0, IB2], IR0, 'A2')
+ registry.subscribe([IB0, IB2], IR1, 'A4')
+ registry.subscribe([IB0, IB3], IR1, 'A3')
+
+
+ def build_subscribers(L, F, MT):
+ return L([
+ # 0
+ MT({
+ IR0: MT({
+ '': F(['A1', 'A2'])
+ })
+ }),
+ # 1
+ MT({
+ IB0: MT({
+ IR0: MT({
+ '': F(['A1', 'A2'])
+ }),
+ IR1: MT({
+ '': F(['A3', 'A4'])
+ })
+ })
+ }),
+ # 3
+ MT({
+ IB0: MT({
+ IB1: MT({
+ IR0: MT({'': F(['A1'])})
+ }),
+ IB2: MT({
+ IR0: MT({'': F(['A2'])}),
+ IR1: MT({'': F(['A4'])}),
+ }),
+ IB3: MT({
+ IR1: MT({'': F(['A3'])})
+ })
+ }),
+ }),
+ ])
+
+ self.assertEqual(registry._subscribers,
+ build_subscribers(
+ L=self._getMutableListType(),
+ F=self._getLeafSequenceType(),
+ MT=self._getMappingType()
+ ))
+
+ def build_provided(P):
+ return P({
+ IR0: 6,
+ IR1: 4,
+ })
+
+
+ self.assertEqual(registry._provided,
+ build_provided(P=self._getProvidedType()))
+
+ registered = sorted(registry.allSubscriptions())
+ self.assertEqual(registered, [
+ ((), IR0, 'A1'),
+ ((), IR0, 'A2'),
+ ((IB0,), IR0, 'A1'),
+ ((IB0,), IR0, 'A2'),
+ ((IB0,), IR1, 'A3'),
+ ((IB0,), IR1, 'A4'),
+ ((IB0, IB1), IR0, 'A1'),
+ ((IB0, IB2), IR0, 'A2'),
+ ((IB0, IB2), IR1, 'A4'),
+ ((IB0, IB3), IR1, 'A3')
+ ])
+
+ # We can duplicate this to another object
+ registry2 = self._makeOne()
+ for args in registered:
+ registry2.subscribe(*args)
+
+ self.assertEqual(registry2._subscribers, registry._subscribers)
+ self.assertEqual(registry2._provided, registry._provided)
+
+ # We can change the types and rebuild the data structures.
+ registry._mappingType = CustomMapping
+ registry._leafSequenceType = CustomLeafSequence
+ registry._sequenceType = CustomSequence
+ registry._providedType = CustomProvided
+ def addValue(existing, new):
+ existing = existing if existing is not None else CustomLeafSequence()
+ existing.append(new)
+ return existing
+ registry._addValueToLeaf = addValue
+
+ registry.rebuild()
+
+ self.assertEqual(registry._subscribers,
+ build_subscribers(
+ L=CustomSequence,
+ F=CustomLeafSequence,
+ MT=CustomMapping
+ ))
+
+
+class CustomTypesBaseAdapterRegistryTests(BaseAdapterRegistryTests):
+ """
+ This class may be extended by other packages to test their own
+ adapter registries that use custom types. (So be cautious about
+ breaking changes.)
+
+ One known user is ``zope.component.persistentregistry``.
+ """
+
+ def _getMappingType(self):
+ return CustomMapping
+
+ def _getProvidedType(self):
+ return CustomProvided
+
+ def _getMutableListType(self):
+ return CustomSequence
+
+ def _getLeafSequenceType(self):
+ return CustomLeafSequence
+
+ def _getBaseAdapterRegistry(self):
+ from zope.interface.adapter import BaseAdapterRegistry
+ class CustomAdapterRegistry(BaseAdapterRegistry):
+ _mappingType = self._getMappingType()
+ _sequenceType = self._getMutableListType()
+ _leafSequenceType = self._getLeafSequenceType()
+ _providedType = self._getProvidedType()
+
+ def _addValueToLeaf(self, existing_leaf_sequence, new_item):
+ if not existing_leaf_sequence:
+ existing_leaf_sequence = self._leafSequenceType()
+ existing_leaf_sequence.append(new_item)
+ return existing_leaf_sequence
+
+ def _removeValueFromLeaf(self, existing_leaf_sequence, to_remove):
+ without_removed = BaseAdapterRegistry._removeValueFromLeaf(
+ self,
+ existing_leaf_sequence,
+ to_remove)
+ existing_leaf_sequence[:] = without_removed
+ assert to_remove not in existing_leaf_sequence
+ return existing_leaf_sequence
+
+ return CustomAdapterRegistry
+
+ def assertLeafIdentity(self, leaf1, leaf2):
+ self.assertIs(leaf1, leaf2)
+
+
+class LookupBaseFallbackTests(unittest.TestCase):
+
+ def _getFallbackClass(self):
+ from zope.interface.adapter import LookupBaseFallback # pylint:disable=no-name-in-module
+ return LookupBaseFallback
+
+ _getTargetClass = _getFallbackClass
+
+ def _makeOne(self, uc_lookup=None, uc_lookupAll=None,
+ uc_subscriptions=None):
+ # pylint:disable=function-redefined
+ if uc_lookup is None:
+ def uc_lookup(self, required, provided, name):
+ pass
+ if uc_lookupAll is None:
+ def uc_lookupAll(self, required, provided):
+ raise NotImplementedError()
+ if uc_subscriptions is None:
+ def uc_subscriptions(self, required, provided):
+ raise NotImplementedError()
+ class Derived(self._getTargetClass()):
+ _uncached_lookup = uc_lookup
+ _uncached_lookupAll = uc_lookupAll
+ _uncached_subscriptions = uc_subscriptions
+ return Derived()
+
+ def test_lookup_w_invalid_name(self):
+ def _lookup(self, required, provided, name):
+ self.fail("This should never be called")
+ lb = self._makeOne(uc_lookup=_lookup)
+ with self.assertRaises(ValueError):
+ lb.lookup(('A',), 'B', object())
+
+ def test_lookup_miss_no_default(self):
+ _called_with = []
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIsNone(found)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+
+ def test_lookup_miss_w_default(self):
+ _called_with = []
+ _default = object()
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C', _default)
+ self.assertIs(found, _default)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+
+ def test_lookup_not_cached(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup_cached(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C')
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup_not_cached_multi_required(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A', 'D'), 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A', 'D'), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup_cached_multi_required(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A', 'D'), 'B', 'C')
+ found = lb.lookup(('A', 'D'), 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A', 'D'), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup_not_cached_after_changed(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C')
+ lb.changed(lb)
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIs(found, b)
+ self.assertEqual(_called_with,
+ [(('A',), 'B', 'C'), (('A',), 'B', 'C')])
+ self.assertEqual(_results, [c])
+
+ def test_lookup1_w_invalid_name(self):
+ def _lookup(self, required, provided, name):
+ self.fail("This should never be called")
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ with self.assertRaises(ValueError):
+ lb.lookup1('A', 'B', object())
+
+ def test_lookup1_miss_no_default(self):
+ _called_with = []
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIsNone(found)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+
+ def test_lookup1_miss_w_default(self):
+ _called_with = []
+ _default = object()
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C', _default)
+ self.assertIs(found, _default)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+
+ def test_lookup1_miss_w_default_negative_cache(self):
+ _called_with = []
+ _default = object()
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C', _default)
+ self.assertIs(found, _default)
+ found = lb.lookup1('A', 'B', 'C', _default)
+ self.assertIs(found, _default)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+
+ def test_lookup1_not_cached(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup1_cached(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C')
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+
+ def test_lookup1_not_cached_after_changed(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ lb = self._makeOne(uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C')
+ lb.changed(lb)
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIs(found, b)
+ self.assertEqual(_called_with,
+ [(('A',), 'B', 'C'), (('A',), 'B', 'C')])
+ self.assertEqual(_results, [c])
+
+ def test_adapter_hook_w_invalid_name(self):
+ req, prv = object(), object()
+ lb = self._makeOne()
+ with self.assertRaises(ValueError):
+ lb.adapter_hook(prv, req, object())
+
+ def test_adapter_hook_miss_no_default(self):
+ req, prv = object(), object()
+ lb = self._makeOne()
+ found = lb.adapter_hook(prv, req, '')
+ self.assertIsNone(found)
+
+ def test_adapter_hook_miss_w_default(self):
+ req, prv, _default = object(), object(), object()
+ lb = self._makeOne()
+ found = lb.adapter_hook(prv, req, '', _default)
+ self.assertIs(found, _default)
+
+ def test_adapter_hook_hit_factory_returns_None(self):
+ _f_called_with = []
+ def _factory(context):
+ _f_called_with.append(context)
+
+ def _lookup(self, required, provided, name):
+ return _factory
+ req, prv, _default = object(), object(), object()
+ lb = self._makeOne(uc_lookup=_lookup)
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, _default)
+ self.assertEqual(_f_called_with, [req])
+
+ def test_adapter_hook_hit_factory_returns_adapter(self):
+ _f_called_with = []
+ _adapter = object()
+ def _factory(context):
+ _f_called_with.append(context)
+ return _adapter
+ def _lookup(self, required, provided, name):
+ return _factory
+ req, prv, _default = object(), object(), object()
+ lb = self._makeOne(uc_lookup=_lookup)
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, _adapter)
+ self.assertEqual(_f_called_with, [req])
+
+ def test_adapter_hook_super_unwraps(self):
+ _f_called_with = []
+ def _factory(context):
+ _f_called_with.append(context)
+ return context
+ def _lookup(self, required, provided, name=''):
+ return _factory
+ required = super(LookupBaseFallbackTests, self)
+ provided = object()
+ lb = self._makeOne(uc_lookup=_lookup)
+ adapted = lb.adapter_hook(provided, required)
+ self.assertIs(adapted, self)
+ self.assertEqual(_f_called_with, [self])
+
+ def test_queryAdapter(self):
+ _f_called_with = []
+ _adapter = object()
+ def _factory(context):
+ _f_called_with.append(context)
+ return _adapter
+ def _lookup(self, required, provided, name):
+ return _factory
+ req, prv, _default = object(), object(), object()
+ lb = self._makeOne(uc_lookup=_lookup)
+ adapted = lb.queryAdapter(req, prv, 'C', _default)
+ self.assertIs(adapted, _adapter)
+ self.assertEqual(_f_called_with, [req])
+
+ def test_lookupAll_uncached(self):
+ _called_with = []
+ _results = [object(), object(), object()]
+ def _lookupAll(self, required, provided):
+ _called_with.append((required, provided))
+ return tuple(_results)
+ lb = self._makeOne(uc_lookupAll=_lookupAll)
+ found = lb.lookupAll('A', 'B')
+ self.assertEqual(found, tuple(_results))
+ self.assertEqual(_called_with, [(('A',), 'B')])
+
+ def test_lookupAll_cached(self):
+ _called_with = []
+ _results = [object(), object(), object()]
+ def _lookupAll(self, required, provided):
+ _called_with.append((required, provided))
+ return tuple(_results)
+ lb = self._makeOne(uc_lookupAll=_lookupAll)
+ found = lb.lookupAll('A', 'B')
+ found = lb.lookupAll('A', 'B')
+ self.assertEqual(found, tuple(_results))
+ self.assertEqual(_called_with, [(('A',), 'B')])
+
+ def test_subscriptions_uncached(self):
+ _called_with = []
+ _results = [object(), object(), object()]
+ def _subscriptions(self, required, provided):
+ _called_with.append((required, provided))
+ return tuple(_results)
+ lb = self._makeOne(uc_subscriptions=_subscriptions)
+ found = lb.subscriptions('A', 'B')
+ self.assertEqual(found, tuple(_results))
+ self.assertEqual(_called_with, [(('A',), 'B')])
+
+ def test_subscriptions_cached(self):
+ _called_with = []
+ _results = [object(), object(), object()]
+ def _subscriptions(self, required, provided):
+ _called_with.append((required, provided))
+ return tuple(_results)
+ lb = self._makeOne(uc_subscriptions=_subscriptions)
+ found = lb.subscriptions('A', 'B')
+ found = lb.subscriptions('A', 'B')
+ self.assertEqual(found, tuple(_results))
+ self.assertEqual(_called_with, [(('A',), 'B')])
+
+
+class LookupBaseTests(LookupBaseFallbackTests,
+ OptimizationTestMixin):
+
+ def _getTargetClass(self):
+ from zope.interface.adapter import LookupBase
+ return LookupBase
+
+
+class VerifyingBaseFallbackTests(unittest.TestCase):
+
+ def _getFallbackClass(self):
+ from zope.interface.adapter import VerifyingBaseFallback # pylint:disable=no-name-in-module
+ return VerifyingBaseFallback
+
+ _getTargetClass = _getFallbackClass
+
+ def _makeOne(self, registry, uc_lookup=None, uc_lookupAll=None,
+ uc_subscriptions=None):
+ # pylint:disable=function-redefined
+ if uc_lookup is None:
+ def uc_lookup(self, required, provided, name):
+ raise NotImplementedError()
+ if uc_lookupAll is None:
+ def uc_lookupAll(self, required, provided):
+ raise NotImplementedError()
+ if uc_subscriptions is None:
+ def uc_subscriptions(self, required, provided):
+ raise NotImplementedError()
+ class Derived(self._getTargetClass()):
+ _uncached_lookup = uc_lookup
+ _uncached_lookupAll = uc_lookupAll
+ _uncached_subscriptions = uc_subscriptions
+ def __init__(self, registry):
+ super(Derived, self).__init__()
+ self._registry = registry
+ derived = Derived(registry)
+ derived.changed(derived) # init. '_verify_ro' / '_verify_generations'
+ return derived
+
+ def _makeRegistry(self, depth):
+ class WithGeneration(object):
+ _generation = 1
+ class Registry:
+ def __init__(self, depth):
+ self.ro = [WithGeneration() for i in range(depth)]
+ return Registry(depth)
+
+ def test_lookup(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_lookup=_lookup)
+ found = lb.lookup(('A',), 'B', 'C')
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+ reg.ro[1]._generation += 1
+ found = lb.lookup(('A',), 'B', 'C')
+ self.assertIs(found, b)
+ self.assertEqual(_called_with,
+ [(('A',), 'B', 'C'), (('A',), 'B', 'C')])
+ self.assertEqual(_results, [c])
+
+ def test_lookup1(self):
+ _called_with = []
+ a, b, c = object(), object(), object()
+ _results = [a, b, c]
+ def _lookup(self, required, provided, name):
+ _called_with.append((required, provided, name))
+ return _results.pop(0)
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_lookup=_lookup)
+ found = lb.lookup1('A', 'B', 'C')
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIs(found, a)
+ self.assertEqual(_called_with, [(('A',), 'B', 'C')])
+ self.assertEqual(_results, [b, c])
+ reg.ro[1]._generation += 1
+ found = lb.lookup1('A', 'B', 'C')
+ self.assertIs(found, b)
+ self.assertEqual(_called_with,
+ [(('A',), 'B', 'C'), (('A',), 'B', 'C')])
+ self.assertEqual(_results, [c])
+
+ def test_adapter_hook(self):
+ a, b, _c = [object(), object(), object()]
+ def _factory1(context):
+ return a
+ def _factory2(context):
+ return b
+ def _factory3(context):
+ self.fail("This should never be called")
+ _factories = [_factory1, _factory2, _factory3]
+ def _lookup(self, required, provided, name):
+ return _factories.pop(0)
+ req, prv, _default = object(), object(), object()
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_lookup=_lookup)
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, a)
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, a)
+ reg.ro[1]._generation += 1
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, b)
+
+ def test_queryAdapter(self):
+ a, b, _c = [object(), object(), object()]
+ def _factory1(context):
+ return a
+ def _factory2(context):
+ return b
+ def _factory3(context):
+ self.fail("This should never be called")
+ _factories = [_factory1, _factory2, _factory3]
+ def _lookup(self, required, provided, name):
+ return _factories.pop(0)
+ req, prv, _default = object(), object(), object()
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_lookup=_lookup)
+ adapted = lb.queryAdapter(req, prv, 'C', _default)
+ self.assertIs(adapted, a)
+ adapted = lb.queryAdapter(req, prv, 'C', _default)
+ self.assertIs(adapted, a)
+ reg.ro[1]._generation += 1
+ adapted = lb.adapter_hook(prv, req, 'C', _default)
+ self.assertIs(adapted, b)
+
+ def test_lookupAll(self):
+ _results_1 = [object(), object(), object()]
+ _results_2 = [object(), object(), object()]
+ _results = [_results_1, _results_2]
+ def _lookupAll(self, required, provided):
+ return tuple(_results.pop(0))
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_lookupAll=_lookupAll)
+ found = lb.lookupAll('A', 'B')
+ self.assertEqual(found, tuple(_results_1))
+ found = lb.lookupAll('A', 'B')
+ self.assertEqual(found, tuple(_results_1))
+ reg.ro[1]._generation += 1
+ found = lb.lookupAll('A', 'B')
+ self.assertEqual(found, tuple(_results_2))
+
+ def test_subscriptions(self):
+ _results_1 = [object(), object(), object()]
+ _results_2 = [object(), object(), object()]
+ _results = [_results_1, _results_2]
+ def _subscriptions(self, required, provided):
+ return tuple(_results.pop(0))
+ reg = self._makeRegistry(3)
+ lb = self._makeOne(reg, uc_subscriptions=_subscriptions)
+ found = lb.subscriptions('A', 'B')
+ self.assertEqual(found, tuple(_results_1))
+ found = lb.subscriptions('A', 'B')
+ self.assertEqual(found, tuple(_results_1))
+ reg.ro[1]._generation += 1
+ found = lb.subscriptions('A', 'B')
+ self.assertEqual(found, tuple(_results_2))
+
+
+class VerifyingBaseTests(VerifyingBaseFallbackTests,
+ OptimizationTestMixin):
+
+ def _getTargetClass(self):
+ from zope.interface.adapter import VerifyingBase
+ return VerifyingBase
+
+
+class AdapterLookupBaseTests(unittest.TestCase):
+
+ def _getTargetClass(self):
+ from zope.interface.adapter import AdapterLookupBase
+ return AdapterLookupBase
+
+ def _makeOne(self, registry):
+ return self._getTargetClass()(registry)
+
+ def _makeSubregistry(self, *provided):
+ class Subregistry:
+ def __init__(self):
+ self._adapters = []
+ self._subscribers = []
+ return Subregistry()
+
+ def _makeRegistry(self, *provided):
+ class Registry:
+ def __init__(self, provided):
+ self._provided = provided
+ self.ro = []
+ return Registry(provided)
+
+ def test_ctor_empty_registry(self):
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ self.assertEqual(alb._extendors, {})
+
+ def test_ctor_w_registry_provided(self):
+ from zope.interface import Interface
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ alb = self._makeOne(registry)
+ self.assertEqual(sorted(alb._extendors.keys()),
+ sorted([IBar, IFoo, Interface]))
+ self.assertEqual(alb._extendors[IFoo], [IFoo, IBar])
+ self.assertEqual(alb._extendors[IBar], [IBar])
+ self.assertEqual(sorted(alb._extendors[Interface]),
+ sorted([IFoo, IBar]))
+
+ def test_changed_empty_required(self):
+ # ALB.changed expects to call a mixed in changed.
+ class Mixin(object):
+ def changed(self, *other):
+ pass
+ class Derived(self._getTargetClass(), Mixin):
+ pass
+ registry = self._makeRegistry()
+ alb = Derived(registry)
+ alb.changed(alb)
+
+ def test_changed_w_required(self):
+ # ALB.changed expects to call a mixed in changed.
+ class Mixin(object):
+ def changed(self, *other):
+ pass
+ class Derived(self._getTargetClass(), Mixin):
+ pass
+ class FauxWeakref(object):
+ _unsub = None
+ def __init__(self, here):
+ self._here = here
+ def __call__(self):
+ return self if self._here else None
+ def unsubscribe(self, target):
+ self._unsub = target
+ gone = FauxWeakref(False)
+ here = FauxWeakref(True)
+ registry = self._makeRegistry()
+ alb = Derived(registry)
+ alb._required[gone] = 1
+ alb._required[here] = 1
+ alb.changed(alb)
+ self.assertEqual(len(alb._required), 0)
+ self.assertEqual(gone._unsub, None)
+ self.assertEqual(here._unsub, alb)
+
+ def test_init_extendors_after_registry_update(self):
+ from zope.interface import Interface
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ registry._provided = [IFoo, IBar]
+ alb.init_extendors()
+ self.assertEqual(sorted(alb._extendors.keys()),
+ sorted([IBar, IFoo, Interface]))
+ self.assertEqual(alb._extendors[IFoo], [IFoo, IBar])
+ self.assertEqual(alb._extendors[IBar], [IBar])
+ self.assertEqual(sorted(alb._extendors[Interface]),
+ sorted([IFoo, IBar]))
+
+ def test_add_extendor(self):
+ from zope.interface import Interface
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ alb.add_extendor(IFoo)
+ alb.add_extendor(IBar)
+ self.assertEqual(sorted(alb._extendors.keys()),
+ sorted([IBar, IFoo, Interface]))
+ self.assertEqual(alb._extendors[IFoo], [IFoo, IBar])
+ self.assertEqual(alb._extendors[IBar], [IBar])
+ self.assertEqual(sorted(alb._extendors[Interface]),
+ sorted([IFoo, IBar]))
+
+ def test_remove_extendor(self):
+ from zope.interface import Interface
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ alb = self._makeOne(registry)
+ alb.remove_extendor(IFoo)
+ self.assertEqual(sorted(alb._extendors.keys()),
+ sorted([IFoo, IBar, Interface]))
+ self.assertEqual(alb._extendors[IFoo], [IBar])
+ self.assertEqual(alb._extendors[IBar], [IBar])
+ self.assertEqual(sorted(alb._extendors[Interface]),
+ sorted([IBar]))
+
+ # test '_subscribe' via its callers, '_uncached_lookup', etc.
+
+ def test__uncached_lookup_empty_ro(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertEqual(result, None)
+ self.assertEqual(len(alb._required), 1)
+ self.assertIn(IFoo.weakref(), alb._required)
+
+ def test__uncached_lookup_order_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertEqual(result, None)
+
+ def test__uncached_lookup_extendors_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ subr = self._makeSubregistry()
+ subr._adapters = [{}, {}] #utilities, single adapters
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertEqual(result, None)
+
+ def test__uncached_lookup_components_miss_wrong_iface(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ IQux = InterfaceClass('IQux')
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ irrelevant = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IQux: {'': irrelevant},
+ }},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertEqual(result, None)
+
+ def test__uncached_lookup_components_miss_wrong_name(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+
+ wrongname = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'wrongname': wrongname},
+ }},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertEqual(result, None)
+
+ def test__uncached_lookup_simple_hit(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _expected}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookup((IFoo,), IBar)
+ self.assertIs(result, _expected)
+
+ def test__uncached_lookup_repeated_hit(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _expected}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookup((IFoo,), IBar)
+ result2 = alb._uncached_lookup((IFoo,), IBar)
+ self.assertIs(result, _expected)
+ self.assertIs(result2, _expected)
+
+ def test_queryMultiAdaptor_lookup_miss(self):
+ from zope.interface.declarations import implementer
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ @implementer(IFoo)
+ class Foo(object):
+ pass
+ foo = Foo()
+ registry = self._makeRegistry()
+ subr = self._makeSubregistry()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.lookup = alb._uncached_lookup # provided by derived
+ subr._v_lookup = alb
+ _default = object()
+ result = alb.queryMultiAdapter((foo,), IBar, default=_default)
+ self.assertIs(result, _default)
+
+ def test_queryMultiAdapter_errors_on_attribute_access(self):
+ # Any error on attribute access previously lead to using the _empty singleton as "requires"
+ # argument (See https://github.com/zopefoundation/zope.interface/issues/162)
+ # but after https://github.com/zopefoundation/zope.interface/issues/200
+ # they get propagated.
+ from zope.interface.interface import InterfaceClass
+ from __tests__.tests import MissingSomeAttrs
+
+ IFoo = InterfaceClass('IFoo')
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ alb.lookup = alb._uncached_lookup
+
+ def test(ob):
+ return alb.queryMultiAdapter(
+ (ob,),
+ IFoo,
+ )
+
+ PY3 = str is not bytes
+ MissingSomeAttrs.test_raises(self, test,
+ expected_missing='__class__' if PY3 else '__providedBy__')
+
+ def test_queryMultiAdaptor_factory_miss(self):
+ from zope.interface.declarations import implementer
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ @implementer(IFoo)
+ class Foo(object):
+ pass
+ foo = Foo()
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ _called_with = []
+ def _factory(context):
+ _called_with.append(context)
+
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _factory}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.lookup = alb._uncached_lookup # provided by derived
+ subr._v_lookup = alb
+ _default = object()
+ result = alb.queryMultiAdapter((foo,), IBar, default=_default)
+ self.assertIs(result, _default)
+ self.assertEqual(_called_with, [foo])
+
+ def test_queryMultiAdaptor_factory_hit(self):
+ from zope.interface.declarations import implementer
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ @implementer(IFoo)
+ class Foo(object):
+ pass
+ foo = Foo()
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ _called_with = []
+ def _factory(context):
+ _called_with.append(context)
+ return _expected
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _factory}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.lookup = alb._uncached_lookup # provided by derived
+ subr._v_lookup = alb
+ _default = object()
+ result = alb.queryMultiAdapter((foo,), IBar, default=_default)
+ self.assertIs(result, _expected)
+ self.assertEqual(_called_with, [foo])
+
+ def test_queryMultiAdapter_super_unwraps(self):
+ alb = self._makeOne(self._makeRegistry())
+ def lookup(*args):
+ return factory
+ def factory(*args):
+ return args
+ alb.lookup = lookup
+
+ objects = [
+ super(AdapterLookupBaseTests, self),
+ 42,
+ "abc",
+ super(AdapterLookupBaseTests, self),
+ ]
+
+ result = alb.queryMultiAdapter(objects, None)
+ self.assertEqual(result, (
+ self,
+ 42,
+ "abc",
+ self,
+ ))
+
+ def test__uncached_lookupAll_empty_ro(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ result = alb._uncached_lookupAll((IFoo,), IBar)
+ self.assertEqual(result, ())
+ self.assertEqual(len(alb._required), 1)
+ self.assertIn(IFoo.weakref(), alb._required)
+
+ def test__uncached_lookupAll_order_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookupAll((IFoo,), IBar)
+ self.assertEqual(result, ())
+
+ def test__uncached_lookupAll_extendors_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ subr = self._makeSubregistry()
+ subr._adapters = [{}, {}] #utilities, single adapters
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookupAll((IFoo,), IBar)
+ self.assertEqual(result, ())
+
+ def test__uncached_lookupAll_components_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ IQux = InterfaceClass('IQux')
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ irrelevant = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IQux: {'': irrelevant}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookupAll((IFoo,), IBar)
+ self.assertEqual(result, ())
+
+ def test__uncached_lookupAll_simple_hit(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ _named = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _expected, 'named': _named}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_lookupAll((IFoo,), IBar)
+ self.assertEqual(sorted(result), [('', _expected), ('named', _named)])
+
+ def test_names(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _expected = object()
+ _named = object()
+ subr._adapters = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': _expected, 'named': _named}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.lookupAll = alb._uncached_lookupAll
+ subr._v_lookup = alb
+ result = alb.names((IFoo,), IBar)
+ self.assertEqual(sorted(result), ['', 'named'])
+
+ def test__uncached_subscriptions_empty_ro(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ alb = self._makeOne(registry)
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(result, [])
+ self.assertEqual(len(alb._required), 1)
+ self.assertIn(IFoo.weakref(), alb._required)
+
+ def test__uncached_subscriptions_order_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(result, [])
+
+ def test__uncached_subscriptions_extendors_miss(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry()
+ subr = self._makeSubregistry()
+ subr._subscribers = [{}, {}] #utilities, single adapters
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(result, [])
+
+ def test__uncached_subscriptions_components_miss_wrong_iface(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ IQux = InterfaceClass('IQux')
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ irrelevant = object()
+ subr._subscribers = [ #utilities, single adapters
+ {},
+ {IFoo: {IQux: {'': irrelevant}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(result, [])
+
+ def test__uncached_subscriptions_components_miss_wrong_name(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ wrongname = object()
+ subr._subscribers = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'wrongname': wrongname}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(result, [])
+
+ def test__uncached_subscriptions_simple_hit(self):
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ class Foo(object):
+ def __lt__(self, other):
+ return True
+ _exp1, _exp2 = Foo(), Foo()
+ subr._subscribers = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': (_exp1, _exp2)}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ subr._v_lookup = alb
+ result = alb._uncached_subscriptions((IFoo,), IBar)
+ self.assertEqual(sorted(result), sorted([_exp1, _exp2]))
+
+ def test_subscribers_wo_provided(self):
+ from zope.interface.declarations import implementer
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ @implementer(IFoo)
+ class Foo(object):
+ pass
+ foo = Foo()
+ registry = self._makeRegistry(IFoo, IBar)
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _called = {}
+ def _factory1(context):
+ _called.setdefault('_factory1', []).append(context)
+ def _factory2(context):
+ _called.setdefault('_factory2', []).append(context)
+ subr._subscribers = [ #utilities, single adapters
+ {},
+ {IFoo: {None: {'': (_factory1, _factory2)}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.subscriptions = alb._uncached_subscriptions
+ subr._v_lookup = alb
+ result = alb.subscribers((foo,), None)
+ self.assertEqual(result, ())
+ self.assertEqual(_called, {'_factory1': [foo], '_factory2': [foo]})
+
+ def test_subscribers_w_provided(self):
+ from zope.interface.declarations import implementer
+ from zope.interface.interface import InterfaceClass
+ IFoo = InterfaceClass('IFoo')
+ IBar = InterfaceClass('IBar', (IFoo,))
+ @implementer(IFoo)
+ class Foo(object):
+ pass
+ foo = Foo()
+ registry = self._makeRegistry(IFoo, IBar)
+ registry = self._makeRegistry(IFoo, IBar)
+ subr = self._makeSubregistry()
+ _called = {}
+ _exp1, _exp2 = object(), object()
+ def _factory1(context):
+ _called.setdefault('_factory1', []).append(context)
+ return _exp1
+ def _factory2(context):
+ _called.setdefault('_factory2', []).append(context)
+ return _exp2
+ def _side_effect_only(context):
+ _called.setdefault('_side_effect_only', []).append(context)
+
+ subr._subscribers = [ #utilities, single adapters
+ {},
+ {IFoo: {IBar: {'': (_factory1, _factory2, _side_effect_only)}}},
+ ]
+ registry.ro.append(subr)
+ alb = self._makeOne(registry)
+ alb.subscriptions = alb._uncached_subscriptions
+ subr._v_lookup = alb
+ result = alb.subscribers((foo,), IBar)
+ self.assertEqual(result, [_exp1, _exp2])
+ self.assertEqual(_called,
+ {'_factory1': [foo],
+ '_factory2': [foo],
+ '_side_effect_only': [foo],
+ })
+
+
+class VerifyingAdapterRegistryTests(unittest.TestCase):
+ # This is also the base for AdapterRegistryTests. That makes the
+ # inheritance seems backwards, but even though they implement the
+ # same interfaces, VAR and AR each only extend BAR; and neither
+ # one will pass the test cases for BAR (it uses a special
+ # LookupClass just for the tests).
+
+ def _getTargetClass(self):
+ from zope.interface.adapter import VerifyingAdapterRegistry
+ return VerifyingAdapterRegistry
+
+ def _makeOne(self, *args, **kw):
+ return self._getTargetClass()(*args, **kw)
+
+ def test_verify_object_provides_IAdapterRegistry(self):
+ from zope.interface.verify import verifyObject
+ from zope.interface.interfaces import IAdapterRegistry
+ registry = self._makeOne()
+ verifyObject(IAdapterRegistry, registry)
+
+
+class AdapterRegistryTests(VerifyingAdapterRegistryTests):
+
+ def _getTargetClass(self):
+ from zope.interface.adapter import AdapterRegistry
+ return AdapterRegistry
+
+ def test_ctor_no_bases(self):
+ ar = self._makeOne()
+ self.assertEqual(len(ar._v_subregistries), 0)
+
+ def test_ctor_w_bases(self):
+ base = self._makeOne()
+ sub = self._makeOne([base])
+ self.assertEqual(len(sub._v_subregistries), 0)
+ self.assertEqual(len(base._v_subregistries), 1)
+ self.assertIn(sub, base._v_subregistries)
+
+ # test _addSubregistry / _removeSubregistry via only caller, _setBases
+
+ def test__setBases_removing_existing_subregistry(self):
+ before = self._makeOne()
+ after = self._makeOne()
+ sub = self._makeOne([before])
+ sub.__bases__ = [after]
+ self.assertEqual(len(before._v_subregistries), 0)
+ self.assertEqual(len(after._v_subregistries), 1)
+ self.assertIn(sub, after._v_subregistries)
+
+ def test__setBases_wo_stray_entry(self):
+ before = self._makeOne()
+ stray = self._makeOne()
+ after = self._makeOne()
+ sub = self._makeOne([before])
+ sub.__dict__['__bases__'].append(stray)
+ sub.__bases__ = [after]
+ self.assertEqual(len(before._v_subregistries), 0)
+ self.assertEqual(len(after._v_subregistries), 1)
+ self.assertIn(sub, after._v_subregistries)
+
+ def test__setBases_w_existing_entry_continuing(self):
+ before = self._makeOne()
+ after = self._makeOne()
+ sub = self._makeOne([before])
+ sub.__bases__ = [before, after]
+ self.assertEqual(len(before._v_subregistries), 1)
+ self.assertEqual(len(after._v_subregistries), 1)
+ self.assertIn(sub, before._v_subregistries)
+ self.assertIn(sub, after._v_subregistries)
+
+ def test_changed_w_subregistries(self):
+ base = self._makeOne()
+ class Derived(object):
+ _changed = None
+ def changed(self, originally_changed):
+ self._changed = originally_changed
+ derived1, derived2 = Derived(), Derived()
+ base._addSubregistry(derived1)
+ base._addSubregistry(derived2)
+ orig = object()
+ base.changed(orig)
+ self.assertIs(derived1._changed, orig)
+ self.assertIs(derived2._changed, orig)
+
+
+class Test_utils(unittest.TestCase):
+
+ def test__convert_None_to_Interface_w_None(self):
+ from zope.interface.adapter import _convert_None_to_Interface
+ from zope.interface.interface import Interface
+ self.assertIs(_convert_None_to_Interface(None), Interface)
+
+ def test__convert_None_to_Interface_w_other(self):
+ from zope.interface.adapter import _convert_None_to_Interface
+ other = object()
+ self.assertIs(_convert_None_to_Interface(other), other)
+
+ def test__normalize_name_str(self):
+ from zope.interface.adapter import _normalize_name
+ STR = b'str'
+ UNICODE = u'str'
+ norm = _normalize_name(STR)
+ self.assertEqual(norm, UNICODE)
+ self.assertIsInstance(norm, type(UNICODE))
+
+ def test__normalize_name_unicode(self):
+ from zope.interface.adapter import _normalize_name
+
+ USTR = u'ustr'
+ self.assertEqual(_normalize_name(USTR), USTR)
+
+ def test__normalize_name_other(self):
+ from zope.interface.adapter import _normalize_name
+ for other in 1, 1.0, (), [], {}, object():
+ self.assertRaises(TypeError, _normalize_name, other)
+
+ # _lookup, _lookupAll, and _subscriptions tested via their callers
+ # (AdapterLookupBase.{lookup,lookupAll,subscriptions}).