aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Automat/py3/automat/_methodical.py
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-09-02 00:01:09 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-09-02 00:09:17 +0300
commitb5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e (patch)
tree9b37605b78a3d2398da4addca3ee37899621c38e /contrib/python/Automat/py3/automat/_methodical.py
parent88fb7b5c334c3afdffacd104ee20efda4400d1bc (diff)
downloadydb-b5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e.tar.gz
Intermediate changes
Diffstat (limited to 'contrib/python/Automat/py3/automat/_methodical.py')
-rw-r--r--contrib/python/Automat/py3/automat/_methodical.py311
1 files changed, 192 insertions, 119 deletions
diff --git a/contrib/python/Automat/py3/automat/_methodical.py b/contrib/python/Automat/py3/automat/_methodical.py
index 6c9060cbb04..6c46c11e89e 100644
--- a/contrib/python/Automat/py3/automat/_methodical.py
+++ b/contrib/python/Automat/py3/automat/_methodical.py
@@ -1,20 +1,34 @@
# -*- test-case-name: automat._test.test_methodical -*-
+from __future__ import annotations
import collections
+import sys
+from dataclasses import dataclass, field
from functools import wraps
-from itertools import count
-
from inspect import getfullargspec as getArgsSpec
+from itertools import count
+from typing import Any, Callable, Hashable, Iterable, TypeVar
-import attr
+if sys.version_info < (3, 10):
+ from typing_extensions import TypeAlias
+else:
+ from typing import TypeAlias
-from ._core import Transitioner, Automaton
+from ._core import Automaton, OutputTracer, Tracer, Transitioner
from ._introspection import preserveName
-
-ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw',
- 'defaults', 'kwonlyargs',
- 'kwonlydefaults', 'annotations'])
+ArgSpec = collections.namedtuple(
+ "ArgSpec",
+ [
+ "args",
+ "varargs",
+ "varkw",
+ "defaults",
+ "kwonlyargs",
+ "kwonlydefaults",
+ "annotations",
+ ],
+)
def _getArgSpec(func):
@@ -34,8 +48,7 @@ def _getArgSpec(func):
defaults=spec.defaults if spec.defaults else (),
kwonlyargs=tuple(spec.kwonlyargs),
kwonlydefaults=(
- tuple(spec.kwonlydefaults.items())
- if spec.kwonlydefaults else ()
+ tuple(spec.kwonlydefaults.items()) if spec.kwonlydefaults else ()
),
annotations=tuple(spec.annotations.items()),
)
@@ -54,8 +67,8 @@ def _getArgNames(spec):
return set(
spec.args
+ spec.kwonlyargs
- + (('*args',) if spec.varargs else ())
- + (('**kwargs',) if spec.varkw else ())
+ + (("*args",) if spec.varargs else ())
+ + (("**kwargs",) if spec.varkw else ())
+ spec.annotations
)
@@ -70,39 +83,51 @@ def _keywords_only(f):
Only works for methods right now.
"""
+
@wraps(f)
def g(self, **kw):
return f(self, **kw)
+
return g
-@attr.s(frozen=True)
+@dataclass(frozen=True)
class MethodicalState(object):
"""
A state for a L{MethodicalMachine}.
"""
- machine = attr.ib(repr=False)
- method = attr.ib()
- serialized = attr.ib(repr=False)
- def upon(self, input, enter=None, outputs=None, collector=list):
+ machine: MethodicalMachine = field(repr=False)
+ method: Callable[..., Any] = field()
+ serialized: bool = field(repr=False)
+
+ def upon(
+ self,
+ input: MethodicalInput,
+ enter: MethodicalState | None = None,
+ outputs: Iterable[MethodicalOutput] | None = None,
+ collector: Callable[[Iterable[T]], object] = list,
+ ) -> None:
"""
- Declare a state transition within the :class:`automat.MethodicalMachine`
- associated with this :class:`automat.MethodicalState`:
- upon the receipt of the `input`, enter the `state`,
- emitting each output in `outputs`.
-
- :param MethodicalInput input: The input triggering a state transition.
- :param MethodicalState enter: The resulting state.
- :param Iterable[MethodicalOutput] outputs: The outputs to be triggered
- as a result of the declared state transition.
- :param Callable collector: The function to be used when collecting
- output return values.
-
- :raises TypeError: if any of the `outputs` signatures do not match
- the `inputs` signature.
- :raises ValueError: if the state transition from `self` via `input`
- has already been defined.
+ Declare a state transition within the L{MethodicalMachine} associated
+ with this L{MethodicalState}: upon the receipt of the `input`, enter
+ the `state`, emitting each output in `outputs`.
+
+ @param input: The input triggering a state transition.
+
+ @param enter: The resulting state.
+
+ @param outputs: The outputs to be triggered as a result of the declared
+ state transition.
+
+ @param collector: The function to be used when collecting output return
+ values.
+
+ @raises TypeError: if any of the `outputs` signatures do not match the
+ `inputs` signature.
+
+ @raises ValueError: if the state transition from `self` via `input` has
+ already been defined.
"""
if enter is None:
enter = self
@@ -120,14 +145,19 @@ class MethodicalState(object):
output=output.method.__name__,
inputSignature=getArgsSpec(input.method),
outputSignature=getArgsSpec(output.method),
- ))
+ )
+ )
self.machine._oneTransition(self, input, enter, outputs, collector)
- def _name(self):
+ def _name(self) -> str:
return self.method.__name__
-def _transitionerFromInstance(oself, symbol, automaton):
+def _transitionerFromInstance(
+ oself: object,
+ symbol: str,
+ automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput],
+) -> Transitioner[MethodicalState, MethodicalInput, MethodicalOutput]:
"""
Get a L{Transitioner}
"""
@@ -144,10 +174,12 @@ def _transitionerFromInstance(oself, symbol, automaton):
def _empty():
pass
+
def _docstring():
"""docstring"""
-def assertNoCode(inst, attribute, f):
+
+def assertNoCode(f: Callable[..., Any]) -> None:
# The function body must be empty, i.e. "pass" or "return None", which
# both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also
# accept functions with only a docstring, which yields slightly different
@@ -159,8 +191,7 @@ def assertNoCode(inst, attribute, f):
# checking that would require us to parse the bytecode, find the index
# being returned, then making sure the table has a None at that index.
- if f.__code__.co_code not in (_empty.__code__.co_code,
- _docstring.__code__.co_code):
+ if f.__code__.co_code not in (_empty.__code__.co_code, _docstring.__code__.co_code):
raise ValueError("function body must be empty")
@@ -198,68 +229,80 @@ def _filterArgs(args, kwargs, inputSpec, outputSpec):
else:
# Filter out names that the output method does not accept.
all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs
- return_kwargs = {n: v for n, v in full_kwargs.items()
- if n in all_accepted_names}
+ return_kwargs = {
+ n: v for n, v in full_kwargs.items() if n in all_accepted_names
+ }
return return_args, return_kwargs
-@attr.s(eq=False, hash=False)
+T = TypeVar("T")
+R = TypeVar("R")
+
+
+@dataclass(eq=False)
class MethodicalInput(object):
"""
An input for a L{MethodicalMachine}.
"""
- automaton = attr.ib(repr=False)
- method = attr.ib(validator=assertNoCode)
- symbol = attr.ib(repr=False)
- collectors = attr.ib(default=attr.Factory(dict), repr=False)
- argSpec = attr.ib(init=False, repr=False)
- @argSpec.default
- def _buildArgSpec(self):
- return _getArgSpec(self.method)
+ automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field(
+ repr=False
+ )
+ method: Callable[..., Any] = field()
+ symbol: str = field(repr=False)
+ collectors: dict[MethodicalState, Callable[[Iterable[T]], R]] = field(
+ default_factory=dict, repr=False
+ )
- def __get__(self, oself, type=None):
+ argSpec: ArgSpec = field(init=False, repr=False)
+
+ def __post_init__(self) -> None:
+ self.argSpec = _getArgSpec(self.method)
+ assertNoCode(self.method)
+
+ def __get__(self, oself: object, type: None = None) -> object:
"""
Return a function that takes no arguments and returns values returned
by output functions produced by the given L{MethodicalInput} in
C{oself}'s current state.
"""
- transitioner = _transitionerFromInstance(oself, self.symbol,
- self.automaton)
+ transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton)
+
@preserveName(self.method)
@wraps(self.method)
- def doInput(*args, **kwargs):
+ def doInput(*args: object, **kwargs: object) -> object:
self.method(oself, *args, **kwargs)
previousState = transitioner._state
(outputs, outTracer) = transitioner.transition(self)
collector = self.collectors[previousState]
values = []
for output in outputs:
- if outTracer:
- outTracer(output._name())
+ if outTracer is not None:
+ outTracer(output)
a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec)
value = output(oself, *a, **k)
values.append(value)
return collector(values)
+
return doInput
- def _name(self):
+ def _name(self) -> str:
return self.method.__name__
-@attr.s(frozen=True)
+@dataclass(frozen=True)
class MethodicalOutput(object):
"""
An output for a L{MethodicalMachine}.
"""
- machine = attr.ib(repr=False)
- method = attr.ib()
- argSpec = attr.ib(init=False, repr=False)
- @argSpec.default
- def _buildArgSpec(self):
- return _getArgSpec(self.method)
+ machine: MethodicalMachine = field(repr=False)
+ method: Callable[..., Any]
+ argSpec: ArgSpec = field(init=False, repr=False, compare=False)
+
+ def __post_init__(self) -> None:
+ self.__dict__["argSpec"] = _getArgSpec(self.method)
def __get__(self, oself, type=None):
"""
@@ -268,37 +311,64 @@ class MethodicalOutput(object):
raise AttributeError(
"{cls}.{method} is a state-machine output method; "
"to produce this output, call an input method instead.".format(
- cls=type.__name__,
- method=self.method.__name__
+ cls=type.__name__, method=self.method.__name__
)
)
-
def __call__(self, oself, *args, **kwargs):
"""
Call the underlying method.
"""
return self.method(oself, *args, **kwargs)
- def _name(self):
+ def _name(self) -> str:
return self.method.__name__
-@attr.s(eq=False, hash=False)
+
+StringOutputTracer = Callable[[str], None]
+StringTracer: TypeAlias = "Callable[[str, str, str], StringOutputTracer | None]"
+
+
+def wrapTracer(
+ wrapped: StringTracer | None,
+) -> Tracer[MethodicalState, MethodicalInput, MethodicalOutput] | None:
+ if wrapped is None:
+ return None
+
+ def tracer(
+ state: MethodicalState,
+ input: MethodicalInput,
+ output: MethodicalState,
+ ) -> OutputTracer[MethodicalOutput] | None:
+ result = wrapped(state._name(), input._name(), output._name())
+ if result is not None:
+ return lambda out: result(out._name())
+ return None
+
+ return tracer
+
+
+@dataclass(eq=False)
class MethodicalTracer(object):
- automaton = attr.ib(repr=False)
- symbol = attr.ib(repr=False)
+ automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field(
+ repr=False
+ )
+ symbol: str = field(repr=False)
+ def __get__(
+ self, oself: object, type: object = None
+ ) -> Callable[[StringTracer], None]:
+ transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton)
- def __get__(self, oself, type=None):
- transitioner = _transitionerFromInstance(oself, self.symbol,
- self.automaton)
- def setTrace(tracer):
- transitioner.setTrace(tracer)
- return setTrace
+ def setTrace(tracer: StringTracer | None) -> None:
+ transitioner.setTrace(wrapTracer(tracer))
+ return setTrace
counter = count()
+
+
def gensym():
"""
Create a unique Python identifier.
@@ -306,11 +376,10 @@ def gensym():
return "_symbol_" + str(next(counter))
-
class MethodicalMachine(object):
"""
- A :class:`MethodicalMachine` is an interface to an `Automaton`
- that uses methods on a class.
+ A L{MethodicalMachine} is an interface to an L{Automaton} that uses methods
+ on a class.
"""
def __init__(self):
@@ -318,7 +387,6 @@ class MethodicalMachine(object):
self._reducers = {}
self._symbol = gensym()
-
def __get__(self, oself, type=None):
"""
L{MethodicalMachine} is an implementation detail for setting up
@@ -326,42 +394,41 @@ class MethodicalMachine(object):
instance.
"""
if oself is not None:
- raise AttributeError(
- "MethodicalMachine is an implementation detail.")
+ raise AttributeError("MethodicalMachine is an implementation detail.")
return self
-
@_keywords_only
- def state(self, initial=False, terminal=False,
- serialized=None):
+ def state(
+ self, initial: bool = False, terminal: bool = False, serialized: Hashable = None
+ ):
"""
Declare a state, possibly an initial state or a terminal state.
This is a decorator for methods, but it will modify the method so as
not to be callable any more.
- :param bool initial: is this state the initial state?
- Only one state on this :class:`automat.MethodicalMachine`
- may be an initial state; more than one is an error.
+ @param initial: is this state the initial state? Only one state on
+ this L{automat.MethodicalMachine} may be an initial state; more
+ than one is an error.
- :param bool terminal: Is this state a terminal state?
- i.e. a state that the machine can end up in?
- (This is purely informational at this point.)
+ @param terminal: Is this state a terminal state? i.e. a state that the
+ machine can end up in? (This is purely informational at this
+ point.)
- :param Hashable serialized: a serializable value
- to be used to represent this state to external systems.
- This value should be hashable;
- :py:func:`unicode` is a good type to use.
+ @param serialized: a serializable value to be used to represent this
+ state to external systems. This value should be hashable; L{str}
+ is a good type to use.
"""
+
def decorator(stateMethod):
- state = MethodicalState(machine=self,
- method=stateMethod,
- serialized=serialized)
+ state = MethodicalState(
+ machine=self, method=stateMethod, serialized=serialized
+ )
if initial:
self._automaton.initialState = state
return state
- return decorator
+ return decorator
@_keywords_only
def input(self):
@@ -370,12 +437,13 @@ class MethodicalMachine(object):
This is a decorator for methods.
"""
+
def decorator(inputMethod):
- return MethodicalInput(automaton=self._automaton,
- method=inputMethod,
- symbol=self._symbol)
- return decorator
+ return MethodicalInput(
+ automaton=self._automaton, method=inputMethod, symbol=self._symbol
+ )
+ return decorator
@_keywords_only
def output(self):
@@ -387,13 +455,13 @@ class MethodicalMachine(object):
This method will be called when the state machine transitions to this
state as specified in the decorated `output` method.
"""
+
def decorator(outputMethod):
return MethodicalOutput(machine=self, method=outputMethod)
- return decorator
+ return decorator
- def _oneTransition(self, startState, inputToken, endState, outputTokens,
- collector):
+ def _oneTransition(self, startState, inputToken, endState, outputTokens, collector):
"""
See L{MethodicalState.upon}.
"""
@@ -411,30 +479,31 @@ class MethodicalMachine(object):
# if not isinstance(endState, MethodicalState):
# raise NotImplementedError("output state {} isn't a state"
# .format(endState))
- self._automaton.addTransition(startState, inputToken, endState,
- tuple(outputTokens))
+ self._automaton.addTransition(
+ startState, inputToken, endState, tuple(outputTokens)
+ )
inputToken.collectors[startState] = collector
-
@_keywords_only
def serializer(self):
- """
+ """ """
- """
def decorator(decoratee):
@wraps(decoratee)
def serialize(oself):
- transitioner = _transitionerFromInstance(oself, self._symbol,
- self._automaton)
+ transitioner = _transitionerFromInstance(
+ oself, self._symbol, self._automaton
+ )
return decoratee(oself, transitioner._state.serialized)
+
return serialize
+
return decorator
@_keywords_only
def unserializer(self):
- """
+ """ """
- """
def decorator(decoratee):
@wraps(decoratee)
def unserialize(oself, *args, **kwargs):
@@ -443,14 +512,17 @@ class MethodicalMachine(object):
for eachState in self._automaton.states():
mapping[eachState.serialized] = eachState
transitioner = _transitionerFromInstance(
- oself, self._symbol, self._automaton)
+ oself, self._symbol, self._automaton
+ )
transitioner._state = mapping[state]
- return None # it's on purpose
+ return None # it's on purpose
+
return unserialize
+
return decorator
@property
- def _setTrace(self):
+ def _setTrace(self) -> MethodicalTracer:
return MethodicalTracer(self._automaton, self._symbol)
def asDigraph(self):
@@ -464,6 +536,7 @@ class MethodicalMachine(object):
"""
from ._visualize import makeDigraph
+
return makeDigraph(
self._automaton,
stateAsString=lambda state: state.method.__name__,