summaryrefslogtreecommitdiffstats
path: root/contrib/python/pluggy/py3/tests/test_pluginmanager.py
diff options
context:
space:
mode:
authornkozlovskiy <[email protected]>2023-09-29 12:24:06 +0300
committernkozlovskiy <[email protected]>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/pluggy/py3/tests/test_pluginmanager.py
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
add ydb deps
Diffstat (limited to 'contrib/python/pluggy/py3/tests/test_pluginmanager.py')
-rw-r--r--contrib/python/pluggy/py3/tests/test_pluginmanager.py682
1 files changed, 682 insertions, 0 deletions
diff --git a/contrib/python/pluggy/py3/tests/test_pluginmanager.py b/contrib/python/pluggy/py3/tests/test_pluginmanager.py
new file mode 100644
index 00000000000..f6d606b4525
--- /dev/null
+++ b/contrib/python/pluggy/py3/tests/test_pluginmanager.py
@@ -0,0 +1,682 @@
+"""
+``PluginManager`` unit and public API testing.
+"""
+import sys
+from typing import Any
+from typing import List
+
+import pytest
+
+from pluggy import HookCallError
+from pluggy import HookimplMarker
+from pluggy import HookspecMarker
+from pluggy import PluginManager
+from pluggy import PluginValidationError
+
+if sys.version_info >= (3, 8):
+ from importlib import metadata as importlib_metadata
+else:
+ import importlib_metadata
+
+
+hookspec = HookspecMarker("example")
+hookimpl = HookimplMarker("example")
+
+
+def test_plugin_double_register(pm: PluginManager) -> None:
+ """Registering the same plugin more then once isn't allowed"""
+ pm.register(42, name="abc")
+ with pytest.raises(ValueError):
+ pm.register(42, name="abc")
+ with pytest.raises(ValueError):
+ pm.register(42, name="def")
+
+
+def test_pm(pm: PluginManager) -> None:
+ """Basic registration with objects"""
+
+ class A:
+ pass
+
+ a1, a2 = A(), A()
+ pm.register(a1)
+ assert pm.is_registered(a1)
+ pm.register(a2, "hello")
+ assert pm.is_registered(a2)
+ out = pm.get_plugins()
+ assert a1 in out
+ assert a2 in out
+ assert pm.get_plugin("hello") == a2
+ assert pm.unregister(a1) == a1
+ assert not pm.is_registered(a1)
+
+ out2 = pm.list_name_plugin()
+ assert len(out2) == 1
+ assert out2 == [("hello", a2)]
+
+
+def test_has_plugin(pm: PluginManager) -> None:
+ class A:
+ pass
+
+ a1 = A()
+ pm.register(a1, "hello")
+ assert pm.is_registered(a1)
+ assert pm.has_plugin("hello")
+
+
+def test_register_dynamic_attr(he_pm: PluginManager) -> None:
+ class A:
+ def __getattr__(self, name):
+ if name[0] != "_":
+ return 42
+ raise AttributeError()
+
+ a = A()
+ he_pm.register(a)
+ assert not he_pm.get_hookcallers(a)
+
+
+def test_pm_name(pm: PluginManager) -> None:
+ class A:
+ pass
+
+ a1 = A()
+ name = pm.register(a1, name="hello")
+ assert name == "hello"
+ pm.unregister(a1)
+ assert pm.get_plugin("hello") is None
+ assert not pm.is_registered(a1)
+ assert not pm.get_plugins()
+ name2 = pm.register(a1, name="hello")
+ assert name2 == name
+ pm.unregister(name="hello")
+ assert pm.get_plugin("hello") is None
+ assert not pm.is_registered(a1)
+ assert not pm.get_plugins()
+
+
+def test_set_blocked(pm: PluginManager) -> None:
+ class A:
+ pass
+
+ a1 = A()
+ name = pm.register(a1)
+ assert name is not None
+ assert pm.is_registered(a1)
+ assert not pm.is_blocked(name)
+ pm.set_blocked(name)
+ assert pm.is_blocked(name)
+ assert not pm.is_registered(a1)
+
+ pm.set_blocked("somename")
+ assert pm.is_blocked("somename")
+ assert not pm.register(A(), "somename")
+ pm.unregister(name="somename")
+ assert pm.is_blocked("somename")
+
+
+def test_register_mismatch_method(he_pm: PluginManager) -> None:
+ class hello:
+ @hookimpl
+ def he_method_notexists(self):
+ pass
+
+ plugin = hello()
+
+ he_pm.register(plugin)
+ with pytest.raises(PluginValidationError) as excinfo:
+ he_pm.check_pending()
+ assert excinfo.value.plugin is plugin
+
+
+def test_register_mismatch_arg(he_pm: PluginManager) -> None:
+ class hello:
+ @hookimpl
+ def he_method1(self, qlwkje):
+ pass
+
+ plugin = hello()
+
+ with pytest.raises(PluginValidationError) as excinfo:
+ he_pm.register(plugin)
+ assert excinfo.value.plugin is plugin
+
+
+def test_register_hookwrapper_not_a_generator_function(he_pm: PluginManager) -> None:
+ class hello:
+ @hookimpl(hookwrapper=True)
+ def he_method1(self):
+ pass # pragma: no cover
+
+ plugin = hello()
+
+ with pytest.raises(PluginValidationError, match="generator function") as excinfo:
+ he_pm.register(plugin)
+ assert excinfo.value.plugin is plugin
+
+
+def test_register_both_wrapper_and_hookwrapper(he_pm: PluginManager) -> None:
+ class hello:
+ @hookimpl(wrapper=True, hookwrapper=True)
+ def he_method1(self):
+ yield # pragma: no cover
+
+ plugin = hello()
+
+ with pytest.raises(
+ PluginValidationError, match="wrapper.*hookwrapper.*mutually exclusive"
+ ) as excinfo:
+ he_pm.register(plugin)
+ assert excinfo.value.plugin is plugin
+
+
+def test_register(pm: PluginManager) -> None:
+ class MyPlugin:
+ @hookimpl
+ def he_method1(self):
+ ...
+
+ my = MyPlugin()
+ pm.register(my)
+ assert pm.get_plugins() == {my}
+ my2 = MyPlugin()
+ pm.register(my2)
+ assert pm.get_plugins() == {my, my2}
+
+ assert pm.is_registered(my)
+ assert pm.is_registered(my2)
+ pm.unregister(my)
+ assert not pm.is_registered(my)
+ assert pm.get_plugins() == {my2}
+
+ with pytest.raises(AssertionError, match=r"not registered"):
+ pm.unregister(my)
+
+
+def test_register_unknown_hooks(pm: PluginManager) -> None:
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg + 1
+
+ pname = pm.register(Plugin1())
+ assert pname is not None
+
+ class Hooks:
+ @hookspec
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+ # assert not pm._unverified_hooks
+ assert pm.hook.he_method1(arg=1) == [2]
+ hookcallers = pm.get_hookcallers(pm.get_plugin(pname))
+ assert hookcallers is not None
+ assert len(hookcallers) == 1
+
+
+def test_register_historic(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ pm.hook.he_method1.call_historic(kwargs=dict(arg=1))
+ out = []
+
+ class Plugin:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg)
+
+ pm.register(Plugin())
+ assert out == [1]
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg * 10)
+
+ pm.register(Plugin2())
+ assert out == [1, 10]
+ pm.hook.he_method1.call_historic(kwargs=dict(arg=12))
+ assert out == [1, 10, 120, 12]
+
+
+def test_historic_with_subset_hook_caller(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ ...
+
+ pm.add_hookspecs(Hooks)
+
+ out = []
+
+ class Plugin:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg)
+
+ plugin = Plugin()
+ pm.register(plugin)
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg * 10)
+
+ shc = pm.subset_hook_caller("he_method1", remove_plugins=[plugin])
+ shc.call_historic(kwargs=dict(arg=1))
+
+ pm.register(Plugin2())
+ assert out == [10]
+
+ pm.register(Plugin())
+ assert out == [10, 1]
+
+
[email protected]("result_callback", [True, False])
+def test_with_result_memorized(pm: PluginManager, result_callback: bool) -> None:
+ """Verify that ``_HookCaller._maybe_apply_history()`
+ correctly applies the ``result_callback`` function, when provided,
+ to the result from calling each newly registered hook.
+ """
+ out = []
+ if not result_callback:
+ callback = None
+ else:
+
+ def callback(res) -> None:
+ out.append(res)
+
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg * 10
+
+ pm.register(Plugin1())
+
+ he_method1 = pm.hook.he_method1
+ he_method1.call_historic(result_callback=callback, kwargs=dict(arg=1))
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg * 10
+
+ pm.register(Plugin2())
+ if result_callback:
+ assert out == [10, 10]
+ else:
+ assert out == []
+
+
+def test_with_callbacks_immediately_executed(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg * 10
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg * 20
+
+ class Plugin3:
+ @hookimpl
+ def he_method1(self, arg):
+ return arg * 30
+
+ out = []
+ pm.register(Plugin1())
+ pm.register(Plugin2())
+
+ he_method1 = pm.hook.he_method1
+ he_method1.call_historic(lambda res: out.append(res), dict(arg=1))
+ assert out == [20, 10]
+ pm.register(Plugin3())
+ assert out == [20, 10, 30]
+
+
+def test_register_historic_incompat_hookwrapper(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ out = []
+
+ class Plugin:
+ @hookimpl(hookwrapper=True)
+ def he_method1(self, arg):
+ out.append(arg)
+
+ with pytest.raises(PluginValidationError):
+ pm.register(Plugin())
+
+
+def test_register_historic_incompat_wrapper(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec(historic=True)
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ class Plugin:
+ @hookimpl(wrapper=True)
+ def he_method1(self, arg):
+ yield
+
+ with pytest.raises(PluginValidationError):
+ pm.register(Plugin())
+
+
+def test_call_extra(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ def he_method1(arg):
+ return arg * 10
+
+ out = pm.hook.he_method1.call_extra([he_method1], dict(arg=1))
+ assert out == [10]
+
+
+def test_call_with_too_few_args(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ 0 / 0
+
+ pm.register(Plugin1())
+ with pytest.raises(HookCallError):
+ with pytest.warns(UserWarning):
+ pm.hook.he_method1()
+
+
+def test_subset_hook_caller(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+
+ out = []
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg)
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ out.append(arg * 10)
+
+ class PluginNo:
+ pass
+
+ plugin1, plugin2, plugin3 = Plugin1(), Plugin2(), PluginNo()
+ pm.register(plugin1)
+ pm.register(plugin2)
+ pm.register(plugin3)
+ pm.hook.he_method1(arg=1)
+ assert out == [10, 1]
+ out[:] = []
+
+ hc = pm.subset_hook_caller("he_method1", [plugin1])
+ hc(arg=2)
+ assert out == [20]
+ out[:] = []
+
+ hc = pm.subset_hook_caller("he_method1", [plugin2])
+ hc(arg=2)
+ assert out == [2]
+ out[:] = []
+
+ pm.unregister(plugin1)
+ hc(arg=2)
+ assert out == []
+ out[:] = []
+
+ pm.hook.he_method1(arg=1)
+ assert out == [10]
+
+ assert repr(hc) == "<_SubsetHookCaller 'he_method1'>"
+
+
+def test_get_hookimpls(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec
+ def he_method1(self, arg):
+ pass
+
+ pm.add_hookspecs(Hooks)
+ assert pm.hook.he_method1.get_hookimpls() == []
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self, arg):
+ pass
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self, arg):
+ pass
+
+ class PluginNo:
+ pass
+
+ plugin1, plugin2, plugin3 = Plugin1(), Plugin2(), PluginNo()
+ pm.register(plugin1)
+ pm.register(plugin2)
+ pm.register(plugin3)
+
+ hookimpls = pm.hook.he_method1.get_hookimpls()
+ hook_plugins = [item.plugin for item in hookimpls]
+ assert hook_plugins == [plugin1, plugin2]
+
+
+def test_get_hookcallers(pm: PluginManager) -> None:
+ class Hooks:
+ @hookspec
+ def he_method1(self):
+ ...
+
+ @hookspec
+ def he_method2(self):
+ ...
+
+ pm.add_hookspecs(Hooks)
+
+ class Plugin1:
+ @hookimpl
+ def he_method1(self):
+ ...
+
+ @hookimpl
+ def he_method2(self):
+ ...
+
+ class Plugin2:
+ @hookimpl
+ def he_method1(self):
+ ...
+
+ class Plugin3:
+ @hookimpl
+ def he_method2(self):
+ ...
+
+ plugin1 = Plugin1()
+ pm.register(plugin1)
+ plugin2 = Plugin2()
+ pm.register(plugin2)
+ plugin3 = Plugin3()
+ pm.register(plugin3)
+
+ hookcallers1 = pm.get_hookcallers(plugin1)
+ assert hookcallers1 is not None
+ assert len(hookcallers1) == 2
+ hookcallers2 = pm.get_hookcallers(plugin2)
+ assert hookcallers2 is not None
+ assert len(hookcallers2) == 1
+ hookcallers3 = pm.get_hookcallers(plugin3)
+ assert hookcallers3 is not None
+ assert len(hookcallers3) == 1
+ assert hookcallers1 == hookcallers2 + hookcallers3
+
+ assert pm.get_hookcallers(object()) is None
+
+
+def test_add_hookspecs_nohooks(pm: PluginManager) -> None:
+ class NoHooks:
+ pass
+
+ with pytest.raises(ValueError):
+ pm.add_hookspecs(NoHooks)
+
+
+def test_load_setuptools_instantiation(monkeypatch, pm: PluginManager) -> None:
+ class EntryPoint:
+ name = "myname"
+ group = "hello"
+ value = "myname:foo"
+
+ def load(self):
+ class PseudoPlugin:
+ x = 42
+
+ return PseudoPlugin()
+
+ class Distribution:
+ entry_points = (EntryPoint(),)
+
+ dist = Distribution()
+
+ def my_distributions():
+ return (dist,)
+
+ monkeypatch.setattr(importlib_metadata, "distributions", my_distributions)
+ num = pm.load_setuptools_entrypoints("hello")
+ assert num == 1
+ plugin = pm.get_plugin("myname")
+ assert plugin is not None
+ assert plugin.x == 42
+ ret = pm.list_plugin_distinfo()
+ # poor man's `assert ret == [(plugin, mock.ANY)]`
+ assert len(ret) == 1
+ assert len(ret[0]) == 2
+ assert ret[0][0] == plugin
+ assert ret[0][1]._dist == dist # type: ignore[comparison-overlap]
+ num = pm.load_setuptools_entrypoints("hello")
+ assert num == 0 # no plugin loaded by this call
+
+
+def test_add_tracefuncs(he_pm: PluginManager) -> None:
+ out: List[Any] = []
+
+ class api1:
+ @hookimpl
+ def he_method1(self):
+ out.append("he_method1-api1")
+
+ class api2:
+ @hookimpl
+ def he_method1(self):
+ out.append("he_method1-api2")
+
+ he_pm.register(api1())
+ he_pm.register(api2())
+
+ def before(hook_name, hook_impls, kwargs):
+ out.append((hook_name, list(hook_impls), kwargs))
+
+ def after(outcome, hook_name, hook_impls, kwargs):
+ out.append((outcome, hook_name, list(hook_impls), kwargs))
+
+ undo = he_pm.add_hookcall_monitoring(before, after)
+
+ he_pm.hook.he_method1(arg=1)
+ assert len(out) == 4
+ assert out[0][0] == "he_method1"
+ assert len(out[0][1]) == 2
+ assert isinstance(out[0][2], dict)
+ assert out[1] == "he_method1-api2"
+ assert out[2] == "he_method1-api1"
+ assert len(out[3]) == 4
+ assert out[3][1] == out[0][0]
+
+ undo()
+ he_pm.hook.he_method1(arg=1)
+ assert len(out) == 4 + 2
+
+
+def test_hook_tracing(he_pm: PluginManager) -> None:
+ saveindent = []
+
+ class api1:
+ @hookimpl
+ def he_method1(self):
+ saveindent.append(he_pm.trace.root.indent)
+
+ class api2:
+ @hookimpl
+ def he_method1(self):
+ saveindent.append(he_pm.trace.root.indent)
+ raise ValueError()
+
+ he_pm.register(api1())
+ out: List[Any] = []
+ he_pm.trace.root.setwriter(out.append)
+ undo = he_pm.enable_tracing()
+ try:
+ indent = he_pm.trace.root.indent
+ he_pm.hook.he_method1(arg=1)
+ assert indent == he_pm.trace.root.indent
+ assert len(out) == 2
+ assert "he_method1" in out[0]
+ assert "finish" in out[1]
+
+ out[:] = []
+ he_pm.register(api2())
+
+ with pytest.raises(ValueError):
+ he_pm.hook.he_method1(arg=1)
+ assert he_pm.trace.root.indent == indent
+ assert saveindent[0] > indent
+ finally:
+ undo()