summaryrefslogtreecommitdiffstats
path: root/contrib/python/executing
diff options
context:
space:
mode:
authornkozlovskiy <[email protected]>2023-09-29 12:24:06 +0300
committernkozlovskiy <[email protected]>2023-09-29 12:41:34 +0300
commite0e3e1717e3d33762ce61950504f9637a6e669ed (patch)
treebca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/executing
parent38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff)
add ydb deps
Diffstat (limited to 'contrib/python/executing')
-rw-r--r--contrib/python/executing/.dist-info/METADATA170
-rw-r--r--contrib/python/executing/.dist-info/top_level.txt1
-rw-r--r--contrib/python/executing/LICENSE.txt21
-rw-r--r--contrib/python/executing/README.md141
-rw-r--r--contrib/python/executing/executing/__init__.py25
-rw-r--r--contrib/python/executing/executing/_exceptions.py22
-rw-r--r--contrib/python/executing/executing/_position_node_finder.py573
-rw-r--r--contrib/python/executing/executing/executing.py1260
-rw-r--r--contrib/python/executing/executing/py.typed0
-rw-r--r--contrib/python/executing/executing/version.py1
-rw-r--r--contrib/python/executing/ya.make27
11 files changed, 2241 insertions, 0 deletions
diff --git a/contrib/python/executing/.dist-info/METADATA b/contrib/python/executing/.dist-info/METADATA
new file mode 100644
index 00000000000..bf103f9c955
--- /dev/null
+++ b/contrib/python/executing/.dist-info/METADATA
@@ -0,0 +1,170 @@
+Metadata-Version: 2.1
+Name: executing
+Version: 1.2.0
+Summary: Get the currently executing AST node of a frame, and other information
+Home-page: https://github.com/alexmojaki/executing
+Author: Alex Hall
+Author-email: [email protected]
+License: MIT
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Programming Language :: Python
+Classifier: Programming Language :: Python :: 2
+Classifier: Programming Language :: Python :: 2.7
+Classifier: Programming Language :: Python :: 3
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Programming Language :: Python :: 3.7
+Classifier: Programming Language :: Python :: 3.8
+Classifier: Programming Language :: Python :: 3.9
+Classifier: Programming Language :: Python :: 3.10
+Classifier: Programming Language :: Python :: 3.11
+Description-Content-Type: text/markdown
+License-File: LICENSE.txt
+Requires-Dist: typing ; python_version < "3.5"
+Provides-Extra: tests
+Requires-Dist: asttokens ; extra == 'tests'
+Requires-Dist: pytest ; extra == 'tests'
+Requires-Dist: littleutils ; extra == 'tests'
+Requires-Dist: rich ; (python_version >= "3.11") and extra == 'tests'
+
+# executing
+
+[![Build Status](https://github.com/alexmojaki/executing/workflows/Tests/badge.svg?branch=master)](https://github.com/alexmojaki/executing/actions) [![Coverage Status](https://coveralls.io/repos/github/alexmojaki/executing/badge.svg?branch=master)](https://coveralls.io/github/alexmojaki/executing?branch=master) [![Supports Python versions 2.7 and 3.5+, including PyPy](https://img.shields.io/pypi/pyversions/executing.svg)](https://pypi.python.org/pypi/executing)
+
+This mini-package lets you get information about what a frame is currently doing, particularly the AST node being executed.
+
+* [Usage](#usage)
+ * [Getting the AST node](#getting-the-ast-node)
+ * [Getting the source code of the node](#getting-the-source-code-of-the-node)
+ * [Getting the `__qualname__` of the current function](#getting-the-__qualname__-of-the-current-function)
+ * [The Source class](#the-source-class)
+* [Installation](#installation)
+* [How does it work?](#how-does-it-work)
+* [Is it reliable?](#is-it-reliable)
+* [Which nodes can it identify?](#which-nodes-can-it-identify)
+* [Libraries that use this](#libraries-that-use-this)
+
+## Usage
+
+### Getting the AST node
+
+```python
+import executing
+
+node = executing.Source.executing(frame).node
+```
+
+Then `node` will be an AST node (from the `ast` standard library module) or None if the node couldn't be identified (which may happen often and should always be checked).
+
+`node` will always be the same instance for multiple calls with frames at the same point of execution.
+
+If you have a traceback object, pass it directly to `Source.executing()` rather than the `tb_frame` attribute to get the correct node.
+
+### Getting the source code of the node
+
+For this you will need to separately install the [`asttokens`](https://github.com/gristlabs/asttokens) library, then obtain an `ASTTokens` object:
+
+```python
+executing.Source.executing(frame).source.asttokens()
+```
+
+or:
+
+```python
+executing.Source.for_frame(frame).asttokens()
+```
+
+or use one of the convenience methods:
+
+```python
+executing.Source.executing(frame).text()
+executing.Source.executing(frame).text_range()
+```
+
+### Getting the `__qualname__` of the current function
+
+```python
+executing.Source.executing(frame).code_qualname()
+```
+
+or:
+
+```python
+executing.Source.for_frame(frame).code_qualname(frame.f_code)
+```
+
+### The `Source` class
+
+Everything goes through the `Source` class. Only one instance of the class is created for each filename. Subclassing it to add more attributes on creation or methods is recommended. The classmethods such as `executing` will respect this. See the source code and docstrings for more detail.
+
+## Installation
+
+ pip install executing
+
+If you don't like that you can just copy the file `executing.py`, there are no dependencies (but of course you won't get updates).
+
+## How does it work?
+
+Suppose the frame is executing this line:
+
+```python
+self.foo(bar.x)
+```
+
+and in particular it's currently obtaining the attribute `self.foo`. Looking at the bytecode, specifically `frame.f_code.co_code[frame.f_lasti]`, we can tell that it's loading an attribute, but it's not obvious which one. We can narrow down the statement being executed using `frame.f_lineno` and find the two `ast.Attribute` nodes representing `self.foo` and `bar.x`. How do we find out which one it is, without recreating the entire compiler in Python?
+
+The trick is to modify the AST slightly for each candidate expression and observe the changes in the bytecode instructions. We change the AST to this:
+
+```python
+(self.foo ** 'longuniqueconstant')(bar.x)
+```
+
+and compile it, and the bytecode will be almost the same but there will be two new instructions:
+
+ LOAD_CONST 'longuniqueconstant'
+ BINARY_POWER
+
+and just before that will be a `LOAD_ATTR` instruction corresponding to `self.foo`. Seeing that it's in the same position as the original instruction lets us know we've found our match.
+
+## Is it reliable?
+
+Yes - if it identifies a node, you can trust that it's identified the correct one. The tests are very thorough - in addition to unit tests which check various situations directly, there are property tests against a large number of files (see the filenames printed in [this build](https://travis-ci.org/alexmojaki/executing/jobs/557970457)) with real code. Specifically, for each file, the tests:
+
+ 1. Identify as many nodes as possible from all the bytecode instructions in the file, and assert that they are all distinct
+ 2. Find all the nodes that should be identifiable, and assert that they were indeed identified somewhere
+
+In other words, it shows that there is a one-to-one mapping between the nodes and the instructions that can be handled. This leaves very little room for a bug to creep in.
+
+Furthermore, `executing` checks that the instructions compiled from the modified AST exactly match the original code save for a few small known exceptions. This accounts for all the quirks and optimisations in the interpreter.
+
+## Which nodes can it identify?
+
+Currently it works in almost all cases for the following `ast` nodes:
+
+ - `Call`, e.g. `self.foo(bar)`
+ - `Attribute`, e.g. `point.x`
+ - `Subscript`, e.g. `lst[1]`
+ - `BinOp`, e.g. `x + y` (doesn't include `and` and `or`)
+ - `UnaryOp`, e.g. `-n` (includes `not` but only works sometimes)
+ - `Compare` e.g. `a < b` (not for chains such as `0 < p < 1`)
+
+The plan is to extend to more operations in the future.
+
+## Projects that use this
+
+### My Projects
+
+- **[`stack_data`](https://github.com/alexmojaki/stack_data)**: Extracts data from stack frames and tracebacks, particularly to display more useful tracebacks than the default. Also uses another related library of mine: **[`pure_eval`](https://github.com/alexmojaki/pure_eval)**.
+- **[`futurecoder`](https://futurecoder.io/)**: Highlights the executing node in tracebacks using `executing` via `stack_data`, and provides debugging with `snoop`.
+- **[`snoop`](https://github.com/alexmojaki/snoop)**: A feature-rich and convenient debugging library. Uses `executing` to show the operation which caused an exception and to allow the `pp` function to display the source of its arguments.
+- **[`heartrate`](https://github.com/alexmojaki/heartrate)**: A simple real time visualisation of the execution of a Python program. Uses `executing` to highlight currently executing operations, particularly in each frame of the stack trace.
+- **[`sorcery`](https://github.com/alexmojaki/sorcery)**: Dark magic delights in Python. Uses `executing` to let special callables called spells know where they're being called from.
+
+### Projects I've contributed to
+
+- **[`IPython`](https://github.com/ipython/ipython/pull/12150)**: Highlights the executing node in tracebacks using `executing` via [`stack_data`](https://github.com/alexmojaki/stack_data).
+- **[`icecream`](https://github.com/gruns/icecream)**: 🍦 Sweet and creamy print debugging. Uses `executing` to identify where `ic` is called and print its arguments.
+- **[`friendly_traceback`](https://github.com/friendly-traceback/friendly-traceback)**: Uses `stack_data` and `executing` to pinpoint the cause of errors and provide helpful explanations.
+- **[`python-devtools`](https://github.com/samuelcolvin/python-devtools)**: Uses `executing` for print debugging similar to `icecream`.
+- **[`sentry_sdk`](https://github.com/getsentry/sentry-python)**: Add the integration `sentry_sdk.integrations.executingExecutingIntegration()` to show the function `__qualname__` in each frame in sentry events.
+- **[`varname`](https://github.com/pwwang/python-varname)**: Dark magics about variable names in python. Uses `executing` to find where its various magical functions like `varname` and `nameof` are called from.
diff --git a/contrib/python/executing/.dist-info/top_level.txt b/contrib/python/executing/.dist-info/top_level.txt
new file mode 100644
index 00000000000..a920f2c56c3
--- /dev/null
+++ b/contrib/python/executing/.dist-info/top_level.txt
@@ -0,0 +1 @@
+executing
diff --git a/contrib/python/executing/LICENSE.txt b/contrib/python/executing/LICENSE.txt
new file mode 100644
index 00000000000..473e36e246e
--- /dev/null
+++ b/contrib/python/executing/LICENSE.txt
@@ -0,0 +1,21 @@
+MIT License
+
+Copyright (c) 2019 Alex Hall
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/contrib/python/executing/README.md b/contrib/python/executing/README.md
new file mode 100644
index 00000000000..46bc01596a4
--- /dev/null
+++ b/contrib/python/executing/README.md
@@ -0,0 +1,141 @@
+# executing
+
+[![Build Status](https://github.com/alexmojaki/executing/workflows/Tests/badge.svg?branch=master)](https://github.com/alexmojaki/executing/actions) [![Coverage Status](https://coveralls.io/repos/github/alexmojaki/executing/badge.svg?branch=master)](https://coveralls.io/github/alexmojaki/executing?branch=master) [![Supports Python versions 2.7 and 3.5+, including PyPy](https://img.shields.io/pypi/pyversions/executing.svg)](https://pypi.python.org/pypi/executing)
+
+This mini-package lets you get information about what a frame is currently doing, particularly the AST node being executed.
+
+* [Usage](#usage)
+ * [Getting the AST node](#getting-the-ast-node)
+ * [Getting the source code of the node](#getting-the-source-code-of-the-node)
+ * [Getting the `__qualname__` of the current function](#getting-the-__qualname__-of-the-current-function)
+ * [The Source class](#the-source-class)
+* [Installation](#installation)
+* [How does it work?](#how-does-it-work)
+* [Is it reliable?](#is-it-reliable)
+* [Which nodes can it identify?](#which-nodes-can-it-identify)
+* [Libraries that use this](#libraries-that-use-this)
+
+## Usage
+
+### Getting the AST node
+
+```python
+import executing
+
+node = executing.Source.executing(frame).node
+```
+
+Then `node` will be an AST node (from the `ast` standard library module) or None if the node couldn't be identified (which may happen often and should always be checked).
+
+`node` will always be the same instance for multiple calls with frames at the same point of execution.
+
+If you have a traceback object, pass it directly to `Source.executing()` rather than the `tb_frame` attribute to get the correct node.
+
+### Getting the source code of the node
+
+For this you will need to separately install the [`asttokens`](https://github.com/gristlabs/asttokens) library, then obtain an `ASTTokens` object:
+
+```python
+executing.Source.executing(frame).source.asttokens()
+```
+
+or:
+
+```python
+executing.Source.for_frame(frame).asttokens()
+```
+
+or use one of the convenience methods:
+
+```python
+executing.Source.executing(frame).text()
+executing.Source.executing(frame).text_range()
+```
+
+### Getting the `__qualname__` of the current function
+
+```python
+executing.Source.executing(frame).code_qualname()
+```
+
+or:
+
+```python
+executing.Source.for_frame(frame).code_qualname(frame.f_code)
+```
+
+### The `Source` class
+
+Everything goes through the `Source` class. Only one instance of the class is created for each filename. Subclassing it to add more attributes on creation or methods is recommended. The classmethods such as `executing` will respect this. See the source code and docstrings for more detail.
+
+## Installation
+
+ pip install executing
+
+If you don't like that you can just copy the file `executing.py`, there are no dependencies (but of course you won't get updates).
+
+## How does it work?
+
+Suppose the frame is executing this line:
+
+```python
+self.foo(bar.x)
+```
+
+and in particular it's currently obtaining the attribute `self.foo`. Looking at the bytecode, specifically `frame.f_code.co_code[frame.f_lasti]`, we can tell that it's loading an attribute, but it's not obvious which one. We can narrow down the statement being executed using `frame.f_lineno` and find the two `ast.Attribute` nodes representing `self.foo` and `bar.x`. How do we find out which one it is, without recreating the entire compiler in Python?
+
+The trick is to modify the AST slightly for each candidate expression and observe the changes in the bytecode instructions. We change the AST to this:
+
+```python
+(self.foo ** 'longuniqueconstant')(bar.x)
+```
+
+and compile it, and the bytecode will be almost the same but there will be two new instructions:
+
+ LOAD_CONST 'longuniqueconstant'
+ BINARY_POWER
+
+and just before that will be a `LOAD_ATTR` instruction corresponding to `self.foo`. Seeing that it's in the same position as the original instruction lets us know we've found our match.
+
+## Is it reliable?
+
+Yes - if it identifies a node, you can trust that it's identified the correct one. The tests are very thorough - in addition to unit tests which check various situations directly, there are property tests against a large number of files (see the filenames printed in [this build](https://travis-ci.org/alexmojaki/executing/jobs/557970457)) with real code. Specifically, for each file, the tests:
+
+ 1. Identify as many nodes as possible from all the bytecode instructions in the file, and assert that they are all distinct
+ 2. Find all the nodes that should be identifiable, and assert that they were indeed identified somewhere
+
+In other words, it shows that there is a one-to-one mapping between the nodes and the instructions that can be handled. This leaves very little room for a bug to creep in.
+
+Furthermore, `executing` checks that the instructions compiled from the modified AST exactly match the original code save for a few small known exceptions. This accounts for all the quirks and optimisations in the interpreter.
+
+## Which nodes can it identify?
+
+Currently it works in almost all cases for the following `ast` nodes:
+
+ - `Call`, e.g. `self.foo(bar)`
+ - `Attribute`, e.g. `point.x`
+ - `Subscript`, e.g. `lst[1]`
+ - `BinOp`, e.g. `x + y` (doesn't include `and` and `or`)
+ - `UnaryOp`, e.g. `-n` (includes `not` but only works sometimes)
+ - `Compare` e.g. `a < b` (not for chains such as `0 < p < 1`)
+
+The plan is to extend to more operations in the future.
+
+## Projects that use this
+
+### My Projects
+
+- **[`stack_data`](https://github.com/alexmojaki/stack_data)**: Extracts data from stack frames and tracebacks, particularly to display more useful tracebacks than the default. Also uses another related library of mine: **[`pure_eval`](https://github.com/alexmojaki/pure_eval)**.
+- **[`futurecoder`](https://futurecoder.io/)**: Highlights the executing node in tracebacks using `executing` via `stack_data`, and provides debugging with `snoop`.
+- **[`snoop`](https://github.com/alexmojaki/snoop)**: A feature-rich and convenient debugging library. Uses `executing` to show the operation which caused an exception and to allow the `pp` function to display the source of its arguments.
+- **[`heartrate`](https://github.com/alexmojaki/heartrate)**: A simple real time visualisation of the execution of a Python program. Uses `executing` to highlight currently executing operations, particularly in each frame of the stack trace.
+- **[`sorcery`](https://github.com/alexmojaki/sorcery)**: Dark magic delights in Python. Uses `executing` to let special callables called spells know where they're being called from.
+
+### Projects I've contributed to
+
+- **[`IPython`](https://github.com/ipython/ipython/pull/12150)**: Highlights the executing node in tracebacks using `executing` via [`stack_data`](https://github.com/alexmojaki/stack_data).
+- **[`icecream`](https://github.com/gruns/icecream)**: 🍦 Sweet and creamy print debugging. Uses `executing` to identify where `ic` is called and print its arguments.
+- **[`friendly_traceback`](https://github.com/friendly-traceback/friendly-traceback)**: Uses `stack_data` and `executing` to pinpoint the cause of errors and provide helpful explanations.
+- **[`python-devtools`](https://github.com/samuelcolvin/python-devtools)**: Uses `executing` for print debugging similar to `icecream`.
+- **[`sentry_sdk`](https://github.com/getsentry/sentry-python)**: Add the integration `sentry_sdk.integrations.executingExecutingIntegration()` to show the function `__qualname__` in each frame in sentry events.
+- **[`varname`](https://github.com/pwwang/python-varname)**: Dark magics about variable names in python. Uses `executing` to find where its various magical functions like `varname` and `nameof` are called from.
diff --git a/contrib/python/executing/executing/__init__.py b/contrib/python/executing/executing/__init__.py
new file mode 100644
index 00000000000..b6451973917
--- /dev/null
+++ b/contrib/python/executing/executing/__init__.py
@@ -0,0 +1,25 @@
+"""
+Get information about what a frame is currently doing. Typical usage:
+
+ import executing
+
+ node = executing.Source.executing(frame).node
+ # node will be an AST node or None
+"""
+
+from collections import namedtuple
+_VersionInfo = namedtuple('_VersionInfo', ('major', 'minor', 'micro'))
+from .executing import Source, Executing, only, NotOneValueFound, cache, future_flags
+try:
+ from .version import __version__ # type: ignore[import]
+ if "dev" in __version__:
+ raise ValueError
+except Exception:
+ # version.py is auto-generated with the git tag when building
+ __version__ = "???"
+ __version_info__ = _VersionInfo(-1, -1, -1)
+else:
+ __version_info__ = _VersionInfo(*map(int, __version__.split('.')))
+
+
+__all__ = ["Source"]
diff --git a/contrib/python/executing/executing/_exceptions.py b/contrib/python/executing/executing/_exceptions.py
new file mode 100644
index 00000000000..1e88e3f29c9
--- /dev/null
+++ b/contrib/python/executing/executing/_exceptions.py
@@ -0,0 +1,22 @@
+
+class KnownIssue(Exception):
+ """
+ Raised in case of an known problem. Mostly because of cpython bugs.
+ Executing.node gets set to None in this case.
+ """
+
+ pass
+
+
+class VerifierFailure(Exception):
+ """
+ Thrown for an unexpected mapping from instruction to ast node
+ Executing.node gets set to None in this case.
+ """
+
+ def __init__(self, title, node, instruction):
+ # type: (object, object, object) -> None
+ self.node = node
+ self.instruction = instruction
+
+ super().__init__(title) # type: ignore[call-arg]
diff --git a/contrib/python/executing/executing/_position_node_finder.py b/contrib/python/executing/executing/_position_node_finder.py
new file mode 100644
index 00000000000..8b3aec08663
--- /dev/null
+++ b/contrib/python/executing/executing/_position_node_finder.py
@@ -0,0 +1,573 @@
+import ast
+import dis
+from types import CodeType, FrameType
+from typing import Any, Callable, Iterator, Optional, Sequence, Set, Tuple, Type, Union, cast
+from .executing import EnhancedAST, NotOneValueFound, Source, only, function_node_types, assert_
+from ._exceptions import KnownIssue, VerifierFailure
+
+from functools import lru_cache
+
+# the code in this module can use all python>=3.11 features
+
+
+def parents(node: EnhancedAST) -> Iterator[EnhancedAST]:
+ while True:
+ if hasattr(node, "parent"):
+ node = node.parent
+ yield node
+ else:
+ break
+
+def node_and_parents(node: EnhancedAST) -> Iterator[EnhancedAST]:
+ yield node
+ yield from parents(node)
+
+
+def mangled_name(node: EnhancedAST) -> str:
+ """
+
+ Parameters:
+ node: the node which should be mangled
+ name: the name of the node
+
+ Returns:
+ The mangled name of `node`
+ """
+ if isinstance(node, ast.Attribute):
+ name = node.attr
+ elif isinstance(node, ast.Name):
+ name = node.id
+ elif isinstance(node, (ast.alias)):
+ name = node.asname or node.name.split(".")[0]
+ elif isinstance(node, (ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef)):
+ name = node.name
+ elif isinstance(node, ast.ExceptHandler):
+ name = node.name or "exc"
+ else:
+ raise TypeError("no node to mangle")
+
+ if name.startswith("__") and not name.endswith("__"):
+
+ parent,child=node.parent,node
+
+ while not (isinstance(parent,ast.ClassDef) and child not in parent.bases):
+ if not hasattr(parent,"parent"):
+ break
+ parent,child=parent.parent,parent
+ else:
+ class_name=parent.name.lstrip("_")
+ if class_name!="":
+ return "_" + class_name + name
+
+
+
+ return name
+
+
+@lru_cache(128)
+def get_instructions(code: CodeType) -> list[dis.Instruction]:
+ return list(dis.get_instructions(code, show_caches=True))
+
+
+types_cmp_issue_fix = (
+ ast.IfExp,
+ ast.If,
+ ast.Assert,
+ ast.While,
+)
+
+types_cmp_issue = types_cmp_issue_fix + (
+ ast.ListComp,
+ ast.SetComp,
+ ast.DictComp,
+ ast.GeneratorExp,
+)
+
+op_type_map = {
+ "**": ast.Pow,
+ "*": ast.Mult,
+ "@": ast.MatMult,
+ "//": ast.FloorDiv,
+ "/": ast.Div,
+ "%": ast.Mod,
+ "+": ast.Add,
+ "-": ast.Sub,
+ "<<": ast.LShift,
+ ">>": ast.RShift,
+ "&": ast.BitAnd,
+ "^": ast.BitXor,
+ "|": ast.BitOr,
+}
+
+
+class PositionNodeFinder(object):
+ """
+ Mapping bytecode to ast-node based on the source positions, which where introduced in pyhon 3.11.
+ In general every ast-node can be exactly referenced by its begin/end line/col_offset, which is stored in the bytecode.
+ There are only some exceptions for methods and attributes.
+ """
+
+ def __init__(self, frame: FrameType, stmts: Set[EnhancedAST], tree: ast.Module, lasti: int, source: Source):
+ self.bc_list = get_instructions(frame.f_code)
+
+ self.source = source
+ self.decorator: Optional[EnhancedAST] = None
+
+ # work around for https://github.com/python/cpython/issues/96970
+ while self.opname(lasti) == "CACHE":
+ lasti -= 2
+
+ try:
+ # try to map with all match_positions
+ self.result = self.find_node(lasti)
+ except NotOneValueFound:
+ typ: tuple[Type]
+ # LOAD_METHOD could load "".join for long "..."%(...) BinOps
+ # this can only be associated by using all positions
+ if self.opname(lasti) in (
+ "LOAD_METHOD",
+ "LOAD_ATTR",
+ "STORE_ATTR",
+ "DELETE_ATTR",
+ ):
+ # lineno and col_offset of LOAD_METHOD and *_ATTR instructions get set to the beginning of
+ # the attribute by the python compiler to improved error messages (PEP-657)
+ # we ignore here the start position and try to find the ast-node just by end position and expected node type
+ # This is save, because there can only be one attribute ending at a specific point in the source code.
+ typ = (ast.Attribute,)
+ elif self.opname(lasti) == "CALL":
+ # A CALL instruction can be a method call, in which case the lineno and col_offset gets changed by the compiler.
+ # Therefore we ignoring here this attributes and searchnig for a Call-node only by end_col_offset and end_lineno.
+ # This is save, because there can only be one method ending at a specific point in the source code.
+ # One closing ) only belongs to one method.
+ typ = (ast.Call,)
+ else:
+ raise
+
+ self.result = self.find_node(
+ lasti,
+ match_positions=("end_col_offset", "end_lineno"),
+ typ=typ,
+ )
+
+ self.known_issues(self.result, self.instruction(lasti))
+
+ self.test_for_decorator(self.result, lasti)
+
+ if self.decorator is None:
+ self.verify(self.result, self.instruction(lasti))
+
+ def test_for_decorator(self, node: EnhancedAST, index: int) -> None:
+ if (
+ isinstance(node.parent, (ast.ClassDef, function_node_types))
+ and node in node.parent.decorator_list # type: ignore[attr-defined]
+ ):
+ node_func = node.parent
+
+ while True:
+ # the generated bytecode looks like follow:
+
+ # index opname
+ # ------------------
+ # index-4 PRECALL
+ # index-2 CACHE
+ # index CALL <- the call instruction
+ # ... CACHE some CACHE instructions
+
+ # maybe multiple other bytecode blocks for other decorators
+ # index-4 PRECALL
+ # index-2 CACHE
+ # index CALL <- index of the next loop
+ # ... CACHE some CACHE instructions
+
+ # index+x STORE_* the ast-node of this instruction points to the decorated thing
+
+ if self.opname(index - 4) != "PRECALL" or self.opname(index) != "CALL":
+ break
+
+ index += 2
+
+ while self.opname(index) in ("CACHE", "EXTENDED_ARG"):
+ index += 2
+
+ if (
+ self.opname(index).startswith("STORE_")
+ and self.find_node(index) == node_func
+ ):
+ self.result = node_func
+ self.decorator = node
+ return
+
+ index += 4
+
+ def known_issues(self, node: EnhancedAST, instruction: dis.Instruction) -> None:
+ if instruction.opname in ("COMPARE_OP", "IS_OP", "CONTAINS_OP") and isinstance(
+ node, types_cmp_issue
+ ):
+ if isinstance(node, types_cmp_issue_fix):
+ # this is a workaround for https://github.com/python/cpython/issues/95921
+ # we can fix cases with only on comparison inside the test condition
+ #
+ # we can not fix cases like:
+ # if a<b<c and d<e<f: pass
+ # if (a<b<c)!=d!=e: pass
+ # because we don't know which comparison caused the problem
+
+ comparisons = [
+ n
+ for n in ast.walk(node.test) # type: ignore[attr-defined]
+ if isinstance(n, ast.Compare) and len(n.ops) > 1
+ ]
+
+ assert_(comparisons, "expected at least one comparison")
+
+ if len(comparisons) == 1:
+ node = self.result = cast(EnhancedAST, comparisons[0])
+ else:
+ raise KnownIssue(
+ "multiple chain comparison inside %s can not be fixed" % (node)
+ )
+
+ else:
+ # Comprehension and generators get not fixed for now.
+ raise KnownIssue("chain comparison inside %s can not be fixed" % (node))
+
+ if isinstance(node, ast.Assert):
+ # pytest assigns the position of the assertion to all expressions of the rewritten assertion.
+ # All the rewritten expressions get mapped to ast.Assert, which is the wrong ast-node.
+ # We don't report this wrong result.
+ raise KnownIssue("assert")
+
+ if any(isinstance(n, ast.pattern) for n in node_and_parents(node)):
+ # TODO: investigate
+ raise KnownIssue("pattern matching ranges seems to be wrong")
+
+ if instruction.opname == "STORE_NAME" and instruction.argval == "__classcell__":
+ # handle stores to __classcell__ as KnownIssue,
+ # because they get complicated if they are used in `if` or `for` loops
+ # example:
+ #
+ # class X:
+ # # ... something
+ # if some_condition:
+ # def method(self):
+ # super()
+ #
+ # The `STORE_NAME` instruction gets mapped to the `ast.If` node,
+ # because it is the last element in the class.
+ # This last element could be anything and gets dificult to verify.
+
+ raise KnownIssue("store __classcell__")
+
+ @staticmethod
+ def is_except_cleanup(inst: dis.Instruction, node: EnhancedAST) -> bool:
+ if inst.opname not in (
+ "STORE_NAME",
+ "STORE_FAST",
+ "STORE_DEREF",
+ "STORE_GLOBAL",
+ "DELETE_NAME",
+ "DELETE_FAST",
+ "DELETE_GLOBAL",
+ ):
+ return False
+
+ # This bytecode does something exception cleanup related.
+ # The position of the instruciton seems to be something in the last ast-node of the ExceptHandler
+ # this could be a bug, but it might not be observable in normal python code.
+
+ # example:
+ # except Exception as exc:
+ # enum_member._value_ = value
+
+ # other example:
+ # STORE_FAST of e was mapped to Constant(value=False)
+ # except OSError as e:
+ # if not _ignore_error(e):
+ # raise
+ # return False
+
+ # STORE_FAST of msg was mapped to print(...)
+ # except TypeError as msg:
+ # print("Sorry:", msg, file=file)
+
+ return any(
+ isinstance(n, ast.ExceptHandler) and mangled_name(n) == inst.argval
+ for n in parents(node)
+ )
+
+ def verify(self, node: EnhancedAST, instruction: dis.Instruction) -> None:
+ """
+ checks if this node could gererate this instruction
+ """
+
+ op_name = instruction.opname
+ extra_filter: Callable[[EnhancedAST], bool] = lambda e: True
+ ctx: Type = type(None)
+
+ def inst_match(opnames: Union[str, Sequence[str]], **kwargs: Any) -> bool:
+ """
+ match instruction
+
+ Parameters:
+ opnames: (str|Seq[str]): inst.opname has to be equal to or in `opname`
+ **kwargs: every arg has to match inst.arg
+
+ Returns:
+ True if all conditions match the instruction
+
+ """
+
+ if isinstance(opnames, str):
+ opnames = [opnames]
+ return instruction.opname in opnames and kwargs == {
+ k: getattr(instruction, k) for k in kwargs
+ }
+
+ def node_match(node_type: Union[Type, Tuple[Type, ...]], **kwargs: Any) -> bool:
+ """
+ match the ast-node
+
+ Parameters:
+ node_type: type of the node
+ **kwargs: every `arg` has to be equal `node.arg`
+ or `node.arg` has to be an instance of `arg` if it is a type.
+ """
+ return isinstance(node, node_type) and all(
+ isinstance(getattr(node, k), v)
+ if isinstance(v, type)
+ else getattr(node, k) == v
+ for k, v in kwargs.items()
+ )
+
+ if op_name == "CACHE":
+ return
+
+ if inst_match("CALL") and node_match((ast.With, ast.AsyncWith)):
+ # call to context.__exit__
+ return
+
+ if inst_match(("CALL", "LOAD_FAST")) and node_match(
+ (ast.ListComp, ast.GeneratorExp, ast.SetComp, ast.DictComp)
+ ):
+ # call to the generator function
+ return
+
+ if inst_match(("CALL", "CALL_FUNCTION_EX")) and node_match(
+ (ast.ClassDef, ast.Call)
+ ):
+ return
+
+ if inst_match(("COMPARE_OP", "IS_OP", "CONTAINS_OP")) and node_match(
+ ast.Compare
+ ):
+ return
+
+ if inst_match("LOAD_NAME", argval="__annotations__") and node_match(
+ ast.AnnAssign
+ ):
+ return
+
+ if (
+ (
+ inst_match("LOAD_METHOD", argval="join")
+ or inst_match(("CALL", "BUILD_STRING"))
+ )
+ and node_match(ast.BinOp, left=ast.Constant, op=ast.Mod)
+ and isinstance(cast(ast.Constant, cast(ast.BinOp, node).left).value, str)
+ ):
+ # "..."%(...) uses "".join
+ return
+
+ if inst_match("STORE_SUBSCR") and node_match(ast.AnnAssign):
+ # data: int
+ return
+
+ if self.is_except_cleanup(instruction, node):
+ return
+
+ if inst_match(("DELETE_NAME", "DELETE_FAST")) and node_match(
+ ast.Name, id=instruction.argval, ctx=ast.Del
+ ):
+ return
+
+ if inst_match("BUILD_STRING") and (
+ node_match(ast.JoinedStr) or node_match(ast.BinOp, op=ast.Mod)
+ ):
+ return
+
+ if inst_match(("BEFORE_WITH","WITH_EXCEPT_START")) and node_match(ast.With):
+ return
+
+ if inst_match(("STORE_NAME", "STORE_GLOBAL"), argval="__doc__") and node_match(
+ ast.Constant
+ ):
+ # store docstrings
+ return
+
+ if (
+ inst_match(("STORE_NAME", "STORE_FAST", "STORE_GLOBAL", "STORE_DEREF"))
+ and node_match(ast.ExceptHandler)
+ and instruction.argval == mangled_name(node)
+ ):
+ # store exception in variable
+ return
+
+ if (
+ inst_match(("STORE_NAME", "STORE_FAST", "STORE_DEREF", "STORE_GLOBAL"))
+ and node_match((ast.Import, ast.ImportFrom))
+ and any(mangled_name(cast(EnhancedAST, alias)) == instruction.argval for alias in cast(ast.Import, node).names)
+ ):
+ # store imported module in variable
+ return
+
+ if (
+ inst_match(("STORE_FAST", "STORE_DEREF", "STORE_NAME", "STORE_GLOBAL"))
+ and (
+ node_match((ast.FunctionDef, ast.ClassDef, ast.AsyncFunctionDef))
+ or node_match(
+ ast.Name,
+ ctx=ast.Store,
+ )
+ )
+ and instruction.argval == mangled_name(node)
+ ):
+ return
+
+ if False:
+ # TODO: match expressions are not supported for now
+ if inst_match(("STORE_FAST", "STORE_NAME")) and node_match(
+ ast.MatchAs, name=instruction.argval
+ ):
+ return
+
+ if inst_match("COMPARE_OP", argval="==") and node_match(ast.MatchSequence):
+ return
+
+ if inst_match("COMPARE_OP", argval="==") and node_match(ast.MatchValue):
+ return
+
+ if inst_match("BINARY_OP") and node_match(
+ ast.AugAssign, op=op_type_map[instruction.argrepr.removesuffix("=")]
+ ):
+ # a+=5
+ return
+
+ if node_match(ast.Attribute, ctx=ast.Del) and inst_match(
+ "DELETE_ATTR", argval=mangled_name(node)
+ ):
+ return
+
+ if inst_match(("JUMP_IF_TRUE_OR_POP", "JUMP_IF_FALSE_OR_POP")) and node_match(
+ ast.BoolOp
+ ):
+ # and/or short circuit
+ return
+
+ if inst_match("DELETE_SUBSCR") and node_match(ast.Subscript, ctx=ast.Del):
+ return
+
+ if node_match(ast.Name, ctx=ast.Load) and inst_match(
+ ("LOAD_NAME", "LOAD_FAST", "LOAD_GLOBAL"), argval=mangled_name(node)
+ ):
+ return
+
+ if node_match(ast.Name, ctx=ast.Del) and inst_match(
+ ("DELETE_NAME", "DELETE_GLOBAL"), argval=mangled_name(node)
+ ):
+ return
+
+ # old verifier
+
+ typ: Type = type(None)
+ op_type: Type = type(None)
+
+ if op_name.startswith(("BINARY_SUBSCR", "SLICE+")):
+ typ = ast.Subscript
+ ctx = ast.Load
+ elif op_name.startswith("BINARY_"):
+ typ = ast.BinOp
+ op_type = op_type_map[instruction.argrepr]
+ extra_filter = lambda e: isinstance(cast(ast.BinOp, e).op, op_type)
+ elif op_name.startswith("UNARY_"):
+ typ = ast.UnaryOp
+ op_type = dict(
+ UNARY_POSITIVE=ast.UAdd,
+ UNARY_NEGATIVE=ast.USub,
+ UNARY_NOT=ast.Not,
+ UNARY_INVERT=ast.Invert,
+ )[op_name]
+ extra_filter = lambda e: isinstance(cast(ast.UnaryOp, e).op, op_type)
+ elif op_name in ("LOAD_ATTR", "LOAD_METHOD", "LOOKUP_METHOD"):
+ typ = ast.Attribute
+ ctx = ast.Load
+ extra_filter = lambda e: mangled_name(e) == instruction.argval
+ elif op_name in (
+ "LOAD_NAME",
+ "LOAD_GLOBAL",
+ "LOAD_FAST",
+ "LOAD_DEREF",
+ "LOAD_CLASSDEREF",
+ ):
+ typ = ast.Name
+ ctx = ast.Load
+ extra_filter = lambda e: cast(ast.Name, e).id == instruction.argval
+ elif op_name in ("COMPARE_OP", "IS_OP", "CONTAINS_OP"):
+ typ = ast.Compare
+ extra_filter = lambda e: len(cast(ast.Compare, e).ops) == 1
+ elif op_name.startswith(("STORE_SLICE", "STORE_SUBSCR")):
+ ctx = ast.Store
+ typ = ast.Subscript
+ elif op_name.startswith("STORE_ATTR"):
+ ctx = ast.Store
+ typ = ast.Attribute
+ extra_filter = lambda e: mangled_name(e) == instruction.argval
+
+ node_ctx = getattr(node, "ctx", None)
+
+ ctx_match = (
+ ctx is not type(None)
+ or not hasattr(node, "ctx")
+ or isinstance(node_ctx, ctx)
+ )
+
+ # check for old verifier
+ if isinstance(node, typ) and ctx_match and extra_filter(node):
+ return
+
+ # generate error
+
+ title = "ast.%s is not created from %s" % (
+ type(node).__name__,
+ instruction.opname,
+ )
+
+ raise VerifierFailure(title, node, instruction)
+
+ def instruction(self, index: int) -> dis.Instruction:
+ return self.bc_list[index // 2]
+
+ def opname(self, index: int) -> str:
+ return self.instruction(index).opname
+
+ def find_node(
+ self,
+ index: int,
+ match_positions: Sequence[str]=("lineno", "end_lineno", "col_offset", "end_col_offset"),
+ typ: tuple[Type, ...]=(ast.expr, ast.stmt, ast.excepthandler, ast.pattern),
+ ) -> EnhancedAST:
+ position = self.instruction(index).positions
+ assert position is not None and position.lineno is not None
+
+ return only(
+ cast(EnhancedAST, node)
+ for node in self.source._nodes_by_line[position.lineno]
+ if isinstance(node, typ)
+ if not isinstance(node, ast.Expr)
+ # matchvalue.value has the same positions as matchvalue themself, so we exclude ast.MatchValue
+ if not isinstance(node, ast.MatchValue)
+ if all(
+ getattr(position, attr) == getattr(node, attr)
+ for attr in match_positions
+ )
+ )
diff --git a/contrib/python/executing/executing/executing.py b/contrib/python/executing/executing/executing.py
new file mode 100644
index 00000000000..c28091d3f84
--- /dev/null
+++ b/contrib/python/executing/executing/executing.py
@@ -0,0 +1,1260 @@
+"""
+MIT License
+
+Copyright (c) 2021 Alex Hall
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+"""
+
+import __future__
+import ast
+import dis
+import functools
+import inspect
+import io
+import linecache
+import re
+import sys
+import types
+from collections import defaultdict, namedtuple
+from copy import deepcopy
+from itertools import islice
+from operator import attrgetter
+from threading import RLock
+from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast
+
+if TYPE_CHECKING: # pragma: no cover
+ from asttokens import ASTTokens, ASTText
+ from asttokens.asttokens import ASTTextBase
+
+function_node_types = (ast.FunctionDef,) # type: Tuple[Type, ...]
+if sys.version_info[0] == 3:
+ function_node_types += (ast.AsyncFunctionDef,)
+
+
+if sys.version_info[0] == 3:
+ # noinspection PyUnresolvedReferences
+ from functools import lru_cache
+ # noinspection PyUnresolvedReferences
+ from tokenize import detect_encoding
+ from itertools import zip_longest
+ # noinspection PyUnresolvedReferences,PyCompatibility
+ from pathlib import Path
+
+ cache = lru_cache(maxsize=None)
+ text_type = str
+else:
+ from lib2to3.pgen2.tokenize import detect_encoding, cookie_re as encoding_pattern # type: ignore[attr-defined]
+ from itertools import izip_longest as zip_longest
+
+ class Path(object):
+ pass
+
+
+ def cache(func):
+ # type: (Callable) -> Callable
+ d = {} # type: Dict[Tuple, Callable]
+
+ @functools.wraps(func)
+ def wrapper(*args):
+ # type: (Any) -> Any
+ if args in d:
+ return d[args]
+ result = d[args] = func(*args)
+ return result
+
+ return wrapper
+
+
+ # noinspection PyUnresolvedReferences
+ text_type = unicode
+
+# Type class used to expand out the definition of AST to include fields added by this library
+# It's not actually used for anything other than type checking though!
+class EnhancedAST(ast.AST):
+ parent = None # type: EnhancedAST
+
+if sys.version_info >= (3, 4):
+ # noinspection PyUnresolvedReferences
+ _get_instructions = dis.get_instructions
+ from dis import Instruction as _Instruction
+
+ class Instruction(_Instruction):
+ lineno = None # type: int
+else:
+ class Instruction(namedtuple('Instruction', 'offset argval opname starts_line')):
+ lineno = None # type: int
+
+ from dis import HAVE_ARGUMENT, EXTENDED_ARG, hasconst, opname, findlinestarts, hasname
+
+ # Based on dis.disassemble from 2.7
+ # Left as similar as possible for easy diff
+
+ def _get_instructions(co):
+ # type: (types.CodeType) -> Iterator[Instruction]
+ code = co.co_code
+ linestarts = dict(findlinestarts(co))
+ n = len(code)
+ i = 0
+ extended_arg = 0
+ while i < n:
+ offset = i
+ c = code[i]
+ op = ord(c)
+ lineno = linestarts.get(i)
+ argval = None
+ i = i + 1
+ if op >= HAVE_ARGUMENT:
+ oparg = ord(code[i]) + ord(code[i + 1]) * 256 + extended_arg
+ extended_arg = 0
+ i = i + 2
+ if op == EXTENDED_ARG:
+ extended_arg = oparg * 65536
+
+ if op in hasconst:
+ argval = co.co_consts[oparg]
+ elif op in hasname:
+ argval = co.co_names[oparg]
+ elif opname[op] == 'LOAD_FAST':
+ argval = co.co_varnames[oparg]
+ yield Instruction(offset, argval, opname[op], lineno)
+
+
+# Type class used to expand out the definition of AST to include fields added by this library
+# It's not actually used for anything other than type checking though!
+class EnhancedInstruction(Instruction):
+ _copied = None # type: bool
+
+
+
+def assert_(condition, message=""):
+ # type: (Any, str) -> None
+ """
+ Like an assert statement, but unaffected by -O
+ :param condition: value that is expected to be truthy
+ :type message: Any
+ """
+ if not condition:
+ raise AssertionError(str(message))
+
+
+def get_instructions(co):
+ # type: (types.CodeType) -> Iterator[EnhancedInstruction]
+ lineno = co.co_firstlineno
+ for inst in _get_instructions(co):
+ inst = cast(EnhancedInstruction, inst)
+ lineno = inst.starts_line or lineno
+ assert_(lineno)
+ inst.lineno = lineno
+ yield inst
+
+
+TESTING = 0
+
+
+class NotOneValueFound(Exception):
+ def __init__(self,msg,values=[]):
+ # type: (str, Sequence) -> None
+ self.values=values
+ super(NotOneValueFound,self).__init__(msg)
+
+T = TypeVar('T')
+
+
+def only(it):
+ # type: (Iterable[T]) -> T
+ if isinstance(it, Sized):
+ if len(it) != 1:
+ raise NotOneValueFound('Expected one value, found %s' % len(it))
+ # noinspection PyTypeChecker
+ return list(it)[0]
+
+ lst = tuple(islice(it, 2))
+ if len(lst) == 0:
+ raise NotOneValueFound('Expected one value, found 0')
+ if len(lst) > 1:
+ raise NotOneValueFound('Expected one value, found several',lst)
+ return lst[0]
+
+
+class Source(object):
+ """
+ The source code of a single file and associated metadata.
+
+ The main method of interest is the classmethod `executing(frame)`.
+
+ If you want an instance of this class, don't construct it.
+ Ideally use the classmethod `for_frame(frame)`.
+ If you don't have a frame, use `for_filename(filename [, module_globals])`.
+ These methods cache instances by filename, so at most one instance exists per filename.
+
+ Attributes:
+ - filename
+ - text
+ - lines
+ - tree: AST parsed from text, or None if text is not valid Python
+ All nodes in the tree have an extra `parent` attribute
+
+ Other methods of interest:
+ - statements_at_line
+ - asttokens
+ - code_qualname
+ """
+
+ def __init__(self, filename, lines):
+ # type: (str, Sequence[str]) -> None
+ """
+ Don't call this constructor, see the class docstring.
+ """
+
+ self.filename = filename
+ text = ''.join(lines)
+
+ if not isinstance(text, text_type):
+ encoding = self.detect_encoding(text)
+ # noinspection PyUnresolvedReferences
+ text = text.decode(encoding)
+ lines = [line.decode(encoding) for line in lines]
+
+ self.text = text
+ self.lines = [line.rstrip('\r\n') for line in lines]
+
+ if sys.version_info[0] == 3:
+ ast_text = text
+ else:
+ # In python 2 it's a syntax error to parse unicode
+ # with an encoding declaration, so we remove it but
+ # leave empty lines in its place to keep line numbers the same
+ ast_text = ''.join([
+ '\n' if i < 2 and encoding_pattern.match(line)
+ else line
+ for i, line in enumerate(lines)
+ ])
+
+ self._nodes_by_line = defaultdict(list)
+ self.tree = None
+ self._qualnames = {}
+ self._asttokens = None # type: Optional[ASTTokens]
+ self._asttext = None # type: Optional[ASTText]
+
+ try:
+ self.tree = ast.parse(ast_text, filename=filename)
+ except (SyntaxError, ValueError):
+ pass
+ else:
+ for node in ast.walk(self.tree):
+ for child in ast.iter_child_nodes(node):
+ cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
+ for lineno in node_linenos(node):
+ self._nodes_by_line[lineno].append(node)
+
+ visitor = QualnameVisitor()
+ visitor.visit(self.tree)
+ self._qualnames = visitor.qualnames
+
+ @classmethod
+ def for_frame(cls, frame, use_cache=True):
+ # type: (types.FrameType, bool) -> "Source"
+ """
+ Returns the `Source` object corresponding to the file the frame is executing in.
+ """
+ return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
+
+ @classmethod
+ def for_filename(
+ cls,
+ filename,
+ module_globals=None,
+ use_cache=True, # noqa no longer used
+ ):
+ # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source"
+ if isinstance(filename, Path):
+ filename = str(filename)
+
+ def get_lines():
+ # type: () -> List[str]
+ return linecache.getlines(cast(text_type, filename), module_globals)
+
+ # Save the current linecache entry, then ensure the cache is up to date.
+ entry = linecache.cache.get(filename) # type: ignore[attr-defined]
+ linecache.checkcache(filename)
+ lines = get_lines()
+ if entry is not None and not lines:
+ # There was an entry, checkcache removed it, and nothing replaced it.
+ # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
+ # but rather the file was found not to exist, probably because `filename` was fake.
+ # Restore the original entry so that we still have something.
+ linecache.cache[filename] = entry # type: ignore[attr-defined]
+ lines = get_lines()
+
+ return cls._for_filename_and_lines(filename, tuple(lines))
+
+ @classmethod
+ def _for_filename_and_lines(cls, filename, lines):
+ # type: (str, Sequence[str]) -> "Source"
+ source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source]
+ try:
+ return source_cache[(filename, lines)]
+ except KeyError:
+ pass
+
+ result = source_cache[(filename, lines)] = cls(filename, lines)
+ return result
+
+ @classmethod
+ def lazycache(cls, frame):
+ # type: (types.FrameType) -> None
+ if sys.version_info >= (3, 5):
+ linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
+
+ @classmethod
+ def executing(cls, frame_or_tb):
+ # type: (Union[types.TracebackType, types.FrameType]) -> "Executing"
+ """
+ Returns an `Executing` object representing the operation
+ currently executing in the given frame or traceback object.
+ """
+ if isinstance(frame_or_tb, types.TracebackType):
+ # https://docs.python.org/3/reference/datamodel.html#traceback-objects
+ # "tb_lineno gives the line number where the exception occurred;
+ # tb_lasti indicates the precise instruction.
+ # The line number and last instruction in the traceback may differ
+ # from the line number of its frame object
+ # if the exception occurred in a try statement with no matching except clause
+ # or with a finally clause."
+ tb = frame_or_tb
+ frame = tb.tb_frame
+ lineno = tb.tb_lineno
+ lasti = tb.tb_lasti
+ else:
+ frame = frame_or_tb
+ lineno = frame.f_lineno
+ lasti = frame.f_lasti
+
+
+
+ code = frame.f_code
+ key = (code, id(code), lasti)
+ executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any]
+
+ args = executing_cache.get(key)
+ if not args:
+ node = stmts = decorator = None
+ source = cls.for_frame(frame)
+ tree = source.tree
+ if tree:
+ try:
+ stmts = source.statements_at_line(lineno)
+ if stmts:
+ if is_ipython_cell_code(code):
+ decorator, node = find_node_ipython(frame, lasti, stmts, source)
+ else:
+ node_finder = NodeFinder(frame, stmts, tree, lasti, source)
+ node = node_finder.result
+ decorator = node_finder.decorator
+ except Exception:
+ if TESTING:
+ raise
+
+ assert stmts is not None
+ if node:
+ new_stmts = {statement_containing_node(node)}
+ assert_(new_stmts <= stmts)
+ stmts = new_stmts
+
+ executing_cache[key] = args = source, node, stmts, decorator
+
+ return Executing(frame, *args)
+
+ @classmethod
+ def _class_local(cls, name, default):
+ # type: (str, T) -> T
+ """
+ Returns an attribute directly associated with this class
+ (as opposed to subclasses), setting default if necessary
+ """
+ # classes have a mappingproxy preventing us from using setdefault
+ result = cls.__dict__.get(name, default)
+ setattr(cls, name, result)
+ return result
+
+ @cache
+ def statements_at_line(self, lineno):
+ # type: (int) -> Set[EnhancedAST]
+ """
+ Returns the statement nodes overlapping the given line.
+
+ Returns at most one statement unless semicolons are present.
+
+ If the `text` attribute is not valid python, meaning
+ `tree` is None, returns an empty set.
+
+ Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
+ should return at least one statement.
+ """
+
+ return {
+ statement_containing_node(node)
+ for node in
+ self._nodes_by_line[lineno]
+ }
+
+ def asttext(self):
+ # type: () -> ASTText
+ """
+ Returns an ASTText object for getting the source of specific AST nodes.
+
+ See http://asttokens.readthedocs.io/en/latest/api-index.html
+ """
+ from asttokens import ASTText # must be installed separately
+
+ if self._asttext is None:
+ self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
+
+ return self._asttext
+
+ def asttokens(self):
+ # type: () -> ASTTokens
+ """
+ Returns an ASTTokens object for getting the source of specific AST nodes.
+
+ See http://asttokens.readthedocs.io/en/latest/api-index.html
+ """
+ import asttokens # must be installed separately
+
+ if self._asttokens is None:
+ if hasattr(asttokens, 'ASTText'):
+ self._asttokens = self.asttext().asttokens
+ else: # pragma: no cover
+ self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
+ return self._asttokens
+
+ def _asttext_base(self):
+ # type: () -> ASTTextBase
+ import asttokens # must be installed separately
+
+ if hasattr(asttokens, 'ASTText'):
+ return self.asttext()
+ else: # pragma: no cover
+ return self.asttokens()
+
+ @staticmethod
+ def decode_source(source):
+ # type: (Union[str, bytes]) -> text_type
+ if isinstance(source, bytes):
+ encoding = Source.detect_encoding(source)
+ return source.decode(encoding)
+ else:
+ return source
+
+ @staticmethod
+ def detect_encoding(source):
+ # type: (bytes) -> str
+ return detect_encoding(io.BytesIO(source).readline)[0]
+
+ def code_qualname(self, code):
+ # type: (types.CodeType) -> str
+ """
+ Imitates the __qualname__ attribute of functions for code objects.
+ Given:
+
+ - A function `func`
+ - A frame `frame` for an execution of `func`, meaning:
+ `frame.f_code is func.__code__`
+
+ `Source.for_frame(frame).code_qualname(frame.f_code)`
+ will be equal to `func.__qualname__`*. Works for Python 2 as well,
+ where of course no `__qualname__` attribute exists.
+
+ Falls back to `code.co_name` if there is no appropriate qualname.
+
+ Based on https://github.com/wbolster/qualname
+
+ (* unless `func` is a lambda
+ nested inside another lambda on the same line, in which case
+ the outer lambda's qualname will be returned for the codes
+ of both lambdas)
+ """
+ assert_(code.co_filename == self.filename)
+ return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
+
+
+class Executing(object):
+ """
+ Information about the operation a frame is currently executing.
+
+ Generally you will just want `node`, which is the AST node being executed,
+ or None if it's unknown.
+
+ If a decorator is currently being called, then:
+ - `node` is a function or class definition
+ - `decorator` is the expression in `node.decorator_list` being called
+ - `statements == {node}`
+ """
+
+ def __init__(self, frame, source, node, stmts, decorator):
+ # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None
+ self.frame = frame
+ self.source = source
+ self.node = node
+ self.statements = stmts
+ self.decorator = decorator
+
+ def code_qualname(self):
+ # type: () -> str
+ return self.source.code_qualname(self.frame.f_code)
+
+ def text(self):
+ # type: () -> str
+ return self.source._asttext_base().get_text(self.node)
+
+ def text_range(self):
+ # type: () -> Tuple[int, int]
+ return self.source._asttext_base().get_text_range(self.node)
+
+
+class QualnameVisitor(ast.NodeVisitor):
+ def __init__(self):
+ # type: () -> None
+ super(QualnameVisitor, self).__init__()
+ self.stack = [] # type: List[str]
+ self.qualnames = {} # type: Dict[Tuple[str, int], str]
+
+ def add_qualname(self, node, name=None):
+ # type: (ast.AST, Optional[str]) -> None
+ name = name or node.name # type: ignore[attr-defined]
+ self.stack.append(name)
+ if getattr(node, 'decorator_list', ()):
+ lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
+ else:
+ lineno = node.lineno # type: ignore[attr-defined]
+ self.qualnames.setdefault((name, lineno), ".".join(self.stack))
+
+ def visit_FunctionDef(self, node, name=None):
+ # type: (ast.AST, Optional[str]) -> None
+ if sys.version_info[0] == 3:
+ assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
+ else:
+ assert isinstance(node, (ast.FunctionDef, ast.Lambda)), node
+ self.add_qualname(node, name)
+ self.stack.append('<locals>')
+ children = [] # type: Sequence[ast.AST]
+ if isinstance(node, ast.Lambda):
+ children = [node.body]
+ else:
+ children = node.body
+ for child in children:
+ self.visit(child)
+ self.stack.pop()
+ self.stack.pop()
+
+ # Find lambdas in the function definition outside the body,
+ # e.g. decorators or default arguments
+ # Based on iter_child_nodes
+ for field, child in ast.iter_fields(node):
+ if field == 'body':
+ continue
+ if isinstance(child, ast.AST):
+ self.visit(child)
+ elif isinstance(child, list):
+ for grandchild in child:
+ if isinstance(grandchild, ast.AST):
+ self.visit(grandchild)
+
+ visit_AsyncFunctionDef = visit_FunctionDef
+
+ def visit_Lambda(self, node):
+ # type: (ast.AST) -> None
+ assert isinstance(node, ast.Lambda)
+ self.visit_FunctionDef(node, '<lambda>')
+
+ def visit_ClassDef(self, node):
+ # type: (ast.AST) -> None
+ assert isinstance(node, ast.ClassDef)
+ self.add_qualname(node)
+ self.generic_visit(node)
+ self.stack.pop()
+
+
+
+
+
+future_flags = sum(
+ getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
+)
+
+
+def compile_similar_to(source, matching_code):
+ # type: (ast.Module, types.CodeType) -> Any
+ return compile(
+ source,
+ matching_code.co_filename,
+ 'exec',
+ flags=future_flags & matching_code.co_flags,
+ dont_inherit=True,
+ )
+
+
+sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
+
+def is_rewritten_by_pytest(code):
+ # type: (types.CodeType) -> bool
+ return any(
+ bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
+ for bc in get_instructions(code)
+ )
+
+
+class SentinelNodeFinder(object):
+ result = None # type: EnhancedAST
+
+ def __init__(self, frame, stmts, tree, lasti, source):
+ # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None
+ assert_(stmts)
+ self.frame = frame
+ self.tree = tree
+ self.code = code = frame.f_code
+ self.is_pytest = is_rewritten_by_pytest(code)
+
+ if self.is_pytest:
+ self.ignore_linenos = frozenset(assert_linenos(tree))
+ else:
+ self.ignore_linenos = frozenset()
+
+ self.decorator = None
+
+ self.instruction = instruction = self.get_actual_current_instruction(lasti)
+ op_name = instruction.opname
+ extra_filter = lambda e: True # type: Callable[[Any], bool]
+ ctx = type(None) # type: Type
+
+ typ = type(None) # type: Type
+ if op_name.startswith('CALL_'):
+ typ = ast.Call
+ elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
+ typ = ast.Subscript
+ ctx = ast.Load
+ elif op_name.startswith('BINARY_'):
+ typ = ast.BinOp
+ op_type = dict(
+ BINARY_POWER=ast.Pow,
+ BINARY_MULTIPLY=ast.Mult,
+ BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
+ BINARY_FLOOR_DIVIDE=ast.FloorDiv,
+ BINARY_TRUE_DIVIDE=ast.Div,
+ BINARY_MODULO=ast.Mod,
+ BINARY_ADD=ast.Add,
+ BINARY_SUBTRACT=ast.Sub,
+ BINARY_LSHIFT=ast.LShift,
+ BINARY_RSHIFT=ast.RShift,
+ BINARY_AND=ast.BitAnd,
+ BINARY_XOR=ast.BitXor,
+ BINARY_OR=ast.BitOr,
+ )[op_name]
+ extra_filter = lambda e: isinstance(e.op, op_type)
+ elif op_name.startswith('UNARY_'):
+ typ = ast.UnaryOp
+ op_type = dict(
+ UNARY_POSITIVE=ast.UAdd,
+ UNARY_NEGATIVE=ast.USub,
+ UNARY_NOT=ast.Not,
+ UNARY_INVERT=ast.Invert,
+ )[op_name]
+ extra_filter = lambda e: isinstance(e.op, op_type)
+ elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
+ typ = ast.Attribute
+ ctx = ast.Load
+ extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
+ elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
+ typ = ast.Name
+ ctx = ast.Load
+ if sys.version_info[0] == 3 or instruction.argval:
+ extra_filter = lambda e: e.id == instruction.argval
+ elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
+ typ = ast.Compare
+ extra_filter = lambda e: len(e.ops) == 1
+ elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
+ ctx = ast.Store
+ typ = ast.Subscript
+ elif op_name.startswith('STORE_ATTR'):
+ ctx = ast.Store
+ typ = ast.Attribute
+ extra_filter = lambda e: attr_names_match(e.attr, instruction.argval)
+ else:
+ raise RuntimeError(op_name)
+
+ with lock:
+ exprs = {
+ cast(EnhancedAST, node)
+ for stmt in stmts
+ for node in ast.walk(stmt)
+ if isinstance(node, typ)
+ if isinstance(getattr(node, "ctx", None), ctx)
+ if extra_filter(node)
+ if statement_containing_node(node) == stmt
+ }
+
+ if ctx == ast.Store:
+ # No special bytecode tricks here.
+ # We can handle multiple assigned attributes with different names,
+ # but only one assigned subscript.
+ self.result = only(exprs)
+ return
+
+ matching = list(self.matching_nodes(exprs))
+ if not matching and typ == ast.Call:
+ self.find_decorator(stmts)
+ else:
+ self.result = only(matching)
+
+ def find_decorator(self, stmts):
+ # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None
+ stmt = only(stmts)
+ assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
+ decorators = stmt.decorator_list # type: ignore[attr-defined]
+ assert_(decorators)
+ line_instructions = [
+ inst
+ for inst in self.clean_instructions(self.code)
+ if inst.lineno == self.frame.f_lineno
+ ]
+ last_decorator_instruction_index = [
+ i
+ for i, inst in enumerate(line_instructions)
+ if inst.opname == "CALL_FUNCTION"
+ ][-1]
+ assert_(
+ line_instructions[last_decorator_instruction_index + 1].opname.startswith(
+ "STORE_"
+ )
+ )
+ decorator_instructions = line_instructions[
+ last_decorator_instruction_index
+ - len(decorators)
+ + 1 : last_decorator_instruction_index
+ + 1
+ ]
+ assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
+ decorator_index = decorator_instructions.index(self.instruction)
+ decorator = decorators[::-1][decorator_index]
+ self.decorator = decorator
+ self.result = stmt
+
+ def clean_instructions(self, code):
+ # type: (types.CodeType) -> List[EnhancedInstruction]
+ return [
+ inst
+ for inst in get_instructions(code)
+ if inst.opname not in ("EXTENDED_ARG", "NOP")
+ if inst.lineno not in self.ignore_linenos
+ ]
+
+ def get_original_clean_instructions(self):
+ # type: () -> List[EnhancedInstruction]
+ result = self.clean_instructions(self.code)
+
+ # pypy sometimes (when is not clear)
+ # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
+ # If they're not present in our compiled instructions,
+ # ignore them in the original bytecode
+ if not any(
+ inst.opname == "JUMP_IF_NOT_DEBUG"
+ for inst in self.compile_instructions()
+ ):
+ result = [
+ inst for inst in result
+ if inst.opname != "JUMP_IF_NOT_DEBUG"
+ ]
+
+ return result
+
+ def matching_nodes(self, exprs):
+ # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST]
+ original_instructions = self.get_original_clean_instructions()
+ original_index = only(
+ i
+ for i, inst in enumerate(original_instructions)
+ if inst == self.instruction
+ )
+ for expr_index, expr in enumerate(exprs):
+ setter = get_setter(expr)
+ assert setter is not None
+ # noinspection PyArgumentList
+ replacement = ast.BinOp(
+ left=expr,
+ op=ast.Pow(),
+ right=ast.Str(s=sentinel),
+ )
+ ast.fix_missing_locations(replacement)
+ setter(replacement)
+ try:
+ instructions = self.compile_instructions()
+ finally:
+ setter(expr)
+
+ if sys.version_info >= (3, 10):
+ try:
+ handle_jumps(instructions, original_instructions)
+ except Exception:
+ # Give other candidates a chance
+ if TESTING or expr_index < len(exprs) - 1:
+ continue
+ raise
+
+ indices = [
+ i
+ for i, instruction in enumerate(instructions)
+ if instruction.argval == sentinel
+ ]
+
+ # There can be several indices when the bytecode is duplicated,
+ # as happens in a finally block in 3.9+
+ # First we remove the opcodes caused by our modifications
+ for index_num, sentinel_index in enumerate(indices):
+ # Adjustment for removing sentinel instructions below
+ # in past iterations
+ sentinel_index -= index_num * 2
+
+ assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
+ assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
+
+ # Then we see if any of the instruction indices match
+ for index_num, sentinel_index in enumerate(indices):
+ sentinel_index -= index_num * 2
+ new_index = sentinel_index - 1
+
+ if new_index != original_index:
+ continue
+
+ original_inst = original_instructions[original_index]
+ new_inst = instructions[new_index]
+
+ # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
+ # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
+ if (
+ original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
+ and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
+ and (
+ original_instructions[original_index + 1].opname
+ != instructions[new_index + 1].opname == 'UNARY_NOT'
+ )):
+ # Remove the difference for the upcoming assert
+ instructions.pop(new_index + 1)
+
+ # Check that the modified instructions don't have anything unexpected
+ # 3.10 is a bit too weird to assert this in all cases but things still work
+ if sys.version_info < (3, 10):
+ for inst1, inst2 in zip_longest(
+ original_instructions, instructions
+ ):
+ assert_(inst1 and inst2 and opnames_match(inst1, inst2))
+
+ yield expr
+
+ def compile_instructions(self):
+ # type: () -> List[EnhancedInstruction]
+ module_code = compile_similar_to(self.tree, self.code)
+ code = only(self.find_codes(module_code))
+ return self.clean_instructions(code)
+
+ def find_codes(self, root_code):
+ # type: (types.CodeType) -> list
+ checks = [
+ attrgetter('co_firstlineno'),
+ attrgetter('co_freevars'),
+ attrgetter('co_cellvars'),
+ lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
+ ] # type: List[Callable]
+ if not self.is_pytest:
+ checks += [
+ attrgetter('co_names'),
+ attrgetter('co_varnames'),
+ ]
+
+ def matches(c):
+ # type: (types.CodeType) -> bool
+ return all(
+ f(c) == f(self.code)
+ for f in checks
+ )
+
+ code_options = []
+ if matches(root_code):
+ code_options.append(root_code)
+
+ def finder(code):
+ # type: (types.CodeType) -> None
+ for const in code.co_consts:
+ if not inspect.iscode(const):
+ continue
+
+ if matches(const):
+ code_options.append(const)
+ finder(const)
+
+ finder(root_code)
+ return code_options
+
+ def get_actual_current_instruction(self, lasti):
+ # type: (int) -> EnhancedInstruction
+ """
+ Get the instruction corresponding to the current
+ frame offset, skipping EXTENDED_ARG instructions
+ """
+ # Don't use get_original_clean_instructions
+ # because we need the actual instructions including
+ # EXTENDED_ARG
+ instructions = list(get_instructions(self.code))
+ index = only(
+ i
+ for i, inst in enumerate(instructions)
+ if inst.offset == lasti
+ )
+
+ while True:
+ instruction = instructions[index]
+ if instruction.opname != "EXTENDED_ARG":
+ return instruction
+ index += 1
+
+
+
+def non_sentinel_instructions(instructions, start):
+ # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]]
+ """
+ Yields (index, instruction) pairs excluding the basic
+ instructions introduced by the sentinel transformation
+ """
+ skip_power = False
+ for i, inst in islice(enumerate(instructions), start, None):
+ if inst.argval == sentinel:
+ assert_(inst.opname == "LOAD_CONST")
+ skip_power = True
+ continue
+ elif skip_power:
+ assert_(inst.opname == "BINARY_POWER")
+ skip_power = False
+ continue
+ yield i, inst
+
+
+def walk_both_instructions(original_instructions, original_start, instructions, start):
+ # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]
+ """
+ Yields matching indices and instructions from the new and original instructions,
+ leaving out changes made by the sentinel transformation.
+ """
+ original_iter = islice(enumerate(original_instructions), original_start, None)
+ new_iter = non_sentinel_instructions(instructions, start)
+ inverted_comparison = False
+ while True:
+ try:
+ original_i, original_inst = next(original_iter)
+ new_i, new_inst = next(new_iter)
+ except StopIteration:
+ return
+ if (
+ inverted_comparison
+ and original_inst.opname != new_inst.opname == "UNARY_NOT"
+ ):
+ new_i, new_inst = next(new_iter)
+ inverted_comparison = (
+ original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
+ and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
+ )
+ yield original_i, original_inst, new_i, new_inst
+
+
+def handle_jumps(instructions, original_instructions):
+ # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None
+ """
+ Transforms instructions in place until it looks more like original_instructions.
+ This is only needed in 3.10+ where optimisations lead to more drastic changes
+ after the sentinel transformation.
+ Replaces JUMP instructions that aren't also present in original_instructions
+ with the sections that they jump to until a raise or return.
+ In some other cases duplication found in `original_instructions`
+ is replicated in `instructions`.
+ """
+ while True:
+ for original_i, original_inst, new_i, new_inst in walk_both_instructions(
+ original_instructions, 0, instructions, 0
+ ):
+ if opnames_match(original_inst, new_inst):
+ continue
+
+ if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
+ # Find where the new instruction is jumping to, ignoring
+ # instructions which have been copied in previous iterations
+ start = only(
+ i
+ for i, inst in enumerate(instructions)
+ if inst.offset == new_inst.argval
+ and not getattr(inst, "_copied", False)
+ )
+ # Replace the jump instruction with the jumped to section of instructions
+ # That section may also be deleted if it's not similarly duplicated
+ # in original_instructions
+ new_instructions = handle_jump(
+ original_instructions, original_i, instructions, start
+ )
+ assert new_instructions is not None
+ instructions[new_i : new_i + 1] = new_instructions
+ else:
+ # Extract a section of original_instructions from original_i to return/raise
+ orig_section = []
+ for section_inst in original_instructions[original_i:]:
+ orig_section.append(section_inst)
+ if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
+ break
+ else:
+ # No return/raise - this is just a mismatch we can't handle
+ raise AssertionError
+
+ instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
+
+ # instructions has been modified, the for loop can't sensibly continue
+ # Restart it from the beginning, checking for other issues
+ break
+
+ else: # No mismatched jumps found, we're done
+ return
+
+
+def find_new_matching(orig_section, instructions):
+ # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]
+ """
+ Yields sections of `instructions` which match `orig_section`.
+ The yielded sections include sentinel instructions, but these
+ are ignored when checking for matches.
+ """
+ for start in range(len(instructions) - len(orig_section)):
+ indices, dup_section = zip(
+ *islice(
+ non_sentinel_instructions(instructions, start),
+ len(orig_section),
+ )
+ )
+ if len(dup_section) < len(orig_section):
+ return
+ if sections_match(orig_section, dup_section):
+ yield instructions[start:indices[-1] + 1]
+
+
+def handle_jump(original_instructions, original_start, instructions, start):
+ # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]]
+ """
+ Returns the section of instructions starting at `start` and ending
+ with a RETURN_VALUE or RAISE_VARARGS instruction.
+ There should be a matching section in original_instructions starting at original_start.
+ If that section doesn't appear elsewhere in original_instructions,
+ then also delete the returned section of instructions.
+ """
+ for original_j, original_inst, new_j, new_inst in walk_both_instructions(
+ original_instructions, original_start, instructions, start
+ ):
+ assert_(opnames_match(original_inst, new_inst))
+ if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
+ inlined = deepcopy(instructions[start : new_j + 1])
+ for inl in inlined:
+ inl._copied = True
+ orig_section = original_instructions[original_start : original_j + 1]
+ if not check_duplicates(
+ original_start, orig_section, original_instructions
+ ):
+ instructions[start : new_j + 1] = []
+ return inlined
+
+ return None
+
+
+def check_duplicates(original_i, orig_section, original_instructions):
+ # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
+ """
+ Returns True if a section of original_instructions starting somewhere other
+ than original_i and matching orig_section is found, i.e. orig_section is duplicated.
+ """
+ for dup_start in range(len(original_instructions)):
+ if dup_start == original_i:
+ continue
+ dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
+ if len(dup_section) < len(orig_section):
+ return False
+ if sections_match(orig_section, dup_section):
+ return True
+
+ return False
+
+def sections_match(orig_section, dup_section):
+ # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
+ """
+ Returns True if the given lists of instructions have matching linenos and opnames.
+ """
+ return all(
+ (
+ orig_inst.lineno == dup_inst.lineno
+ # POP_BLOCKs have been found to have differing linenos in innocent cases
+ or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
+ )
+ and opnames_match(orig_inst, dup_inst)
+ for orig_inst, dup_inst in zip(orig_section, dup_section)
+ )
+
+
+def opnames_match(inst1, inst2):
+ # type: (Instruction, Instruction) -> bool
+ return (
+ inst1.opname == inst2.opname
+ or "JUMP" in inst1.opname
+ and "JUMP" in inst2.opname
+ or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
+ or (
+ inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
+ and inst2.opname == "LOAD_ATTR"
+ )
+ or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
+ )
+
+
+def get_setter(node):
+ # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]]
+ parent = node.parent
+ for name, field in ast.iter_fields(parent):
+ if field is node:
+ def setter(new_node):
+ # type: (ast.AST) -> None
+ return setattr(parent, name, new_node)
+ return setter
+ elif isinstance(field, list):
+ for i, item in enumerate(field):
+ if item is node:
+ def setter(new_node):
+ # type: (ast.AST) -> None
+ field[i] = new_node
+
+ return setter
+ return None
+
+lock = RLock()
+
+
+@cache
+def statement_containing_node(node):
+ # type: (ast.AST) -> EnhancedAST
+ while not isinstance(node, ast.stmt):
+ node = cast(EnhancedAST, node).parent
+ return cast(EnhancedAST, node)
+
+
+def assert_linenos(tree):
+ # type: (ast.AST) -> Iterator[int]
+ for node in ast.walk(tree):
+ if (
+ hasattr(node, 'parent') and
+ isinstance(statement_containing_node(node), ast.Assert)
+ ):
+ for lineno in node_linenos(node):
+ yield lineno
+
+
+def _extract_ipython_statement(stmt):
+ # type: (EnhancedAST) -> ast.Module
+ # IPython separates each statement in a cell to be executed separately
+ # So NodeFinder should only compile one statement at a time or it
+ # will find a code mismatch.
+ while not isinstance(stmt.parent, ast.Module):
+ stmt = stmt.parent
+ # use `ast.parse` instead of `ast.Module` for better portability
+ # python3.8 changes the signature of `ast.Module`
+ # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
+ tree = ast.parse("")
+ tree.body = [cast(ast.stmt, stmt)]
+ ast.copy_location(tree, stmt)
+ return tree
+
+
+def is_ipython_cell_code_name(code_name):
+ # type: (str) -> bool
+ return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
+
+
+def is_ipython_cell_filename(filename):
+ # type: (str) -> bool
+ return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
+
+
+def is_ipython_cell_code(code_obj):
+ # type: (types.CodeType) -> bool
+ return (
+ is_ipython_cell_filename(code_obj.co_filename) and
+ is_ipython_cell_code_name(code_obj.co_name)
+ )
+
+
+def find_node_ipython(frame, lasti, stmts, source):
+ # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]]
+ node = decorator = None
+ for stmt in stmts:
+ tree = _extract_ipython_statement(stmt)
+ try:
+ node_finder = NodeFinder(frame, stmts, tree, lasti, source)
+ if (node or decorator) and (node_finder.result or node_finder.decorator):
+ # Found potential nodes in separate statements,
+ # cannot resolve ambiguity, give up here
+ return None, None
+
+ node = node_finder.result
+ decorator = node_finder.decorator
+ except Exception:
+ pass
+ return decorator, node
+
+
+def attr_names_match(attr, argval):
+ # type: (str, str) -> bool
+ """
+ Checks that the user-visible attr (from ast) can correspond to
+ the argval in the bytecode, i.e. the real attribute fetched internally,
+ which may be mangled for private attributes.
+ """
+ if attr == argval:
+ return True
+ if not attr.startswith("__"):
+ return False
+ return bool(re.match(r"^_\w+%s$" % attr, argval))
+
+
+def node_linenos(node):
+ # type: (ast.AST) -> Iterator[int]
+ if hasattr(node, "lineno"):
+ linenos = [] # type: Sequence[int]
+ if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
+ assert node.end_lineno is not None # type: ignore[attr-defined]
+ linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
+ else:
+ linenos = [node.lineno] # type: ignore[attr-defined]
+ for lineno in linenos:
+ yield lineno
+
+
+if sys.version_info >= (3, 11):
+ from ._position_node_finder import PositionNodeFinder as NodeFinder
+else:
+ NodeFinder = SentinelNodeFinder
+
diff --git a/contrib/python/executing/executing/py.typed b/contrib/python/executing/executing/py.typed
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/contrib/python/executing/executing/py.typed
diff --git a/contrib/python/executing/executing/version.py b/contrib/python/executing/executing/version.py
new file mode 100644
index 00000000000..e916218b9c8
--- /dev/null
+++ b/contrib/python/executing/executing/version.py
@@ -0,0 +1 @@
+__version__ = '1.2.0' \ No newline at end of file
diff --git a/contrib/python/executing/ya.make b/contrib/python/executing/ya.make
new file mode 100644
index 00000000000..626c0fd5593
--- /dev/null
+++ b/contrib/python/executing/ya.make
@@ -0,0 +1,27 @@
+# Generated by devtools/yamaker (pypi).
+
+PY3_LIBRARY()
+
+VERSION(1.2.0)
+
+LICENSE(MIT)
+
+NO_LINT()
+
+PY_SRCS(
+ TOP_LEVEL
+ executing/__init__.py
+ executing/_exceptions.py
+ executing/_position_node_finder.py
+ executing/executing.py
+ executing/version.py
+)
+
+RESOURCE_FILES(
+ PREFIX contrib/python/executing/
+ .dist-info/METADATA
+ .dist-info/top_level.txt
+ executing/py.typed
+)
+
+END()