diff options
author | maxim-yurchuk <maxim-yurchuk@yandex-team.com> | 2024-10-09 12:29:46 +0300 |
---|---|---|
committer | maxim-yurchuk <maxim-yurchuk@yandex-team.com> | 2024-10-09 13:14:22 +0300 |
commit | 9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80 (patch) | |
tree | a8fb3181d5947c0d78cf402aa56e686130179049 /contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py | |
parent | a44b779cd359f06c3ebbef4ec98c6b38609d9d85 (diff) | |
download | ydb-9731d8a4bb7ee2cc8554eaf133bb85498a4c7d80.tar.gz |
publishFullContrib: true for ydb
<HIDDEN_URL>
commit_hash:c82a80ac4594723cebf2c7387dec9c60217f603e
Diffstat (limited to 'contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py')
-rw-r--r-- | contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py | 3057 |
1 files changed, 3057 insertions, 0 deletions
diff --git a/contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py b/contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py new file mode 100644 index 0000000000..81bb58a8f2 --- /dev/null +++ b/contrib/python/zope.interface/py2/zope/interface/tests/test_registry.py @@ -0,0 +1,3057 @@ +############################################################################## +# +# Copyright (c) 2001, 2002, 2009 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## +"""Component Registry Tests""" +# pylint:disable=protected-access +import unittest + +from zope.interface import Interface +from zope.interface.adapter import VerifyingAdapterRegistry + +from zope.interface.registry import Components + +class ComponentsTests(unittest.TestCase): + + def _getTargetClass(self): + return Components + + def _makeOne(self, name='test', *args, **kw): + return self._getTargetClass()(name, *args, **kw) + + def _wrapEvents(self): + from zope.interface import registry + _events = [] + def _notify(*args, **kw): + _events.append((args, kw)) + _monkey = _Monkey(registry, notify=_notify) + return _monkey, _events + + def test_ctor_no_bases(self): + from zope.interface.adapter import AdapterRegistry + comp = self._makeOne('testing') + self.assertEqual(comp.__name__, 'testing') + self.assertEqual(comp.__bases__, ()) + self.assertTrue(isinstance(comp.adapters, AdapterRegistry)) + self.assertTrue(isinstance(comp.utilities, AdapterRegistry)) + self.assertEqual(comp.adapters.__bases__, ()) + self.assertEqual(comp.utilities.__bases__, ()) + self.assertEqual(comp._utility_registrations, {}) + self.assertEqual(comp._adapter_registrations, {}) + self.assertEqual(comp._subscription_registrations, []) + self.assertEqual(comp._handler_registrations, []) + + def test_ctor_w_base(self): + base = self._makeOne('base') + comp = self._makeOne('testing', (base,)) + self.assertEqual(comp.__name__, 'testing') + self.assertEqual(comp.__bases__, (base,)) + self.assertEqual(comp.adapters.__bases__, (base.adapters,)) + self.assertEqual(comp.utilities.__bases__, (base.utilities,)) + + def test___repr__(self): + comp = self._makeOne('testing') + self.assertEqual(repr(comp), '<Components testing>') + + # test _init_registries / _init_registrations via only caller, __init__. + + def test_assign_to___bases__(self): + base1 = self._makeOne('base1') + base2 = self._makeOne('base2') + comp = self._makeOne() + comp.__bases__ = (base1, base2) + self.assertEqual(comp.__bases__, (base1, base2)) + self.assertEqual(comp.adapters.__bases__, + (base1.adapters, base2.adapters)) + self.assertEqual(comp.utilities.__bases__, + (base1.utilities, base2.utilities)) + + def test_registerUtility_with_component_name(self): + from zope.interface.declarations import named, InterfaceClass + + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + + @named(u'foo') + class Foo(object): + pass + foo = Foo() + _info = u'info' + + comp = self._makeOne() + comp.registerUtility(foo, ifoo, info=_info) + self.assertEqual( + comp._utility_registrations[ifoo, u'foo'], + (foo, _info, None)) + + def test_registerUtility_both_factory_and_component(self): + def _factory(): + raise NotImplementedError() + _to_reg = object() + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerUtility, + component=_to_reg, factory=_factory) + + def test_registerUtility_w_component(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = object() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name, _info) + self.assertTrue(comp.utilities._adapters[0][ifoo][_name] is _to_reg) + self.assertEqual(comp._utility_registrations[ifoo, _name], + (_to_reg, _info, None)) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], (_to_reg,)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + + def test_registerUtility_w_factory(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = object() + def _factory(): + return _to_reg + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(None, ifoo, _name, _info, factory=_factory) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _factory) + + def test_registerUtility_no_provided_available(self): + class Foo(object): + pass + + _info = u'info' + _name = u'name' + _to_reg = Foo() + comp = self._makeOne() + self.assertRaises(TypeError, + comp.registerUtility, _to_reg, None, _name, _info) + + def test_registerUtility_wo_provided(self): + from zope.interface.declarations import directlyProvides + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + class Foo(object): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = Foo() + directlyProvides(_to_reg, ifoo) + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, None, _name, _info) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + + def test_registerUtility_duplicates_existing_reg(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name, _info) + self.assertEqual(len(_events), 0) + + def test_registerUtility_w_different_info(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info1 = u'info1' + _info2 = u'info2' + _name = u'name' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name, _info1) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name, _info2) + self.assertEqual(len(_events), 2) # unreg, reg + self.assertEqual(comp._utility_registrations[(ifoo, _name)], + (_to_reg, _info2, None)) # replaced + self.assertEqual(comp.utilities._subscribers[0][ifoo][u''], + (_to_reg,)) + + def test_registerUtility_w_different_names_same_component(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _other_reg = object() + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_other_reg, ifoo, _name1, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name2, _info) + self.assertEqual(len(_events), 1) # reg + self.assertEqual(comp._utility_registrations[(ifoo, _name1)], + (_other_reg, _info, None)) + self.assertEqual(comp._utility_registrations[(ifoo, _name2)], + (_to_reg, _info, None)) + self.assertEqual(comp.utilities._subscribers[0][ifoo][u''], + (_other_reg, _to_reg,)) + + def test_registerUtility_replaces_existing_reg(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.interfaces import Registered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _before, _after = object(), object() + comp = self._makeOne() + comp.registerUtility(_before, ifoo, _name, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_after, ifoo, _name, _info) + self.assertEqual(len(_events), 2) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _before) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + args, kw = _events[1] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _after) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + + def test_registerUtility_w_existing_subscr(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name2, _info) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], (_to_reg,)) + + def test_registerUtility_wo_event(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = object() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerUtility(_to_reg, ifoo, _name, _info, False) + self.assertEqual(len(_events), 0) + + def test_registerUtility_changes_object_identity_after(self): + # If a subclass changes the identity of the _utility_registrations, + # the cache is updated and the right thing still happens. + class CompThatChangesAfter1Reg(self._getTargetClass()): + reg_count = 0 + def registerUtility(self, *args): + self.reg_count += 1 + super(CompThatChangesAfter1Reg, self).registerUtility(*args) + if self.reg_count == 1: + self._utility_registrations = dict(self._utility_registrations) + + comp = CompThatChangesAfter1Reg() + comp.registerUtility(object(), Interface) + + self.assertEqual(len(list(comp.registeredUtilities())), 1) + + class IFoo(Interface): + pass + + comp.registerUtility(object(), IFoo) + self.assertEqual(len(list(comp.registeredUtilities())), 2) + + def test_registerUtility_changes_object_identity_before(self): + # If a subclass changes the identity of the _utility_registrations, + # the cache is updated and the right thing still happens. + class CompThatChangesAfter2Reg(self._getTargetClass()): + reg_count = 0 + def registerUtility(self, *args): + self.reg_count += 1 + if self.reg_count == 2: + self._utility_registrations = dict(self._utility_registrations) + + super(CompThatChangesAfter2Reg, self).registerUtility(*args) + + comp = CompThatChangesAfter2Reg() + comp.registerUtility(object(), Interface) + + self.assertEqual(len(list(comp.registeredUtilities())), 1) + + class IFoo(Interface): + pass + + comp.registerUtility(object(), IFoo) + self.assertEqual(len(list(comp.registeredUtilities())), 2) + + + class IBar(Interface): + pass + + comp.registerUtility(object(), IBar) + self.assertEqual(len(list(comp.registeredUtilities())), 3) + + + def test_unregisterUtility_neither_factory_nor_component_nor_provided(self): + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterUtility, + component=None, provided=None, factory=None) + + def test_unregisterUtility_both_factory_and_component(self): + def _factory(): + raise NotImplementedError() + _to_reg = object() + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterUtility, + component=_to_reg, factory=_factory) + + def test_unregisterUtility_w_component_miss(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _name = u'name' + _to_reg = object() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterUtility(_to_reg, ifoo, _name) + self.assertFalse(unreg) + self.assertFalse(_events) + + def test_unregisterUtility_w_component(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _name = u'name' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterUtility(_to_reg, ifoo, _name) + self.assertTrue(unreg) + self.assertFalse(comp.utilities._adapters) # all erased + self.assertFalse((ifoo, _name) in comp._utility_registrations) + self.assertFalse(comp.utilities._subscribers) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.factory is None) + + def test_unregisterUtility_w_factory(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = object() + def _factory(): + return _to_reg + comp = self._makeOne() + comp.registerUtility(None, ifoo, _name, _info, factory=_factory) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterUtility(None, ifoo, _name, factory=_factory) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.factory is _factory) + + def test_unregisterUtility_wo_explicit_provided(self): + from zope.interface.declarations import directlyProvides + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + class Foo(object): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = Foo() + directlyProvides(_to_reg, ifoo) + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterUtility(_to_reg, None, _name) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + + def test_unregisterUtility_wo_component_or_factory(self): + from zope.interface.declarations import directlyProvides + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import UtilityRegistration + + class IFoo(InterfaceClass): + pass + class Foo(object): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + _to_reg = Foo() + directlyProvides(_to_reg, ifoo) + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + # Just pass the interface / name + unreg = comp.unregisterUtility(provided=ifoo, name=_name) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, UtilityRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.component is _to_reg) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is None) + + def test_unregisterUtility_w_existing_subscr(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], (_to_reg,)) + + def test_unregisterUtility_w_existing_subscr_non_hashable(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = dict() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], (_to_reg,)) + + def test_unregisterUtility_w_existing_subscr_non_hashable_fresh_cache(self): + # We correctly populate the cache of registrations if it has gone away + # (for example, the Components was unpickled) + from zope.interface.declarations import InterfaceClass + from zope.interface.registry import _UtilityRegistrations + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = dict() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + + _monkey, _events = self._wrapEvents() + with _monkey: + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], (_to_reg,)) + + def test_unregisterUtility_w_existing_subscr_non_hashable_reinitted(self): + # We correctly populate the cache of registrations if the base objects change + # out from under us + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = dict() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + + # zope.component.testing does this + comp.__init__('base') + + comp.registerUtility(_to_reg, ifoo, _name2, _info) + + _monkey, _events = self._wrapEvents() + with _monkey: + # Nothing to do, but we don't break either + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(0, len(comp.utilities._subscribers)) + + def test_unregisterUtility_w_existing_subscr_other_component(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _other_reg = object() + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_other_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], + (_other_reg,)) + + def test_unregisterUtility_w_existing_subscr_other_component_mixed_hash(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + # First register something hashable + _other_reg = object() + # Then it transfers to something unhashable + _to_reg = dict() + comp = self._makeOne() + comp.registerUtility(_other_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + _monkey, _events = self._wrapEvents() + with _monkey: + comp.unregisterUtility(_to_reg, ifoo, _name2) + self.assertEqual(comp.utilities._subscribers[0][ifoo][''], + (_other_reg,)) + + def test_registeredUtilities_empty(self): + comp = self._makeOne() + self.assertEqual(list(comp.registeredUtilities()), []) + + def test_registeredUtilities_notempty(self): + from zope.interface.declarations import InterfaceClass + + from zope.interface.registry import UtilityRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, _name1, _info) + comp.registerUtility(_to_reg, ifoo, _name2, _info) + reg = sorted(comp.registeredUtilities(), key=lambda r: r.name) + self.assertEqual(len(reg), 2) + self.assertTrue(isinstance(reg[0], UtilityRegistration)) + self.assertTrue(reg[0].registry is comp) + self.assertTrue(reg[0].provided is ifoo) + self.assertTrue(reg[0].name is _name1) + self.assertTrue(reg[0].component is _to_reg) + self.assertTrue(reg[0].info is _info) + self.assertTrue(reg[0].factory is None) + self.assertTrue(isinstance(reg[1], UtilityRegistration)) + self.assertTrue(reg[1].registry is comp) + self.assertTrue(reg[1].provided is ifoo) + self.assertTrue(reg[1].name is _name2) + self.assertTrue(reg[1].component is _to_reg) + self.assertTrue(reg[1].info is _info) + self.assertTrue(reg[1].factory is None) + + def test_queryUtility_miss_no_default(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertTrue(comp.queryUtility(ifoo) is None) + + def test_queryUtility_miss_w_default(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + _default = object() + self.assertTrue(comp.queryUtility(ifoo, default=_default) is _default) + + def test_queryUtility_hit(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo) + self.assertTrue(comp.queryUtility(ifoo) is _to_reg) + + def test_getUtility_miss(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import ComponentLookupError + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertRaises(ComponentLookupError, comp.getUtility, ifoo) + + def test_getUtility_hit(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo) + self.assertTrue(comp.getUtility(ifoo) is _to_reg) + + def test_getUtilitiesFor_miss(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertEqual(list(comp.getUtilitiesFor(ifoo)), []) + + def test_getUtilitiesFor_hit(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _name1 = u'name1' + _name2 = u'name2' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, name=_name1) + comp.registerUtility(_to_reg, ifoo, name=_name2) + self.assertEqual(sorted(comp.getUtilitiesFor(ifoo)), + [(_name1, _to_reg), (_name2, _to_reg)]) + + def test_getAllUtilitiesRegisteredFor_miss(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertEqual(list(comp.getAllUtilitiesRegisteredFor(ifoo)), []) + + def test_getAllUtilitiesRegisteredFor_hit(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _name1 = u'name1' + _name2 = u'name2' + _to_reg = object() + comp = self._makeOne() + comp.registerUtility(_to_reg, ifoo, name=_name1) + comp.registerUtility(_to_reg, ifoo, name=_name2) + self.assertEqual(list(comp.getAllUtilitiesRegisteredFor(ifoo)), + [_to_reg]) + + def test_registerAdapter_with_component_name(self): + from zope.interface.declarations import named, InterfaceClass + + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + + @named(u'foo') + class Foo(object): + pass + _info = u'info' + + comp = self._makeOne() + comp.registerAdapter(Foo, (ibar,), ifoo, info=_info) + + self.assertEqual( + comp._adapter_registrations[(ibar,), ifoo, u'foo'], + (Foo, _info)) + + def test_registerAdapter_w_explicit_provided_and_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import AdapterRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + + def _factory(context): + raise NotImplementedError() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_factory, (ibar,), ifoo, _name, _info) + self.assertTrue(comp.adapters._adapters[1][ibar][ifoo][_name] + is _factory) + self.assertEqual(comp._adapter_registrations[(ibar,), ifoo, _name], + (_factory, _info)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _factory) + + def test_registerAdapter_no_provided_available(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + + class _Factory(object): + pass + + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerAdapter, _Factory, (ibar,), + name=_name, info=_info) + + def test_registerAdapter_wo_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import Registered + from zope.interface.registry import AdapterRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + _to_reg = object() + + @implementer(ifoo) + class _Factory(object): + pass + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_Factory, (ibar,), name=_name, info=_info) + self.assertTrue(comp.adapters._adapters[1][ibar][ifoo][_name] + is _Factory) + self.assertEqual(comp._adapter_registrations[(ibar,), ifoo, _name], + (_Factory, _info)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerAdapter_no_required_available(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + + _info = u'info' + _name = u'name' + class _Factory(object): + pass + + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerAdapter, _Factory, + provided=ifoo, name=_name, info=_info) + + def test_registerAdapter_w_invalid_required(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + class _Factory(object): + pass + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerAdapter, _Factory, + ibar, provided=ifoo, name=_name, info=_info) + + def test_registerAdapter_w_required_containing_None(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interface import Interface + from zope.interface.interfaces import Registered + from zope.interface.registry import AdapterRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _name = u'name' + class _Factory(object): + pass + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_Factory, [None], provided=ifoo, + name=_name, info=_info) + self.assertTrue(comp.adapters._adapters[1][Interface][ifoo][_name] + is _Factory) + self.assertEqual(comp._adapter_registrations[(Interface,), ifoo, _name], + (_Factory, _info)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (Interface,)) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerAdapter_w_required_containing_class(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.declarations import implementedBy + from zope.interface.interfaces import Registered + from zope.interface.registry import AdapterRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + class _Factory(object): + pass + + @implementer(ibar) + class _Context(object): + pass + _ctx_impl = implementedBy(_Context) + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_Factory, [_Context], provided=ifoo, + name=_name, info=_info) + self.assertTrue(comp.adapters._adapters[1][_ctx_impl][ifoo][_name] + is _Factory) + self.assertEqual(comp._adapter_registrations[(_ctx_impl,), ifoo, _name], + (_Factory, _info)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (_ctx_impl,)) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerAdapter_w_required_containing_junk(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + + _info = u'info' + _name = u'name' + class _Factory(object): + pass + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerAdapter, _Factory, [object()], + provided=ifoo, name=_name, info=_info) + + def test_registerAdapter_wo_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import AdapterRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + class _Factory(object): + __component_adapts__ = (ibar,) + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_Factory, provided=ifoo, name=_name, + info=_info) + self.assertTrue(comp.adapters._adapters[1][ibar][ifoo][_name] + is _Factory) + self.assertEqual(comp._adapter_registrations[(ibar,), ifoo, _name], + (_Factory, _info)) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertTrue(event.object.name is _name) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerAdapter_wo_event(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _name = u'name' + + def _factory(context): + raise NotImplementedError() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerAdapter(_factory, (ibar,), ifoo, _name, _info, + event=False) + self.assertEqual(len(_events), 0) + + def test_unregisterAdapter_neither_factory_nor_provided(self): + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterAdapter, + factory=None, provided=None) + + def test_unregisterAdapter_neither_factory_nor_required(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterAdapter, + factory=None, provided=ifoo, required=None) + + def test_unregisterAdapter_miss(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + pass + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterAdapter(_Factory, (ibar,), ifoo) + self.assertFalse(unreg) + + def test_unregisterAdapter_hit_w_explicit_provided_and_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import AdapterRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterAdapter(_Factory, (ibar,), ifoo) + self.assertTrue(unreg) + self.assertFalse(comp.adapters._adapters) + self.assertFalse(comp._adapter_registrations) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_unregisterAdapter_wo_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import Unregistered + from zope.interface.registry import AdapterRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + @implementer(ifoo) + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterAdapter(_Factory, (ibar,)) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_unregisterAdapter_wo_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import AdapterRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + __component_adapts__ = (ibar,) + + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterAdapter(_Factory, provided=ifoo) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, AdapterRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_registeredAdapters_empty(self): + comp = self._makeOne() + self.assertEqual(list(comp.registeredAdapters()), []) + + def test_registeredAdapters_notempty(self): + from zope.interface.declarations import InterfaceClass + + from zope.interface.registry import AdapterRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IFoo') + _info = u'info' + _name1 = u'name1' + _name2 = u'name2' + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo, _name1, _info) + comp.registerAdapter(_Factory, (ibar,), ifoo, _name2, _info) + reg = sorted(comp.registeredAdapters(), key=lambda r: r.name) + self.assertEqual(len(reg), 2) + self.assertTrue(isinstance(reg[0], AdapterRegistration)) + self.assertTrue(reg[0].registry is comp) + self.assertTrue(reg[0].provided is ifoo) + self.assertEqual(reg[0].required, (ibar,)) + self.assertTrue(reg[0].name is _name1) + self.assertTrue(reg[0].info is _info) + self.assertTrue(reg[0].factory is _Factory) + self.assertTrue(isinstance(reg[1], AdapterRegistration)) + self.assertTrue(reg[1].registry is comp) + self.assertTrue(reg[1].provided is ifoo) + self.assertEqual(reg[1].required, (ibar,)) + self.assertTrue(reg[1].name is _name2) + self.assertTrue(reg[1].info is _info) + self.assertTrue(reg[1].factory is _Factory) + + def test_queryAdapter_miss_no_default(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + _context = object() + self.assertTrue(comp.queryAdapter(_context, ifoo) is None) + + def test_queryAdapter_miss_w_default(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + _context = object() + _default = object() + self.assertTrue( + comp.queryAdapter(_context, ifoo, default=_default) is _default) + + def test_queryAdapter_hit(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + def __init__(self, context): + self.context = context + @implementer(ibar) + class _Context(object): + pass + _context = _Context() + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo) + adapter = comp.queryAdapter(_context, ifoo) + self.assertTrue(isinstance(adapter, _Factory)) + self.assertTrue(adapter.context is _context) + + def test_getAdapter_miss(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import ComponentLookupError + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + @implementer(ibar) + class _Context(object): + pass + _context = _Context() + comp = self._makeOne() + self.assertRaises(ComponentLookupError, + comp.getAdapter, _context, ifoo) + + def test_getAdapter_hit(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + def __init__(self, context): + self.context = context + @implementer(ibar) + class _Context(object): + pass + _context = _Context() + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar,), ifoo) + adapter = comp.getAdapter(_context, ifoo) + self.assertIsInstance(adapter, _Factory) + self.assertIs(adapter.context, _context) + + def test_getAdapter_hit_super(self): + from zope.interface import Interface + from zope.interface.declarations import implementer + + class IBase(Interface): + pass + + class IDerived(IBase): + pass + + class IFoo(Interface): + pass + + @implementer(IBase) + class Base(object): + pass + + @implementer(IDerived) + class Derived(Base): + pass + + class AdapterBase(object): + def __init__(self, context): + self.context = context + + class AdapterDerived(object): + def __init__(self, context): + self.context = context + + comp = self._makeOne() + comp.registerAdapter(AdapterDerived, (IDerived,), IFoo) + comp.registerAdapter(AdapterBase, (IBase,), IFoo) + self._should_not_change(comp) + + derived = Derived() + adapter = comp.getAdapter(derived, IFoo) + self.assertIsInstance(adapter, AdapterDerived) + self.assertIs(adapter.context, derived) + + supe = super(Derived, derived) + adapter = comp.getAdapter(supe, IFoo) + self.assertIsInstance(adapter, AdapterBase) + self.assertIs(adapter.context, derived) + + def test_getAdapter_hit_super_when_parent_implements_interface_diamond(self): + from zope.interface import Interface + from zope.interface.declarations import implementer + + class IBase(Interface): + pass + + class IDerived(IBase): + pass + + class IFoo(Interface): + pass + + class Base(object): + pass + + class Child1(Base): + pass + + @implementer(IBase) + class Child2(Base): + pass + + @implementer(IDerived) + class Derived(Child1, Child2): + pass + + class AdapterBase(object): + def __init__(self, context): + self.context = context + + class AdapterDerived(object): + def __init__(self, context): + self.context = context + + comp = self._makeOne() + comp.registerAdapter(AdapterDerived, (IDerived,), IFoo) + comp.registerAdapter(AdapterBase, (IBase,), IFoo) + self._should_not_change(comp) + + derived = Derived() + adapter = comp.getAdapter(derived, IFoo) + self.assertIsInstance(adapter, AdapterDerived) + self.assertIs(adapter.context, derived) + + supe = super(Derived, derived) + adapter = comp.getAdapter(supe, IFoo) + self.assertIsInstance(adapter, AdapterBase) + self.assertIs(adapter.context, derived) + + def test_queryMultiAdapter_miss(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + comp = self._makeOne() + self.assertEqual(comp.queryMultiAdapter((_context1, _context2), ifoo), + None) + + def test_queryMultiAdapter_miss_w_default(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + _default = object() + comp = self._makeOne() + self.assertTrue( + comp.queryMultiAdapter((_context1, _context2), ifoo, + default=_default) is _default) + + def test_queryMultiAdapter_hit(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + class _Factory(object): + def __init__(self, context1, context2): + self.context = context1, context2 + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar, ibaz), ifoo) + adapter = comp.queryMultiAdapter((_context1, _context2), ifoo) + self.assertTrue(isinstance(adapter, _Factory)) + self.assertEqual(adapter.context, (_context1, _context2)) + + def test_getMultiAdapter_miss(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import ComponentLookupError + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + comp = self._makeOne() + self.assertRaises(ComponentLookupError, + comp.getMultiAdapter, (_context1, _context2), ifoo) + + def test_getMultiAdapter_hit(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + class _Factory(object): + def __init__(self, context1, context2): + self.context = context1, context2 + comp = self._makeOne() + comp.registerAdapter(_Factory, (ibar, ibaz), ifoo) + adapter = comp.getMultiAdapter((_context1, _context2), ifoo) + self.assertTrue(isinstance(adapter, _Factory)) + self.assertEqual(adapter.context, (_context1, _context2)) + + def _should_not_change(self, comp): + # Be sure that none of the underlying structures + # get told that they have changed during this process + # because that invalidates caches. + def no_changes(*args): + self.fail("Nothing should get changed") + comp.changed = no_changes + comp.adapters.changed = no_changes + comp.adapters._v_lookup.changed = no_changes + + def test_getMultiAdapter_hit_super(self): + from zope.interface import Interface + from zope.interface.declarations import implementer + + class IBase(Interface): + pass + + class IDerived(IBase): + pass + + class IFoo(Interface): + pass + + @implementer(IBase) + class Base(object): + pass + + @implementer(IDerived) + class Derived(Base): + pass + + class AdapterBase(object): + def __init__(self, context1, context2): + self.context1 = context1 + self.context2 = context2 + + class AdapterDerived(AdapterBase): + pass + + comp = self._makeOne() + comp.registerAdapter(AdapterDerived, (IDerived, IDerived), IFoo) + comp.registerAdapter(AdapterBase, (IBase, IDerived), IFoo) + self._should_not_change(comp) + + derived = Derived() + adapter = comp.getMultiAdapter((derived, derived), IFoo) + self.assertIsInstance(adapter, AdapterDerived) + self.assertIs(adapter.context1, derived) + self.assertIs(adapter.context2, derived) + + supe = super(Derived, derived) + adapter = comp.getMultiAdapter((supe, derived), IFoo) + self.assertIsInstance(adapter, AdapterBase) + self.assertNotIsInstance(adapter, AdapterDerived) + self.assertIs(adapter.context1, derived) + self.assertIs(adapter.context2, derived) + + def test_getAdapters_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + comp = self._makeOne() + self.assertEqual( + list(comp.getAdapters((_context1, _context2), ifoo)), []) + + def test_getAdapters_factory_returns_None(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + comp = self._makeOne() + _called_with = [] + def _side_effect_only(context1, context2): + _called_with.append((context1, context2)) + return None + comp.registerAdapter(_side_effect_only, (ibar, ibaz), ifoo) + self.assertEqual( + list(comp.getAdapters((_context1, _context2), ifoo)), []) + self.assertEqual(_called_with, [(_context1, _context2)]) + + def test_getAdapters_non_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + ibaz = IFoo('IBaz') + @implementer(ibar) + class _Context1(object): + pass + @implementer(ibaz) + class _Context2(object): + pass + _context1 = _Context1() + _context2 = _Context2() + class _Factory1(object): + def __init__(self, context1, context2): + self.context = context1, context2 + class _Factory2(object): + def __init__(self, context1, context2): + self.context = context1, context2 + _name1 = u'name1' + _name2 = u'name2' + comp = self._makeOne() + comp.registerAdapter(_Factory1, (ibar, ibaz), ifoo, name=_name1) + comp.registerAdapter(_Factory2, (ibar, ibaz), ifoo, name=_name2) + found = sorted(comp.getAdapters((_context1, _context2), ifoo)) + self.assertEqual(len(found), 2) + self.assertEqual(found[0][0], _name1) + self.assertTrue(isinstance(found[0][1], _Factory1)) + self.assertEqual(found[1][0], _name2) + self.assertTrue(isinstance(found[1][1], _Factory2)) + + def test_registerSubscriptionAdapter_w_nonblank_name(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _name = u'name' + _info = u'info' + def _factory(context): + raise NotImplementedError() + + comp = self._makeOne() + self.assertRaises(TypeError, comp.registerSubscriptionAdapter, + _factory, (ibar,), ifoo, _name, _info) + + def test_registerSubscriptionAdapter_w_explicit_provided_and_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import SubscriptionRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _blank = u'' + _info = u'info' + def _factory(context): + raise NotImplementedError() + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerSubscriptionAdapter(_factory, (ibar,), ifoo, + info=_info) + reg = comp.adapters._subscribers[1][ibar][ifoo][_blank] + self.assertEqual(len(reg), 1) + self.assertTrue(reg[0] is _factory) + self.assertEqual(comp._subscription_registrations, + [((ibar,), ifoo, _blank, _factory, _info)]) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, _blank) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _factory) + + def test_registerSubscriptionAdapter_wo_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import Registered + from zope.interface.registry import SubscriptionRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _blank = u'' + + @implementer(ifoo) + class _Factory(object): + pass + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerSubscriptionAdapter(_Factory, (ibar,), info=_info) + reg = comp.adapters._subscribers[1][ibar][ifoo][_blank] + self.assertEqual(len(reg), 1) + self.assertTrue(reg[0] is _Factory) + self.assertEqual(comp._subscription_registrations, + [((ibar,), ifoo, _blank, _Factory, _info)]) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, _blank) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerSubscriptionAdapter_wo_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import SubscriptionRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _info = u'info' + _blank = u'' + class _Factory(object): + __component_adapts__ = (ibar,) + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerSubscriptionAdapter( + _Factory, provided=ifoo, info=_info) + reg = comp.adapters._subscribers[1][ibar][ifoo][_blank] + self.assertEqual(len(reg), 1) + self.assertTrue(reg[0] is _Factory) + self.assertEqual(comp._subscription_registrations, + [((ibar,), ifoo, _blank, _Factory, _info)]) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, _blank) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _Factory) + + def test_registerSubscriptionAdapter_wo_event(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _blank = u'' + _info = u'info' + + def _factory(context): + raise NotImplementedError() + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerSubscriptionAdapter(_factory, (ibar,), ifoo, + info=_info, event=False) + self.assertEqual(len(_events), 0) + + def test_registeredSubscriptionAdapters_empty(self): + comp = self._makeOne() + self.assertEqual(list(comp.registeredSubscriptionAdapters()), []) + + def test_registeredSubscriptionAdapters_notempty(self): + from zope.interface.declarations import InterfaceClass + + from zope.interface.registry import SubscriptionRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IFoo') + _info = u'info' + _blank = u'' + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo, info=_info) + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo, info=_info) + reg = list(comp.registeredSubscriptionAdapters()) + self.assertEqual(len(reg), 2) + self.assertTrue(isinstance(reg[0], SubscriptionRegistration)) + self.assertTrue(reg[0].registry is comp) + self.assertTrue(reg[0].provided is ifoo) + self.assertEqual(reg[0].required, (ibar,)) + self.assertEqual(reg[0].name, _blank) + self.assertTrue(reg[0].info is _info) + self.assertTrue(reg[0].factory is _Factory) + self.assertTrue(isinstance(reg[1], SubscriptionRegistration)) + self.assertTrue(reg[1].registry is comp) + self.assertTrue(reg[1].provided is ifoo) + self.assertEqual(reg[1].required, (ibar,)) + self.assertEqual(reg[1].name, _blank) + self.assertTrue(reg[1].info is _info) + self.assertTrue(reg[1].factory is _Factory) + + def test_unregisterSubscriptionAdapter_w_nonblank_name(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + _nonblank = u'nonblank' + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterSubscriptionAdapter, + required=ifoo, provided=ibar, name=_nonblank) + + def test_unregisterSubscriptionAdapter_neither_factory_nor_provided(self): + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterSubscriptionAdapter, + factory=None, provided=None) + + def test_unregisterSubscriptionAdapter_neither_factory_nor_required(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterSubscriptionAdapter, + factory=None, provided=ifoo, required=None) + + def test_unregisterSubscriptionAdapter_miss(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + pass + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterSubscriptionAdapter(_Factory, (ibar,), ifoo) + self.assertFalse(unreg) + self.assertFalse(_events) + + def test_unregisterSubscriptionAdapter_hit_wo_factory(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import SubscriptionRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterSubscriptionAdapter(None, (ibar,), ifoo) + self.assertTrue(unreg) + self.assertFalse(comp.adapters._subscribers) + self.assertFalse(comp._subscription_registrations) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is None) + + def test_unregisterSubscriptionAdapter_hit_w_factory(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import SubscriptionRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterSubscriptionAdapter(_Factory, (ibar,), ifoo) + self.assertTrue(unreg) + self.assertFalse(comp.adapters._subscribers) + self.assertFalse(comp._subscription_registrations) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_unregisterSubscriptionAdapter_wo_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + from zope.interface.interfaces import Unregistered + from zope.interface.registry import SubscriptionRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + @implementer(ifoo) + class _Factory(object): + pass + + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterSubscriptionAdapter(_Factory, (ibar,)) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_unregisterSubscriptionAdapter_wo_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import SubscriptionRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + __component_adapts__ = (ibar,) + + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterSubscriptionAdapter(_Factory, provided=ifoo) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, SubscriptionRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertTrue(event.object.provided is ifoo) + self.assertEqual(event.object.required, (ibar,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_subscribers_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + comp = self._makeOne() + @implementer(ibar) + class Bar(object): + pass + bar = Bar() + self.assertEqual(list(comp.subscribers((bar,), ifoo)), []) + + def test_subscribers_non_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Factory(object): + __component_adapts__ = (ibar,) + def __init__(self, context): + self._context = context + class _Derived(_Factory): + pass + comp = self._makeOne() + comp.registerSubscriptionAdapter(_Factory, (ibar,), ifoo) + comp.registerSubscriptionAdapter(_Derived, (ibar,), ifoo) + @implementer(ibar) + class Bar(object): + pass + bar = Bar() + subscribers = comp.subscribers((bar,), ifoo) + def _klassname(x): + return x.__class__.__name__ + subscribers = sorted(subscribers, key=_klassname) + self.assertEqual(len(subscribers), 2) + self.assertTrue(isinstance(subscribers[0], _Derived)) + self.assertTrue(isinstance(subscribers[1], _Factory)) + + def test_registerHandler_w_nonblank_name(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _nonblank = u'nonblank' + comp = self._makeOne() + def _factory(context): + raise NotImplementedError() + + self.assertRaises(TypeError, comp.registerHandler, _factory, + required=ifoo, name=_nonblank) + + def test_registerHandler_w_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Registered + from zope.interface.registry import HandlerRegistration + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _blank = u'' + _info = u'info' + def _factory(context): + raise NotImplementedError() + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerHandler(_factory, (ifoo,), info=_info) + reg = comp.adapters._subscribers[1][ifoo][None][_blank] + self.assertEqual(len(reg), 1) + self.assertTrue(reg[0] is _factory) + self.assertEqual(comp._handler_registrations, + [((ifoo,), _blank, _factory, _info)]) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Registered)) + self.assertTrue(isinstance(event.object, HandlerRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertEqual(event.object.required, (ifoo,)) + self.assertEqual(event.object.name, _blank) + self.assertTrue(event.object.info is _info) + self.assertTrue(event.object.factory is _factory) + + def test_registerHandler_wo_explicit_required_no_event(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _info = u'info' + _blank = u'' + class _Factory(object): + __component_adapts__ = (ifoo,) + pass + + comp = self._makeOne() + _monkey, _events = self._wrapEvents() + with _monkey: + comp.registerHandler(_Factory, info=_info, event=False) + reg = comp.adapters._subscribers[1][ifoo][None][_blank] + self.assertEqual(len(reg), 1) + self.assertTrue(reg[0] is _Factory) + self.assertEqual(comp._handler_registrations, + [((ifoo,), _blank, _Factory, _info)]) + self.assertEqual(len(_events), 0) + + def test_registeredHandlers_empty(self): + comp = self._makeOne() + self.assertFalse(list(comp.registeredHandlers())) + + def test_registeredHandlers_non_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.registry import HandlerRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + def _factory1(context): + raise NotImplementedError() + def _factory2(context): + raise NotImplementedError() + comp = self._makeOne() + comp.registerHandler(_factory1, (ifoo,)) + comp.registerHandler(_factory2, (ifoo,)) + def _factory_name(x): + return x.factory.__code__.co_name + subscribers = sorted(comp.registeredHandlers(), key=_factory_name) + self.assertEqual(len(subscribers), 2) + self.assertTrue(isinstance(subscribers[0], HandlerRegistration)) + self.assertEqual(subscribers[0].required, (ifoo,)) + self.assertEqual(subscribers[0].name, '') + self.assertEqual(subscribers[0].factory, _factory1) + self.assertEqual(subscribers[0].info, '') + self.assertTrue(isinstance(subscribers[1], HandlerRegistration)) + self.assertEqual(subscribers[1].required, (ifoo,)) + self.assertEqual(subscribers[1].name, '') + self.assertEqual(subscribers[1].factory, _factory2) + self.assertEqual(subscribers[1].info, '') + + def test_unregisterHandler_w_nonblank_name(self): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _nonblank = u'nonblank' + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterHandler, + required=(ifoo,), name=_nonblank) + + def test_unregisterHandler_neither_factory_nor_required(self): + comp = self._makeOne() + self.assertRaises(TypeError, comp.unregisterHandler) + + def test_unregisterHandler_miss(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + unreg = comp.unregisterHandler(required=(ifoo,)) + self.assertFalse(unreg) + + def test_unregisterHandler_hit_w_factory_and_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import HandlerRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + def _factory(context): + raise NotImplementedError() + comp = self._makeOne() + comp.registerHandler(_factory, (ifoo,)) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterHandler(_factory, (ifoo,)) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, HandlerRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertEqual(event.object.required, (ifoo,)) + self.assertEqual(event.object.name, '') + self.assertTrue(event.object.factory is _factory) + + def test_unregisterHandler_hit_w_only_explicit_provided(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import HandlerRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + def _factory(context): + raise NotImplementedError() + comp = self._makeOne() + comp.registerHandler(_factory, (ifoo,)) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterHandler(required=(ifoo,)) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, HandlerRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertEqual(event.object.required, (ifoo,)) + self.assertEqual(event.object.name, '') + self.assertTrue(event.object.factory is None) + + def test_unregisterHandler_wo_explicit_required(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.interfaces import Unregistered + from zope.interface.registry import HandlerRegistration + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + class _Factory(object): + __component_adapts__ = (ifoo,) + + comp = self._makeOne() + comp.registerHandler(_Factory) + _monkey, _events = self._wrapEvents() + with _monkey: + unreg = comp.unregisterHandler(_Factory) + self.assertTrue(unreg) + self.assertEqual(len(_events), 1) + args, kw = _events[0] + event, = args + self.assertEqual(kw, {}) + self.assertTrue(isinstance(event, Unregistered)) + self.assertTrue(isinstance(event.object, HandlerRegistration)) + self.assertTrue(event.object.registry is comp) + self.assertEqual(event.object.required, (ifoo,)) + self.assertEqual(event.object.name, '') + self.assertEqual(event.object.info, '') + self.assertTrue(event.object.factory is _Factory) + + def test_handle_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + comp = self._makeOne() + @implementer(ifoo) + class Bar(object): + pass + bar = Bar() + comp.handle((bar,)) # doesn't raise + + def test_handle_non_empty(self): + from zope.interface.declarations import InterfaceClass + from zope.interface.declarations import implementer + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + _called_1 = [] + def _factory_1(context): + _called_1.append(context) + _called_2 = [] + def _factory_2(context): + _called_2.append(context) + comp = self._makeOne() + comp.registerHandler(_factory_1, (ifoo,)) + comp.registerHandler(_factory_2, (ifoo,)) + @implementer(ifoo) + class Bar(object): + pass + bar = Bar() + comp.handle(bar) + self.assertEqual(_called_1, [bar]) + self.assertEqual(_called_2, [bar]) + + def test_register_unregister_identical_objects_provided(self, identical=True): + # https://github.com/zopefoundation/zope.interface/issues/227 + class IFoo(Interface): + pass + + comp = self._makeOne() + first = object() + second = first if identical else object() + + comp.registerUtility(first, provided=IFoo) + comp.registerUtility(second, provided=IFoo, name='bar') + + self.assertEqual(len(comp.utilities._subscribers), 1) + self.assertEqual(comp.utilities._subscribers, [{ + IFoo: {'': (first, ) if identical else (first, second)} + }]) + self.assertEqual(comp.utilities._provided, { + IFoo: 3 if identical else 4 + }) + + res = comp.unregisterUtility(first, provided=IFoo) + self.assertTrue(res) + res = comp.unregisterUtility(second, provided=IFoo, name='bar') + self.assertTrue(res) + + self.assertEqual(comp.utilities._provided, {}) + self.assertEqual(len(comp.utilities._subscribers), 0) + + def test_register_unregister_nonequal_objects_provided(self): + self.test_register_unregister_identical_objects_provided(identical=False) + + def test_rebuildUtilityRegistryFromLocalCache(self): + class IFoo(Interface): + "Does nothing" + + class UtilityImplementingFoo(object): + "Does nothing" + + comps = self._makeOne() + + for i in range(30): + comps.registerUtility(UtilityImplementingFoo(), IFoo, name=u'%s' % (i,)) + + orig_generation = comps.utilities._generation + + orig_adapters = comps.utilities._adapters + self.assertEqual(len(orig_adapters), 1) + self.assertEqual(len(orig_adapters[0]), 1) + self.assertEqual(len(orig_adapters[0][IFoo]), 30) + + orig_subscribers = comps.utilities._subscribers + self.assertEqual(len(orig_subscribers), 1) + self.assertEqual(len(orig_subscribers[0]), 1) + self.assertEqual(len(orig_subscribers[0][IFoo]), 1) + self.assertEqual(len(orig_subscribers[0][IFoo][u'']), 30) + + # Blow a bunch of them away, creating artificial corruption + new_adapters = comps.utilities._adapters = type(orig_adapters)() + new_adapters.append({}) + d = new_adapters[0][IFoo] = {} + for name in range(10): + name = type(u'')(str(name)) + d[name] = orig_adapters[0][IFoo][name] + + self.assertNotEqual(orig_adapters, new_adapters) + + new_subscribers = comps.utilities._subscribers = type(orig_subscribers)() + new_subscribers.append({}) + d = new_subscribers[0][IFoo] = {} + d[u''] = () + + for name in range(5, 12): # 12 - 5 = 7 + name = type(u'')(str(name)) + comp = orig_adapters[0][IFoo][name] + d[u''] += (comp,) + + # We can preflight (by default) and nothing changes + rebuild_results_preflight = comps.rebuildUtilityRegistryFromLocalCache() + + self.assertEqual(comps.utilities._generation, orig_generation) + self.assertEqual(rebuild_results_preflight, { + 'did_not_register': 10, + 'needed_registered': 20, + + 'did_not_subscribe': 7, + 'needed_subscribed': 23, + }) + + # Now for real + rebuild_results = comps.rebuildUtilityRegistryFromLocalCache(rebuild=True) + + # The generation only got incremented once + self.assertEqual(comps.utilities._generation, orig_generation + 1) + # The result was the same + self.assertEqual(rebuild_results_preflight, rebuild_results) + self.assertEqual(new_adapters, orig_adapters) + self.assertEqual( + len(new_subscribers[0][IFoo][u'']), + len(orig_subscribers[0][IFoo][u''])) + + for orig_subscriber in orig_subscribers[0][IFoo][u'']: + self.assertIn(orig_subscriber, new_subscribers[0][IFoo][u'']) + + # Preflighting, rebuilding again produce no changes. + preflight_after = comps.rebuildUtilityRegistryFromLocalCache() + self.assertEqual(preflight_after, { + 'did_not_register': 30, + 'needed_registered': 0, + + 'did_not_subscribe': 30, + 'needed_subscribed': 0, + }) + + rebuild_after = comps.rebuildUtilityRegistryFromLocalCache(rebuild=True) + self.assertEqual(rebuild_after, preflight_after) + self.assertEqual(comps.utilities._generation, orig_generation + 1) + + +class UnhashableComponentsTests(ComponentsTests): + + def _getTargetClass(self): + # Mimic what pyramid does to create an unhashable + # registry + class Components(super(UnhashableComponentsTests, self)._getTargetClass(), dict): + pass + return Components + +# Test _getUtilityProvided, _getAdapterProvided, _getAdapterRequired via their +# callers (Component.registerUtility, Component.registerAdapter). + + +class UtilityRegistrationTests(unittest.TestCase): + + def _getTargetClass(self): + from zope.interface.registry import UtilityRegistration + return UtilityRegistration + + def _makeOne(self, component=None, factory=None): + from zope.interface.declarations import InterfaceClass + + class InterfaceClassSubclass(InterfaceClass): + pass + + ifoo = InterfaceClassSubclass('IFoo') + class _Registry(object): + def __repr__(self): + return '_REGISTRY' + registry = _Registry() + name = u'name' + doc = 'DOCSTRING' + klass = self._getTargetClass() + return (klass(registry, ifoo, name, component, doc, factory), + registry, + name, + ) + + def test_class_conforms_to_IUtilityRegistration(self): + from zope.interface.verify import verifyClass + from zope.interface.interfaces import IUtilityRegistration + verifyClass(IUtilityRegistration, self._getTargetClass()) + + def test_instance_conforms_to_IUtilityRegistration(self): + from zope.interface.verify import verifyObject + from zope.interface.interfaces import IUtilityRegistration + ur, _, _ = self._makeOne() + verifyObject(IUtilityRegistration, ur) + + def test___repr__(self): + class _Component(object): + __name__ = 'TEST' + _component = _Component() + ur, _registry, _name = self._makeOne(_component) + self.assertEqual(repr(ur), + "UtilityRegistration(_REGISTRY, IFoo, %r, TEST, None, 'DOCSTRING')" + % (_name)) + + def test___repr___provided_wo_name(self): + class _Component(object): + def __repr__(self): + return 'TEST' + _component = _Component() + ur, _registry, _name = self._makeOne(_component) + ur.provided = object() + self.assertEqual(repr(ur), + "UtilityRegistration(_REGISTRY, None, %r, TEST, None, 'DOCSTRING')" + % (_name)) + + def test___repr___component_wo_name(self): + class _Component(object): + def __repr__(self): + return 'TEST' + _component = _Component() + ur, _registry, _name = self._makeOne(_component) + ur.provided = object() + self.assertEqual(repr(ur), + "UtilityRegistration(_REGISTRY, None, %r, TEST, None, 'DOCSTRING')" + % (_name)) + + def test___hash__(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertEqual(ur.__hash__(), id(ur)) + + def test___eq___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertTrue(ur == ur) + + def test___eq___hit(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + self.assertTrue(ur == ur2) + + def test___eq___miss(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + self.assertFalse(ur == ur2) + + def test___ne___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertFalse(ur != ur) + + def test___ne___hit(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + self.assertFalse(ur != ur2) + + def test___ne___miss(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + self.assertTrue(ur != ur2) + + def test___lt___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertFalse(ur < ur) + + def test___lt___hit(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + self.assertFalse(ur < ur2) + + def test___lt___miss(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + ur2.name = _name + '2' + self.assertTrue(ur < ur2) + + def test___le___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertTrue(ur <= ur) + + def test___le___hit(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + self.assertTrue(ur <= ur2) + + def test___le___miss(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + ur2.name = _name + '2' + self.assertTrue(ur <= ur2) + + def test___gt___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertFalse(ur > ur) + + def test___gt___hit(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + ur2.name = _name + '2' + self.assertTrue(ur2 > ur) + + def test___gt___miss(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + self.assertFalse(ur2 > ur) + + def test___ge___identity(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + self.assertTrue(ur >= ur) + + def test___ge___miss(self): + _component = object() + _component2 = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component2) + ur2.name = _name + '2' + self.assertFalse(ur >= ur2) + + def test___ge___hit(self): + _component = object() + ur, _registry, _name = self._makeOne(_component) + ur2, _, _ = self._makeOne(_component) + ur2.name = _name + '2' + self.assertTrue(ur2 >= ur) + + +class AdapterRegistrationTests(unittest.TestCase): + + def _getTargetClass(self): + from zope.interface.registry import AdapterRegistration + return AdapterRegistration + + def _makeOne(self, component=None): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Registry(object): + def __repr__(self): + return '_REGISTRY' + registry = _Registry() + name = u'name' + doc = 'DOCSTRING' + klass = self._getTargetClass() + return (klass(registry, (ibar,), ifoo, name, component, doc), + registry, + name, + ) + + def test_class_conforms_to_IAdapterRegistration(self): + from zope.interface.verify import verifyClass + from zope.interface.interfaces import IAdapterRegistration + verifyClass(IAdapterRegistration, self._getTargetClass()) + + def test_instance_conforms_to_IAdapterRegistration(self): + from zope.interface.verify import verifyObject + from zope.interface.interfaces import IAdapterRegistration + ar, _, _ = self._makeOne() + verifyObject(IAdapterRegistration, ar) + + def test___repr__(self): + class _Component(object): + __name__ = 'TEST' + _component = _Component() + ar, _registry, _name = self._makeOne(_component) + self.assertEqual(repr(ar), + ("AdapterRegistration(_REGISTRY, [IBar], IFoo, %r, TEST, " + + "'DOCSTRING')") % (_name)) + + def test___repr___provided_wo_name(self): + class _Component(object): + def __repr__(self): + return 'TEST' + _component = _Component() + ar, _registry, _name = self._makeOne(_component) + ar.provided = object() + self.assertEqual(repr(ar), + ("AdapterRegistration(_REGISTRY, [IBar], None, %r, TEST, " + + "'DOCSTRING')") % (_name)) + + def test___repr___component_wo_name(self): + class _Component(object): + def __repr__(self): + return 'TEST' + _component = _Component() + ar, _registry, _name = self._makeOne(_component) + ar.provided = object() + self.assertEqual(repr(ar), + ("AdapterRegistration(_REGISTRY, [IBar], None, %r, TEST, " + + "'DOCSTRING')") % (_name)) + + def test___hash__(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertEqual(ar.__hash__(), id(ar)) + + def test___eq___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertTrue(ar == ar) + + def test___eq___hit(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + self.assertTrue(ar == ar2) + + def test___eq___miss(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + self.assertFalse(ar == ar2) + + def test___ne___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertFalse(ar != ar) + + def test___ne___miss(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + self.assertFalse(ar != ar2) + + def test___ne___hit_component(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + self.assertTrue(ar != ar2) + + def test___ne___hit_provided(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ibaz = IFoo('IBaz') + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + ar2.provided = ibaz + self.assertTrue(ar != ar2) + + def test___ne___hit_required(self): + from zope.interface.declarations import InterfaceClass + class IFoo(InterfaceClass): + pass + ibaz = IFoo('IBaz') + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + ar2.required = (ibaz,) + self.assertTrue(ar != ar2) + + def test___lt___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertFalse(ar < ar) + + def test___lt___hit(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + self.assertFalse(ar < ar2) + + def test___lt___miss(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + ar2.name = _name + '2' + self.assertTrue(ar < ar2) + + def test___le___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertTrue(ar <= ar) + + def test___le___hit(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + self.assertTrue(ar <= ar2) + + def test___le___miss(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + ar2.name = _name + '2' + self.assertTrue(ar <= ar2) + + def test___gt___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertFalse(ar > ar) + + def test___gt___hit(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + ar2.name = _name + '2' + self.assertTrue(ar2 > ar) + + def test___gt___miss(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + self.assertFalse(ar2 > ar) + + def test___ge___identity(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + self.assertTrue(ar >= ar) + + def test___ge___miss(self): + _component = object() + _component2 = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component2) + ar2.name = _name + '2' + self.assertFalse(ar >= ar2) + + def test___ge___hit(self): + _component = object() + ar, _registry, _name = self._makeOne(_component) + ar2, _, _ = self._makeOne(_component) + ar2.name = _name + '2' + self.assertTrue(ar2 >= ar) + + +class SubscriptionRegistrationTests(unittest.TestCase): + + def _getTargetClass(self): + from zope.interface.registry import SubscriptionRegistration + return SubscriptionRegistration + + def _makeOne(self, component=None): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + ibar = IFoo('IBar') + class _Registry(object): + def __repr__(self): # pragma: no cover + return '_REGISTRY' + registry = _Registry() + name = u'name' + doc = 'DOCSTRING' + klass = self._getTargetClass() + return (klass(registry, (ibar,), ifoo, name, component, doc), + registry, + name, + ) + + def test_class_conforms_to_ISubscriptionAdapterRegistration(self): + from zope.interface.verify import verifyClass + from zope.interface.interfaces import ISubscriptionAdapterRegistration + verifyClass(ISubscriptionAdapterRegistration, self._getTargetClass()) + + def test_instance_conforms_to_ISubscriptionAdapterRegistration(self): + from zope.interface.verify import verifyObject + from zope.interface.interfaces import ISubscriptionAdapterRegistration + sar, _, _ = self._makeOne() + verifyObject(ISubscriptionAdapterRegistration, sar) + + +class HandlerRegistrationTests(unittest.TestCase): + + def _getTargetClass(self): + from zope.interface.registry import HandlerRegistration + return HandlerRegistration + + def _makeOne(self, component=None): + from zope.interface.declarations import InterfaceClass + + class IFoo(InterfaceClass): + pass + ifoo = IFoo('IFoo') + class _Registry(object): + def __repr__(self): + return '_REGISTRY' + registry = _Registry() + name = u'name' + doc = 'DOCSTRING' + klass = self._getTargetClass() + return (klass(registry, (ifoo,), name, component, doc), + registry, + name, + ) + + def test_class_conforms_to_IHandlerRegistration(self): + from zope.interface.verify import verifyClass + from zope.interface.interfaces import IHandlerRegistration + verifyClass(IHandlerRegistration, self._getTargetClass()) + + def test_instance_conforms_to_IHandlerRegistration(self): + from zope.interface.verify import verifyObject + from zope.interface.interfaces import IHandlerRegistration + hr, _, _ = self._makeOne() + verifyObject(IHandlerRegistration, hr) + + def test_properties(self): + def _factory(context): + raise NotImplementedError() + hr, _, _ = self._makeOne(_factory) + self.assertTrue(hr.handler is _factory) + self.assertTrue(hr.factory is hr.handler) + self.assertTrue(hr.provided is None) + + def test___repr___factory_w_name(self): + class _Factory(object): + __name__ = 'TEST' + hr, _registry, _name = self._makeOne(_Factory()) + self.assertEqual(repr(hr), + ("HandlerRegistration(_REGISTRY, [IFoo], %r, TEST, " + + "'DOCSTRING')") % (_name)) + + def test___repr___factory_wo_name(self): + class _Factory(object): + def __repr__(self): + return 'TEST' + hr, _registry, _name = self._makeOne(_Factory()) + self.assertEqual(repr(hr), + ("HandlerRegistration(_REGISTRY, [IFoo], %r, TEST, " + + "'DOCSTRING')") % (_name)) + +class PersistentAdapterRegistry(VerifyingAdapterRegistry): + + def __getstate__(self): + state = self.__dict__.copy() + for k in list(state): + if k in self._delegated or k.startswith('_v'): + state.pop(k) + state.pop('ro', None) + return state + + def __setstate__(self, state): + bases = state.pop('__bases__', ()) + self.__dict__.update(state) + self._createLookup() + self.__bases__ = bases + self._v_lookup.changed(self) + +class PersistentComponents(Components): + # Mimic zope.component.persistentregistry.PersistentComponents: + # we should be picklalable, but not persistent.Persistent ourself. + + def _init_registries(self): + self.adapters = PersistentAdapterRegistry() + self.utilities = PersistentAdapterRegistry() + +class PersistentDictComponents(PersistentComponents, dict): + # Like Pyramid's Registry, we subclass Components and dict + pass + + +class PersistentComponentsDict(dict, PersistentComponents): + # Like the above, but inheritance is flipped + def __init__(self, name): + dict.__init__(self) + PersistentComponents.__init__(self, name) + +class TestPersistentComponents(unittest.TestCase): + + def _makeOne(self): + return PersistentComponents('test') + + def _check_equality_after_pickle(self, made): + pass + + def test_pickles_empty(self): + import pickle + comp = self._makeOne() + pickle.dumps(comp) + comp2 = pickle.loads(pickle.dumps(comp)) + + self.assertEqual(comp2.__name__, 'test') + + def test_pickles_with_utility_registration(self): + import pickle + comp = self._makeOne() + utility = object() + comp.registerUtility( + utility, + Interface) + + self.assertIs(utility, + comp.getUtility(Interface)) + + comp2 = pickle.loads(pickle.dumps(comp)) + self.assertEqual(comp2.__name__, 'test') + + # The utility is still registered + self.assertIsNotNone(comp2.getUtility(Interface)) + + # We can register another one + comp2.registerUtility( + utility, + Interface) + self.assertIs(utility, + comp2.getUtility(Interface)) + + self._check_equality_after_pickle(comp2) + + +class TestPersistentDictComponents(TestPersistentComponents): + + def _getTargetClass(self): + return PersistentDictComponents + + def _makeOne(self): + comp = self._getTargetClass()(name='test') + comp['key'] = 42 + return comp + + def _check_equality_after_pickle(self, made): + self.assertIn('key', made) + self.assertEqual(made['key'], 42) + +class TestPersistentComponentsDict(TestPersistentDictComponents): + + def _getTargetClass(self): + return PersistentComponentsDict + +class _Monkey(object): + # context-manager for replacing module names in the scope of a test. + def __init__(self, module, **kw): + self.module = module + self.to_restore = dict([(key, getattr(module, key)) for key in kw]) + for key, value in kw.items(): + setattr(module, key, value) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + for key, value in self.to_restore.items(): + setattr(self.module, key, value) |