diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-02 00:01:09 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-02 00:09:17 +0300 |
commit | b5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e (patch) | |
tree | 9b37605b78a3d2398da4addca3ee37899621c38e | |
parent | 88fb7b5c334c3afdffacd104ee20efda4400d1bc (diff) | |
download | ydb-b5c4ec42ac2cc59dc3b104277ce2e85f5f77c88e.tar.gz |
Intermediate changes
-rw-r--r-- | contrib/libs/cxxsupp/libcxxmsvc/include/__support/win32/atomic_win32.h | 17 | ||||
-rw-r--r-- | contrib/python/Automat/py3/.dist-info/METADATA | 201 | ||||
-rw-r--r-- | contrib/python/Automat/py3/README.md | 397 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/__init__.py | 14 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_core.py | 146 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_discover.py | 74 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_introspection.py | 37 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_methodical.py | 311 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_runtimeproto.py | 62 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_typed.py | 736 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/_visualize.py | 234 | ||||
-rw-r--r-- | contrib/python/Automat/py3/automat/py.typed | 0 | ||||
-rw-r--r-- | contrib/python/Automat/py3/ya.make | 10 |
13 files changed, 1564 insertions, 675 deletions
diff --git a/contrib/libs/cxxsupp/libcxxmsvc/include/__support/win32/atomic_win32.h b/contrib/libs/cxxsupp/libcxxmsvc/include/__support/win32/atomic_win32.h index e32bcf9073..1862997f17 100644 --- a/contrib/libs/cxxsupp/libcxxmsvc/include/__support/win32/atomic_win32.h +++ b/contrib/libs/cxxsupp/libcxxmsvc/include/__support/win32/atomic_win32.h @@ -86,14 +86,14 @@ void __msvc_unlock(void* p); template<class _Out, class _Tp> static inline _Out __msvc_cast(_Tp __val) { - _Out __result; + alignas(_Out) char __result[sizeof(_Out)]; volatile char* to = reinterpret_cast<volatile char*>(&__result); volatile char* end = to + sizeof(_Tp); char* from = reinterpret_cast<char*>(&__val); while (to != end) { *to++ = *from++; } - return __result; + return *reinterpret_cast<_Out*>(&__result); } @@ -368,21 +368,20 @@ static inline __int64 __msvc_atomic_load64(volatile __int64* __a, memory_order _ template<typename _Tp> static inline _Tp __c11_atomic_load(volatile _Atomic(_Tp)* __a, int __order) { - _Tp __result; if (sizeof(_Tp) == 1) { - __result = __msvc_cast<_Tp>(__msvc_atomic_load8((volatile char*)__a, (memory_order)__order)); + return __msvc_cast<_Tp>(__msvc_atomic_load8((volatile char*)__a, (memory_order)__order)); } else if (sizeof(_Tp) == 2 && alignof(_Tp) % 2 == 0) { - __result = __msvc_cast<_Tp>(__msvc_atomic_load16((volatile short*)__a, (memory_order)__order)); + return __msvc_cast<_Tp>(__msvc_atomic_load16((volatile short*)__a, (memory_order)__order)); } else if (sizeof(_Tp) == 4 && alignof(_Tp) % 4 == 0) { - __result = __msvc_cast<_Tp>(__msvc_atomic_load32((volatile long*)__a, (memory_order)__order)); + return __msvc_cast<_Tp>(__msvc_atomic_load32((volatile long*)__a, (memory_order)__order)); } else if (sizeof(_Tp) == 8 && alignof(_Tp) % 8 == 0) { - __result = __msvc_cast<_Tp>(__msvc_atomic_load64((volatile __int64*)__a, (memory_order)__order)); + return __msvc_cast<_Tp>(__msvc_atomic_load64((volatile __int64*)__a, (memory_order)__order)); } else { __msvc_lock((void*)__a); - __result = *(_Atomic(_Tp)*)__a; + _Tp __result = *(_Atomic(_Tp)*)__a; __msvc_unlock((void*)__a); + return __result; } - return __result; } template<typename _Tp> diff --git a/contrib/python/Automat/py3/.dist-info/METADATA b/contrib/python/Automat/py3/.dist-info/METADATA index 1df3dba6c8..0f70edae69 100644 --- a/contrib/python/Automat/py3/.dist-info/METADATA +++ b/contrib/python/Automat/py3/.dist-info/METADATA @@ -1,27 +1,198 @@ Metadata-Version: 2.1 Name: Automat -Version: 22.10.0 +Version: 24.8.0 Summary: Self-service finite-state machines for the programmer on the go. -Home-page: https://github.com/glyph/Automat -Author: Glyph -Author-email: glyph@twistedmatrix.com -License: MIT -Keywords: fsm finite state machine automata +Author-email: Glyph <code@glyph.im> +License: Copyright (c) 2014 + Rackspace + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +Project-URL: Documentation, https://automat.readthedocs.io/ +Project-URL: Source, https://github.com/glyph/automat/ +Keywords: fsm,state machine,automata Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: MIT License Classifier: Operating System :: OS Independent Classifier: Programming Language :: Python -Classifier: Programming Language :: Python :: 2 -Classifier: Programming Language :: Python :: 2.7 Classifier: Programming Language :: Python :: 3 -Classifier: Programming Language :: Python :: 3.5 -Classifier: Programming Language :: Python :: 3.6 -Classifier: Programming Language :: Python :: 3.7 Classifier: Programming Language :: Python :: 3.8 +Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 +Classifier: Programming Language :: Python :: 3.11 +Classifier: Programming Language :: Python :: 3.12 +Description-Content-Type: text/markdown License-File: LICENSE -Requires-Dist: attrs (>=19.2.0) -Requires-Dist: six +Requires-Dist: typing-extensions; python_version < "3.10" Provides-Extra: visualize -Requires-Dist: graphviz (>0.5.1) ; extra == 'visualize' -Requires-Dist: Twisted (>=16.1.1) ; extra == 'visualize' +Requires-Dist: graphviz>0.5.1; extra == "visualize" +Requires-Dist: Twisted>=16.1.1; extra == "visualize" +# Automat # + +[![Documentation Status](https://readthedocs.org/projects/automat/badge/?version=latest)](http://automat.readthedocs.io/en/latest/) +[![Build Status](https://github.com/glyph/automat/actions/workflows/ci.yml/badge.svg?branch=trunk)](https://github.com/glyph/automat/actions/workflows/ci.yml?query=branch%3Atrunk) +[![Coverage Status](http://codecov.io/github/glyph/automat/coverage.svg?branch=trunk)](http://codecov.io/github/glyph/automat?branch=trunk) + +## Self-service finite-state machines for the programmer on the go. ## + +Automat is a library for concise, idiomatic Python expression of finite-state +automata (particularly deterministic finite-state transducers). + +Read more here, or on [Read the Docs](https://automat.readthedocs.io/), or watch the following videos for an overview and presentation + +### Why use state machines? ### + +Sometimes you have to create an object whose behavior varies with its state, +but still wishes to present a consistent interface to its callers. + +For example, let's say you're writing the software for a coffee machine. It +has a lid that can be opened or closed, a chamber for water, a chamber for +coffee beans, and a button for "brew". + +There are a number of possible states for the coffee machine. It might or +might not have water. It might or might not have beans. The lid might be open +or closed. The "brew" button should only actually attempt to brew coffee in +one of these configurations, and the "open lid" button should only work if the +coffee is not, in fact, brewing. + +With diligence and attention to detail, you can implement this correctly using +a collection of attributes on an object; `hasWater`, `hasBeans`, `isLidOpen` +and so on. However, you have to keep all these attributes consistent. As the +coffee maker becomes more complex - perhaps you add an additional chamber for +flavorings so you can make hazelnut coffee, for example - you have to keep +adding more and more checks and more and more reasoning about which +combinations of states are allowed. + +Rather than adding tedious `if` checks to every single method to make sure that +each of these flags are exactly what you expect, you can use a state machine to +ensure that if your code runs at all, it will be run with all the required +values initialized, because they have to be called in the order you declare +them. + +You can read about state machines and their advantages for Python programmers +in more detail [in this excellent article by Jean-Paul +Calderone](https://web.archive.org/web/20160507053658/https://clusterhq.com/2013/12/05/what-is-a-state-machine/). + +### What makes Automat different? ### + +There are +[dozens of libraries on PyPI implementing state machines](https://pypi.org/search/?q=finite+state+machine). +So it behooves me to say why yet another one would be a good idea. + +Automat is designed around this principle: while organizing your code around +state machines is a good idea, your callers don't, and shouldn't have to, care +that you've done so. In Python, the "input" to a stateful system is a method +call; the "output" may be a method call, if you need to invoke a side effect, +or a return value, if you are just performing a computation in memory. Most +other state-machine libraries require you to explicitly create an input object, +provide that object to a generic "input" method, and then receive results, +sometimes in terms of that library's interfaces and sometimes in terms of +classes you define yourself. + +For example, a snippet of the coffee-machine example above might be implemented +as follows in naive Python: + +```python +class CoffeeMachine(object): + def brewButton(self) -> None: + if self.hasWater and self.hasBeans and not self.isLidOpen: + self.heatTheHeatingElement() + # ... +``` + +With Automat, you'd begin with a `typing.Protocol` that describes all of your +inputs: + +```python +from typing import Protocol + +class CoffeeBrewer(Protocol): + def brewButton(self) -> None: + "The user pressed the 'brew' button." + def putInBeans(self) -> None: + "The user put in some beans." +``` + +We'll then need a concrete class to contain the shared core of state shared +among the different states: + +```python +from dataclasses import dataclass + +@dataclass +class BrewerCore: + heatingElement: HeatingElement +``` + +Next, we need to describe our state machine, including all of our states. For +simplicity's sake let's say that the only two states are `noBeans` and +`haveBeans`: + +```python +from automat import TypeMachineBuilder + +builder = TypeMachineBuilder(CoffeeBrewer, BrewerCore) +noBeans = builder.state("noBeans") +haveBeans = builder.state("haveBeans") +``` + +Next we can describe a simple transition; when we put in beans, we move to the +`haveBeans` state, with no other behavior. + +```python +# When we don't have beans, upon putting in beans, we will then have beans +noBeans.upon(CoffeeBrewer.putInBeans).to(haveBeans).returns(None) +``` + +And then another transition that we describe with a decorator, one that *does* +have some behavior, that needs to heat up the heating element to brew the +coffee: + +```python +@haveBeans.upon(CoffeeBrewer.brewButton).to(noBeans) +def heatUp(inputs: CoffeeBrewer, core: BrewerCore) -> None: + """ + When we have beans, upon pressing the brew button, we will then not have + beans any more (as they have been entered into the brewing chamber) and + our output will be heating the heating element. + """ + print("Brewing the coffee...") + core.heatingElement.turnOn() +``` + +Then we finalize the state machine by building it, which gives us a callable +that takes a `BrewerCore` and returns a synthetic `CoffeeBrewer` + +```python +newCoffeeMachine = builder.build() +``` + +```python +>>> coffee = newCoffeeMachine(BrewerCore(HeatingElement())) +>>> machine.putInBeans() +>>> machine.brewButton() +Brewing the coffee... +``` + +All of the *inputs* are provided by calling them like methods, all of the +*output behaviors* are automatically invoked when they are produced according +to the outputs specified to `upon` and all of the states are simply opaque +tokens. diff --git a/contrib/python/Automat/py3/README.md b/contrib/python/Automat/py3/README.md index 488a6c4043..a95e125345 100644 --- a/contrib/python/Automat/py3/README.md +++ b/contrib/python/Automat/py3/README.md @@ -1,8 +1,8 @@ # Automat # [![Documentation Status](https://readthedocs.org/projects/automat/badge/?version=latest)](http://automat.readthedocs.io/en/latest/) -[![Build Status](https://travis-ci.org/glyph/automat.svg?branch=master)](https://travis-ci.org/glyph/automat) -[![Coverage Status](https://coveralls.io/repos/glyph/automat/badge.png)](https://coveralls.io/r/glyph/automat) +[![Build Status](https://github.com/glyph/automat/actions/workflows/ci.yml/badge.svg?branch=trunk)](https://github.com/glyph/automat/actions/workflows/ci.yml?query=branch%3Atrunk) +[![Coverage Status](http://codecov.io/github/glyph/automat/coverage.svg?branch=trunk)](http://codecov.io/github/glyph/automat?branch=trunk) ## Self-service finite-state machines for the programmer on the go. ## @@ -11,12 +11,6 @@ automata (particularly deterministic finite-state transducers). Read more here, or on [Read the Docs](https://automat.readthedocs.io/), or watch the following videos for an overview and presentation -Overview and presentation by **Glyph Lefkowitz** at the first talk of the first Pyninsula meetup, on February 21st, 2017: -[![Glyph Lefkowitz - Automat - Pyninsula #0](https://img.youtube.com/vi/0wOZBpD1VVk/0.jpg)](https://www.youtube.com/watch?v=0wOZBpD1VVk) - -Presentation by **Clinton Roy** at PyCon Australia, on August 6th 2017: -[![Clinton Roy - State Machines - Pycon Australia 2017](https://img.youtube.com/vi/TedUKXhu9kE/0.jpg)](https://www.youtube.com/watch?v=TedUKXhu9kE) - ### Why use state machines? ### Sometimes you have to create an object whose behavior varies with its state, @@ -33,14 +27,14 @@ one of these configurations, and the "open lid" button should only work if the coffee is not, in fact, brewing. With diligence and attention to detail, you can implement this correctly using -a collection of attributes on an object; `has_water`, `has_beans`, -`is_lid_open` and so on. However, you have to keep all these attributes -consistent. As the coffee maker becomes more complex - perhaps you add an -additional chamber for flavorings so you can make hazelnut coffee, for -example - you have to keep adding more and more checks and more and more -reasoning about which combinations of states are allowed. - -Rather than adding tedious 'if' checks to every single method to make sure that +a collection of attributes on an object; `hasWater`, `hasBeans`, `isLidOpen` +and so on. However, you have to keep all these attributes consistent. As the +coffee maker becomes more complex - perhaps you add an additional chamber for +flavorings so you can make hazelnut coffee, for example - you have to keep +adding more and more checks and more and more reasoning about which +combinations of states are allowed. + +Rather than adding tedious `if` checks to every single method to make sure that each of these flags are exactly what you expect, you can use a state machine to ensure that if your code runs at all, it will be run with all the required values initialized, because they have to be called in the order you declare @@ -71,360 +65,87 @@ as follows in naive Python: ```python class CoffeeMachine(object): - def brew_button(self): - if self.has_water and self.has_beans and not self.is_lid_open: - self.heat_the_heating_element() + def brewButton(self) -> None: + if self.hasWater and self.hasBeans and not self.isLidOpen: + self.heatTheHeatingElement() # ... ``` -With Automat, you'd create a class with a `MethodicalMachine` attribute: +With Automat, you'd begin with a `typing.Protocol` that describes all of your +inputs: ```python -from automat import MethodicalMachine - -class CoffeeBrewer(object): - _machine = MethodicalMachine() -``` - -and then you would break the above logic into two pieces - the `brew_button` -*input*, declared like so: +from typing import Protocol -```python - @_machine.input() - def brew_button(self): +class CoffeeBrewer(Protocol): + def brewButton(self) -> None: "The user pressed the 'brew' button." -``` - -It wouldn't do any good to declare a method *body* on this, however, because -input methods don't actually execute their bodies when called; doing actual -work is the *output*'s job: - -```python - @_machine.output() - def _heat_the_heating_element(self): - "Heat up the heating element, which should cause coffee to happen." - self._heating_element.turn_on() -``` - -As well as a couple of *states* - and for simplicity's sake let's say that the -only two states are `have_beans` and `dont_have_beans`: - -```python - @_machine.state() - def have_beans(self): - "In this state, you have some beans." - @_machine.state(initial=True) - def dont_have_beans(self): - "In this state, you don't have any beans." -``` - -`dont_have_beans` is the `initial` state because `CoffeeBrewer` starts without beans -in it. - -(And another input to put some beans in:) - -```python - @_machine.input() - def put_in_beans(self): - "The user put in some beans." -``` - -Finally, you hook everything together with the `upon` method of the functions -decorated with `_machine.state`: - -```python - - # When we don't have beans, upon putting in beans, we will then have beans - # (and produce no output) - dont_have_beans.upon(put_in_beans, enter=have_beans, outputs=[]) - - # When we have beans, upon pressing the brew button, we will then not have - # beans any more (as they have been entered into the brewing chamber) and - # our output will be heating the heating element. - have_beans.upon(brew_button, enter=dont_have_beans, - outputs=[_heat_the_heating_element]) -``` - -To *users* of this coffee machine class though, it still looks like a POPO -(Plain Old Python Object): - -```python ->>> coffee_machine = CoffeeMachine() ->>> coffee_machine.put_in_beans() ->>> coffee_machine.brew_button() -``` - -All of the *inputs* are provided by calling them like methods, all of the -*outputs* are automatically invoked when they are produced according to the -outputs specified to `upon` and all of the states are simply opaque tokens - -although the fact that they're defined as methods like inputs and outputs -allows you to put docstrings on them easily to document them. - -## How do I get the current state of a state machine? - -Don't do that. - -One major reason for having a state machine is that you want the callers of the -state machine to just provide the appropriate input to the machine at the -appropriate time, and *not have to check themselves* what state the machine is -in. So if you are tempted to write some code like this: - -```python -if connection_state_machine.state == "CONNECTED": - connection_state_machine.send_message() -else: - print("not connected") -``` - -Instead, just make your calling code do this: - -```python -connection_state_machine.send_message() -``` - -and then change your state machine to look like this: - -```python - @_machine.state() - def connected(self): - "connected" - @_machine.state() - def not_connected(self): - "not connected" - @_machine.input() - def send_message(self): - "send a message" - @_machine.output() - def _actually_send_message(self): - self._transport.send(b"message") - @_machine.output() - def _report_sending_failure(self): - print("not connected") - connected.upon(send_message, enter=connected, [_actually_send_message]) - not_connected.upon(send_message, enter=not_connected, [_report_sending_failure]) -``` - -so that the responsibility for knowing which state the state machine is in -remains within the state machine itself. - -## Input for Inputs and Output for Outputs - -Quite often you want to be able to pass parameters to your methods, as well as -inspecting their results. For example, when you brew the coffee, you might -expect a cup of coffee to result, and you would like to see what kind of coffee -it is. And if you were to put delicious hand-roasted small-batch artisanal -beans into the machine, you would expect a *better* cup of coffee than if you -were to use mass-produced beans. You would do this in plain old Python by -adding a parameter, so that's how you do it in Automat as well. - -```python - @_machine.input() - def put_in_beans(self, beans): + def putInBeans(self) -> None: "The user put in some beans." ``` -However, one important difference here is that *we can't add any -implementation code to the input method*. Inputs are purely a declaration of -the interface; the behavior must all come from outputs. Therefore, the change -in the state of the coffee machine must be represented as an output. We can -add an output method like this: - -```python - @_machine.output() - def _save_beans(self, beans): - "The beans are now in the machine; save them." - self._beans = beans -``` - -and then connect it to the `put_in_beans` by changing the transition from -`dont_have_beans` to `have_beans` like so: - -```python - dont_have_beans.upon(put_in_beans, enter=have_beans, - outputs=[_save_beans]) -``` - -Now, when you call: - -```python -coffee_machine.put_in_beans("real good beans") -``` - -the machine will remember the beans for later. - -So how do we get the beans back out again? One of our outputs needs to have a -return value. It would make sense if our `brew_button` method returned the cup -of coffee that it made, so we should add an output. So, in addition to heating -the heating element, let's add a return value that describes the coffee. First -a new output: - -```python - @_machine.output() - def _describe_coffee(self): - return "A cup of coffee made with {}.".format(self._beans) -``` - -Note that we don't need to check first whether `self._beans` exists or not, -because we can only reach this output method if the state machine says we've -gone through a set of states that sets this attribute. - -Now, we need to hook up `_describe_coffee` to the process of brewing, so change -the brewing transition to: - -```python - have_beans.upon(brew_button, enter=dont_have_beans, - outputs=[_heat_the_heating_element, - _describe_coffee]) -``` - -Now, we can call it: - -```python ->>> coffee_machine.brew_button() -[None, 'A cup of coffee made with real good beans.'] -``` - -Except... wait a second, what's that `None` doing there? - -Since every input can produce multiple outputs, in automat, the default return -value from every input invocation is a `list`. In this case, we have both -`_heat_the_heating_element` and `_describe_coffee` outputs, so we're seeing -both of their return values. However, this can be customized, with the -`collector` argument to `upon`; the `collector` is a callable which takes an -iterable of all the outputs' return values and "collects" a single return value -to return to the caller of the state machine. - -In this case, we only care about the last output, so we can adjust the call to -`upon` like this: +We'll then need a concrete class to contain the shared core of state shared +among the different states: ```python - have_beans.upon(brew_button, enter=dont_have_beans, - outputs=[_heat_the_heating_element, - _describe_coffee], - collector=lambda iterable: list(iterable)[-1] - ) -``` +from dataclasses import dataclass -And now, we'll get just the return value we want: - -```python ->>> coffee_machine.brew_button() -'A cup of coffee made with real good beans.' +@dataclass +class BrewerCore: + heatingElement: HeatingElement ``` -## If I can't get the state of the state machine, how can I save it to (a database, an API response, a file on disk...) - -There are APIs for serializing the state machine. - -First, you have to decide on a persistent representation of each state, via the -`serialized=` argument to the `MethodicalMachine.state()` decorator. - -Let's take this very simple "light switch" state machine, which can be on or -off, and flipped to reverse its state: +Next, we need to describe our state machine, including all of our states. For +simplicity's sake let's say that the only two states are `noBeans` and +`haveBeans`: ```python -class LightSwitch(object): - _machine = MethodicalMachine() - @_machine.state(serialized="on") - def on_state(self): - "the switch is on" - @_machine.state(serialized="off", initial=True) - def off_state(self): - "the switch is off" - @_machine.input() - def flip(self): - "flip the switch" - on_state.upon(flip, enter=off_state, outputs=[]) - off_state.upon(flip, enter=on_state, outputs=[]) -``` +from automat import TypeMachineBuilder -In this case, we've chosen a serialized representation for each state via the -`serialized` argument. The on state is represented by the string `"on"`, and -the off state is represented by the string `"off"`. - -Now, let's just add an input that lets us tell if the switch is on or not. - -```python - @_machine.input() - def query_power(self): - "return True if powered, False otherwise" - @_machine.output() - def _is_powered(self): - return True - @_machine.output() - def _not_powered(self): - return False - on_state.upon(query_power, enter=on_state, outputs=[_is_powered], - collector=next) - off_state.upon(query_power, enter=off_state, outputs=[_not_powered], - collector=next) +builder = TypeMachineBuilder(CoffeeBrewer, BrewerCore) +noBeans = builder.state("noBeans") +haveBeans = builder.state("haveBeans") ``` -To save the state, we have the `MethodicalMachine.serializer()` method. A -method decorated with `@serializer()` gets an extra argument injected at the -beginning of its argument list: the serialized identifier for the state. In -this case, either `"on"` or `"off"`. Since state machine output methods can -also affect other state on the object, a serializer method is expected to -return *all* relevant state for serialization. - -For our simple light switch, such a method might look like this: +Next we can describe a simple transition; when we put in beans, we move to the +`haveBeans` state, with no other behavior. ```python - @_machine.serializer() - def save(self, state): - return {"is-it-on": state} +# When we don't have beans, upon putting in beans, we will then have beans +noBeans.upon(CoffeeBrewer.putInBeans).to(haveBeans).returns(None) ``` -Serializers can be public methods, and they can return whatever you like. If -necessary, you can have different serializers - just multiple methods decorated -with `@_machine.serializer()` - for different formats; return one data-structure -for JSON, one for XML, one for a database row, and so on. - -When it comes time to unserialize, though, you generally want a private method, -because an unserializer has to take a not-fully-initialized instance and -populate it with state. It is expected to *return* the serialized machine -state token that was passed to the serializer, but it can take whatever -arguments you like. Of course, in order to return that, it probably has to -take it somewhere in its arguments, so it will generally take whatever a paired -serializer has returned as an argument. - -So our unserializer would look like this: +And then another transition that we describe with a decorator, one that *does* +have some behavior, that needs to heat up the heating element to brew the +coffee: ```python - @_machine.unserializer() - def _restore(self, blob): - return blob["is-it-on"] +@haveBeans.upon(CoffeeBrewer.brewButton).to(noBeans) +def heatUp(inputs: CoffeeBrewer, core: BrewerCore) -> None: + """ + When we have beans, upon pressing the brew button, we will then not have + beans any more (as they have been entered into the brewing chamber) and + our output will be heating the heating element. + """ + print("Brewing the coffee...") + core.heatingElement.turnOn() ``` -Generally you will want a classmethod deserialization constructor which you -write yourself to call this, so that you know how to create an instance of your -own object, like so: +Then we finalize the state machine by building it, which gives us a callable +that takes a `BrewerCore` and returns a synthetic `CoffeeBrewer` ```python - @classmethod - def from_blob(cls, blob): - self = cls() - self._restore(blob) - return self +newCoffeeMachine = builder.build() ``` -Saving and loading our `LightSwitch` along with its state-machine state can now -be accomplished as follows: - ```python ->>> switch1 = LightSwitch() ->>> switch1.query_power() -False ->>> switch1.flip() -[] ->>> switch1.query_power() -True ->>> blob = switch1.save() ->>> switch2 = LightSwitch.from_blob(blob) ->>> switch2.query_power() -True +>>> coffee = newCoffeeMachine(BrewerCore(HeatingElement())) +>>> machine.putInBeans() +>>> machine.brewButton() +Brewing the coffee... ``` -More comprehensive (tested, working) examples are present in `docs/examples`. - -Go forth and machine all the state! +All of the *inputs* are provided by calling them like methods, all of the +*output behaviors* are automatically invoked when they are produced according +to the outputs specified to `upon` and all of the states are simply opaque +tokens. diff --git a/contrib/python/Automat/py3/automat/__init__.py b/contrib/python/Automat/py3/automat/__init__.py index 570b84f995..c4b34e565a 100644 --- a/contrib/python/Automat/py3/automat/__init__.py +++ b/contrib/python/Automat/py3/automat/__init__.py @@ -1,8 +1,16 @@ # -*- test-case-name: automat -*- -from ._methodical import MethodicalMachine +""" +State-machines. +""" +from ._typed import TypeMachineBuilder, pep614, AlreadyBuiltError, TypeMachine from ._core import NoTransition +from ._methodical import MethodicalMachine __all__ = [ - 'MethodicalMachine', - 'NoTransition', + "TypeMachineBuilder", + "TypeMachine", + "NoTransition", + "AlreadyBuiltError", + "pep614", + "MethodicalMachine", ] diff --git a/contrib/python/Automat/py3/automat/_core.py b/contrib/python/Automat/py3/automat/_core.py index 4118a4b070..fc637b3a0f 100644 --- a/contrib/python/Automat/py3/automat/_core.py +++ b/contrib/python/Automat/py3/automat/_core.py @@ -5,23 +5,41 @@ A core state-machine abstraction. Perhaps something that could be replaced with or integrated into machinist. """ +from __future__ import annotations +import sys from itertools import chain +from typing import Callable, Generic, Optional, Sequence, TypeVar, Hashable + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias _NO_STATE = "<no state>" +State = TypeVar("State", bound=Hashable) +Input = TypeVar("Input", bound=Hashable) +Output = TypeVar("Output", bound=Hashable) -class NoTransition(Exception): +class NoTransition(Exception, Generic[State, Input]): """ A finite state machine in C{state} has no transition for C{symbol}. - @param state: the finite state machine's state at the time of the - illegal transition. + @ivar state: See C{state} init parameter. - @param symbol: the input symbol for which no transition exists. + @ivar symbol: See C{symbol} init parameter. """ - def __init__(self, state, symbol): + def __init__(self, state: State, symbol: Input): + """ + Construct a L{NoTransition}. + + @param state: the finite state machine's state at the time of the + illegal transition. + + @param symbol: the input symbol for which no transition exists. + """ self.state = state self.symbol = symbol super(Exception, self).__init__( @@ -29,31 +47,33 @@ class NoTransition(Exception): ) -class Automaton(object): +class Automaton(Generic[State, Input, Output]): """ A declaration of a finite state machine. Note that this is not the machine itself; it is immutable. """ - def __init__(self): + def __init__(self, initial: State | None = None) -> None: """ Initialize the set of transitions and the initial state. """ - self._initialState = _NO_STATE - self._transitions = set() - + if initial is None: + initial = _NO_STATE # type:ignore[assignment] + assert initial is not None + self._initialState: State = initial + self._transitions: set[tuple[State, Input, State, Sequence[Output]]] = set() + self._unhandledTransition: Optional[tuple[State, Sequence[Output]]] = None @property - def initialState(self): + def initialState(self) -> State: """ Return this automaton's initial state. """ return self._initialState - @initialState.setter - def initialState(self, state): + def initialState(self, state: State) -> None: """ Set this automaton's initial state. Raises a ValueError if this automaton already has an initial state. @@ -61,12 +81,18 @@ class Automaton(object): if self._initialState is not _NO_STATE: raise ValueError( - "initial state already set to {}".format(self._initialState)) + "initial state already set to {}".format(self._initialState) + ) self._initialState = state - - def addTransition(self, inState, inputSymbol, outState, outputSymbols): + def addTransition( + self, + inState: State, + inputSymbol: Input, + outState: State, + outputSymbols: tuple[Output, ...], + ): """ Add the given transition to the outputSymbol. Raise ValueError if there is already a transition with the same inState and inputSymbol. @@ -74,44 +100,51 @@ class Automaton(object): # keeping self._transitions in a flat list makes addTransition # O(n^2), but state machines don't tend to have hundreds of # transitions. - for (anInState, anInputSymbol, anOutState, _) in self._transitions: - if (anInState == inState and anInputSymbol == inputSymbol): + for anInState, anInputSymbol, anOutState, _ in self._transitions: + if anInState == inState and anInputSymbol == inputSymbol: raise ValueError( - "already have transition from {} via {}".format(inState, inputSymbol)) - self._transitions.add( - (inState, inputSymbol, outState, tuple(outputSymbols)) - ) + "already have transition from {} to {} via {}".format( + inState, anOutState, inputSymbol + ) + ) + self._transitions.add((inState, inputSymbol, outState, tuple(outputSymbols))) + def unhandledTransition( + self, outState: State, outputSymbols: Sequence[Output] + ) -> None: + """ + All unhandled transitions will be handled by transitioning to the given + error state and error-handling output symbols. + """ + self._unhandledTransition = (outState, tuple(outputSymbols)) - def allTransitions(self): + def allTransitions(self) -> frozenset[tuple[State, Input, State, Sequence[Output]]]: """ All transitions. """ return frozenset(self._transitions) - - def inputAlphabet(self): + def inputAlphabet(self) -> set[Input]: """ The full set of symbols acceptable to this automaton. """ - return {inputSymbol for (inState, inputSymbol, outState, - outputSymbol) in self._transitions} + return { + inputSymbol + for (inState, inputSymbol, outState, outputSymbol) in self._transitions + } - - def outputAlphabet(self): + def outputAlphabet(self) -> set[Output]: """ The full set of symbols which can be produced by this automaton. """ return set( chain.from_iterable( - outputSymbols for - (inState, inputSymbol, outState, outputSymbols) - in self._transitions + outputSymbols + for (inState, inputSymbol, outState, outputSymbols) in self._transitions ) ) - - def states(self): + def states(self) -> frozenset[State]: """ All valid states; "Q" in the mathematical description of a state machine. @@ -119,47 +152,52 @@ class Automaton(object): return frozenset( chain.from_iterable( (inState, outState) - for - (inState, inputSymbol, outState, outputSymbol) - in self._transitions + for (inState, inputSymbol, outState, outputSymbol) in self._transitions ) ) - - def outputForInput(self, inState, inputSymbol): + def outputForInput( + self, inState: State, inputSymbol: Input + ) -> tuple[State, Sequence[Output]]: """ A 2-tuple of (outState, outputSymbols) for inputSymbol. """ - for (anInState, anInputSymbol, - outState, outputSymbols) in self._transitions: + for anInState, anInputSymbol, outState, outputSymbols in self._transitions: if (inState, inputSymbol) == (anInState, anInputSymbol): return (outState, list(outputSymbols)) - raise NoTransition(state=inState, symbol=inputSymbol) + if self._unhandledTransition is None: + raise NoTransition(state=inState, symbol=inputSymbol) + return self._unhandledTransition + +OutputTracer = Callable[[Output], None] +Tracer: TypeAlias = "Callable[[State, Input, State], OutputTracer[Output] | None]" -class Transitioner(object): + +class Transitioner(Generic[State, Input, Output]): """ The combination of a current state and an L{Automaton}. """ - def __init__(self, automaton, initialState): - self._automaton = automaton - self._state = initialState - self._tracer = None + def __init__(self, automaton: Automaton[State, Input, Output], initialState: State): + self._automaton: Automaton[State, Input, Output] = automaton + self._state: State = initialState + self._tracer: Tracer[State, Input, Output] | None = None - def setTrace(self, tracer): + def setTrace(self, tracer: Tracer[State, Input, Output] | None) -> None: self._tracer = tracer - def transition(self, inputSymbol): + def transition( + self, inputSymbol: Input + ) -> tuple[Sequence[Output], OutputTracer[Output] | None]: """ Transition between states, returning any outputs. """ - outState, outputSymbols = self._automaton.outputForInput(self._state, - inputSymbol) + outState, outputSymbols = self._automaton.outputForInput( + self._state, inputSymbol + ) outTracer = None if self._tracer: - outTracer = self._tracer(self._state._name(), - inputSymbol._name(), - outState._name()) + outTracer = self._tracer(self._state, inputSymbol, outState) self._state = outState return (outputSymbols, outTracer) diff --git a/contrib/python/Automat/py3/automat/_discover.py b/contrib/python/Automat/py3/automat/_discover.py index c0d88baea4..ae92f82fc0 100644 --- a/contrib/python/Automat/py3/automat/_discover.py +++ b/contrib/python/Automat/py3/automat/_discover.py @@ -1,10 +1,17 @@ +from __future__ import annotations + import collections import inspect +from typing import Any, Iterator + +from twisted.python.modules import PythonAttribute, PythonModule, getModule + from automat import MethodicalMachine -from twisted.python.modules import PythonModule, getModule +from ._typed import TypeMachine, InputProtocol, Core -def isOriginalLocation(attr): + +def isOriginalLocation(attr: PythonAttribute | PythonModule) -> bool: """ Attempt to discover if this appearance of a PythonAttribute representing a class refers to the module where that class was @@ -21,7 +28,9 @@ def isOriginalLocation(attr): return currentModule.name == sourceModule.__name__ -def findMachinesViaWrapper(within): +def findMachinesViaWrapper( + within: PythonModule | PythonAttribute, +) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]: """ Recursively yield L{MethodicalMachine}s and their FQPNs within a L{PythonModule} or a L{twisted.python.modules.PythonAttribute} @@ -40,17 +49,25 @@ def findMachinesViaWrapper(within): @return: a generator which yields FQPN, L{MethodicalMachine} pairs. """ queue = collections.deque([within]) - visited = set() + visited: set[ + PythonModule + | PythonAttribute + | MethodicalMachine + | TypeMachine[InputProtocol, Core] + | type[Any] + ] = set() while queue: attr = queue.pop() value = attr.load() - - if isinstance(value, MethodicalMachine) and value not in visited: + if ( + isinstance(value, MethodicalMachine) or isinstance(value, TypeMachine) + ) and value not in visited: visited.add(value) yield attr.name, value - elif (inspect.isclass(value) and isOriginalLocation(attr) and - value not in visited): + elif ( + inspect.isclass(value) and isOriginalLocation(attr) and value not in visited + ): visited.add(value) queue.extendleft(attr.iterAttributes()) elif isinstance(attr, PythonModule) and value not in visited: @@ -77,7 +94,7 @@ class NoObject(InvalidFQPN): """ -def wrapFQPN(fqpn): +def wrapFQPN(fqpn: str) -> PythonModule | PythonAttribute: """ Given an FQPN, retrieve the object via the global Python module namespace and wrap it with a L{PythonModule} or a @@ -88,12 +105,13 @@ def wrapFQPN(fqpn): if not fqpn: raise InvalidFQPN("FQPN was empty") - components = collections.deque(fqpn.split('.')) + components = collections.deque(fqpn.split(".")) - if '' in components: + if "" in components: raise InvalidFQPN( "name must be a string giving a '.'-separated list of Python " - "identifiers, not %r" % (fqpn,)) + "identifiers, not %r" % (fqpn,) + ) component = components.popleft() try: @@ -118,27 +136,33 @@ def wrapFQPN(fqpn): attribute = module for component in components: try: - attribute = next(child for child in attribute.iterAttributes() - if child.name.rsplit('.', 1)[-1] == component) + attribute = next( + child + for child in attribute.iterAttributes() + if child.name.rsplit(".", 1)[-1] == component + ) except StopIteration: - raise NoObject('{}.{}'.format(attribute.name, component)) + raise NoObject("{}.{}".format(attribute.name, component)) return attribute -def findMachines(fqpn): +def findMachines( + fqpn: str, +) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]: """ - Recursively yield L{MethodicalMachine}s and their FQPNs in and - under the a Python object specified by an FQPN. + Recursively yield L{MethodicalMachine}s and their FQPNs in and under the a + Python object specified by an FQPN. - The discovery heuristic considers L{MethodicalMachine} instances - that are module-level attributes or class-level attributes - accessible from module scope. Machines inside nested classes will - be discovered, but those returned from functions or methods will not be. + The discovery heuristic considers L{MethodicalMachine} instances that are + module-level attributes or class-level attributes accessible from module + scope. Machines inside nested classes will be discovered, but those + returned from functions or methods will not be. - @type within: an FQPN - @param within: Where to start the search. + @param fqpn: a fully-qualified Python identifier (i.e. the dotted + identifier of an object defined at module or class scope, including the + package and modele names); where to start the search. - @return: a generator which yields FQPN, L{MethodicalMachine} pairs. + @return: a generator which yields (C{FQPN}, L{MethodicalMachine}) pairs. """ return findMachinesViaWrapper(wrapFQPN(fqpn)) diff --git a/contrib/python/Automat/py3/automat/_introspection.py b/contrib/python/Automat/py3/automat/_introspection.py index 403cddb15e..e433b2f9f9 100644 --- a/contrib/python/Automat/py3/automat/_introspection.py +++ b/contrib/python/Automat/py3/automat/_introspection.py @@ -7,31 +7,41 @@ from types import CodeType as code, FunctionType as function def copycode(template, changes): if hasattr(code, "replace"): - return template.replace(**{"co_" + k : v for k, v in changes.items()}) + return template.replace(**{"co_" + k: v for k, v in changes.items()}) names = [ - "argcount", "nlocals", "stacksize", "flags", "code", "consts", - "names", "varnames", "filename", "name", "firstlineno", "lnotab", - "freevars", "cellvars" + "argcount", + "nlocals", + "stacksize", + "flags", + "code", + "consts", + "names", + "varnames", + "filename", + "name", + "firstlineno", + "lnotab", + "freevars", + "cellvars", ] if hasattr(code, "co_kwonlyargcount"): names.insert(1, "kwonlyargcount") if hasattr(code, "co_posonlyargcount"): # PEP 570 added "positional only arguments" names.insert(1, "posonlyargcount") - values = [ - changes.get(name, getattr(template, "co_" + name)) - for name in names - ] + values = [changes.get(name, getattr(template, "co_" + name)) for name in names] return code(*values) def copyfunction(template, funcchanges, codechanges): names = [ - "globals", "name", "defaults", "closure", + "globals", + "name", + "defaults", + "closure", ] values = [ - funcchanges.get(name, getattr(template, "__" + name + "__")) - for name in names + funcchanges.get(name, getattr(template, "__" + name + "__")) for name in names ] return function(copycode(template.__code__, codechanges), *values) @@ -40,7 +50,8 @@ def preserveName(f): """ Preserve the name of the given function on the decorated function. """ + def decorator(decorated): - return copyfunction(decorated, - dict(name=f.__name__), dict(name=f.__name__)) + return copyfunction(decorated, dict(name=f.__name__), dict(name=f.__name__)) + return decorator diff --git a/contrib/python/Automat/py3/automat/_methodical.py b/contrib/python/Automat/py3/automat/_methodical.py index 6c9060cbb0..6c46c11e89 100644 --- a/contrib/python/Automat/py3/automat/_methodical.py +++ b/contrib/python/Automat/py3/automat/_methodical.py @@ -1,20 +1,34 @@ # -*- test-case-name: automat._test.test_methodical -*- +from __future__ import annotations import collections +import sys +from dataclasses import dataclass, field from functools import wraps -from itertools import count - from inspect import getfullargspec as getArgsSpec +from itertools import count +from typing import Any, Callable, Hashable, Iterable, TypeVar -import attr +if sys.version_info < (3, 10): + from typing_extensions import TypeAlias +else: + from typing import TypeAlias -from ._core import Transitioner, Automaton +from ._core import Automaton, OutputTracer, Tracer, Transitioner from ._introspection import preserveName - -ArgSpec = collections.namedtuple('ArgSpec', ['args', 'varargs', 'varkw', - 'defaults', 'kwonlyargs', - 'kwonlydefaults', 'annotations']) +ArgSpec = collections.namedtuple( + "ArgSpec", + [ + "args", + "varargs", + "varkw", + "defaults", + "kwonlyargs", + "kwonlydefaults", + "annotations", + ], +) def _getArgSpec(func): @@ -34,8 +48,7 @@ def _getArgSpec(func): defaults=spec.defaults if spec.defaults else (), kwonlyargs=tuple(spec.kwonlyargs), kwonlydefaults=( - tuple(spec.kwonlydefaults.items()) - if spec.kwonlydefaults else () + tuple(spec.kwonlydefaults.items()) if spec.kwonlydefaults else () ), annotations=tuple(spec.annotations.items()), ) @@ -54,8 +67,8 @@ def _getArgNames(spec): return set( spec.args + spec.kwonlyargs - + (('*args',) if spec.varargs else ()) - + (('**kwargs',) if spec.varkw else ()) + + (("*args",) if spec.varargs else ()) + + (("**kwargs",) if spec.varkw else ()) + spec.annotations ) @@ -70,39 +83,51 @@ def _keywords_only(f): Only works for methods right now. """ + @wraps(f) def g(self, **kw): return f(self, **kw) + return g -@attr.s(frozen=True) +@dataclass(frozen=True) class MethodicalState(object): """ A state for a L{MethodicalMachine}. """ - machine = attr.ib(repr=False) - method = attr.ib() - serialized = attr.ib(repr=False) - def upon(self, input, enter=None, outputs=None, collector=list): + machine: MethodicalMachine = field(repr=False) + method: Callable[..., Any] = field() + serialized: bool = field(repr=False) + + def upon( + self, + input: MethodicalInput, + enter: MethodicalState | None = None, + outputs: Iterable[MethodicalOutput] | None = None, + collector: Callable[[Iterable[T]], object] = list, + ) -> None: """ - Declare a state transition within the :class:`automat.MethodicalMachine` - associated with this :class:`automat.MethodicalState`: - upon the receipt of the `input`, enter the `state`, - emitting each output in `outputs`. - - :param MethodicalInput input: The input triggering a state transition. - :param MethodicalState enter: The resulting state. - :param Iterable[MethodicalOutput] outputs: The outputs to be triggered - as a result of the declared state transition. - :param Callable collector: The function to be used when collecting - output return values. - - :raises TypeError: if any of the `outputs` signatures do not match - the `inputs` signature. - :raises ValueError: if the state transition from `self` via `input` - has already been defined. + Declare a state transition within the L{MethodicalMachine} associated + with this L{MethodicalState}: upon the receipt of the `input`, enter + the `state`, emitting each output in `outputs`. + + @param input: The input triggering a state transition. + + @param enter: The resulting state. + + @param outputs: The outputs to be triggered as a result of the declared + state transition. + + @param collector: The function to be used when collecting output return + values. + + @raises TypeError: if any of the `outputs` signatures do not match the + `inputs` signature. + + @raises ValueError: if the state transition from `self` via `input` has + already been defined. """ if enter is None: enter = self @@ -120,14 +145,19 @@ class MethodicalState(object): output=output.method.__name__, inputSignature=getArgsSpec(input.method), outputSignature=getArgsSpec(output.method), - )) + ) + ) self.machine._oneTransition(self, input, enter, outputs, collector) - def _name(self): + def _name(self) -> str: return self.method.__name__ -def _transitionerFromInstance(oself, symbol, automaton): +def _transitionerFromInstance( + oself: object, + symbol: str, + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput], +) -> Transitioner[MethodicalState, MethodicalInput, MethodicalOutput]: """ Get a L{Transitioner} """ @@ -144,10 +174,12 @@ def _transitionerFromInstance(oself, symbol, automaton): def _empty(): pass + def _docstring(): """docstring""" -def assertNoCode(inst, attribute, f): + +def assertNoCode(f: Callable[..., Any]) -> None: # The function body must be empty, i.e. "pass" or "return None", which # both yield the same bytecode: LOAD_CONST (None), RETURN_VALUE. We also # accept functions with only a docstring, which yields slightly different @@ -159,8 +191,7 @@ def assertNoCode(inst, attribute, f): # checking that would require us to parse the bytecode, find the index # being returned, then making sure the table has a None at that index. - if f.__code__.co_code not in (_empty.__code__.co_code, - _docstring.__code__.co_code): + if f.__code__.co_code not in (_empty.__code__.co_code, _docstring.__code__.co_code): raise ValueError("function body must be empty") @@ -198,68 +229,80 @@ def _filterArgs(args, kwargs, inputSpec, outputSpec): else: # Filter out names that the output method does not accept. all_accepted_names = outputSpec.args[1:] + outputSpec.kwonlyargs - return_kwargs = {n: v for n, v in full_kwargs.items() - if n in all_accepted_names} + return_kwargs = { + n: v for n, v in full_kwargs.items() if n in all_accepted_names + } return return_args, return_kwargs -@attr.s(eq=False, hash=False) +T = TypeVar("T") +R = TypeVar("R") + + +@dataclass(eq=False) class MethodicalInput(object): """ An input for a L{MethodicalMachine}. """ - automaton = attr.ib(repr=False) - method = attr.ib(validator=assertNoCode) - symbol = attr.ib(repr=False) - collectors = attr.ib(default=attr.Factory(dict), repr=False) - argSpec = attr.ib(init=False, repr=False) - @argSpec.default - def _buildArgSpec(self): - return _getArgSpec(self.method) + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field( + repr=False + ) + method: Callable[..., Any] = field() + symbol: str = field(repr=False) + collectors: dict[MethodicalState, Callable[[Iterable[T]], R]] = field( + default_factory=dict, repr=False + ) - def __get__(self, oself, type=None): + argSpec: ArgSpec = field(init=False, repr=False) + + def __post_init__(self) -> None: + self.argSpec = _getArgSpec(self.method) + assertNoCode(self.method) + + def __get__(self, oself: object, type: None = None) -> object: """ Return a function that takes no arguments and returns values returned by output functions produced by the given L{MethodicalInput} in C{oself}'s current state. """ - transitioner = _transitionerFromInstance(oself, self.symbol, - self.automaton) + transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton) + @preserveName(self.method) @wraps(self.method) - def doInput(*args, **kwargs): + def doInput(*args: object, **kwargs: object) -> object: self.method(oself, *args, **kwargs) previousState = transitioner._state (outputs, outTracer) = transitioner.transition(self) collector = self.collectors[previousState] values = [] for output in outputs: - if outTracer: - outTracer(output._name()) + if outTracer is not None: + outTracer(output) a, k = _filterArgs(args, kwargs, self.argSpec, output.argSpec) value = output(oself, *a, **k) values.append(value) return collector(values) + return doInput - def _name(self): + def _name(self) -> str: return self.method.__name__ -@attr.s(frozen=True) +@dataclass(frozen=True) class MethodicalOutput(object): """ An output for a L{MethodicalMachine}. """ - machine = attr.ib(repr=False) - method = attr.ib() - argSpec = attr.ib(init=False, repr=False) - @argSpec.default - def _buildArgSpec(self): - return _getArgSpec(self.method) + machine: MethodicalMachine = field(repr=False) + method: Callable[..., Any] + argSpec: ArgSpec = field(init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + self.__dict__["argSpec"] = _getArgSpec(self.method) def __get__(self, oself, type=None): """ @@ -268,37 +311,64 @@ class MethodicalOutput(object): raise AttributeError( "{cls}.{method} is a state-machine output method; " "to produce this output, call an input method instead.".format( - cls=type.__name__, - method=self.method.__name__ + cls=type.__name__, method=self.method.__name__ ) ) - def __call__(self, oself, *args, **kwargs): """ Call the underlying method. """ return self.method(oself, *args, **kwargs) - def _name(self): + def _name(self) -> str: return self.method.__name__ -@attr.s(eq=False, hash=False) + +StringOutputTracer = Callable[[str], None] +StringTracer: TypeAlias = "Callable[[str, str, str], StringOutputTracer | None]" + + +def wrapTracer( + wrapped: StringTracer | None, +) -> Tracer[MethodicalState, MethodicalInput, MethodicalOutput] | None: + if wrapped is None: + return None + + def tracer( + state: MethodicalState, + input: MethodicalInput, + output: MethodicalState, + ) -> OutputTracer[MethodicalOutput] | None: + result = wrapped(state._name(), input._name(), output._name()) + if result is not None: + return lambda out: result(out._name()) + return None + + return tracer + + +@dataclass(eq=False) class MethodicalTracer(object): - automaton = attr.ib(repr=False) - symbol = attr.ib(repr=False) + automaton: Automaton[MethodicalState, MethodicalInput, MethodicalOutput] = field( + repr=False + ) + symbol: str = field(repr=False) + def __get__( + self, oself: object, type: object = None + ) -> Callable[[StringTracer], None]: + transitioner = _transitionerFromInstance(oself, self.symbol, self.automaton) - def __get__(self, oself, type=None): - transitioner = _transitionerFromInstance(oself, self.symbol, - self.automaton) - def setTrace(tracer): - transitioner.setTrace(tracer) - return setTrace + def setTrace(tracer: StringTracer | None) -> None: + transitioner.setTrace(wrapTracer(tracer)) + return setTrace counter = count() + + def gensym(): """ Create a unique Python identifier. @@ -306,11 +376,10 @@ def gensym(): return "_symbol_" + str(next(counter)) - class MethodicalMachine(object): """ - A :class:`MethodicalMachine` is an interface to an `Automaton` - that uses methods on a class. + A L{MethodicalMachine} is an interface to an L{Automaton} that uses methods + on a class. """ def __init__(self): @@ -318,7 +387,6 @@ class MethodicalMachine(object): self._reducers = {} self._symbol = gensym() - def __get__(self, oself, type=None): """ L{MethodicalMachine} is an implementation detail for setting up @@ -326,42 +394,41 @@ class MethodicalMachine(object): instance. """ if oself is not None: - raise AttributeError( - "MethodicalMachine is an implementation detail.") + raise AttributeError("MethodicalMachine is an implementation detail.") return self - @_keywords_only - def state(self, initial=False, terminal=False, - serialized=None): + def state( + self, initial: bool = False, terminal: bool = False, serialized: Hashable = None + ): """ Declare a state, possibly an initial state or a terminal state. This is a decorator for methods, but it will modify the method so as not to be callable any more. - :param bool initial: is this state the initial state? - Only one state on this :class:`automat.MethodicalMachine` - may be an initial state; more than one is an error. + @param initial: is this state the initial state? Only one state on + this L{automat.MethodicalMachine} may be an initial state; more + than one is an error. - :param bool terminal: Is this state a terminal state? - i.e. a state that the machine can end up in? - (This is purely informational at this point.) + @param terminal: Is this state a terminal state? i.e. a state that the + machine can end up in? (This is purely informational at this + point.) - :param Hashable serialized: a serializable value - to be used to represent this state to external systems. - This value should be hashable; - :py:func:`unicode` is a good type to use. + @param serialized: a serializable value to be used to represent this + state to external systems. This value should be hashable; L{str} + is a good type to use. """ + def decorator(stateMethod): - state = MethodicalState(machine=self, - method=stateMethod, - serialized=serialized) + state = MethodicalState( + machine=self, method=stateMethod, serialized=serialized + ) if initial: self._automaton.initialState = state return state - return decorator + return decorator @_keywords_only def input(self): @@ -370,12 +437,13 @@ class MethodicalMachine(object): This is a decorator for methods. """ + def decorator(inputMethod): - return MethodicalInput(automaton=self._automaton, - method=inputMethod, - symbol=self._symbol) - return decorator + return MethodicalInput( + automaton=self._automaton, method=inputMethod, symbol=self._symbol + ) + return decorator @_keywords_only def output(self): @@ -387,13 +455,13 @@ class MethodicalMachine(object): This method will be called when the state machine transitions to this state as specified in the decorated `output` method. """ + def decorator(outputMethod): return MethodicalOutput(machine=self, method=outputMethod) - return decorator + return decorator - def _oneTransition(self, startState, inputToken, endState, outputTokens, - collector): + def _oneTransition(self, startState, inputToken, endState, outputTokens, collector): """ See L{MethodicalState.upon}. """ @@ -411,30 +479,31 @@ class MethodicalMachine(object): # if not isinstance(endState, MethodicalState): # raise NotImplementedError("output state {} isn't a state" # .format(endState)) - self._automaton.addTransition(startState, inputToken, endState, - tuple(outputTokens)) + self._automaton.addTransition( + startState, inputToken, endState, tuple(outputTokens) + ) inputToken.collectors[startState] = collector - @_keywords_only def serializer(self): - """ + """ """ - """ def decorator(decoratee): @wraps(decoratee) def serialize(oself): - transitioner = _transitionerFromInstance(oself, self._symbol, - self._automaton) + transitioner = _transitionerFromInstance( + oself, self._symbol, self._automaton + ) return decoratee(oself, transitioner._state.serialized) + return serialize + return decorator @_keywords_only def unserializer(self): - """ + """ """ - """ def decorator(decoratee): @wraps(decoratee) def unserialize(oself, *args, **kwargs): @@ -443,14 +512,17 @@ class MethodicalMachine(object): for eachState in self._automaton.states(): mapping[eachState.serialized] = eachState transitioner = _transitionerFromInstance( - oself, self._symbol, self._automaton) + oself, self._symbol, self._automaton + ) transitioner._state = mapping[state] - return None # it's on purpose + return None # it's on purpose + return unserialize + return decorator @property - def _setTrace(self): + def _setTrace(self) -> MethodicalTracer: return MethodicalTracer(self._automaton, self._symbol) def asDigraph(self): @@ -464,6 +536,7 @@ class MethodicalMachine(object): """ from ._visualize import makeDigraph + return makeDigraph( self._automaton, stateAsString=lambda state: state.method.__name__, diff --git a/contrib/python/Automat/py3/automat/_runtimeproto.py b/contrib/python/Automat/py3/automat/_runtimeproto.py new file mode 100644 index 0000000000..c9c7409f5a --- /dev/null +++ b/contrib/python/Automat/py3/automat/_runtimeproto.py @@ -0,0 +1,62 @@ +""" +Workaround for U{the lack of TypeForm +<https://github.com/python/mypy/issues/9773>}. +""" + +from __future__ import annotations + +import sys + +from typing import TYPE_CHECKING, Callable, Protocol, TypeVar + +from inspect import signature, Signature + +T = TypeVar("T") + +ProtocolAtRuntime = Callable[[], T] + + +def runtime_name(x: ProtocolAtRuntime[T]) -> str: + return x.__name__ + + +from inspect import getmembers, isfunction + +emptyProtocolMethods: frozenset[str] +if not TYPE_CHECKING: + emptyProtocolMethods = frozenset( + name + for name, each in getmembers(type("Example", tuple([Protocol]), {}), isfunction) + ) + + +def actuallyDefinedProtocolMethods(protocol: object) -> frozenset[str]: + """ + Attempt to ignore implementation details, and get all the methods that the + protocol actually defines. + + that includes locally defined methods and also those defined in inherited + superclasses. + """ + return ( + frozenset(name for name, each in getmembers(protocol, isfunction)) + - emptyProtocolMethods + ) + + +def _fixAnnotation(method: Callable[..., object], it: object, ann: str) -> None: + annotation = getattr(it, ann) + if isinstance(annotation, str): + setattr(it, ann, eval(annotation, method.__globals__)) + + +def _liveSignature(method: Callable[..., object]) -> Signature: + """ + Get a signature with evaluated annotations. + """ + # TODO: could this be replaced with get_type_hints? + result = signature(method) + for param in result.parameters.values(): + _fixAnnotation(method, param, "_annotation") + _fixAnnotation(method, result, "_return_annotation") + return result diff --git a/contrib/python/Automat/py3/automat/_typed.py b/contrib/python/Automat/py3/automat/_typed.py new file mode 100644 index 0000000000..c5d9e128ed --- /dev/null +++ b/contrib/python/Automat/py3/automat/_typed.py @@ -0,0 +1,736 @@ +# -*- test-case-name: automat._test.test_type_based -*- +from __future__ import annotations + +import sys +from dataclasses import dataclass, field +from typing import ( + TYPE_CHECKING, + get_origin, + Any, + Callable, + Generic, + Iterable, + Literal, + Protocol, + TypeVar, + overload, +) + +if TYPE_CHECKING: + from graphviz import Digraph +try: + from zope.interface.interface import InterfaceClass # type:ignore[import-untyped] +except ImportError: + hasInterface = False +else: + hasInterface = True + +if sys.version_info < (3, 10): + from typing_extensions import Concatenate, ParamSpec, TypeAlias +else: + from typing import Concatenate, ParamSpec, TypeAlias + +from ._core import Automaton, Transitioner +from ._runtimeproto import ( + ProtocolAtRuntime, + _liveSignature, + actuallyDefinedProtocolMethods, + runtime_name, +) + + +class AlreadyBuiltError(Exception): + """ + The L{TypeMachine} is already built, and thus can no longer be + modified. + """ + + +InputProtocol = TypeVar("InputProtocol") +Core = TypeVar("Core") +Data = TypeVar("Data") +P = ParamSpec("P") +P1 = ParamSpec("P1") +R = TypeVar("R") +OtherData = TypeVar("OtherData") +Decorator = Callable[[Callable[P, R]], Callable[P, R]] +FactoryParams = ParamSpec("FactoryParams") +OtherFactoryParams = ParamSpec("OtherFactoryParams") + + +def pep614(t: R) -> R: + """ + This is a workaround for Python 3.8, which has U{some restrictions on its + grammar for decorators <https://peps.python.org/pep-0614/>}, and makes + C{@state.to(other).upon(Protocol.input)} invalid syntax; for code that + needs to run on these older Python versions, you can do + C{@pep614(state.to(other).upon(Protocol.input))} instead. + """ + return t + + +@dataclass() +class TransitionRegistrar(Generic[P, P1, R]): + """ + This is a record of a transition that need finalizing; it is the result of + calling L{TypeMachineBuilder.state} and then ``.upon(input).to(state)`` on + the result of that. + + It can be used as a decorator, like:: + + registrar = state.upon(Proto.input).to(state2) + @registrar + def inputImplementation(proto: Proto, core: Core) -> Result: ... + + Or, it can be used used to implement a constant return value with + L{TransitionRegistrar.returns}, like:: + + registrar = state.upon(Proto.input).to(state2) + registrar.returns(value) + + Type parameter P: the precise signature of the decorated implementation + callable. + + Type parameter P1: the precise signature of the input method from the + outward-facing state-machine protocol. + + Type parameter R: the return type of both the protocol method and the input + method. + """ + + _signature: Callable[P1, R] + _old: AnyState + _new: AnyState + _nodata: bool = False + _callback: Callable[P, R] | None = None + + def __post_init__(self) -> None: + self._old.builder._registrars.append(self) + + def __call__(self, impl: Callable[P, R]) -> Callable[P, R]: + """ + Finalize it with C{__call__} to indicate that there is an + implementation to the transition, which can be treated as an output. + """ + if self._callback is not None: + raise AlreadyBuiltError( + f"already registered transition from {self._old.name!r} to {self._new.name!r}" + ) + self._callback = impl + builder = self._old.builder + assert builder is self._new.builder, "states must be from the same builder" + builder._automaton.addTransition( + self._old, + self._signature.__name__, + self._new, + tuple(self._new._produceOutputs(impl, self._old, self._nodata)), + ) + return impl + + def returns(self, result: R) -> None: + """ + Finalize it with C{.returns(constant)} to indicate that there is no + method body, and the given result can just be yielded each time after + the state transition. The only output generated in this case would be + the data-construction factory for the target state. + """ + + def constant(*args: object, **kwargs: object) -> R: + return result + + constant.__name__ = f"returns({result})" + self(constant) + + def _checkComplete(self) -> None: + """ + Raise an exception if the user forgot to decorate a method + implementation or supply a return value for this transition. + """ + # TODO: point at the line where `.to`/`.loop`/`.upon` are called so the + # user can more immediately see the incomplete transition + if not self._callback: + raise ValueError( + f"incomplete transition from {self._old.name} to " + f"{self._new.name} upon {self._signature.__qualname__}: " + "remember to use the transition as a decorator or call " + "`.returns` on it." + ) + + +@dataclass +class UponFromNo(Generic[InputProtocol, Core, P, R]): + """ + Type parameter P: the signature of the input method. + """ + + old: TypedState[InputProtocol, Core] | TypedDataState[InputProtocol, Core, Any, ...] + input: Callable[Concatenate[InputProtocol, P], R] + + @overload + def to( + self, state: TypedState[InputProtocol, Core] + ) -> TransitionRegistrar[Concatenate[InputProtocol, Core, P], P, R]: ... + @overload + def to( + self, + state: TypedDataState[InputProtocol, Core, OtherData, P], + ) -> TransitionRegistrar[ + Concatenate[InputProtocol, Core, P], + Concatenate[InputProtocol, P], + R, + ]: ... + def to( + self, + state: ( + TypedState[InputProtocol, Core] + | TypedDataState[InputProtocol, Core, Any, P] + ), + ) -> ( + TransitionRegistrar[Concatenate[InputProtocol, Core, P], P, R] + | TransitionRegistrar[ + Concatenate[InputProtocol, Core, P], + Concatenate[InputProtocol, P], + R, + ] + ): + """ + Declare a state transition to a new state. + """ + return TransitionRegistrar(self.input, self.old, state, True) + + def loop(self) -> TransitionRegistrar[ + Concatenate[InputProtocol, Core, P], + Concatenate[InputProtocol, P], + R, + ]: + """ + Register a transition back to the same state. + """ + return TransitionRegistrar(self.input, self.old, self.old, True) + + +@dataclass +class UponFromData(Generic[InputProtocol, Core, P, R, Data]): + """ + Type parameter P: the signature of the input method. + """ + + old: TypedDataState[InputProtocol, Core, Data, ...] + input: Callable[Concatenate[InputProtocol, P], R] + + @overload + def to( + self, state: TypedState[InputProtocol, Core] + ) -> TransitionRegistrar[ + Concatenate[InputProtocol, Core, Data, P], Concatenate[InputProtocol, P], R + ]: ... + @overload + def to( + self, + state: TypedDataState[InputProtocol, Core, OtherData, P], + ) -> TransitionRegistrar[ + Concatenate[InputProtocol, Core, Data, P], + Concatenate[InputProtocol, P], + R, + ]: ... + def to( + self, + state: ( + TypedState[InputProtocol, Core] + | TypedDataState[InputProtocol, Core, Any, P] + ), + ) -> ( + TransitionRegistrar[Concatenate[InputProtocol, Core, P], P, R] + | TransitionRegistrar[ + Concatenate[InputProtocol, Core, Data, P], + Concatenate[InputProtocol, P], + R, + ] + ): + """ + Declare a state transition to a new state. + """ + return TransitionRegistrar(self.input, self.old, state) + + def loop(self) -> TransitionRegistrar[ + Concatenate[InputProtocol, Core, Data, P], + Concatenate[InputProtocol, P], + R, + ]: + """ + Register a transition back to the same state. + """ + return TransitionRegistrar(self.input, self.old, self.old) + + +@dataclass(frozen=True) +class TypedState(Generic[InputProtocol, Core]): + """ + The result of L{.state() <automat.TypeMachineBuilder.state>}. + """ + + name: str + builder: TypeMachineBuilder[InputProtocol, Core] = field(repr=False) + + def upon( + self, input: Callable[Concatenate[InputProtocol, P], R] + ) -> UponFromNo[InputProtocol, Core, P, R]: + ".upon()" + self.builder._checkMembership(input) + return UponFromNo(self, input) + + def _produceOutputs( + self, + impl: Callable[..., object], + old: ( + TypedDataState[InputProtocol, Core, OtherData, OtherFactoryParams] + | TypedState[InputProtocol, Core] + ), + nodata: bool = False, + ) -> Iterable[SomeOutput]: + yield MethodOutput._fromImpl(impl, isinstance(old, TypedDataState)) + + +@dataclass(frozen=True) +class TypedDataState(Generic[InputProtocol, Core, Data, FactoryParams]): + name: str + builder: TypeMachineBuilder[InputProtocol, Core] = field(repr=False) + factory: Callable[Concatenate[InputProtocol, Core, FactoryParams], Data] + + @overload + def upon( + self, input: Callable[Concatenate[InputProtocol, P], R] + ) -> UponFromData[InputProtocol, Core, P, R, Data]: ... + @overload + def upon( + self, input: Callable[Concatenate[InputProtocol, P], R], nodata: Literal[False] + ) -> UponFromData[InputProtocol, Core, P, R, Data]: ... + @overload + def upon( + self, input: Callable[Concatenate[InputProtocol, P], R], nodata: Literal[True] + ) -> UponFromNo[InputProtocol, Core, P, R]: ... + def upon( + self, + input: Callable[Concatenate[InputProtocol, P], R], + nodata: bool = False, + ) -> ( + UponFromData[InputProtocol, Core, P, R, Data] + | UponFromNo[InputProtocol, Core, P, R] + ): + self.builder._checkMembership(input) + if nodata: + return UponFromNo(self, input) + else: + return UponFromData(self, input) + + def _produceOutputs( + self, + impl: Callable[..., object], + old: ( + TypedDataState[InputProtocol, Core, OtherData, OtherFactoryParams] + | TypedState[InputProtocol, Core] + ), + nodata: bool, + ) -> Iterable[SomeOutput]: + if self is not old: + yield DataOutput(self.factory) + yield MethodOutput._fromImpl( + impl, isinstance(old, TypedDataState) and not nodata + ) + + +AnyState: TypeAlias = "TypedState[Any, Any] | TypedDataState[Any, Any, Any, Any]" + + +@dataclass +class TypedInput: + name: str + + +class SomeOutput(Protocol): + """ + A state machine output. + """ + + @property + def name(self) -> str: + "read-only name property" + + def __call__(*args: Any, **kwargs: Any) -> Any: ... + + def __hash__(self) -> int: + "must be hashable" + + +@dataclass +class InputImplementer(Generic[InputProtocol, Core]): + """ + An L{InputImplementer} implements an input protocol in terms of a + state machine. + + When the factory returned from L{TypeMachine} + """ + + __automat_core__: Core + __automat_transitioner__: Transitioner[ + TypedState[InputProtocol, Core] + | TypedDataState[InputProtocol, Core, object, ...], + str, + SomeOutput, + ] + __automat_data__: object | None = None + __automat_postponed__: list[Callable[[], None]] | None = None + + +def implementMethod( + method: Callable[..., object], +) -> Callable[..., object]: + """ + Construct a function for populating in the synthetic provider of the Input + Protocol to a L{TypeMachineBuilder}. It should have a signature matching that + of the C{method} parameter, a function from that protocol. + """ + methodInput = method.__name__ + # side-effects can be re-ordered until later. If you need to compute a + # value in your method, then obviously it can't be invoked reentrantly. + returnAnnotation = _liveSignature(method).return_annotation + returnsNone = returnAnnotation is None + + def implementation( + self: InputImplementer[InputProtocol, Core], *args: object, **kwargs: object + ) -> object: + transitioner = self.__automat_transitioner__ + dataAtStart = self.__automat_data__ + if self.__automat_postponed__ is not None: + if not returnsNone: + raise RuntimeError( + f"attempting to reentrantly run {method.__qualname__} " + f"but it wants to return {returnAnnotation!r} not None" + ) + + def rerunme() -> None: + implementation(self, *args, **kwargs) + + self.__automat_postponed__.append(rerunme) + return None + postponed = self.__automat_postponed__ = [] + try: + [outputs, tracer] = transitioner.transition(methodInput) + result: Any = None + for output in outputs: + # here's the idea: there will be a state-setup output and a + # state-teardown output. state-setup outputs are added to the + # *beginning* of any entry into a state, so that by the time you + # are running the *implementation* of a method that has entered + # that state, the protocol is in a self-consistent state and can + # run reentrant outputs. not clear that state-teardown outputs are + # necessary + result = output(self, dataAtStart, *args, **kwargs) + finally: + self.__automat_postponed__ = None + while postponed: + postponed.pop(0)() + return result + + implementation.__qualname__ = implementation.__name__ = ( + f"<implementation for {method}>" + ) + return implementation + + +@dataclass(frozen=True) +class MethodOutput(Generic[Core]): + """ + This is the thing that goes into the automaton's outputs list, and thus + (per the implementation of L{implementMethod}) takes the 'self' of the + InputImplementer instance (i.e. the synthetic protocol implementation) and the + previous result computed by the former output, which will be None + initially. + """ + + method: Callable[..., Any] + requiresData: bool + _assertion: Callable[[object], None] + + @classmethod + def _fromImpl( + cls: type[MethodOutput[Core]], method: Callable[..., Any], requiresData: bool + ) -> MethodOutput[Core]: + parameter = None + annotation: type[object] = object + + def assertion(data: object) -> None: + """ + No assertion about the data. + """ + + # Do our best to compute the declared signature, so that we caan verify + # it's the right type. We can't always do that. + try: + sig = _liveSignature(method) + except NameError: + ... + # An inner function may refer to type aliases that only appear as + # local variables, and those are just lost here; give up. + else: + if requiresData: + # 0: self, 1: self.__automat_core__, 2: self.__automat_data__ + declaredParams = list(sig.parameters.values()) + if len(declaredParams) >= 3: + parameter = declaredParams[2] + annotation = parameter.annotation + origin = get_origin(annotation) + if origin is not None: + annotation = origin + if hasInterface and isinstance(annotation, InterfaceClass): + + def assertion(data: object) -> None: + assert annotation.providedBy(data), ( + f"expected {parameter} to provide {annotation} " + f"but got {type(data)} instead" + ) + + else: + + def assertion(data: object) -> None: + assert isinstance(data, annotation), ( + f"expected {parameter} to be {annotation} " + f"but got {type(data)} instead" + ) + + return cls(method, requiresData, assertion) + + @property + def name(self) -> str: + return f"{self.method.__name__}" + + def __call__( + self, + machine: InputImplementer[InputProtocol, Core], + dataAtStart: Data, + /, + *args: object, + **kwargs: object, + ) -> object: + extraArgs = [machine, machine.__automat_core__] + if self.requiresData: + self._assertion(dataAtStart) + extraArgs += [dataAtStart] + # if anything is invoked reentrantly here, then we can't possibly have + # set __automat_data__ and the data argument to the reentrant method + # will be wrong. we *need* to split out the construction / state-enter + # hook, because it needs to run separately. + return self.method(*extraArgs, *args, **kwargs) + + +@dataclass(frozen=True) +class DataOutput(Generic[Data]): + """ + Construct an output for the given data objects. + """ + + dataFactory: Callable[..., Data] + + @property + def name(self) -> str: + return f"data:{self.dataFactory.__name__}" + + def __call__( + realself, + self: InputImplementer[InputProtocol, Core], + dataAtStart: object, + *args: object, + **kwargs: object, + ) -> Data: + newData = realself.dataFactory(self, self.__automat_core__, *args, **kwargs) + self.__automat_data__ = newData + return newData + + +INVALID_WHILE_DESERIALIZING: TypedState[Any, Any] = TypedState( + "automat:invalid-while-deserializing", + None, # type:ignore[arg-type] +) + + +@dataclass(frozen=True) +class TypeMachine(Generic[InputProtocol, Core]): + """ + A L{TypeMachine} is a factory for instances of C{InputProtocol}. + """ + + __automat_type__: type[InputImplementer[InputProtocol, Core]] + __automat_automaton__: Automaton[ + TypedState[InputProtocol, Core] | TypedDataState[InputProtocol, Core, Any, ...], + str, + SomeOutput, + ] + + @overload + def __call__(self, core: Core) -> InputProtocol: ... + @overload + def __call__( + self, core: Core, state: TypedState[InputProtocol, Core] + ) -> InputProtocol: ... + @overload + def __call__( + self, + core: Core, + state: TypedDataState[InputProtocol, Core, OtherData, ...], + dataFactory: Callable[[InputProtocol, Core], OtherData], + ) -> InputProtocol: ... + + def __call__( + self, + core: Core, + state: ( + TypedState[InputProtocol, Core] + | TypedDataState[InputProtocol, Core, OtherData, ...] + | None + ) = None, + dataFactory: Callable[[InputProtocol, Core], OtherData] | None = None, + ) -> InputProtocol: + """ + Construct an instance of C{InputProtocol} from an instance of the + C{Core} protocol. + """ + if state is None: + state = initial = self.__automat_automaton__.initialState + elif isinstance(state, TypedDataState): + assert dataFactory is not None, "data state requires a data factory" + # Ensure that the machine is in a state with *no* transitions while + # we are doing the initial construction of its state-specific data. + initial = INVALID_WHILE_DESERIALIZING + else: + initial = state + + internals: InputImplementer[InputProtocol, Core] = self.__automat_type__( + core, txnr := Transitioner(self.__automat_automaton__, initial) + ) + result: InputProtocol = internals # type:ignore[assignment] + + if dataFactory is not None: + internals.__automat_data__ = dataFactory(result, core) + txnr._state = state + return result + + def asDigraph(self) -> Digraph: + from ._visualize import makeDigraph + + return makeDigraph( + self.__automat_automaton__, + stateAsString=lambda state: state.name, + inputAsString=lambda input: input, + outputAsString=lambda output: output.name, + ) + + +@dataclass(eq=False) +class TypeMachineBuilder(Generic[InputProtocol, Core]): + """ + The main entry-point into Automat, used to construct a factory for + instances of C{InputProtocol} that take an instance of C{Core}. + + Describe the machine with L{TypeMachineBuilder.state} L{.upon + <automat._typed.TypedState.upon>} L{.to + <automat._typed.UponFromNo.to>}, then build it with + L{TypeMachineBuilder.build}, like so:: + + from typing import Protocol + class Inputs(Protocol): + def method(self) -> None: ... + class Core: ... + + from automat import TypeMachineBuilder + builder = TypeMachineBuilder(Inputs, Core) + state = builder.state("state") + state.upon(Inputs.method).loop().returns(None) + Machine = builder.build() + + machine = Machine(Core()) + machine.method() + """ + + # Public constructor parameters. + inputProtocol: ProtocolAtRuntime[InputProtocol] + coreType: type[Core] + + # Internal state, not in the constructor. + _automaton: Automaton[ + TypedState[InputProtocol, Core] | TypedDataState[InputProtocol, Core, Any, ...], + str, + SomeOutput, + ] = field(default_factory=Automaton, repr=False, init=False) + _initial: bool = field(default=True, init=False) + _registrars: list[TransitionRegistrar[..., ..., Any]] = field( + default_factory=list, init=False + ) + _built: bool = field(default=False, init=False) + + @overload + def state(self, name: str) -> TypedState[InputProtocol, Core]: ... + @overload + def state( + self, + name: str, + dataFactory: Callable[Concatenate[InputProtocol, Core, P], Data], + ) -> TypedDataState[InputProtocol, Core, Data, P]: ... + def state( + self, + name: str, + dataFactory: Callable[Concatenate[InputProtocol, Core, P], Data] | None = None, + ) -> TypedState[InputProtocol, Core] | TypedDataState[InputProtocol, Core, Data, P]: + """ + Construct a state. + """ + if self._built: + raise AlreadyBuiltError( + "Cannot add states to an already-built state machine." + ) + if dataFactory is None: + state = TypedState(name, self) + if self._initial: + self._initial = False + self._automaton.initialState = state + return state + else: + assert not self._initial, "initial state cannot require state-specific data" + return TypedDataState(name, self, dataFactory) + + def build(self) -> TypeMachine[InputProtocol, Core]: + """ + Create a L{TypeMachine}, and prevent further modification to the state + machine being built. + """ + # incompleteness check + if self._built: + raise AlreadyBuiltError("Cannot build a state machine twice.") + self._built = True + + for registrar in self._registrars: + registrar._checkComplete() + + # We were only hanging on to these for error-checking purposes, so we + # can drop them now. + del self._registrars[:] + + runtimeType: type[InputImplementer[InputProtocol, Core]] = type( + f"Typed<{runtime_name(self.inputProtocol)}>", + tuple([InputImplementer]), + { + method_name: implementMethod(getattr(self.inputProtocol, method_name)) + for method_name in actuallyDefinedProtocolMethods(self.inputProtocol) + }, + ) + + return TypeMachine(runtimeType, self._automaton) + + def _checkMembership(self, input: Callable[..., object]) -> None: + """ + Ensure that ``input`` is a valid member function of the input protocol, + not just a function that happens to take the right first argument. + """ + if (checked := getattr(self.inputProtocol, input.__name__, None)) is not input: + raise ValueError( + f"{input.__qualname__} is not a member of {self.inputProtocol.__module__}.{self.inputProtocol.__name__}" + ) diff --git a/contrib/python/Automat/py3/automat/_visualize.py b/contrib/python/Automat/py3/automat/_visualize.py index 7a9c8c6eb5..a2b35e5726 100644 --- a/contrib/python/Automat/py3/automat/_visualize.py +++ b/contrib/python/Automat/py3/automat/_visualize.py @@ -1,56 +1,66 @@ -from __future__ import print_function +from __future__ import annotations + import argparse import sys +from functools import wraps +from typing import Callable, Iterator import graphviz +from ._core import Automaton, Input, Output, State from ._discover import findMachines +from ._methodical import MethodicalMachine +from ._typed import TypeMachine, InputProtocol, Core -def _gvquote(s): - return '"{}"'.format(s.replace('"', r'\"')) +def _gvquote(s: str) -> str: + return '"{}"'.format(s.replace('"', r"\"")) -def _gvhtml(s): - return '<{}>'.format(s) +def _gvhtml(s: str) -> str: + return "<{}>".format(s) -def elementMaker(name, *children, **attrs): +def elementMaker(name: str, *children: str, **attrs: str) -> str: """ Construct a string from the HTML element description. """ - formattedAttrs = ' '.join('{}={}'.format(key, _gvquote(str(value))) - for key, value in sorted(attrs.items())) - formattedChildren = ''.join(children) - return u'<{name} {attrs}>{children}</{name}>'.format( - name=name, - attrs=formattedAttrs, - children=formattedChildren) - - -def tableMaker(inputLabel, outputLabels, port, _E=elementMaker): + formattedAttrs = " ".join( + "{}={}".format(key, _gvquote(str(value))) + for key, value in sorted(attrs.items()) + ) + formattedChildren = "".join(children) + return "<{name} {attrs}>{children}</{name}>".format( + name=name, attrs=formattedAttrs, children=formattedChildren + ) + + +def tableMaker( + inputLabel: str, + outputLabels: list[str], + port: str, + _E: Callable[..., str] = elementMaker, +) -> str: """ Construct an HTML table to label a state transition. """ colspan = {} if outputLabels: - colspan['colspan'] = str(len(outputLabels)) + colspan["colspan"] = str(len(outputLabels)) - inputLabelCell = _E("td", - _E("font", - inputLabel, - face="menlo-italic"), - color="purple", - port=port, - **colspan) + inputLabelCell = _E( + "td", + _E("font", inputLabel, face="menlo-italic"), + color="purple", + port=port, + **colspan, + ) pointSize = {"point-size": "9"} - outputLabelCells = [_E("td", - _E("font", - outputLabel, - **pointSize), - color="pink") - for outputLabel in outputLabels] + outputLabelCells = [ + _E("td", _E("font", outputLabel, **pointSize), color="pink") + for outputLabel in outputLabels + ] rows = [_E("tr", inputLabelCell)] @@ -60,16 +70,33 @@ def tableMaker(inputLabel, outputLabels, port, _E=elementMaker): return _E("table", *rows) -def makeDigraph(automaton, inputAsString=repr, - outputAsString=repr, - stateAsString=repr): +def escapify(x: Callable[[State], str]) -> Callable[[State], str]: + @wraps(x) + def impl(t: State) -> str: + return x(t).replace("<", "<").replace(">", ">") + + return impl + + +def makeDigraph( + automaton: Automaton[State, Input, Output], + inputAsString: Callable[[Input], str] = repr, + outputAsString: Callable[[Output], str] = repr, + stateAsString: Callable[[State], str] = repr, +) -> graphviz.Digraph: """ Produce a L{graphviz.Digraph} object from an automaton. """ - digraph = graphviz.Digraph(graph_attr={'pack': 'true', - 'dpi': '100'}, - node_attr={'fontname': 'Menlo'}, - edge_attr={'fontname': 'Menlo'}) + + inputAsString = escapify(inputAsString) + outputAsString = escapify(outputAsString) + stateAsString = escapify(stateAsString) + + digraph = graphviz.Digraph( + graph_attr={"pack": "true", "dpi": "100"}, + node_attr={"fontname": "Menlo"}, + edge_attr={"fontname": "Menlo"}, + ) for state in automaton.states(): if state is automaton.initialState: @@ -78,38 +105,47 @@ def makeDigraph(automaton, inputAsString=repr, else: stateShape = "" fontName = "Menlo" - digraph.node(stateAsString(state), - fontame=fontName, - shape="ellipse", - style=stateShape, - color="blue") + digraph.node( + stateAsString(state), + fontame=fontName, + shape="ellipse", + style=stateShape, + color="blue", + ) for n, eachTransition in enumerate(automaton.allTransitions()): inState, inputSymbol, outState, outputSymbols = eachTransition thisTransition = "t{}".format(n) inputLabel = inputAsString(inputSymbol) port = "tableport" - table = tableMaker(inputLabel, [outputAsString(outputSymbol) - for outputSymbol in outputSymbols], - port=port) + table = tableMaker( + inputLabel, + [outputAsString(outputSymbol) for outputSymbol in outputSymbols], + port=port, + ) - digraph.node(thisTransition, - label=_gvhtml(table), margin="0.2", shape="none") + digraph.node(thisTransition, label=_gvhtml(table), margin="0.2", shape="none") - digraph.edge(stateAsString(inState), - '{}:{}:w'.format(thisTransition, port), - arrowhead="none") - digraph.edge('{}:{}:e'.format(thisTransition, port), - stateAsString(outState)) + digraph.edge( + stateAsString(inState), + "{}:{}:w".format(thisTransition, port), + arrowhead="none", + ) + digraph.edge("{}:{}:e".format(thisTransition, port), stateAsString(outState)) return digraph -def tool(_progname=sys.argv[0], - _argv=sys.argv[1:], - _syspath=sys.path, - _findMachines=findMachines, - _print=print): +def tool( + _progname: str = sys.argv[0], + _argv: list[str] = sys.argv[1:], + _syspath: list[str] = sys.path, + _findMachines: Callable[ + [str], + Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]], + ] = findMachines, + _print: Callable[..., None] = print, +) -> None: """ Entry point for command line utility. """ @@ -122,59 +158,71 @@ def tool(_progname=sys.argv[0], http://www.graphviz.org for more information. """ if _syspath[0]: - _syspath.insert(0, '') + _syspath.insert(0, "") argumentParser = argparse.ArgumentParser( - prog=_progname, - description=DESCRIPTION, - epilog=EPILOG) - argumentParser.add_argument('fqpn', - help="A Fully Qualified Path name" - " representing where to find machines.") - argumentParser.add_argument('--quiet', '-q', - help="suppress output", - default=False, - action="store_true") - argumentParser.add_argument('--dot-directory', '-d', - help="Where to write out .dot files.", - default=".automat_visualize") - argumentParser.add_argument('--image-directory', '-i', - help="Where to write out image files.", - default=".automat_visualize") - argumentParser.add_argument('--image-type', '-t', - help="The image format.", - choices=graphviz.FORMATS, - default='png') - argumentParser.add_argument('--view', '-v', - help="View rendered graphs with" - " default image viewer", - default=False, - action="store_true") + prog=_progname, description=DESCRIPTION, epilog=EPILOG + ) + argumentParser.add_argument( + "fqpn", + help="A Fully Qualified Path name" " representing where to find machines.", + ) + argumentParser.add_argument( + "--quiet", "-q", help="suppress output", default=False, action="store_true" + ) + argumentParser.add_argument( + "--dot-directory", + "-d", + help="Where to write out .dot files.", + default=".automat_visualize", + ) + argumentParser.add_argument( + "--image-directory", + "-i", + help="Where to write out image files.", + default=".automat_visualize", + ) + argumentParser.add_argument( + "--image-type", + "-t", + help="The image format.", + choices=graphviz.FORMATS, + default="png", + ) + argumentParser.add_argument( + "--view", + "-v", + help="View rendered graphs with" " default image viewer", + default=False, + action="store_true", + ) args = argumentParser.parse_args(_argv) - explicitlySaveDot = (args.dot_directory - and (not args.image_directory - or args.image_directory != args.dot_directory)) + explicitlySaveDot = args.dot_directory and ( + not args.image_directory or args.image_directory != args.dot_directory + ) if args.quiet: + def _print(*args): pass for fqpn, machine in _findMachines(args.fqpn): - _print(fqpn, '...discovered') + _print(fqpn, "...discovered") digraph = machine.asDigraph() if explicitlySaveDot: - digraph.save(filename="{}.dot".format(fqpn), - directory=args.dot_directory) + digraph.save(filename="{}.dot".format(fqpn), directory=args.dot_directory) _print(fqpn, "...wrote dot into", args.dot_directory) if args.image_directory: deleteDot = not args.dot_directory or explicitlySaveDot digraph.format = args.image_type - digraph.render(filename="{}.dot".format(fqpn), - directory=args.image_directory, - view=args.view, - cleanup=deleteDot) + digraph.render( + filename="{}.dot".format(fqpn), + directory=args.image_directory, + view=args.view, + cleanup=deleteDot, + ) if deleteDot: msg = "...wrote image into" else: diff --git a/contrib/python/Automat/py3/automat/py.typed b/contrib/python/Automat/py3/automat/py.typed new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/Automat/py3/automat/py.typed diff --git a/contrib/python/Automat/py3/ya.make b/contrib/python/Automat/py3/ya.make index 03a66263d7..87be635c27 100644 --- a/contrib/python/Automat/py3/ya.make +++ b/contrib/python/Automat/py3/ya.make @@ -2,15 +2,10 @@ PY3_LIBRARY() -VERSION(22.10.0) +VERSION(24.8.0) LICENSE(MIT) -PEERDIR( - contrib/python/attrs - contrib/python/six -) - NO_LINT() NO_CHECK_IMPORTS( @@ -25,6 +20,8 @@ PY_SRCS( automat/_discover.py automat/_introspection.py automat/_methodical.py + automat/_runtimeproto.py + automat/_typed.py automat/_visualize.py ) @@ -33,6 +30,7 @@ RESOURCE_FILES( .dist-info/METADATA .dist-info/entry_points.txt .dist-info/top_level.txt + automat/py.typed ) END() |