aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Automat/py2/automat
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Automat/py2/automat
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Automat/py2/automat')
-rw-r--r--contrib/python/Automat/py2/automat/__init__.py8
-rw-r--r--contrib/python/Automat/py2/automat/_core.py165
-rw-r--r--contrib/python/Automat/py2/automat/_discover.py144
-rw-r--r--contrib/python/Automat/py2/automat/_introspection.py45
-rw-r--r--contrib/python/Automat/py2/automat/_methodical.py474
-rw-r--r--contrib/python/Automat/py2/automat/_visualize.py182
6 files changed, 1018 insertions, 0 deletions
diff --git a/contrib/python/Automat/py2/automat/__init__.py b/contrib/python/Automat/py2/automat/__init__.py
new file mode 100644
index 0000000000..570b84f995
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/__init__.py
@@ -0,0 +1,8 @@
+# -*- test-case-name: automat -*-
+from ._methodical import MethodicalMachine
+from ._core import NoTransition
+
+__all__ = [
+ 'MethodicalMachine',
+ 'NoTransition',
+]
diff --git a/contrib/python/Automat/py2/automat/_core.py b/contrib/python/Automat/py2/automat/_core.py
new file mode 100644
index 0000000000..4118a4b070
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/_core.py
@@ -0,0 +1,165 @@
+# -*- test-case-name: automat._test.test_core -*-
+
+"""
+A core state-machine abstraction.
+
+Perhaps something that could be replaced with or integrated into machinist.
+"""
+
+from itertools import chain
+
+_NO_STATE = "<no state>"
+
+
+class NoTransition(Exception):
+ """
+ A finite state machine in C{state} has no transition for C{symbol}.
+
+ @param state: the finite state machine's state at the time of the
+ illegal transition.
+
+ @param symbol: the input symbol for which no transition exists.
+ """
+
+ def __init__(self, state, symbol):
+ self.state = state
+ self.symbol = symbol
+ super(Exception, self).__init__(
+ "no transition for {} in {}".format(symbol, state)
+ )
+
+
+class Automaton(object):
+ """
+ A declaration of a finite state machine.
+
+ Note that this is not the machine itself; it is immutable.
+ """
+
+ def __init__(self):
+ """
+ Initialize the set of transitions and the initial state.
+ """
+ self._initialState = _NO_STATE
+ self._transitions = set()
+
+
+ @property
+ def initialState(self):
+ """
+ Return this automaton's initial state.
+ """
+ return self._initialState
+
+
+ @initialState.setter
+ def initialState(self, state):
+ """
+ Set this automaton's initial state. Raises a ValueError if
+ this automaton already has an initial state.
+ """
+
+ if self._initialState is not _NO_STATE:
+ raise ValueError(
+ "initial state already set to {}".format(self._initialState))
+
+ self._initialState = state
+
+
+ def addTransition(self, inState, inputSymbol, outState, outputSymbols):
+ """
+ Add the given transition to the outputSymbol. Raise ValueError if
+ there is already a transition with the same inState and inputSymbol.
+ """
+ # keeping self._transitions in a flat list makes addTransition
+ # O(n^2), but state machines don't tend to have hundreds of
+ # transitions.
+ for (anInState, anInputSymbol, anOutState, _) in self._transitions:
+ if (anInState == inState and anInputSymbol == inputSymbol):
+ raise ValueError(
+ "already have transition from {} via {}".format(inState, inputSymbol))
+ self._transitions.add(
+ (inState, inputSymbol, outState, tuple(outputSymbols))
+ )
+
+
+ def allTransitions(self):
+ """
+ All transitions.
+ """
+ return frozenset(self._transitions)
+
+
+ def inputAlphabet(self):
+ """
+ The full set of symbols acceptable to this automaton.
+ """
+ return {inputSymbol for (inState, inputSymbol, outState,
+ outputSymbol) in self._transitions}
+
+
+ def outputAlphabet(self):
+ """
+ The full set of symbols which can be produced by this automaton.
+ """
+ return set(
+ chain.from_iterable(
+ outputSymbols for
+ (inState, inputSymbol, outState, outputSymbols)
+ in self._transitions
+ )
+ )
+
+
+ def states(self):
+ """
+ All valid states; "Q" in the mathematical description of a state
+ machine.
+ """
+ return frozenset(
+ chain.from_iterable(
+ (inState, outState)
+ for
+ (inState, inputSymbol, outState, outputSymbol)
+ in self._transitions
+ )
+ )
+
+
+ def outputForInput(self, inState, inputSymbol):
+ """
+ A 2-tuple of (outState, outputSymbols) for inputSymbol.
+ """
+ for (anInState, anInputSymbol,
+ outState, outputSymbols) in self._transitions:
+ if (inState, inputSymbol) == (anInState, anInputSymbol):
+ return (outState, list(outputSymbols))
+ raise NoTransition(state=inState, symbol=inputSymbol)
+
+
+class Transitioner(object):
+ """
+ The combination of a current state and an L{Automaton}.
+ """
+
+ def __init__(self, automaton, initialState):
+ self._automaton = automaton
+ self._state = initialState
+ self._tracer = None
+
+ def setTrace(self, tracer):
+ self._tracer = tracer
+
+ def transition(self, inputSymbol):
+ """
+ Transition between states, returning any outputs.
+ """
+ outState, outputSymbols = self._automaton.outputForInput(self._state,
+ inputSymbol)
+ outTracer = None
+ if self._tracer:
+ outTracer = self._tracer(self._state._name(),
+ inputSymbol._name(),
+ outState._name())
+ self._state = outState
+ return (outputSymbols, outTracer)
diff --git a/contrib/python/Automat/py2/automat/_discover.py b/contrib/python/Automat/py2/automat/_discover.py
new file mode 100644
index 0000000000..c0d88baea4
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/_discover.py
@@ -0,0 +1,144 @@
+import collections
+import inspect
+from automat import MethodicalMachine
+from twisted.python.modules import PythonModule, getModule
+
+
+def isOriginalLocation(attr):
+ """
+ Attempt to discover if this appearance of a PythonAttribute
+ representing a class refers to the module where that class was
+ defined.
+ """
+ sourceModule = inspect.getmodule(attr.load())
+ if sourceModule is None:
+ return False
+
+ currentModule = attr
+ while not isinstance(currentModule, PythonModule):
+ currentModule = currentModule.onObject
+
+ return currentModule.name == sourceModule.__name__
+
+
+def findMachinesViaWrapper(within):
+ """
+ Recursively yield L{MethodicalMachine}s and their FQPNs within a
+ L{PythonModule} or a L{twisted.python.modules.PythonAttribute}
+ wrapper object.
+
+ Note that L{PythonModule}s may refer to packages, as well.
+
+ The discovery heuristic considers L{MethodicalMachine} instances
+ that are module-level attributes or class-level attributes
+ accessible from module scope. Machines inside nested classes will
+ be discovered, but those returned from functions or methods will not be.
+
+ @type within: L{PythonModule} or L{twisted.python.modules.PythonAttribute}
+ @param within: Where to start the search.
+
+ @return: a generator which yields FQPN, L{MethodicalMachine} pairs.
+ """
+ queue = collections.deque([within])
+ visited = set()
+
+ while queue:
+ attr = queue.pop()
+ value = attr.load()
+
+ if isinstance(value, MethodicalMachine) and value not in visited:
+ visited.add(value)
+ yield attr.name, value
+ elif (inspect.isclass(value) and isOriginalLocation(attr) and
+ value not in visited):
+ visited.add(value)
+ queue.extendleft(attr.iterAttributes())
+ elif isinstance(attr, PythonModule) and value not in visited:
+ visited.add(value)
+ queue.extendleft(attr.iterAttributes())
+ queue.extendleft(attr.iterModules())
+
+
+class InvalidFQPN(Exception):
+ """
+ The given FQPN was not a dot-separated list of Python objects.
+ """
+
+
+class NoModule(InvalidFQPN):
+ """
+ A prefix of the FQPN was not an importable module or package.
+ """
+
+
+class NoObject(InvalidFQPN):
+ """
+ A suffix of the FQPN was not an accessible object
+ """
+
+
+def wrapFQPN(fqpn):
+ """
+ Given an FQPN, retrieve the object via the global Python module
+ namespace and wrap it with a L{PythonModule} or a
+ L{twisted.python.modules.PythonAttribute}.
+ """
+ # largely cribbed from t.p.reflect.namedAny
+
+ if not fqpn:
+ raise InvalidFQPN("FQPN was empty")
+
+ components = collections.deque(fqpn.split('.'))
+
+ if '' in components:
+ raise InvalidFQPN(
+ "name must be a string giving a '.'-separated list of Python "
+ "identifiers, not %r" % (fqpn,))
+
+ component = components.popleft()
+ try:
+ module = getModule(component)
+ except KeyError:
+ raise NoModule(component)
+
+ # find the bottom-most module
+ while components:
+ component = components.popleft()
+ try:
+ module = module[component]
+ except KeyError:
+ components.appendleft(component)
+ break
+ else:
+ module.load()
+ else:
+ return module
+
+ # find the bottom-most attribute
+ attribute = module
+ for component in components:
+ try:
+ attribute = next(child for child in attribute.iterAttributes()
+ if child.name.rsplit('.', 1)[-1] == component)
+ except StopIteration:
+ raise NoObject('{}.{}'.format(attribute.name, component))
+
+ return attribute
+
+
+def findMachines(fqpn):
+ """
+ Recursively yield L{MethodicalMachine}s and their FQPNs in and
+ under the a Python object specified by an FQPN.
+
+ The discovery heuristic considers L{MethodicalMachine} instances
+ that are module-level attributes or class-level attributes
+ accessible from module scope. Machines inside nested classes will
+ be discovered, but those returned from functions or methods will not be.
+
+ @type within: an FQPN
+ @param within: Where to start the search.
+
+ @return: a generator which yields FQPN, L{MethodicalMachine} pairs.
+ """
+ return findMachinesViaWrapper(wrapFQPN(fqpn))
diff --git a/contrib/python/Automat/py2/automat/_introspection.py b/contrib/python/Automat/py2/automat/_introspection.py
new file mode 100644
index 0000000000..3f7307d8df
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/_introspection.py
@@ -0,0 +1,45 @@
+"""
+Python introspection helpers.
+"""
+
+from types import CodeType as code, FunctionType as function
+
+
+def copycode(template, changes):
+ names = [
+ "argcount", "nlocals", "stacksize", "flags", "code", "consts",
+ "names", "varnames", "filename", "name", "firstlineno", "lnotab",
+ "freevars", "cellvars"
+ ]
+ if hasattr(code, "co_kwonlyargcount"):
+ names.insert(1, "kwonlyargcount")
+ if hasattr(code, "co_posonlyargcount"):
+ # PEP 570 added "positional only arguments"
+ names.insert(1, "posonlyargcount")
+ values = [
+ changes.get(name, getattr(template, "co_" + name))
+ for name in names
+ ]
+ return code(*values)
+
+
+
+def copyfunction(template, funcchanges, codechanges):
+ names = [
+ "globals", "name", "defaults", "closure",
+ ]
+ values = [
+ funcchanges.get(name, getattr(template, "__" + name + "__"))
+ for name in names
+ ]
+ return function(copycode(template.__code__, codechanges), *values)
+
+
+def preserveName(f):
+ """
+ Preserve the name of the given function on the decorated function.
+ """
+ def decorator(decorated):
+ return copyfunction(decorated,
+ dict(name=f.__name__), dict(name=f.__name__))
+ return decorator
diff --git a/contrib/python/Automat/py2/automat/_methodical.py b/contrib/python/Automat/py2/automat/_methodical.py
new file mode 100644
index 0000000000..84fcd362a6
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/_methodical.py
@@ -0,0 +1,474 @@
+# -*- test-case-name: automat._test.test_methodical -*-
+
+import collections
+from functools import wraps
+from itertools import count
+
+try:
+ # Python 3
+ from inspect import getfullargspec as getArgsSpec
+except ImportError:
+ # Python 2
+ from inspect import getargspec as getArgsSpec
+
+import attr
+import six
+
+from ._core import Transitioner, Automaton
+from ._introspection import preserveName
+
+
+ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw',
+ 'defaults', 'kwonlyargs',
+ 'kwonlydefaults', 'annotations'])
+
+
+def _getArgSpec(func):
+ """
+ Normalize inspect.ArgSpec across python versions
+ and convert mutable attributes to immutable types.
+
+ :param Callable func: A function.
+ :return: The function's ArgSpec.
+ :rtype: ArgSpec
+ """
+ spec = getArgsSpec(func)
+ return ArgSpec(
+ args=tuple(spec.args),
+ varargs=spec.varargs,
+ varkw=spec.varkw if six.PY3 else spec.keywords,
+ defaults=spec.defaults if spec.defaults else (),
+ kwonlyargs=tuple(spec.kwonlyargs) if six.PY3 else (),
+ kwonlydefaults=(
+ tuple(spec.kwonlydefaults.items())
+ if spec.kwonlydefaults else ()
+ ) if six.PY3 else (),
+ annotations=tuple(spec.annotations.items()) if six.PY3 else (),
+ )
+
+
+def _getArgNames(spec):
+ """
+ Get the name of all arguments defined in a function signature.
+
+ The name of * and ** arguments is normalized to "*args" and "**kwargs".
+
+ :param ArgSpec spec: A function to interrogate for a signature.
+ :return: The set of all argument names in `func`s signature.
+ :rtype: Set[str]
+ """
+ return set(
+ spec.args
+ + spec.kwonlyargs
+ + (('*args',) if spec.varargs else ())
+ + (('**kwargs',) if spec.varkw else ())
+ + spec.annotations
+ )
+
+
+def _keywords_only(f):
+ """
+ Decorate a function so all its arguments must be passed by keyword.
+
+ A useful utility for decorators that take arguments so that they don't
+ accidentally get passed the thing they're decorating as their first
+ argument.
+
+ Only works for methods right now.
+ """
+ @wraps(f)
+ def g(self, **kw):
+ return f(self, **kw)
+ return g
+
+
+@attr.s(frozen=True)
+class MethodicalState(object):
+ """
+ A state for a L{MethodicalMachine}.
+ """
+ machine = attr.ib(repr=False)
+ method = attr.ib()
+ serialized = attr.ib(repr=False)
+
+ def upon(self, input, enter, outputs, collector=list):
+ """
+ Declare a state transition within the :class:`automat.MethodicalMachine`
+ associated with this :class:`automat.MethodicalState`:
+ upon the receipt of the `input`, enter the `state`,
+ emitting each output in `outputs`.
+
+ :param MethodicalInput input: The input triggering a state transition.
+ :param MethodicalState enter: The resulting state.
+ :param Iterable[MethodicalOutput] outputs: The outputs to be triggered
+ as a result of the declared state transition.
+ :param Callable collector: The function to be used when collecting
+ output return values.
+
+ :raises TypeError: if any of the `outputs` signatures do not match
+ the `inputs` signature.
+ :raises ValueError: if the state transition from `self` via `input`
+ has already been defined.
+ """
+ inputArgs = _getArgNames(input.argSpec)
+ for output in outputs:
+ outputArgs = _getArgNames(output.argSpec)
+ if not outputArgs.issubset(inputArgs):
+ raise TypeError(
+ "method {input} signature {inputSignature} "
+ "does not match output {output} "
+ "signature {outputSignature}".format(
+ input=input.method.__name__,
+ output=output.method.__name__,
+ inputSignature=getArgsSpec(input.method),
+ outputSignature=getArgsSpec(output.method),
+ ))
+ self.machine._oneTransition(self, input, enter, outputs, collector)
+
+ def _name(self):
+ return self.method.__name__
+
+
+def _transitionerFromInstance(oself, symbol, automaton):
+ """
+ Get a L{Transitioner}
+ """
+ transitioner = getattr(oself, symbol, None)
+ if transitioner is None:
+ transitioner = Transitioner(
+ automaton,
+ automaton.initialState,
+ )
+ setattr(oself, symbol, transitioner)
+ return transitioner
+
+
+def _empty():
+ pass
+
+def _docstring():
+ """docstring"""
+
+def assertNoCode(inst, attribute, f):
+ # The function body must be empty, i.e. "pass" or "return None", which
+ # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also
+ # accept functions with only a docstring, which yields slightly different
+ # bytecode, because the "None" is put in a different constant slot.
+
+ # Unfortunately, this does not catch function bodies that return a
+ # constant value, e.g. "return 1", because their code is identical to a
+ # "return None". They differ in the contents of their constant table, but
+ # checking that would require us to parse the bytecode, find the index
+ # being returned, then making sure the table has a None at that index.
+
+ if f.__code__.co_code not in (_empty.__code__.co_code,
+ _docstring.__code__.co_code):
+ raise ValueError("function body must be empty")
+
+
+def _filterArgs(args, kwargs, inputSpec, outputSpec):
+ """
+ Filter out arguments that were passed to input that output won't accept.
+
+ :param tuple args: The *args that input received.
+ :param dict kwargs: The **kwargs that input received.
+ :param ArgSpec inputSpec: The input's arg spec.
+ :param ArgSpec outputSpec: The output's arg spec.
+ :return: The args and kwargs that output will accept.
+ :rtype: Tuple[tuple, dict]
+ """
+ named_args = tuple(zip(inputSpec.args[1:], args))
+ if outputSpec.varargs:
+ # Only return all args if the output accepts *args.
+ return_args = args
+ else:
+ # Filter out arguments that don't appear
+ # in the output's method signature.
+ return_args = [v for n, v in named_args if n in outputSpec.args]
+
+ # Get any of input's default arguments that were not passed.
+ passed_arg_names = tuple(kwargs)
+ for name, value in named_args:
+ passed_arg_names += (name, value)
+ defaults = zip(inputSpec.args[::-1], inputSpec.defaults[::-1])
+ full_kwargs = {n: v for n, v in defaults if n not in passed_arg_names}
+ full_kwargs.update(kwargs)
+
+ if outputSpec.varkw:
+ # Only pass all kwargs if the output method accepts **kwargs.
+ return_kwargs = full_kwargs
+ else:
+ # Filter out names that the output method does not accept.
+ all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs
+ return_kwargs = {n: v for n, v in full_kwargs.items()
+ if n in all_accepted_names}
+
+ return return_args, return_kwargs
+
+
+@attr.s(eq=False, hash=False)
+class MethodicalInput(object):
+ """
+ An input for a L{MethodicalMachine}.
+ """
+ automaton = attr.ib(repr=False)
+ method = attr.ib(validator=assertNoCode)
+ symbol = attr.ib(repr=False)
+ collectors = attr.ib(default=attr.Factory(dict), repr=False)
+ argSpec = attr.ib(init=False, repr=False)
+
+ @argSpec.default
+ def _buildArgSpec(self):
+ return _getArgSpec(self.method)
+
+ def __get__(self, oself, type=None):
+ """
+ Return a function that takes no arguments and returns values returned
+ by output functions produced by the given L{MethodicalInput} in
+ C{oself}'s current state.
+ """
+ transitioner = _transitionerFromInstance(oself, self.symbol,
+ self.automaton)
+ @preserveName(self.method)
+ @wraps(self.method)
+ def doInput(*args, **kwargs):
+ self.method(oself, *args, **kwargs)
+ previousState = transitioner._state
+ (outputs, outTracer) = transitioner.transition(self)
+ collector = self.collectors[previousState]
+ values = []
+ for output in outputs:
+ if outTracer:
+ outTracer(output._name())
+ a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec)
+ value = output(oself, *a, **k)
+ values.append(value)
+ return collector(values)
+ return doInput
+
+ def _name(self):
+ return self.method.__name__
+
+
+@attr.s(frozen=True)
+class MethodicalOutput(object):
+ """
+ An output for a L{MethodicalMachine}.
+ """
+ machine = attr.ib(repr=False)
+ method = attr.ib()
+ argSpec = attr.ib(init=False, repr=False)
+
+ @argSpec.default
+ def _buildArgSpec(self):
+ return _getArgSpec(self.method)
+
+ def __get__(self, oself, type=None):
+ """
+ Outputs are private, so raise an exception when we attempt to get one.
+ """
+ raise AttributeError(
+ "{cls}.{method} is a state-machine output method; "
+ "to produce this output, call an input method instead.".format(
+ cls=type.__name__,
+ method=self.method.__name__
+ )
+ )
+
+
+ def __call__(self, oself, *args, **kwargs):
+ """
+ Call the underlying method.
+ """
+ return self.method(oself, *args, **kwargs)
+
+ def _name(self):
+ return self.method.__name__
+
+@attr.s(eq=False, hash=False)
+class MethodicalTracer(object):
+ automaton = attr.ib(repr=False)
+ symbol = attr.ib(repr=False)
+
+
+ def __get__(self, oself, type=None):
+ transitioner = _transitionerFromInstance(oself, self.symbol,
+ self.automaton)
+ def setTrace(tracer):
+ transitioner.setTrace(tracer)
+ return setTrace
+
+
+
+counter = count()
+def gensym():
+ """
+ Create a unique Python identifier.
+ """
+ return "_symbol_" + str(next(counter))
+
+
+
+class MethodicalMachine(object):
+ """
+ A :class:`MethodicalMachine` is an interface to an `Automaton`
+ that uses methods on a class.
+ """
+
+ def __init__(self):
+ self._automaton = Automaton()
+ self._reducers = {}
+ self._symbol = gensym()
+
+
+ def __get__(self, oself, type=None):
+ """
+ L{MethodicalMachine} is an implementation detail for setting up
+ class-level state; applications should never need to access it on an
+ instance.
+ """
+ if oself is not None:
+ raise AttributeError(
+ "MethodicalMachine is an implementation detail.")
+ return self
+
+
+ @_keywords_only
+ def state(self, initial=False, terminal=False,
+ serialized=None):
+ """
+ Declare a state, possibly an initial state or a terminal state.
+
+ This is a decorator for methods, but it will modify the method so as
+ not to be callable any more.
+
+ :param bool initial: is this state the initial state?
+ Only one state on this :class:`automat.MethodicalMachine`
+ may be an initial state; more than one is an error.
+
+ :param bool terminal: Is this state a terminal state?
+ i.e. a state that the machine can end up in?
+ (This is purely informational at this point.)
+
+ :param Hashable serialized: a serializable value
+ to be used to represent this state to external systems.
+ This value should be hashable;
+ :py:func:`unicode` is a good type to use.
+ """
+ def decorator(stateMethod):
+ state = MethodicalState(machine=self,
+ method=stateMethod,
+ serialized=serialized)
+ if initial:
+ self._automaton.initialState = state
+ return state
+ return decorator
+
+
+ @_keywords_only
+ def input(self):
+ """
+ Declare an input.
+
+ This is a decorator for methods.
+ """
+ def decorator(inputMethod):
+ return MethodicalInput(automaton=self._automaton,
+ method=inputMethod,
+ symbol=self._symbol)
+ return decorator
+
+
+ @_keywords_only
+ def output(self):
+ """
+ Declare an output.
+
+ This is a decorator for methods.
+
+ This method will be called when the state machine transitions to this
+ state as specified in the decorated `output` method.
+ """
+ def decorator(outputMethod):
+ return MethodicalOutput(machine=self, method=outputMethod)
+ return decorator
+
+
+ def _oneTransition(self, startState, inputToken, endState, outputTokens,
+ collector):
+ """
+ See L{MethodicalState.upon}.
+ """
+ # FIXME: tests for all of this (some of it is wrong)
+ # if not isinstance(startState, MethodicalState):
+ # raise NotImplementedError("start state {} isn't a state"
+ # .format(startState))
+ # if not isinstance(inputToken, MethodicalInput):
+ # raise NotImplementedError("start state {} isn't an input"
+ # .format(inputToken))
+ # if not isinstance(endState, MethodicalState):
+ # raise NotImplementedError("end state {} isn't a state"
+ # .format(startState))
+ # for output in outputTokens:
+ # if not isinstance(endState, MethodicalState):
+ # raise NotImplementedError("output state {} isn't a state"
+ # .format(endState))
+ self._automaton.addTransition(startState, inputToken, endState,
+ tuple(outputTokens))
+ inputToken.collectors[startState] = collector
+
+
+ @_keywords_only
+ def serializer(self):
+ """
+
+ """
+ def decorator(decoratee):
+ @wraps(decoratee)
+ def serialize(oself):
+ transitioner = _transitionerFromInstance(oself, self._symbol,
+ self._automaton)
+ return decoratee(oself, transitioner._state.serialized)
+ return serialize
+ return decorator
+
+ @_keywords_only
+ def unserializer(self):
+ """
+
+ """
+ def decorator(decoratee):
+ @wraps(decoratee)
+ def unserialize(oself, *args, **kwargs):
+ state = decoratee(oself, *args, **kwargs)
+ mapping = {}
+ for eachState in self._automaton.states():
+ mapping[eachState.serialized] = eachState
+ transitioner = _transitionerFromInstance(
+ oself, self._symbol, self._automaton)
+ transitioner._state = mapping[state]
+ return None # it's on purpose
+ return unserialize
+ return decorator
+
+ @property
+ def _setTrace(self):
+ return MethodicalTracer(self._automaton, self._symbol)
+
+ def asDigraph(self):
+ """
+ Generate a L{graphviz.Digraph} that represents this machine's
+ states and transitions.
+
+ @return: L{graphviz.Digraph} object; for more information, please
+ see the documentation for
+ U{graphviz<https://graphviz.readthedocs.io/>}
+
+ """
+ from ._visualize import makeDigraph
+ return makeDigraph(
+ self._automaton,
+ stateAsString=lambda state: state.method.__name__,
+ inputAsString=lambda input: input.method.__name__,
+ outputAsString=lambda output: output.method.__name__,
+ )
diff --git a/contrib/python/Automat/py2/automat/_visualize.py b/contrib/python/Automat/py2/automat/_visualize.py
new file mode 100644
index 0000000000..7a9c8c6eb5
--- /dev/null
+++ b/contrib/python/Automat/py2/automat/_visualize.py
@@ -0,0 +1,182 @@
+from __future__ import print_function
+import argparse
+import sys
+
+import graphviz
+
+from ._discover import findMachines
+
+
+def _gvquote(s):
+ return '"{}"'.format(s.replace('"', r'\"'))
+
+
+def _gvhtml(s):
+ return '<{}>'.format(s)
+
+
+def elementMaker(name, *children, **attrs):
+ """
+ Construct a string from the HTML element description.
+ """
+ formattedAttrs = ' '.join('{}={}'.format(key, _gvquote(str(value)))
+ for key, value in sorted(attrs.items()))
+ formattedChildren = ''.join(children)
+ return u'<{name} {attrs}>{children}</{name}>'.format(
+ name=name,
+ attrs=formattedAttrs,
+ children=formattedChildren)
+
+
+def tableMaker(inputLabel, outputLabels, port, _E=elementMaker):
+ """
+ Construct an HTML table to label a state transition.
+ """
+ colspan = {}
+ if outputLabels:
+ colspan['colspan'] = str(len(outputLabels))
+
+ inputLabelCell = _E("td",
+ _E("font",
+ inputLabel,
+ face="menlo-italic"),
+ color="purple",
+ port=port,
+ **colspan)
+
+ pointSize = {"point-size": "9"}
+ outputLabelCells = [_E("td",
+ _E("font",
+ outputLabel,
+ **pointSize),
+ color="pink")
+ for outputLabel in outputLabels]
+
+ rows = [_E("tr", inputLabelCell)]
+
+ if outputLabels:
+ rows.append(_E("tr", *outputLabelCells))
+
+ return _E("table", *rows)
+
+
+def makeDigraph(automaton, inputAsString=repr,
+ outputAsString=repr,
+ stateAsString=repr):
+ """
+ Produce a L{graphviz.Digraph} object from an automaton.
+ """
+ digraph = graphviz.Digraph(graph_attr={'pack': 'true',
+ 'dpi': '100'},
+ node_attr={'fontname': 'Menlo'},
+ edge_attr={'fontname': 'Menlo'})
+
+ for state in automaton.states():
+ if state is automaton.initialState:
+ stateShape = "bold"
+ fontName = "Menlo-Bold"
+ else:
+ stateShape = ""
+ fontName = "Menlo"
+ digraph.node(stateAsString(state),
+ fontame=fontName,
+ shape="ellipse",
+ style=stateShape,
+ color="blue")
+ for n, eachTransition in enumerate(automaton.allTransitions()):
+ inState, inputSymbol, outState, outputSymbols = eachTransition
+ thisTransition = "t{}".format(n)
+ inputLabel = inputAsString(inputSymbol)
+
+ port = "tableport"
+ table = tableMaker(inputLabel, [outputAsString(outputSymbol)
+ for outputSymbol in outputSymbols],
+ port=port)
+
+ digraph.node(thisTransition,
+ label=_gvhtml(table), margin="0.2", shape="none")
+
+ digraph.edge(stateAsString(inState),
+ '{}:{}:w'.format(thisTransition, port),
+ arrowhead="none")
+ digraph.edge('{}:{}:e'.format(thisTransition, port),
+ stateAsString(outState))
+
+ return digraph
+
+
+def tool(_progname=sys.argv[0],
+ _argv=sys.argv[1:],
+ _syspath=sys.path,
+ _findMachines=findMachines,
+ _print=print):
+ """
+ Entry point for command line utility.
+ """
+
+ DESCRIPTION = """
+ Visualize automat.MethodicalMachines as graphviz graphs.
+ """
+ EPILOG = """
+ You must have the graphviz tool suite installed. Please visit
+ http://www.graphviz.org for more information.
+ """
+ if _syspath[0]:
+ _syspath.insert(0, '')
+ argumentParser = argparse.ArgumentParser(
+ prog=_progname,
+ description=DESCRIPTION,
+ epilog=EPILOG)
+ argumentParser.add_argument('fqpn',
+ help="A Fully Qualified Path name"
+ " representing where to find machines.")
+ argumentParser.add_argument('--quiet', '-q',
+ help="suppress output",
+ default=False,
+ action="store_true")
+ argumentParser.add_argument('--dot-directory', '-d',
+ help="Where to write out .dot files.",
+ default=".automat_visualize")
+ argumentParser.add_argument('--image-directory', '-i',
+ help="Where to write out image files.",
+ default=".automat_visualize")
+ argumentParser.add_argument('--image-type', '-t',
+ help="The image format.",
+ choices=graphviz.FORMATS,
+ default='png')
+ argumentParser.add_argument('--view', '-v',
+ help="View rendered graphs with"
+ " default image viewer",
+ default=False,
+ action="store_true")
+ args = argumentParser.parse_args(_argv)
+
+ explicitlySaveDot = (args.dot_directory
+ and (not args.image_directory
+ or args.image_directory != args.dot_directory))
+ if args.quiet:
+ def _print(*args):
+ pass
+
+ for fqpn, machine in _findMachines(args.fqpn):
+ _print(fqpn, '...discovered')
+
+ digraph = machine.asDigraph()
+
+ if explicitlySaveDot:
+ digraph.save(filename="{}.dot".format(fqpn),
+ directory=args.dot_directory)
+ _print(fqpn, "...wrote dot into", args.dot_directory)
+
+ if args.image_directory:
+ deleteDot = not args.dot_directory or explicitlySaveDot
+ digraph.format = args.image_type
+ digraph.render(filename="{}.dot".format(fqpn),
+ directory=args.image_directory,
+ view=args.view,
+ cleanup=deleteDot)
+ if deleteDot:
+ msg = "...wrote image into"
+ else:
+ msg = "...wrote image and dot into"
+ _print(fqpn, msg, args.image_directory)