diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Automat/py2/automat | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Automat/py2/automat')
-rw-r--r-- | contrib/python/Automat/py2/automat/__init__.py | 8 | ||||
-rw-r--r-- | contrib/python/Automat/py2/automat/_core.py | 165 | ||||
-rw-r--r-- | contrib/python/Automat/py2/automat/_discover.py | 144 | ||||
-rw-r--r-- | contrib/python/Automat/py2/automat/_introspection.py | 45 | ||||
-rw-r--r-- | contrib/python/Automat/py2/automat/_methodical.py | 474 | ||||
-rw-r--r-- | contrib/python/Automat/py2/automat/_visualize.py | 182 |
6 files changed, 1018 insertions, 0 deletions
diff --git a/contrib/python/Automat/py2/automat/__init__.py b/contrib/python/Automat/py2/automat/__init__.py new file mode 100644 index 0000000000..570b84f995 --- /dev/null +++ b/contrib/python/Automat/py2/automat/__init__.py @@ -0,0 +1,8 @@ +# -*- test-case-name: automat -*- +from ._methodical import MethodicalMachine +from ._core import NoTransition + +__all__ = [ + 'MethodicalMachine', + 'NoTransition', +] diff --git a/contrib/python/Automat/py2/automat/_core.py b/contrib/python/Automat/py2/automat/_core.py new file mode 100644 index 0000000000..4118a4b070 --- /dev/null +++ b/contrib/python/Automat/py2/automat/_core.py @@ -0,0 +1,165 @@ +# -*- test-case-name: automat._test.test_core -*- + +""" +A core state-machine abstraction. + +Perhaps something that could be replaced with or integrated into machinist. +""" + +from itertools import chain + +_NO_STATE = "<no state>" + + +class NoTransition(Exception): + """ + A finite state machine in C{state} has no transition for C{symbol}. + + @param state: the finite state machine's state at the time of the + illegal transition. + + @param symbol: the input symbol for which no transition exists. + """ + + def __init__(self, state, symbol): + self.state = state + self.symbol = symbol + super(Exception, self).__init__( + "no transition for {} in {}".format(symbol, state) + ) + + +class Automaton(object): + """ + A declaration of a finite state machine. + + Note that this is not the machine itself; it is immutable. + """ + + def __init__(self): + """ + Initialize the set of transitions and the initial state. + """ + self._initialState = _NO_STATE + self._transitions = set() + + + @property + def initialState(self): + """ + Return this automaton's initial state. + """ + return self._initialState + + + @initialState.setter + def initialState(self, state): + """ + Set this automaton's initial state. Raises a ValueError if + this automaton already has an initial state. + """ + + if self._initialState is not _NO_STATE: + raise ValueError( + "initial state already set to {}".format(self._initialState)) + + self._initialState = state + + + def addTransition(self, inState, inputSymbol, outState, outputSymbols): + """ + Add the given transition to the outputSymbol. Raise ValueError if + there is already a transition with the same inState and inputSymbol. + """ + # keeping self._transitions in a flat list makes addTransition + # O(n^2), but state machines don't tend to have hundreds of + # transitions. + for (anInState, anInputSymbol, anOutState, _) in self._transitions: + if (anInState == inState and anInputSymbol == inputSymbol): + raise ValueError( + "already have transition from {} via {}".format(inState, inputSymbol)) + self._transitions.add( + (inState, inputSymbol, outState, tuple(outputSymbols)) + ) + + + def allTransitions(self): + """ + All transitions. + """ + return frozenset(self._transitions) + + + def inputAlphabet(self): + """ + The full set of symbols acceptable to this automaton. + """ + return {inputSymbol for (inState, inputSymbol, outState, + outputSymbol) in self._transitions} + + + def outputAlphabet(self): + """ + The full set of symbols which can be produced by this automaton. + """ + return set( + chain.from_iterable( + outputSymbols for + (inState, inputSymbol, outState, outputSymbols) + in self._transitions + ) + ) + + + def states(self): + """ + All valid states; "Q" in the mathematical description of a state + machine. + """ + return frozenset( + chain.from_iterable( + (inState, outState) + for + (inState, inputSymbol, outState, outputSymbol) + in self._transitions + ) + ) + + + def outputForInput(self, inState, inputSymbol): + """ + A 2-tuple of (outState, outputSymbols) for inputSymbol. + """ + for (anInState, anInputSymbol, + outState, outputSymbols) in self._transitions: + if (inState, inputSymbol) == (anInState, anInputSymbol): + return (outState, list(outputSymbols)) + raise NoTransition(state=inState, symbol=inputSymbol) + + +class Transitioner(object): + """ + The combination of a current state and an L{Automaton}. + """ + + def __init__(self, automaton, initialState): + self._automaton = automaton + self._state = initialState + self._tracer = None + + def setTrace(self, tracer): + self._tracer = tracer + + def transition(self, inputSymbol): + """ + Transition between states, returning any outputs. + """ + outState, outputSymbols = self._automaton.outputForInput(self._state, + inputSymbol) + outTracer = None + if self._tracer: + outTracer = self._tracer(self._state._name(), + inputSymbol._name(), + outState._name()) + self._state = outState + return (outputSymbols, outTracer) diff --git a/contrib/python/Automat/py2/automat/_discover.py b/contrib/python/Automat/py2/automat/_discover.py new file mode 100644 index 0000000000..c0d88baea4 --- /dev/null +++ b/contrib/python/Automat/py2/automat/_discover.py @@ -0,0 +1,144 @@ +import collections +import inspect +from automat import MethodicalMachine +from twisted.python.modules import PythonModule, getModule + + +def isOriginalLocation(attr): + """ + Attempt to discover if this appearance of a PythonAttribute + representing a class refers to the module where that class was + defined. + """ + sourceModule = inspect.getmodule(attr.load()) + if sourceModule is None: + return False + + currentModule = attr + while not isinstance(currentModule, PythonModule): + currentModule = currentModule.onObject + + return currentModule.name == sourceModule.__name__ + + +def findMachinesViaWrapper(within): + """ + Recursively yield L{MethodicalMachine}s and their FQPNs within a + L{PythonModule} or a L{twisted.python.modules.PythonAttribute} + wrapper object. + + Note that L{PythonModule}s may refer to packages, as well. + + The discovery heuristic considers L{MethodicalMachine} instances + that are module-level attributes or class-level attributes + accessible from module scope. Machines inside nested classes will + be discovered, but those returned from functions or methods will not be. + + @type within: L{PythonModule} or L{twisted.python.modules.PythonAttribute} + @param within: Where to start the search. + + @return: a generator which yields FQPN, L{MethodicalMachine} pairs. + """ + queue = collections.deque([within]) + visited = set() + + while queue: + attr = queue.pop() + value = attr.load() + + if isinstance(value, MethodicalMachine) and value not in visited: + visited.add(value) + yield attr.name, value + elif (inspect.isclass(value) and isOriginalLocation(attr) and + value not in visited): + visited.add(value) + queue.extendleft(attr.iterAttributes()) + elif isinstance(attr, PythonModule) and value not in visited: + visited.add(value) + queue.extendleft(attr.iterAttributes()) + queue.extendleft(attr.iterModules()) + + +class InvalidFQPN(Exception): + """ + The given FQPN was not a dot-separated list of Python objects. + """ + + +class NoModule(InvalidFQPN): + """ + A prefix of the FQPN was not an importable module or package. + """ + + +class NoObject(InvalidFQPN): + """ + A suffix of the FQPN was not an accessible object + """ + + +def wrapFQPN(fqpn): + """ + Given an FQPN, retrieve the object via the global Python module + namespace and wrap it with a L{PythonModule} or a + L{twisted.python.modules.PythonAttribute}. + """ + # largely cribbed from t.p.reflect.namedAny + + if not fqpn: + raise InvalidFQPN("FQPN was empty") + + components = collections.deque(fqpn.split('.')) + + if '' in components: + raise InvalidFQPN( + "name must be a string giving a '.'-separated list of Python " + "identifiers, not %r" % (fqpn,)) + + component = components.popleft() + try: + module = getModule(component) + except KeyError: + raise NoModule(component) + + # find the bottom-most module + while components: + component = components.popleft() + try: + module = module[component] + except KeyError: + components.appendleft(component) + break + else: + module.load() + else: + return module + + # find the bottom-most attribute + attribute = module + for component in components: + try: + attribute = next(child for child in attribute.iterAttributes() + if child.name.rsplit('.', 1)[-1] == component) + except StopIteration: + raise NoObject('{}.{}'.format(attribute.name, component)) + + return attribute + + +def findMachines(fqpn): + """ + Recursively yield L{MethodicalMachine}s and their FQPNs in and + under the a Python object specified by an FQPN. + + The discovery heuristic considers L{MethodicalMachine} instances + that are module-level attributes or class-level attributes + accessible from module scope. Machines inside nested classes will + be discovered, but those returned from functions or methods will not be. + + @type within: an FQPN + @param within: Where to start the search. + + @return: a generator which yields FQPN, L{MethodicalMachine} pairs. + """ + return findMachinesViaWrapper(wrapFQPN(fqpn)) diff --git a/contrib/python/Automat/py2/automat/_introspection.py b/contrib/python/Automat/py2/automat/_introspection.py new file mode 100644 index 0000000000..3f7307d8df --- /dev/null +++ b/contrib/python/Automat/py2/automat/_introspection.py @@ -0,0 +1,45 @@ +""" +Python introspection helpers. +""" + +from types import CodeType as code, FunctionType as function + + +def copycode(template, changes): + names = [ + "argcount", "nlocals", "stacksize", "flags", "code", "consts", + "names", "varnames", "filename", "name", "firstlineno", "lnotab", + "freevars", "cellvars" + ] + if hasattr(code, "co_kwonlyargcount"): + names.insert(1, "kwonlyargcount") + if hasattr(code, "co_posonlyargcount"): + # PEP 570 added "positional only arguments" + names.insert(1, "posonlyargcount") + values = [ + changes.get(name, getattr(template, "co_" + name)) + for name in names + ] + return code(*values) + + + +def copyfunction(template, funcchanges, codechanges): + names = [ + "globals", "name", "defaults", "closure", + ] + values = [ + funcchanges.get(name, getattr(template, "__" + name + "__")) + for name in names + ] + return function(copycode(template.__code__, codechanges), *values) + + +def preserveName(f): + """ + Preserve the name of the given function on the decorated function. + """ + def decorator(decorated): + return copyfunction(decorated, + dict(name=f.__name__), dict(name=f.__name__)) + return decorator diff --git a/contrib/python/Automat/py2/automat/_methodical.py b/contrib/python/Automat/py2/automat/_methodical.py new file mode 100644 index 0000000000..84fcd362a6 --- /dev/null +++ b/contrib/python/Automat/py2/automat/_methodical.py @@ -0,0 +1,474 @@ +# -*- test-case-name: automat._test.test_methodical -*- + +import collections +from functools import wraps +from itertools import count + +try: + # Python 3 + from inspect import getfullargspec as getArgsSpec +except ImportError: + # Python 2 + from inspect import getargspec as getArgsSpec + +import attr +import six + +from ._core import Transitioner, Automaton +from ._introspection import preserveName + + +ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw', + 'defaults', 'kwonlyargs', + 'kwonlydefaults', 'annotations']) + + +def _getArgSpec(func): + """ + Normalize inspect.ArgSpec across python versions + and convert mutable attributes to immutable types. + + :param Callable func: A function. + :return: The function's ArgSpec. + :rtype: ArgSpec + """ + spec = getArgsSpec(func) + return ArgSpec( + args=tuple(spec.args), + varargs=spec.varargs, + varkw=spec.varkw if six.PY3 else spec.keywords, + defaults=spec.defaults if spec.defaults else (), + kwonlyargs=tuple(spec.kwonlyargs) if six.PY3 else (), + kwonlydefaults=( + tuple(spec.kwonlydefaults.items()) + if spec.kwonlydefaults else () + ) if six.PY3 else (), + annotations=tuple(spec.annotations.items()) if six.PY3 else (), + ) + + +def _getArgNames(spec): + """ + Get the name of all arguments defined in a function signature. + + The name of * and ** arguments is normalized to "*args" and "**kwargs". + + :param ArgSpec spec: A function to interrogate for a signature. + :return: The set of all argument names in `func`s signature. + :rtype: Set[str] + """ + return set( + spec.args + + spec.kwonlyargs + + (('*args',) if spec.varargs else ()) + + (('**kwargs',) if spec.varkw else ()) + + spec.annotations + ) + + +def _keywords_only(f): + """ + Decorate a function so all its arguments must be passed by keyword. + + A useful utility for decorators that take arguments so that they don't + accidentally get passed the thing they're decorating as their first + argument. + + Only works for methods right now. + """ + @wraps(f) + def g(self, **kw): + return f(self, **kw) + return g + + +@attr.s(frozen=True) +class MethodicalState(object): + """ + A state for a L{MethodicalMachine}. + """ + machine = attr.ib(repr=False) + method = attr.ib() + serialized = attr.ib(repr=False) + + def upon(self, input, enter, outputs, collector=list): + """ + Declare a state transition within the :class:`automat.MethodicalMachine` + associated with this :class:`automat.MethodicalState`: + upon the receipt of the `input`, enter the `state`, + emitting each output in `outputs`. + + :param MethodicalInput input: The input triggering a state transition. + :param MethodicalState enter: The resulting state. + :param Iterable[MethodicalOutput] outputs: The outputs to be triggered + as a result of the declared state transition. + :param Callable collector: The function to be used when collecting + output return values. + + :raises TypeError: if any of the `outputs` signatures do not match + the `inputs` signature. + :raises ValueError: if the state transition from `self` via `input` + has already been defined. + """ + inputArgs = _getArgNames(input.argSpec) + for output in outputs: + outputArgs = _getArgNames(output.argSpec) + if not outputArgs.issubset(inputArgs): + raise TypeError( + "method {input} signature {inputSignature} " + "does not match output {output} " + "signature {outputSignature}".format( + input=input.method.__name__, + output=output.method.__name__, + inputSignature=getArgsSpec(input.method), + outputSignature=getArgsSpec(output.method), + )) + self.machine._oneTransition(self, input, enter, outputs, collector) + + def _name(self): + return self.method.__name__ + + +def _transitionerFromInstance(oself, symbol, automaton): + """ + Get a L{Transitioner} + """ + transitioner = getattr(oself, symbol, None) + if transitioner is None: + transitioner = Transitioner( + automaton, + automaton.initialState, + ) + setattr(oself, symbol, transitioner) + return transitioner + + +def _empty(): + pass + +def _docstring(): + """docstring""" + +def assertNoCode(inst, attribute, f): + # The function body must be empty, i.e. "pass" or "return None", which + # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also + # accept functions with only a docstring, which yields slightly different + # bytecode, because the "None" is put in a different constant slot. + + # Unfortunately, this does not catch function bodies that return a + # constant value, e.g. "return 1", because their code is identical to a + # "return None". They differ in the contents of their constant table, but + # checking that would require us to parse the bytecode, find the index + # being returned, then making sure the table has a None at that index. + + if f.__code__.co_code not in (_empty.__code__.co_code, + _docstring.__code__.co_code): + raise ValueError("function body must be empty") + + +def _filterArgs(args, kwargs, inputSpec, outputSpec): + """ + Filter out arguments that were passed to input that output won't accept. + + :param tuple args: The *args that input received. + :param dict kwargs: The **kwargs that input received. + :param ArgSpec inputSpec: The input's arg spec. + :param ArgSpec outputSpec: The output's arg spec. + :return: The args and kwargs that output will accept. + :rtype: Tuple[tuple, dict] + """ + named_args = tuple(zip(inputSpec.args[1:], args)) + if outputSpec.varargs: + # Only return all args if the output accepts *args. + return_args = args + else: + # Filter out arguments that don't appear + # in the output's method signature. + return_args = [v for n, v in named_args if n in outputSpec.args] + + # Get any of input's default arguments that were not passed. + passed_arg_names = tuple(kwargs) + for name, value in named_args: + passed_arg_names += (name, value) + defaults = zip(inputSpec.args[::-1], inputSpec.defaults[::-1]) + full_kwargs = {n: v for n, v in defaults if n not in passed_arg_names} + full_kwargs.update(kwargs) + + if outputSpec.varkw: + # Only pass all kwargs if the output method accepts **kwargs. + return_kwargs = full_kwargs + else: + # Filter out names that the output method does not accept. + all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs + return_kwargs = {n: v for n, v in full_kwargs.items() + if n in all_accepted_names} + + return return_args, return_kwargs + + +@attr.s(eq=False, hash=False) +class MethodicalInput(object): + """ + An input for a L{MethodicalMachine}. + """ + automaton = attr.ib(repr=False) + method = attr.ib(validator=assertNoCode) + symbol = attr.ib(repr=False) + collectors = attr.ib(default=attr.Factory(dict), repr=False) + argSpec = attr.ib(init=False, repr=False) + + @argSpec.default + def _buildArgSpec(self): + return _getArgSpec(self.method) + + def __get__(self, oself, type=None): + """ + Return a function that takes no arguments and returns values returned + by output functions produced by the given L{MethodicalInput} in + C{oself}'s current state. + """ + transitioner = _transitionerFromInstance(oself, self.symbol, + self.automaton) + @preserveName(self.method) + @wraps(self.method) + def doInput(*args, **kwargs): + self.method(oself, *args, **kwargs) + previousState = transitioner._state + (outputs, outTracer) = transitioner.transition(self) + collector = self.collectors[previousState] + values = [] + for output in outputs: + if outTracer: + outTracer(output._name()) + a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec) + value = output(oself, *a, **k) + values.append(value) + return collector(values) + return doInput + + def _name(self): + return self.method.__name__ + + +@attr.s(frozen=True) +class MethodicalOutput(object): + """ + An output for a L{MethodicalMachine}. + """ + machine = attr.ib(repr=False) + method = attr.ib() + argSpec = attr.ib(init=False, repr=False) + + @argSpec.default + def _buildArgSpec(self): + return _getArgSpec(self.method) + + def __get__(self, oself, type=None): + """ + Outputs are private, so raise an exception when we attempt to get one. + """ + raise AttributeError( + "{cls}.{method} is a state-machine output method; " + "to produce this output, call an input method instead.".format( + cls=type.__name__, + method=self.method.__name__ + ) + ) + + + def __call__(self, oself, *args, **kwargs): + """ + Call the underlying method. + """ + return self.method(oself, *args, **kwargs) + + def _name(self): + return self.method.__name__ + +@attr.s(eq=False, hash=False) +class MethodicalTracer(object): + automaton = attr.ib(repr=False) + symbol = attr.ib(repr=False) + + + def __get__(self, oself, type=None): + transitioner = _transitionerFromInstance(oself, self.symbol, + self.automaton) + def setTrace(tracer): + transitioner.setTrace(tracer) + return setTrace + + + +counter = count() +def gensym(): + """ + Create a unique Python identifier. + """ + return "_symbol_" + str(next(counter)) + + + +class MethodicalMachine(object): + """ + A :class:`MethodicalMachine` is an interface to an `Automaton` + that uses methods on a class. + """ + + def __init__(self): + self._automaton = Automaton() + self._reducers = {} + self._symbol = gensym() + + + def __get__(self, oself, type=None): + """ + L{MethodicalMachine} is an implementation detail for setting up + class-level state; applications should never need to access it on an + instance. + """ + if oself is not None: + raise AttributeError( + "MethodicalMachine is an implementation detail.") + return self + + + @_keywords_only + def state(self, initial=False, terminal=False, + serialized=None): + """ + Declare a state, possibly an initial state or a terminal state. + + This is a decorator for methods, but it will modify the method so as + not to be callable any more. + + :param bool initial: is this state the initial state? + Only one state on this :class:`automat.MethodicalMachine` + may be an initial state; more than one is an error. + + :param bool terminal: Is this state a terminal state? + i.e. a state that the machine can end up in? + (This is purely informational at this point.) + + :param Hashable serialized: a serializable value + to be used to represent this state to external systems. + This value should be hashable; + :py:func:`unicode` is a good type to use. + """ + def decorator(stateMethod): + state = MethodicalState(machine=self, + method=stateMethod, + serialized=serialized) + if initial: + self._automaton.initialState = state + return state + return decorator + + + @_keywords_only + def input(self): + """ + Declare an input. + + This is a decorator for methods. + """ + def decorator(inputMethod): + return MethodicalInput(automaton=self._automaton, + method=inputMethod, + symbol=self._symbol) + return decorator + + + @_keywords_only + def output(self): + """ + Declare an output. + + This is a decorator for methods. + + This method will be called when the state machine transitions to this + state as specified in the decorated `output` method. + """ + def decorator(outputMethod): + return MethodicalOutput(machine=self, method=outputMethod) + return decorator + + + def _oneTransition(self, startState, inputToken, endState, outputTokens, + collector): + """ + See L{MethodicalState.upon}. + """ + # FIXME: tests for all of this (some of it is wrong) + # if not isinstance(startState, MethodicalState): + # raise NotImplementedError("start state {} isn't a state" + # .format(startState)) + # if not isinstance(inputToken, MethodicalInput): + # raise NotImplementedError("start state {} isn't an input" + # .format(inputToken)) + # if not isinstance(endState, MethodicalState): + # raise NotImplementedError("end state {} isn't a state" + # .format(startState)) + # for output in outputTokens: + # if not isinstance(endState, MethodicalState): + # raise NotImplementedError("output state {} isn't a state" + # .format(endState)) + self._automaton.addTransition(startState, inputToken, endState, + tuple(outputTokens)) + inputToken.collectors[startState] = collector + + + @_keywords_only + def serializer(self): + """ + + """ + def decorator(decoratee): + @wraps(decoratee) + def serialize(oself): + transitioner = _transitionerFromInstance(oself, self._symbol, + self._automaton) + return decoratee(oself, transitioner._state.serialized) + return serialize + return decorator + + @_keywords_only + def unserializer(self): + """ + + """ + def decorator(decoratee): + @wraps(decoratee) + def unserialize(oself, *args, **kwargs): + state = decoratee(oself, *args, **kwargs) + mapping = {} + for eachState in self._automaton.states(): + mapping[eachState.serialized] = eachState + transitioner = _transitionerFromInstance( + oself, self._symbol, self._automaton) + transitioner._state = mapping[state] + return None # it's on purpose + return unserialize + return decorator + + @property + def _setTrace(self): + return MethodicalTracer(self._automaton, self._symbol) + + def asDigraph(self): + """ + Generate a L{graphviz.Digraph} that represents this machine's + states and transitions. + + @return: L{graphviz.Digraph} object; for more information, please + see the documentation for + U{graphviz<https://graphviz.readthedocs.io/>} + + """ + from ._visualize import makeDigraph + return makeDigraph( + self._automaton, + stateAsString=lambda state: state.method.__name__, + inputAsString=lambda input: input.method.__name__, + outputAsString=lambda output: output.method.__name__, + ) diff --git a/contrib/python/Automat/py2/automat/_visualize.py b/contrib/python/Automat/py2/automat/_visualize.py new file mode 100644 index 0000000000..7a9c8c6eb5 --- /dev/null +++ b/contrib/python/Automat/py2/automat/_visualize.py @@ -0,0 +1,182 @@ +from __future__ import print_function +import argparse +import sys + +import graphviz + +from ._discover import findMachines + + +def _gvquote(s): + return '"{}"'.format(s.replace('"', r'\"')) + + +def _gvhtml(s): + return '<{}>'.format(s) + + +def elementMaker(name, *children, **attrs): + """ + Construct a string from the HTML element description. + """ + formattedAttrs = ' '.join('{}={}'.format(key, _gvquote(str(value))) + for key, value in sorted(attrs.items())) + formattedChildren = ''.join(children) + return u'<{name} {attrs}>{children}</{name}>'.format( + name=name, + attrs=formattedAttrs, + children=formattedChildren) + + +def tableMaker(inputLabel, outputLabels, port, _E=elementMaker): + """ + Construct an HTML table to label a state transition. + """ + colspan = {} + if outputLabels: + colspan['colspan'] = str(len(outputLabels)) + + inputLabelCell = _E("td", + _E("font", + inputLabel, + face="menlo-italic"), + color="purple", + port=port, + **colspan) + + pointSize = {"point-size": "9"} + outputLabelCells = [_E("td", + _E("font", + outputLabel, + **pointSize), + color="pink") + for outputLabel in outputLabels] + + rows = [_E("tr", inputLabelCell)] + + if outputLabels: + rows.append(_E("tr", *outputLabelCells)) + + return _E("table", *rows) + + +def makeDigraph(automaton, inputAsString=repr, + outputAsString=repr, + stateAsString=repr): + """ + Produce a L{graphviz.Digraph} object from an automaton. + """ + digraph = graphviz.Digraph(graph_attr={'pack': 'true', + 'dpi': '100'}, + node_attr={'fontname': 'Menlo'}, + edge_attr={'fontname': 'Menlo'}) + + for state in automaton.states(): + if state is automaton.initialState: + stateShape = "bold" + fontName = "Menlo-Bold" + else: + stateShape = "" + fontName = "Menlo" + digraph.node(stateAsString(state), + fontame=fontName, + shape="ellipse", + style=stateShape, + color="blue") + for n, eachTransition in enumerate(automaton.allTransitions()): + inState, inputSymbol, outState, outputSymbols = eachTransition + thisTransition = "t{}".format(n) + inputLabel = inputAsString(inputSymbol) + + port = "tableport" + table = tableMaker(inputLabel, [outputAsString(outputSymbol) + for outputSymbol in outputSymbols], + port=port) + + digraph.node(thisTransition, + label=_gvhtml(table), margin="0.2", shape="none") + + digraph.edge(stateAsString(inState), + '{}:{}:w'.format(thisTransition, port), + arrowhead="none") + digraph.edge('{}:{}:e'.format(thisTransition, port), + stateAsString(outState)) + + return digraph + + +def tool(_progname=sys.argv[0], + _argv=sys.argv[1:], + _syspath=sys.path, + _findMachines=findMachines, + _print=print): + """ + Entry point for command line utility. + """ + + DESCRIPTION = """ + Visualize automat.MethodicalMachines as graphviz graphs. + """ + EPILOG = """ + You must have the graphviz tool suite installed. Please visit + http://www.graphviz.org for more information. + """ + if _syspath[0]: + _syspath.insert(0, '') + argumentParser = argparse.ArgumentParser( + prog=_progname, + description=DESCRIPTION, + epilog=EPILOG) + argumentParser.add_argument('fqpn', + help="A Fully Qualified Path name" + " representing where to find machines.") + argumentParser.add_argument('--quiet', '-q', + help="suppress output", + default=False, + action="store_true") + argumentParser.add_argument('--dot-directory', '-d', + help="Where to write out .dot files.", + default=".automat_visualize") + argumentParser.add_argument('--image-directory', '-i', + help="Where to write out image files.", + default=".automat_visualize") + argumentParser.add_argument('--image-type', '-t', + help="The image format.", + choices=graphviz.FORMATS, + default='png') + argumentParser.add_argument('--view', '-v', + help="View rendered graphs with" + " default image viewer", + default=False, + action="store_true") + args = argumentParser.parse_args(_argv) + + explicitlySaveDot = (args.dot_directory + and (not args.image_directory + or args.image_directory != args.dot_directory)) + if args.quiet: + def _print(*args): + pass + + for fqpn, machine in _findMachines(args.fqpn): + _print(fqpn, '...discovered') + + digraph = machine.asDigraph() + + if explicitlySaveDot: + digraph.save(filename="{}.dot".format(fqpn), + directory=args.dot_directory) + _print(fqpn, "...wrote dot into", args.dot_directory) + + if args.image_directory: + deleteDot = not args.dot_directory or explicitlySaveDot + digraph.format = args.image_type + digraph.render(filename="{}.dot".format(fqpn), + directory=args.image_directory, + view=args.view, + cleanup=deleteDot) + if deleteDot: + msg = "...wrote image into" + else: + msg = "...wrote image and dot into" + _print(fqpn, msg, args.image_directory) |