diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-02 00:01:09 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-02 00:09:17 +0300 |
commit | b5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e (patch) | |
tree | 9b37605b78a3d2398da4addca3ee37899621c38e /contrib/python/Automat/py3/automat/_methodical.py | |
parent | 88fb7b5c334c3afdffacd104ee20efda4400d1bc (diff) | |
download | ydb-b5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/python/Automat/py3/automat/_methodical.py')
-rw-r--r-- | contrib/python/Automat/py3/automat/_methodical.py | 311 |
1 files changed, 192 insertions, 119 deletions
diff --git a/contrib/python/Automat/py3/automat/_methodical.py b/contrib/python/Automat/py3/automat/_methodical.py index 6c9060cbb04..6c46c11e89e 100644 --- a/contrib/python/Automat/py3/automat/_methodical.py +++ b/contrib/python/Automat/py3/automat/_methodical.py @@ -1,20 +1,34 @@ # -*- test-case-name: automat._test.test_methodical -*- +from __future__ import annotations import collections +import sys +from dataclasses import dataclass, field from functools import wraps -from itertools import count - from inspect import getfullargspec as getArgsSpec +from itertools import count +from typing import Any, Callable, Hashable, Iterable, TypeVar -import attr +if sys.version_info < (3, 10): + from typing_extensions import TypeAlias +else: + from typing import TypeAlias -from ._core import Transitioner, Automaton +from ._core import Automaton, OutputTracer, Tracer, Transitioner from ._introspection import preserveName - -ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw', - 'defaults', 'kwonlyargs', - 'kwonlydefaults', 'annotations']) +ArgSpec = collections.namedtuple( + "ArgSpec", + [ + "args", + "varargs", + "varkw", + "defaults", + "kwonlyargs", + "kwonlydefaults", + "annotations", + ], +) def _getArgSpec(func): @@ -34,8 +48,7 @@ def _getArgSpec(func): defaults=spec.defaults if spec.defaults else (), kwonlyargs=tuple(spec.kwonlyargs), kwonlydefaults=( - tuple(spec.kwonlydefaults.items()) - if spec.kwonlydefaults else () + tuple(spec.kwonlydefaults.items()) if spec.kwonlydefaults else () ), annotations=tuple(spec.annotations.items()), ) @@ -54,8 +67,8 @@ def _getArgNames(spec): return set( spec.args + spec.kwonlyargs - + (('*args',) if spec.varargs else ()) - + (('**kwargs',) if spec.varkw else ()) + + (("*args",) if spec.varargs else ()) + + (("**kwargs",) if spec.varkw else ()) + spec.annotations ) @@ -70,39 +83,51 @@ def _keywords_only(f): Only works for methods right now. """ + @wraps(f) def g(self, **kw): return f(self, **kw) + return g -@attr.s(frozen=True) +@dataclass(frozen=True) class MethodicalState(object): """ A state for a L{MethodicalMachine}. """ - machine = attr.ib(repr=False) - method = attr.ib() - serialized = attr.ib(repr=False) - def upon(self, input, enter=None, outputs=None, collector=list): + machine: MethodicalMachine = field(repr=False) + method: Callable[..., Any] = field() + serialized: bool = field(repr=False) + + def upon( + self, + input: MethodicalInput, + enter: MethodicalState | None = None, + outputs: Iterable[MethodicalOutput] | None = None, + collector: Callable[[Iterable[T]], object] = list, + ) -> None: """ - Declare a state transition within the :class:`automat.MethodicalMachine` - associated with this :class:`automat.MethodicalState`: - upon the receipt of the `input`, enter the `state`, - emitting each output in `outputs`. - - :param MethodicalInput input: The input triggering a state transition. - :param MethodicalState enter: The resulting state. - :param Iterable[MethodicalOutput] outputs: The outputs to be triggered - as a result of the declared state transition. - :param Callable collector: The function to be used when collecting - output return values. - - :raises TypeError: if any of the `outputs` signatures do not match - the `inputs` signature. - :raises ValueError: if the state transition from `self` via `input` - has already been defined. + Declare a state transition within the L{MethodicalMachine} associated + with this L{MethodicalState}: upon the receipt of the `input`, enter + the `state`, emitting each output in `outputs`. + + @param input: The input triggering a state transition. + + @param enter: The resulting state. + + @param outputs: The outputs to be triggered as a result of the declared + state transition. + + @param collector: The function to be used when collecting output return + values. + + @raises TypeError: if any of the `outputs` signatures do not match the + `inputs` signature. + + @raises ValueError: if the state transition from `self` via `input` has + already been defined. """ if enter is None: enter = self @@ -120,14 +145,19 @@ class MethodicalState(object): output=output.method.__name__, inputSignature=getArgsSpec(input.method), outputSignature=getArgsSpec(output.method), - )) + ) + ) self.machine._oneTransition(self, input, enter, outputs, collector) - def _name(self): + def _name(self) -> str: return self.method.__name__ -def _transitionerFromInstance(oself, symbol, automaton): +def _transitionerFromInstance( + oself: object, + symbol: str, + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput], +) -> Transitioner[MethodicalState, MethodicalInput, MethodicalOutput]: """ Get a L{Transitioner} """ @@ -144,10 +174,12 @@ def _transitionerFromInstance(oself, symbol, automaton): def _empty(): pass + def _docstring(): """docstring""" -def assertNoCode(inst, attribute, f): + +def assertNoCode(f: Callable[..., Any]) -> None: # The function body must be empty, i.e. "pass" or "return None", which # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also # accept functions with only a docstring, which yields slightly different @@ -159,8 +191,7 @@ def assertNoCode(inst, attribute, f): # checking that would require us to parse the bytecode, find the index # being returned, then making sure the table has a None at that index. - if f.__code__.co_code not in (_empty.__code__.co_code, - _docstring.__code__.co_code): + if f.__code__.co_code not in (_empty.__code__.co_code, _docstring.__code__.co_code): raise ValueError("function body must be empty") @@ -198,68 +229,80 @@ def _filterArgs(args, kwargs, inputSpec, outputSpec): else: # Filter out names that the output method does not accept. all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs - return_kwargs = {n: v for n, v in full_kwargs.items() - if n in all_accepted_names} + return_kwargs = { + n: v for n, v in full_kwargs.items() if n in all_accepted_names + } return return_args, return_kwargs -@attr.s(eq=False, hash=False) +T = TypeVar("T") +R = TypeVar("R") + + +@dataclass(eq=False) class MethodicalInput(object): """ An input for a L{MethodicalMachine}. """ - automaton = attr.ib(repr=False) - method = attr.ib(validator=assertNoCode) - symbol = attr.ib(repr=False) - collectors = attr.ib(default=attr.Factory(dict), repr=False) - argSpec = attr.ib(init=False, repr=False) - @argSpec.default - def _buildArgSpec(self): - return _getArgSpec(self.method) + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field( + repr=False + ) + method: Callable[..., Any] = field() + symbol: str = field(repr=False) + collectors: dict[MethodicalState, Callable[[Iterable[T]], R]] = field( + default_factory=dict, repr=False + ) - def __get__(self, oself, type=None): + argSpec: ArgSpec = field(init=False, repr=False) + + def __post_init__(self) -> None: + self.argSpec = _getArgSpec(self.method) + assertNoCode(self.method) + + def __get__(self, oself: object, type: None = None) -> object: """ Return a function that takes no arguments and returns values returned by output functions produced by the given L{MethodicalInput} in C{oself}'s current state. """ - transitioner = _transitionerFromInstance(oself, self.symbol, - self.automaton) + transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton) + @preserveName(self.method) @wraps(self.method) - def doInput(*args, **kwargs): + def doInput(*args: object, **kwargs: object) -> object: self.method(oself, *args, **kwargs) previousState = transitioner._state (outputs, outTracer) = transitioner.transition(self) collector = self.collectors[previousState] values = [] for output in outputs: - if outTracer: - outTracer(output._name()) + if outTracer is not None: + outTracer(output) a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec) value = output(oself, *a, **k) values.append(value) return collector(values) + return doInput - def _name(self): + def _name(self) -> str: return self.method.__name__ -@attr.s(frozen=True) +@dataclass(frozen=True) class MethodicalOutput(object): """ An output for a L{MethodicalMachine}. """ - machine = attr.ib(repr=False) - method = attr.ib() - argSpec = attr.ib(init=False, repr=False) - @argSpec.default - def _buildArgSpec(self): - return _getArgSpec(self.method) + machine: MethodicalMachine = field(repr=False) + method: Callable[..., Any] + argSpec: ArgSpec = field(init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + self.__dict__["argSpec"] = _getArgSpec(self.method) def __get__(self, oself, type=None): """ @@ -268,37 +311,64 @@ class MethodicalOutput(object): raise AttributeError( "{cls}.{method} is a state-machine output method; " "to produce this output, call an input method instead.".format( - cls=type.__name__, - method=self.method.__name__ + cls=type.__name__, method=self.method.__name__ ) ) - def __call__(self, oself, *args, **kwargs): """ Call the underlying method. """ return self.method(oself, *args, **kwargs) - def _name(self): + def _name(self) -> str: return self.method.__name__ -@attr.s(eq=False, hash=False) + +StringOutputTracer = Callable[[str], None] +StringTracer: TypeAlias = "Callable[[str, str, str], StringOutputTracer | None]" + + +def wrapTracer( + wrapped: StringTracer | None, +) -> Tracer[MethodicalState, MethodicalInput, MethodicalOutput] | None: + if wrapped is None: + return None + + def tracer( + state: MethodicalState, + input: MethodicalInput, + output: MethodicalState, + ) -> OutputTracer[MethodicalOutput] | None: + result = wrapped(state._name(), input._name(), output._name()) + if result is not None: + return lambda out: result(out._name()) + return None + + return tracer + + +@dataclass(eq=False) class MethodicalTracer(object): - automaton = attr.ib(repr=False) - symbol = attr.ib(repr=False) + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field( + repr=False + ) + symbol: str = field(repr=False) + def __get__( + self, oself: object, type: object = None + ) -> Callable[[StringTracer], None]: + transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton) - def __get__(self, oself, type=None): - transitioner = _transitionerFromInstance(oself, self.symbol, - self.automaton) - def setTrace(tracer): - transitioner.setTrace(tracer) - return setTrace + def setTrace(tracer: StringTracer | None) -> None: + transitioner.setTrace(wrapTracer(tracer)) + return setTrace counter = count() + + def gensym(): """ Create a unique Python identifier. @@ -306,11 +376,10 @@ def gensym(): return "_symbol_" + str(next(counter)) - class MethodicalMachine(object): """ - A :class:`MethodicalMachine` is an interface to an `Automaton` - that uses methods on a class. + A L{MethodicalMachine} is an interface to an L{Automaton} that uses methods + on a class. """ def __init__(self): @@ -318,7 +387,6 @@ class MethodicalMachine(object): self._reducers = {} self._symbol = gensym() - def __get__(self, oself, type=None): """ L{MethodicalMachine} is an implementation detail for setting up @@ -326,42 +394,41 @@ class MethodicalMachine(object): instance. """ if oself is not None: - raise AttributeError( - "MethodicalMachine is an implementation detail.") + raise AttributeError("MethodicalMachine is an implementation detail.") return self - @_keywords_only - def state(self, initial=False, terminal=False, - serialized=None): + def state( + self, initial: bool = False, terminal: bool = False, serialized: Hashable = None + ): """ Declare a state, possibly an initial state or a terminal state. This is a decorator for methods, but it will modify the method so as not to be callable any more. - :param bool initial: is this state the initial state? - Only one state on this :class:`automat.MethodicalMachine` - may be an initial state; more than one is an error. + @param initial: is this state the initial state? Only one state on + this L{automat.MethodicalMachine} may be an initial state; more + than one is an error. - :param bool terminal: Is this state a terminal state? - i.e. a state that the machine can end up in? - (This is purely informational at this point.) + @param terminal: Is this state a terminal state? i.e. a state that the + machine can end up in? (This is purely informational at this + point.) - :param Hashable serialized: a serializable value - to be used to represent this state to external systems. - This value should be hashable; - :py:func:`unicode` is a good type to use. + @param serialized: a serializable value to be used to represent this + state to external systems. This value should be hashable; L{str} + is a good type to use. """ + def decorator(stateMethod): - state = MethodicalState(machine=self, - method=stateMethod, - serialized=serialized) + state = MethodicalState( + machine=self, method=stateMethod, serialized=serialized + ) if initial: self._automaton.initialState = state return state - return decorator + return decorator @_keywords_only def input(self): @@ -370,12 +437,13 @@ class MethodicalMachine(object): This is a decorator for methods. """ + def decorator(inputMethod): - return MethodicalInput(automaton=self._automaton, - method=inputMethod, - symbol=self._symbol) - return decorator + return MethodicalInput( + automaton=self._automaton, method=inputMethod, symbol=self._symbol + ) + return decorator @_keywords_only def output(self): @@ -387,13 +455,13 @@ class MethodicalMachine(object): This method will be called when the state machine transitions to this state as specified in the decorated `output` method. """ + def decorator(outputMethod): return MethodicalOutput(machine=self, method=outputMethod) - return decorator + return decorator - def _oneTransition(self, startState, inputToken, endState, outputTokens, - collector): + def _oneTransition(self, startState, inputToken, endState, outputTokens, collector): """ See L{MethodicalState.upon}. """ @@ -411,30 +479,31 @@ class MethodicalMachine(object): # if not isinstance(endState, MethodicalState): # raise NotImplementedError("output state {} isn't a state" # .format(endState)) - self._automaton.addTransition(startState, inputToken, endState, - tuple(outputTokens)) + self._automaton.addTransition( + startState, inputToken, endState, tuple(outputTokens) + ) inputToken.collectors[startState] = collector - @_keywords_only def serializer(self): - """ + """ """ - """ def decorator(decoratee): @wraps(decoratee) def serialize(oself): - transitioner = _transitionerFromInstance(oself, self._symbol, - self._automaton) + transitioner = _transitionerFromInstance( + oself, self._symbol, self._automaton + ) return decoratee(oself, transitioner._state.serialized) + return serialize + return decorator @_keywords_only def unserializer(self): - """ + """ """ - """ def decorator(decoratee): @wraps(decoratee) def unserialize(oself, *args, **kwargs): @@ -443,14 +512,17 @@ class MethodicalMachine(object): for eachState in self._automaton.states(): mapping[eachState.serialized] = eachState transitioner = _transitionerFromInstance( - oself, self._symbol, self._automaton) + oself, self._symbol, self._automaton + ) transitioner._state = mapping[state] - return None # it's on purpose + return None # it's on purpose + return unserialize + return decorator @property - def _setTrace(self): + def _setTrace(self) -> MethodicalTracer: return MethodicalTracer(self._automaton, self._symbol) def asDigraph(self): @@ -464,6 +536,7 @@ class MethodicalMachine(object): """ from ._visualize import makeDigraph + return makeDigraph( self._automaton, stateAsString=lambda state: state.method.__name__, |