diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:24:06 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-09-29 12:41:34 +0300 |
commit | e0e3e1717e3d33762ce61950504f9637a6e669ed (patch) | |
tree | bca3ff6939b10ed60c3d5c12439963a1146b9711 /contrib/python/ipython/py3/IPython/terminal | |
parent | 38f2c5852db84c7b4d83adfcb009eb61541d1ccd (diff) | |
download | ydb-e0e3e1717e3d33762ce61950504f9637a6e669ed.tar.gz |
add ydb deps
Diffstat (limited to 'contrib/python/ipython/py3/IPython/terminal')
24 files changed, 4994 insertions, 0 deletions
diff --git a/contrib/python/ipython/py3/IPython/terminal/__init__.py b/contrib/python/ipython/py3/IPython/terminal/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/__init__.py diff --git a/contrib/python/ipython/py3/IPython/terminal/console.py b/contrib/python/ipython/py3/IPython/terminal/console.py new file mode 100644 index 0000000000..65571a7572 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/console.py @@ -0,0 +1,19 @@ +""" +Shim to maintain backwards compatibility with old IPython.terminal.console imports. +""" +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +import sys +from warnings import warn + +from IPython.utils.shimmodule import ShimModule, ShimWarning + +warn("The `IPython.terminal.console` package has been deprecated since IPython 4.0. " + "You should import from jupyter_console instead.", ShimWarning) + +# Unconditionally insert the shim into sys.modules so that further import calls +# trigger the custom attribute access above + +sys.modules['IPython.terminal.console'] = ShimModule( + src='IPython.terminal.console', mirror='jupyter_console') diff --git a/contrib/python/ipython/py3/IPython/terminal/debugger.py b/contrib/python/ipython/py3/IPython/terminal/debugger.py new file mode 100644 index 0000000000..7a0623c847 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/debugger.py @@ -0,0 +1,177 @@ +import asyncio +import os +import sys + +from IPython.core.debugger import Pdb +from IPython.core.completer import IPCompleter +from .ptutils import IPythonPTCompleter +from .shortcuts import create_ipython_shortcuts +from . import embed + +from pathlib import Path +from pygments.token import Token +from prompt_toolkit.shortcuts.prompt import PromptSession +from prompt_toolkit.enums import EditingMode +from prompt_toolkit.formatted_text import PygmentsTokens +from prompt_toolkit.history import InMemoryHistory, FileHistory +from concurrent.futures import ThreadPoolExecutor + +from prompt_toolkit import __version__ as ptk_version +PTK3 = ptk_version.startswith('3.') + + +# we want to avoid ptk as much as possible when using subprocesses +# as it uses cursor positioning requests, deletes color .... +_use_simple_prompt = "IPY_TEST_SIMPLE_PROMPT" in os.environ + + +class TerminalPdb(Pdb): + """Standalone IPython debugger.""" + + def __init__(self, *args, pt_session_options=None, **kwargs): + Pdb.__init__(self, *args, **kwargs) + self._ptcomp = None + self.pt_init(pt_session_options) + self.thread_executor = ThreadPoolExecutor(1) + + def pt_init(self, pt_session_options=None): + """Initialize the prompt session and the prompt loop + and store them in self.pt_app and self.pt_loop. + + Additional keyword arguments for the PromptSession class + can be specified in pt_session_options. + """ + if pt_session_options is None: + pt_session_options = {} + + def get_prompt_tokens(): + return [(Token.Prompt, self.prompt)] + + if self._ptcomp is None: + compl = IPCompleter( + shell=self.shell, namespace={}, global_namespace={}, parent=self.shell + ) + # add a completer for all the do_ methods + methods_names = [m[3:] for m in dir(self) if m.startswith("do_")] + + def gen_comp(self, text): + return [m for m in methods_names if m.startswith(text)] + import types + newcomp = types.MethodType(gen_comp, compl) + compl.custom_matchers.insert(0, newcomp) + # end add completer. + + self._ptcomp = IPythonPTCompleter(compl) + + # setup history only when we start pdb + if self.shell.debugger_history is None: + if self.shell.debugger_history_file is not None: + p = Path(self.shell.debugger_history_file).expanduser() + if not p.exists(): + p.touch() + self.debugger_history = FileHistory(os.path.expanduser(str(p))) + else: + self.debugger_history = InMemoryHistory() + else: + self.debugger_history = self.shell.debugger_history + + options = dict( + message=(lambda: PygmentsTokens(get_prompt_tokens())), + editing_mode=getattr(EditingMode, self.shell.editing_mode.upper()), + key_bindings=create_ipython_shortcuts(self.shell), + history=self.debugger_history, + completer=self._ptcomp, + enable_history_search=True, + mouse_support=self.shell.mouse_support, + complete_style=self.shell.pt_complete_style, + style=getattr(self.shell, "style", None), + color_depth=self.shell.color_depth, + ) + + if not PTK3: + options['inputhook'] = self.shell.inputhook + options.update(pt_session_options) + if not _use_simple_prompt: + self.pt_loop = asyncio.new_event_loop() + self.pt_app = PromptSession(**options) + + def cmdloop(self, intro=None): + """Repeatedly issue a prompt, accept input, parse an initial prefix + off the received input, and dispatch to action methods, passing them + the remainder of the line as argument. + + override the same methods from cmd.Cmd to provide prompt toolkit replacement. + """ + if not self.use_rawinput: + raise ValueError('Sorry ipdb does not support use_rawinput=False') + + # In order to make sure that prompt, which uses asyncio doesn't + # interfere with applications in which it's used, we always run the + # prompt itself in a different thread (we can't start an event loop + # within an event loop). This new thread won't have any event loop + # running, and here we run our prompt-loop. + self.preloop() + + try: + if intro is not None: + self.intro = intro + if self.intro: + print(self.intro, file=self.stdout) + stop = None + while not stop: + if self.cmdqueue: + line = self.cmdqueue.pop(0) + else: + self._ptcomp.ipy_completer.namespace = self.curframe_locals + self._ptcomp.ipy_completer.global_namespace = self.curframe.f_globals + + # Run the prompt in a different thread. + if not _use_simple_prompt: + try: + line = self.thread_executor.submit( + self.pt_app.prompt + ).result() + except EOFError: + line = "EOF" + else: + line = input("ipdb> ") + + line = self.precmd(line) + stop = self.onecmd(line) + stop = self.postcmd(stop, line) + self.postloop() + except Exception: + raise + + def do_interact(self, arg): + ipshell = embed.InteractiveShellEmbed( + config=self.shell.config, + banner1="*interactive*", + exit_msg="*exiting interactive console...*", + ) + global_ns = self.curframe.f_globals + ipshell( + module=sys.modules.get(global_ns["__name__"], None), + local_ns=self.curframe_locals, + ) + + +def set_trace(frame=None): + """ + Start debugging from `frame`. + + If frame is not specified, debugging starts from caller's frame. + """ + TerminalPdb().set_trace(frame or sys._getframe().f_back) + + +if __name__ == '__main__': + import pdb + # IPython.core.debugger.Pdb.trace_dispatch shall not catch + # bdb.BdbQuit. When started through __main__ and an exception + # happened after hitting "c", this is needed in order to + # be able to quit the debugging session (see #9950). + old_trace_dispatch = pdb.Pdb.trace_dispatch + pdb.Pdb = TerminalPdb # type: ignore + pdb.Pdb.trace_dispatch = old_trace_dispatch # type: ignore + pdb.main() diff --git a/contrib/python/ipython/py3/IPython/terminal/embed.py b/contrib/python/ipython/py3/IPython/terminal/embed.py new file mode 100644 index 0000000000..ce5ee01ff1 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/embed.py @@ -0,0 +1,420 @@ +# encoding: utf-8 +""" +An embedded IPython shell. +""" +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + + +import sys +import warnings + +from IPython.core import ultratb, compilerop +from IPython.core import magic_arguments +from IPython.core.magic import Magics, magics_class, line_magic +from IPython.core.interactiveshell import DummyMod, InteractiveShell +from IPython.terminal.interactiveshell import TerminalInteractiveShell +from IPython.terminal.ipapp import load_default_config + +from traitlets import Bool, CBool, Unicode +from IPython.utils.io import ask_yes_no + +from typing import Set + +class KillEmbedded(Exception):pass + +# kept for backward compatibility as IPython 6 was released with +# the typo. See https://github.com/ipython/ipython/pull/10706 +KillEmbeded = KillEmbedded + +# This is an additional magic that is exposed in embedded shells. +@magics_class +class EmbeddedMagics(Magics): + + @line_magic + @magic_arguments.magic_arguments() + @magic_arguments.argument('-i', '--instance', action='store_true', + help='Kill instance instead of call location') + @magic_arguments.argument('-x', '--exit', action='store_true', + help='Also exit the current session') + @magic_arguments.argument('-y', '--yes', action='store_true', + help='Do not ask confirmation') + def kill_embedded(self, parameter_s=''): + """%kill_embedded : deactivate for good the current embedded IPython + + This function (after asking for confirmation) sets an internal flag so + that an embedded IPython will never activate again for the given call + location. This is useful to permanently disable a shell that is being + called inside a loop: once you've figured out what you needed from it, + you may then kill it and the program will then continue to run without + the interactive shell interfering again. + + Kill Instance Option: + + If for some reasons you need to kill the location where the instance + is created and not called, for example if you create a single + instance in one place and debug in many locations, you can use the + ``--instance`` option to kill this specific instance. Like for the + ``call location`` killing an "instance" should work even if it is + recreated within a loop. + + .. note:: + + This was the default behavior before IPython 5.2 + + """ + + args = magic_arguments.parse_argstring(self.kill_embedded, parameter_s) + print(args) + if args.instance: + # let no ask + if not args.yes: + kill = ask_yes_no( + "Are you sure you want to kill this embedded instance? [y/N] ", 'n') + else: + kill = True + if kill: + self.shell._disable_init_location() + print("This embedded IPython instance will not reactivate anymore " + "once you exit.") + else: + if not args.yes: + kill = ask_yes_no( + "Are you sure you want to kill this embedded call_location? [y/N] ", 'n') + else: + kill = True + if kill: + self.shell.embedded_active = False + print("This embedded IPython call location will not reactivate anymore " + "once you exit.") + + if args.exit: + # Ask-exit does not really ask, it just set internals flags to exit + # on next loop. + self.shell.ask_exit() + + + @line_magic + def exit_raise(self, parameter_s=''): + """%exit_raise Make the current embedded kernel exit and raise and exception. + + This function sets an internal flag so that an embedded IPython will + raise a `IPython.terminal.embed.KillEmbedded` Exception on exit, and then exit the current I. This is + useful to permanently exit a loop that create IPython embed instance. + """ + + self.shell.should_raise = True + self.shell.ask_exit() + + +class _Sentinel: + def __init__(self, repr): + assert isinstance(repr, str) + self.repr = repr + + def __repr__(self): + return repr + + +class InteractiveShellEmbed(TerminalInteractiveShell): + + dummy_mode = Bool(False) + exit_msg = Unicode('') + embedded = CBool(True) + should_raise = CBool(False) + # Like the base class display_banner is not configurable, but here it + # is True by default. + display_banner = CBool(True) + exit_msg = Unicode() + + # When embedding, by default we don't change the terminal title + term_title = Bool(False, + help="Automatically set the terminal title" + ).tag(config=True) + + _inactive_locations: Set[str] = set() + + def _disable_init_location(self): + """Disable the current Instance creation location""" + InteractiveShellEmbed._inactive_locations.add(self._init_location_id) + + @property + def embedded_active(self): + return (self._call_location_id not in InteractiveShellEmbed._inactive_locations)\ + and (self._init_location_id not in InteractiveShellEmbed._inactive_locations) + + @embedded_active.setter + def embedded_active(self, value): + if value: + InteractiveShellEmbed._inactive_locations.discard( + self._call_location_id) + InteractiveShellEmbed._inactive_locations.discard( + self._init_location_id) + else: + InteractiveShellEmbed._inactive_locations.add( + self._call_location_id) + + def __init__(self, **kw): + assert ( + "user_global_ns" not in kw + ), "Key word argument `user_global_ns` has been replaced by `user_module` since IPython 4.0." + + clid = kw.pop('_init_location_id', None) + if not clid: + frame = sys._getframe(1) + clid = '%s:%s' % (frame.f_code.co_filename, frame.f_lineno) + self._init_location_id = clid + + super(InteractiveShellEmbed,self).__init__(**kw) + + # don't use the ipython crash handler so that user exceptions aren't + # trapped + sys.excepthook = ultratb.FormattedTB(color_scheme=self.colors, + mode=self.xmode, + call_pdb=self.pdb) + + def init_sys_modules(self): + """ + Explicitly overwrite :mod:`IPython.core.interactiveshell` to do nothing. + """ + pass + + def init_magics(self): + super(InteractiveShellEmbed, self).init_magics() + self.register_magics(EmbeddedMagics) + + def __call__( + self, + header="", + local_ns=None, + module=None, + dummy=None, + stack_depth=1, + compile_flags=None, + **kw + ): + """Activate the interactive interpreter. + + __call__(self,header='',local_ns=None,module=None,dummy=None) -> Start + the interpreter shell with the given local and global namespaces, and + optionally print a header string at startup. + + The shell can be globally activated/deactivated using the + dummy_mode attribute. This allows you to turn off a shell used + for debugging globally. + + However, *each* time you call the shell you can override the current + state of dummy_mode with the optional keyword parameter 'dummy'. For + example, if you set dummy mode on with IPShell.dummy_mode = True, you + can still have a specific call work by making it as IPShell(dummy=False). + """ + + # we are called, set the underlying interactiveshell not to exit. + self.keep_running = True + + # If the user has turned it off, go away + clid = kw.pop('_call_location_id', None) + if not clid: + frame = sys._getframe(1) + clid = '%s:%s' % (frame.f_code.co_filename, frame.f_lineno) + self._call_location_id = clid + + if not self.embedded_active: + return + + # Normal exits from interactive mode set this flag, so the shell can't + # re-enter (it checks this variable at the start of interactive mode). + self.exit_now = False + + # Allow the dummy parameter to override the global __dummy_mode + if dummy or (dummy != 0 and self.dummy_mode): + return + + # self.banner is auto computed + if header: + self.old_banner2 = self.banner2 + self.banner2 = self.banner2 + '\n' + header + '\n' + else: + self.old_banner2 = '' + + if self.display_banner: + self.show_banner() + + # Call the embedding code with a stack depth of 1 so it can skip over + # our call and get the original caller's namespaces. + self.mainloop( + local_ns, module, stack_depth=stack_depth, compile_flags=compile_flags + ) + + self.banner2 = self.old_banner2 + + if self.exit_msg is not None: + print(self.exit_msg) + + if self.should_raise: + raise KillEmbedded('Embedded IPython raising error, as user requested.') + + def mainloop( + self, + local_ns=None, + module=None, + stack_depth=0, + compile_flags=None, + ): + """Embeds IPython into a running python program. + + Parameters + ---------- + local_ns, module + Working local namespace (a dict) and module (a module or similar + object). If given as None, they are automatically taken from the scope + where the shell was called, so that program variables become visible. + stack_depth : int + How many levels in the stack to go to looking for namespaces (when + local_ns or module is None). This allows an intermediate caller to + make sure that this function gets the namespace from the intended + level in the stack. By default (0) it will get its locals and globals + from the immediate caller. + compile_flags + A bit field identifying the __future__ features + that are enabled, as passed to the builtin :func:`compile` function. + If given as None, they are automatically taken from the scope where + the shell was called. + + """ + + # Get locals and globals from caller + if ((local_ns is None or module is None or compile_flags is None) + and self.default_user_namespaces): + call_frame = sys._getframe(stack_depth).f_back + + if local_ns is None: + local_ns = call_frame.f_locals + if module is None: + global_ns = call_frame.f_globals + try: + module = sys.modules[global_ns['__name__']] + except KeyError: + warnings.warn("Failed to get module %s" % \ + global_ns.get('__name__', 'unknown module') + ) + module = DummyMod() + module.__dict__ = global_ns + if compile_flags is None: + compile_flags = (call_frame.f_code.co_flags & + compilerop.PyCF_MASK) + + # Save original namespace and module so we can restore them after + # embedding; otherwise the shell doesn't shut down correctly. + orig_user_module = self.user_module + orig_user_ns = self.user_ns + orig_compile_flags = self.compile.flags + + # Update namespaces and fire up interpreter + + # The global one is easy, we can just throw it in + if module is not None: + self.user_module = module + + # But the user/local one is tricky: ipython needs it to store internal + # data, but we also need the locals. We'll throw our hidden variables + # like _ih and get_ipython() into the local namespace, but delete them + # later. + if local_ns is not None: + reentrant_local_ns = {k: v for (k, v) in local_ns.items() if k not in self.user_ns_hidden.keys()} + self.user_ns = reentrant_local_ns + self.init_user_ns() + + # Compiler flags + if compile_flags is not None: + self.compile.flags = compile_flags + + # make sure the tab-completer has the correct frame information, so it + # actually completes using the frame's locals/globals + self.set_completer_frame() + + with self.builtin_trap, self.display_trap: + self.interact() + + # now, purge out the local namespace of IPython's hidden variables. + if local_ns is not None: + local_ns.update({k: v for (k, v) in self.user_ns.items() if k not in self.user_ns_hidden.keys()}) + + + # Restore original namespace so shell can shut down when we exit. + self.user_module = orig_user_module + self.user_ns = orig_user_ns + self.compile.flags = orig_compile_flags + + +def embed(*, header="", compile_flags=None, **kwargs): + """Call this to embed IPython at the current point in your program. + + The first invocation of this will create a :class:`terminal.embed.InteractiveShellEmbed` + instance and then call it. Consecutive calls just call the already + created instance. + + If you don't want the kernel to initialize the namespace + from the scope of the surrounding function, + and/or you want to load full IPython configuration, + you probably want `IPython.start_ipython()` instead. + + Here is a simple example:: + + from IPython import embed + a = 10 + b = 20 + embed(header='First time') + c = 30 + d = 40 + embed() + + Parameters + ---------- + + header : str + Optional header string to print at startup. + compile_flags + Passed to the `compile_flags` parameter of :py:meth:`terminal.embed.InteractiveShellEmbed.mainloop()`, + which is called when the :class:`terminal.embed.InteractiveShellEmbed` instance is called. + **kwargs : various, optional + Any other kwargs will be passed to the :class:`terminal.embed.InteractiveShellEmbed` constructor. + Full customization can be done by passing a traitlets :class:`Config` in as the + `config` argument (see :ref:`configure_start_ipython` and :ref:`terminal_options`). + """ + config = kwargs.get('config') + if config is None: + config = load_default_config() + config.InteractiveShellEmbed = config.TerminalInteractiveShell + kwargs['config'] = config + using = kwargs.get('using', 'sync') + if using : + kwargs['config'].update({'TerminalInteractiveShell':{'loop_runner':using, 'colors':'NoColor', 'autoawait': using!='sync'}}) + #save ps1/ps2 if defined + ps1 = None + ps2 = None + try: + ps1 = sys.ps1 + ps2 = sys.ps2 + except AttributeError: + pass + #save previous instance + saved_shell_instance = InteractiveShell._instance + if saved_shell_instance is not None: + cls = type(saved_shell_instance) + cls.clear_instance() + frame = sys._getframe(1) + shell = InteractiveShellEmbed.instance(_init_location_id='%s:%s' % ( + frame.f_code.co_filename, frame.f_lineno), **kwargs) + shell(header=header, stack_depth=2, compile_flags=compile_flags, + _call_location_id='%s:%s' % (frame.f_code.co_filename, frame.f_lineno)) + InteractiveShellEmbed.clear_instance() + #restore previous instance + if saved_shell_instance is not None: + cls = type(saved_shell_instance) + cls.clear_instance() + for subclass in cls._walk_mro(): + subclass._instance = saved_shell_instance + if ps1 is not None: + sys.ps1 = ps1 + sys.ps2 = ps2 diff --git a/contrib/python/ipython/py3/IPython/terminal/interactiveshell.py b/contrib/python/ipython/py3/IPython/terminal/interactiveshell.py new file mode 100644 index 0000000000..75cf25ea66 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/interactiveshell.py @@ -0,0 +1,993 @@ +"""IPython terminal interface using prompt_toolkit""" + +import asyncio +import os +import sys +from warnings import warn +from typing import Union as UnionType + +from IPython.core.async_helpers import get_asyncio_loop +from IPython.core.interactiveshell import InteractiveShell, InteractiveShellABC +from IPython.utils.py3compat import input +from IPython.utils.terminal import toggle_set_term_title, set_term_title, restore_term_title +from IPython.utils.process import abbrev_cwd +from traitlets import ( + Bool, + Unicode, + Dict, + Integer, + List, + observe, + Instance, + Type, + default, + Enum, + Union, + Any, + validate, + Float, +) + +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode +from prompt_toolkit.filters import HasFocus, Condition, IsDone +from prompt_toolkit.formatted_text import PygmentsTokens +from prompt_toolkit.history import History +from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor +from prompt_toolkit.output import ColorDepth +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit.shortcuts import PromptSession, CompleteStyle, print_formatted_text +from prompt_toolkit.styles import DynamicStyle, merge_styles +from prompt_toolkit.styles.pygments import style_from_pygments_cls, style_from_pygments_dict +from prompt_toolkit import __version__ as ptk_version + +from pygments.styles import get_style_by_name +from pygments.style import Style +from pygments.token import Token + +from .debugger import TerminalPdb, Pdb +from .magics import TerminalMagics +from .pt_inputhooks import get_inputhook_name_and_func +from .prompts import Prompts, ClassicPrompts, RichPromptDisplayHook +from .ptutils import IPythonPTCompleter, IPythonPTLexer +from .shortcuts import ( + KEY_BINDINGS, + create_ipython_shortcuts, + create_identifier, + RuntimeBinding, + add_binding, +) +from .shortcuts.filters import KEYBINDING_FILTERS, filter_from_string +from .shortcuts.auto_suggest import ( + NavigableAutoSuggestFromHistory, + AppendAutoSuggestionInAnyLine, +) + +PTK3 = ptk_version.startswith('3.') + + +class _NoStyle(Style): pass + + + +_style_overrides_light_bg = { + Token.Prompt: '#ansibrightblue', + Token.PromptNum: '#ansiblue bold', + Token.OutPrompt: '#ansibrightred', + Token.OutPromptNum: '#ansired bold', +} + +_style_overrides_linux = { + Token.Prompt: '#ansibrightgreen', + Token.PromptNum: '#ansigreen bold', + Token.OutPrompt: '#ansibrightred', + Token.OutPromptNum: '#ansired bold', +} + +def get_default_editor(): + try: + return os.environ['EDITOR'] + except KeyError: + pass + except UnicodeError: + warn("$EDITOR environment variable is not pure ASCII. Using platform " + "default editor.") + + if os.name == 'posix': + return 'vi' # the only one guaranteed to be there! + else: + return 'notepad' # same in Windows! + +# conservatively check for tty +# overridden streams can result in things like: +# - sys.stdin = None +# - no isatty method +for _name in ('stdin', 'stdout', 'stderr'): + _stream = getattr(sys, _name) + try: + if not _stream or not hasattr(_stream, "isatty") or not _stream.isatty(): + _is_tty = False + break + except ValueError: + # stream is closed + _is_tty = False + break +else: + _is_tty = True + + +_use_simple_prompt = ('IPY_TEST_SIMPLE_PROMPT' in os.environ) or (not _is_tty) + +def black_reformat_handler(text_before_cursor): + """ + We do not need to protect against error, + this is taken care at a higher level where any reformat error is ignored. + Indeed we may call reformatting on incomplete code. + """ + import black + + formatted_text = black.format_str(text_before_cursor, mode=black.FileMode()) + if not text_before_cursor.endswith("\n") and formatted_text.endswith("\n"): + formatted_text = formatted_text[:-1] + return formatted_text + + +def yapf_reformat_handler(text_before_cursor): + from yapf.yapflib import file_resources + from yapf.yapflib import yapf_api + + style_config = file_resources.GetDefaultStyleForDir(os.getcwd()) + formatted_text, was_formatted = yapf_api.FormatCode( + text_before_cursor, style_config=style_config + ) + if was_formatted: + if not text_before_cursor.endswith("\n") and formatted_text.endswith("\n"): + formatted_text = formatted_text[:-1] + return formatted_text + else: + return text_before_cursor + + +class PtkHistoryAdapter(History): + """ + Prompt toolkit has it's own way of handling history, Where it assumes it can + Push/pull from history. + + """ + + def __init__(self, shell): + super().__init__() + self.shell = shell + self._refresh() + + def append_string(self, string): + # we rely on sql for that. + self._loaded = False + self._refresh() + + def _refresh(self): + if not self._loaded: + self._loaded_strings = list(self.load_history_strings()) + + def load_history_strings(self): + last_cell = "" + res = [] + for __, ___, cell in self.shell.history_manager.get_tail( + self.shell.history_load_length, include_latest=True + ): + # Ignore blank lines and consecutive duplicates + cell = cell.rstrip() + if cell and (cell != last_cell): + res.append(cell) + last_cell = cell + yield from res[::-1] + + def store_string(self, string: str) -> None: + pass + +class TerminalInteractiveShell(InteractiveShell): + mime_renderers = Dict().tag(config=True) + + space_for_menu = Integer(6, help='Number of line at the bottom of the screen ' + 'to reserve for the tab completion menu, ' + 'search history, ...etc, the height of ' + 'these menus will at most this value. ' + 'Increase it is you prefer long and skinny ' + 'menus, decrease for short and wide.' + ).tag(config=True) + + pt_app: UnionType[PromptSession, None] = None + auto_suggest: UnionType[ + AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None + ] = None + debugger_history = None + + debugger_history_file = Unicode( + "~/.pdbhistory", help="File in which to store and read history" + ).tag(config=True) + + simple_prompt = Bool(_use_simple_prompt, + help="""Use `raw_input` for the REPL, without completion and prompt colors. + + Useful when controlling IPython as a subprocess, and piping STDIN/OUT/ERR. Known usage are: + IPython own testing machinery, and emacs inferior-shell integration through elpy. + + This mode default to `True` if the `IPY_TEST_SIMPLE_PROMPT` + environment variable is set, or the current terminal is not a tty.""" + ).tag(config=True) + + @property + def debugger_cls(self): + return Pdb if self.simple_prompt else TerminalPdb + + confirm_exit = Bool(True, + help=""" + Set to confirm when you try to exit IPython with an EOF (Control-D + in Unix, Control-Z/Enter in Windows). By typing 'exit' or 'quit', + you can force a direct exit without any confirmation.""", + ).tag(config=True) + + editing_mode = Unicode('emacs', + help="Shortcut style to use at the prompt. 'vi' or 'emacs'.", + ).tag(config=True) + + emacs_bindings_in_vi_insert_mode = Bool( + True, + help="Add shortcuts from 'emacs' insert mode to 'vi' insert mode.", + ).tag(config=True) + + modal_cursor = Bool( + True, + help=""" + Cursor shape changes depending on vi mode: beam in vi insert mode, + block in nav mode, underscore in replace mode.""", + ).tag(config=True) + + ttimeoutlen = Float( + 0.01, + help="""The time in milliseconds that is waited for a key code + to complete.""", + ).tag(config=True) + + timeoutlen = Float( + 0.5, + help="""The time in milliseconds that is waited for a mapped key + sequence to complete.""", + ).tag(config=True) + + autoformatter = Unicode( + None, + help="Autoformatter to reformat Terminal code. Can be `'black'`, `'yapf'` or `None`", + allow_none=True + ).tag(config=True) + + auto_match = Bool( + False, + help=""" + Automatically add/delete closing bracket or quote when opening bracket or quote is entered/deleted. + Brackets: (), [], {} + Quotes: '', \"\" + """, + ).tag(config=True) + + mouse_support = Bool(False, + help="Enable mouse support in the prompt\n(Note: prevents selecting text with the mouse)" + ).tag(config=True) + + # We don't load the list of styles for the help string, because loading + # Pygments plugins takes time and can cause unexpected errors. + highlighting_style = Union([Unicode('legacy'), Type(klass=Style)], + help="""The name or class of a Pygments style to use for syntax + highlighting. To see available styles, run `pygmentize -L styles`.""" + ).tag(config=True) + + @validate('editing_mode') + def _validate_editing_mode(self, proposal): + if proposal['value'].lower() == 'vim': + proposal['value']= 'vi' + elif proposal['value'].lower() == 'default': + proposal['value']= 'emacs' + + if hasattr(EditingMode, proposal['value'].upper()): + return proposal['value'].lower() + + return self.editing_mode + + + @observe('editing_mode') + def _editing_mode(self, change): + if self.pt_app: + self.pt_app.editing_mode = getattr(EditingMode, change.new.upper()) + + def _set_formatter(self, formatter): + if formatter is None: + self.reformat_handler = lambda x:x + elif formatter == 'black': + self.reformat_handler = black_reformat_handler + elif formatter == "yapf": + self.reformat_handler = yapf_reformat_handler + else: + raise ValueError + + @observe("autoformatter") + def _autoformatter_changed(self, change): + formatter = change.new + self._set_formatter(formatter) + + @observe('highlighting_style') + @observe('colors') + def _highlighting_style_changed(self, change): + self.refresh_style() + + def refresh_style(self): + self._style = self._make_style_from_name_or_cls(self.highlighting_style) + + + highlighting_style_overrides = Dict( + help="Override highlighting format for specific tokens" + ).tag(config=True) + + true_color = Bool(False, + help="""Use 24bit colors instead of 256 colors in prompt highlighting. + If your terminal supports true color, the following command should + print ``TRUECOLOR`` in orange:: + + printf \"\\x1b[38;2;255;100;0mTRUECOLOR\\x1b[0m\\n\" + """, + ).tag(config=True) + + editor = Unicode(get_default_editor(), + help="Set the editor used by IPython (default to $EDITOR/vi/notepad)." + ).tag(config=True) + + prompts_class = Type(Prompts, help='Class used to generate Prompt token for prompt_toolkit').tag(config=True) + + prompts = Instance(Prompts) + + @default('prompts') + def _prompts_default(self): + return self.prompts_class(self) + +# @observe('prompts') +# def _(self, change): +# self._update_layout() + + @default('displayhook_class') + def _displayhook_class_default(self): + return RichPromptDisplayHook + + term_title = Bool(True, + help="Automatically set the terminal title" + ).tag(config=True) + + term_title_format = Unicode("IPython: {cwd}", + help="Customize the terminal title format. This is a python format string. " + + "Available substitutions are: {cwd}." + ).tag(config=True) + + display_completions = Enum(('column', 'multicolumn','readlinelike'), + help= ( "Options for displaying tab completions, 'column', 'multicolumn', and " + "'readlinelike'. These options are for `prompt_toolkit`, see " + "`prompt_toolkit` documentation for more information." + ), + default_value='multicolumn').tag(config=True) + + highlight_matching_brackets = Bool(True, + help="Highlight matching brackets.", + ).tag(config=True) + + extra_open_editor_shortcuts = Bool(False, + help="Enable vi (v) or Emacs (C-X C-E) shortcuts to open an external editor. " + "This is in addition to the F2 binding, which is always enabled." + ).tag(config=True) + + handle_return = Any(None, + help="Provide an alternative handler to be called when the user presses " + "Return. This is an advanced option intended for debugging, which " + "may be changed or removed in later releases." + ).tag(config=True) + + enable_history_search = Bool(True, + help="Allows to enable/disable the prompt toolkit history search" + ).tag(config=True) + + autosuggestions_provider = Unicode( + "NavigableAutoSuggestFromHistory", + help="Specifies from which source automatic suggestions are provided. " + "Can be set to ``'NavigableAutoSuggestFromHistory'`` (:kbd:`up` and " + ":kbd:`down` swap suggestions), ``'AutoSuggestFromHistory'``, " + " or ``None`` to disable automatic suggestions. " + "Default is `'NavigableAutoSuggestFromHistory`'.", + allow_none=True, + ).tag(config=True) + + def _set_autosuggestions(self, provider): + # disconnect old handler + if self.auto_suggest and isinstance( + self.auto_suggest, NavigableAutoSuggestFromHistory + ): + self.auto_suggest.disconnect() + if provider is None: + self.auto_suggest = None + elif provider == "AutoSuggestFromHistory": + self.auto_suggest = AutoSuggestFromHistory() + elif provider == "NavigableAutoSuggestFromHistory": + self.auto_suggest = NavigableAutoSuggestFromHistory() + else: + raise ValueError("No valid provider.") + if self.pt_app: + self.pt_app.auto_suggest = self.auto_suggest + + @observe("autosuggestions_provider") + def _autosuggestions_provider_changed(self, change): + provider = change.new + self._set_autosuggestions(provider) + + shortcuts = List( + trait=Dict( + key_trait=Enum( + [ + "command", + "match_keys", + "match_filter", + "new_keys", + "new_filter", + "create", + ] + ), + per_key_traits={ + "command": Unicode(), + "match_keys": List(Unicode()), + "match_filter": Unicode(), + "new_keys": List(Unicode()), + "new_filter": Unicode(), + "create": Bool(False), + }, + ), + help="""Add, disable or modifying shortcuts. + + Each entry on the list should be a dictionary with ``command`` key + identifying the target function executed by the shortcut and at least + one of the following: + + - ``match_keys``: list of keys used to match an existing shortcut, + - ``match_filter``: shortcut filter used to match an existing shortcut, + - ``new_keys``: list of keys to set, + - ``new_filter``: a new shortcut filter to set + + The filters have to be composed of pre-defined verbs and joined by one + of the following conjunctions: ``&`` (and), ``|`` (or), ``~`` (not). + The pre-defined verbs are: + + {} + + + To disable a shortcut set ``new_keys`` to an empty list. + To add a shortcut add key ``create`` with value ``True``. + + When modifying/disabling shortcuts, ``match_keys``/``match_filter`` can + be omitted if the provided specification uniquely identifies a shortcut + to be modified/disabled. When modifying a shortcut ``new_filter`` or + ``new_keys`` can be omitted which will result in reuse of the existing + filter/keys. + + Only shortcuts defined in IPython (and not default prompt-toolkit + shortcuts) can be modified or disabled. The full list of shortcuts, + command identifiers and filters is available under + :ref:`terminal-shortcuts-list`. + """.format( + "\n ".join([f"- `{k}`" for k in KEYBINDING_FILTERS]) + ), + ).tag(config=True) + + @observe("shortcuts") + def _shortcuts_changed(self, change): + if self.pt_app: + self.pt_app.key_bindings = self._merge_shortcuts(user_shortcuts=change.new) + + def _merge_shortcuts(self, user_shortcuts): + # rebuild the bindings list from scratch + key_bindings = create_ipython_shortcuts(self) + + # for now we only allow adding shortcuts for commands which are already + # registered; this is a security precaution. + known_commands = { + create_identifier(binding.command): binding.command + for binding in KEY_BINDINGS + } + shortcuts_to_skip = [] + shortcuts_to_add = [] + + for shortcut in user_shortcuts: + command_id = shortcut["command"] + if command_id not in known_commands: + allowed_commands = "\n - ".join(known_commands) + raise ValueError( + f"{command_id} is not a known shortcut command." + f" Allowed commands are: \n - {allowed_commands}" + ) + old_keys = shortcut.get("match_keys", None) + old_filter = ( + filter_from_string(shortcut["match_filter"]) + if "match_filter" in shortcut + else None + ) + matching = [ + binding + for binding in KEY_BINDINGS + if ( + (old_filter is None or binding.filter == old_filter) + and (old_keys is None or [k for k in binding.keys] == old_keys) + and create_identifier(binding.command) == command_id + ) + ] + + new_keys = shortcut.get("new_keys", None) + new_filter = shortcut.get("new_filter", None) + + command = known_commands[command_id] + + creating_new = shortcut.get("create", False) + modifying_existing = not creating_new and ( + new_keys is not None or new_filter + ) + + if creating_new and new_keys == []: + raise ValueError("Cannot add a shortcut without keys") + + if modifying_existing: + specification = { + key: shortcut[key] + for key in ["command", "filter"] + if key in shortcut + } + if len(matching) == 0: + raise ValueError( + f"No shortcuts matching {specification} found in {KEY_BINDINGS}" + ) + elif len(matching) > 1: + raise ValueError( + f"Multiple shortcuts matching {specification} found," + f" please add keys/filter to select one of: {matching}" + ) + + matched = matching[0] + old_filter = matched.filter + old_keys = list(matched.keys) + shortcuts_to_skip.append( + RuntimeBinding( + command, + keys=old_keys, + filter=old_filter, + ) + ) + + if new_keys != []: + shortcuts_to_add.append( + RuntimeBinding( + command, + keys=new_keys or old_keys, + filter=filter_from_string(new_filter) + if new_filter is not None + else ( + old_filter + if old_filter is not None + else filter_from_string("always") + ), + ) + ) + + # rebuild the bindings list from scratch + key_bindings = create_ipython_shortcuts(self, skip=shortcuts_to_skip) + for binding in shortcuts_to_add: + add_binding(key_bindings, binding) + + return key_bindings + + prompt_includes_vi_mode = Bool(True, + help="Display the current vi mode (when using vi editing mode)." + ).tag(config=True) + + @observe('term_title') + def init_term_title(self, change=None): + # Enable or disable the terminal title. + if self.term_title and _is_tty: + toggle_set_term_title(True) + set_term_title(self.term_title_format.format(cwd=abbrev_cwd())) + else: + toggle_set_term_title(False) + + def restore_term_title(self): + if self.term_title and _is_tty: + restore_term_title() + + def init_display_formatter(self): + super(TerminalInteractiveShell, self).init_display_formatter() + # terminal only supports plain text + self.display_formatter.active_types = ["text/plain"] + + def init_prompt_toolkit_cli(self): + if self.simple_prompt: + # Fall back to plain non-interactive output for tests. + # This is very limited. + def prompt(): + prompt_text = "".join(x[1] for x in self.prompts.in_prompt_tokens()) + lines = [input(prompt_text)] + prompt_continuation = "".join(x[1] for x in self.prompts.continuation_prompt_tokens()) + while self.check_complete('\n'.join(lines))[0] == 'incomplete': + lines.append( input(prompt_continuation) ) + return '\n'.join(lines) + self.prompt_for_code = prompt + return + + # Set up keyboard shortcuts + key_bindings = self._merge_shortcuts(user_shortcuts=self.shortcuts) + + # Pre-populate history from IPython's history database + history = PtkHistoryAdapter(self) + + self._style = self._make_style_from_name_or_cls(self.highlighting_style) + self.style = DynamicStyle(lambda: self._style) + + editing_mode = getattr(EditingMode, self.editing_mode.upper()) + + self.pt_loop = asyncio.new_event_loop() + self.pt_app = PromptSession( + auto_suggest=self.auto_suggest, + editing_mode=editing_mode, + key_bindings=key_bindings, + history=history, + completer=IPythonPTCompleter(shell=self), + enable_history_search=self.enable_history_search, + style=self.style, + include_default_pygments_style=False, + mouse_support=self.mouse_support, + enable_open_in_editor=self.extra_open_editor_shortcuts, + color_depth=self.color_depth, + tempfile_suffix=".py", + **self._extra_prompt_options(), + ) + if isinstance(self.auto_suggest, NavigableAutoSuggestFromHistory): + self.auto_suggest.connect(self.pt_app) + + def _make_style_from_name_or_cls(self, name_or_cls): + """ + Small wrapper that make an IPython compatible style from a style name + + We need that to add style for prompt ... etc. + """ + style_overrides = {} + if name_or_cls == 'legacy': + legacy = self.colors.lower() + if legacy == 'linux': + style_cls = get_style_by_name('monokai') + style_overrides = _style_overrides_linux + elif legacy == 'lightbg': + style_overrides = _style_overrides_light_bg + style_cls = get_style_by_name('pastie') + elif legacy == 'neutral': + # The default theme needs to be visible on both a dark background + # and a light background, because we can't tell what the terminal + # looks like. These tweaks to the default theme help with that. + style_cls = get_style_by_name('default') + style_overrides.update({ + Token.Number: '#ansigreen', + Token.Operator: 'noinherit', + Token.String: '#ansiyellow', + Token.Name.Function: '#ansiblue', + Token.Name.Class: 'bold #ansiblue', + Token.Name.Namespace: 'bold #ansiblue', + Token.Name.Variable.Magic: '#ansiblue', + Token.Prompt: '#ansigreen', + Token.PromptNum: '#ansibrightgreen bold', + Token.OutPrompt: '#ansired', + Token.OutPromptNum: '#ansibrightred bold', + }) + + # Hack: Due to limited color support on the Windows console + # the prompt colors will be wrong without this + if os.name == 'nt': + style_overrides.update({ + Token.Prompt: '#ansidarkgreen', + Token.PromptNum: '#ansigreen bold', + Token.OutPrompt: '#ansidarkred', + Token.OutPromptNum: '#ansired bold', + }) + elif legacy =='nocolor': + style_cls=_NoStyle + style_overrides = {} + else : + raise ValueError('Got unknown colors: ', legacy) + else : + if isinstance(name_or_cls, str): + style_cls = get_style_by_name(name_or_cls) + else: + style_cls = name_or_cls + style_overrides = { + Token.Prompt: '#ansigreen', + Token.PromptNum: '#ansibrightgreen bold', + Token.OutPrompt: '#ansired', + Token.OutPromptNum: '#ansibrightred bold', + } + style_overrides.update(self.highlighting_style_overrides) + style = merge_styles([ + style_from_pygments_cls(style_cls), + style_from_pygments_dict(style_overrides), + ]) + + return style + + @property + def pt_complete_style(self): + return { + 'multicolumn': CompleteStyle.MULTI_COLUMN, + 'column': CompleteStyle.COLUMN, + 'readlinelike': CompleteStyle.READLINE_LIKE, + }[self.display_completions] + + @property + def color_depth(self): + return (ColorDepth.TRUE_COLOR if self.true_color else None) + + def _extra_prompt_options(self): + """ + Return the current layout option for the current Terminal InteractiveShell + """ + def get_message(): + return PygmentsTokens(self.prompts.in_prompt_tokens()) + + if self.editing_mode == 'emacs': + # with emacs mode the prompt is (usually) static, so we call only + # the function once. With VI mode it can toggle between [ins] and + # [nor] so we can't precompute. + # here I'm going to favor the default keybinding which almost + # everybody uses to decrease CPU usage. + # if we have issues with users with custom Prompts we can see how to + # work around this. + get_message = get_message() + + options = { + "complete_in_thread": False, + "lexer": IPythonPTLexer(), + "reserve_space_for_menu": self.space_for_menu, + "message": get_message, + "prompt_continuation": ( + lambda width, lineno, is_soft_wrap: PygmentsTokens( + self.prompts.continuation_prompt_tokens(width) + ) + ), + "multiline": True, + "complete_style": self.pt_complete_style, + "input_processors": [ + # Highlight matching brackets, but only when this setting is + # enabled, and only when the DEFAULT_BUFFER has the focus. + ConditionalProcessor( + processor=HighlightMatchingBracketProcessor(chars="[](){}"), + filter=HasFocus(DEFAULT_BUFFER) + & ~IsDone() + & Condition(lambda: self.highlight_matching_brackets), + ), + # Show auto-suggestion in lines other than the last line. + ConditionalProcessor( + processor=AppendAutoSuggestionInAnyLine(), + filter=HasFocus(DEFAULT_BUFFER) + & ~IsDone() + & Condition( + lambda: isinstance( + self.auto_suggest, NavigableAutoSuggestFromHistory + ) + ), + ), + ], + } + if not PTK3: + options['inputhook'] = self.inputhook + + return options + + def prompt_for_code(self): + if self.rl_next_input: + default = self.rl_next_input + self.rl_next_input = None + else: + default = '' + + # In order to make sure that asyncio code written in the + # interactive shell doesn't interfere with the prompt, we run the + # prompt in a different event loop. + # If we don't do this, people could spawn coroutine with a + # while/true inside which will freeze the prompt. + + policy = asyncio.get_event_loop_policy() + old_loop = get_asyncio_loop() + + # FIXME: prompt_toolkit is using the deprecated `asyncio.get_event_loop` + # to get the current event loop. + # This will probably be replaced by an attribute or input argument, + # at which point we can stop calling the soon-to-be-deprecated `set_event_loop` here. + if old_loop is not self.pt_loop: + policy.set_event_loop(self.pt_loop) + try: + with patch_stdout(raw=True): + text = self.pt_app.prompt( + default=default, + **self._extra_prompt_options()) + finally: + # Restore the original event loop. + if old_loop is not None and old_loop is not self.pt_loop: + policy.set_event_loop(old_loop) + + return text + + def enable_win_unicode_console(self): + # Since IPython 7.10 doesn't support python < 3.6 and PEP 528, Python uses the unicode APIs for the Windows + # console by default, so WUC shouldn't be needed. + warn("`enable_win_unicode_console` is deprecated since IPython 7.10, does not do anything and will be removed in the future", + DeprecationWarning, + stacklevel=2) + + def init_io(self): + if sys.platform not in {'win32', 'cli'}: + return + + import colorama + colorama.init() + + def init_magics(self): + super(TerminalInteractiveShell, self).init_magics() + self.register_magics(TerminalMagics) + + def init_alias(self): + # The parent class defines aliases that can be safely used with any + # frontend. + super(TerminalInteractiveShell, self).init_alias() + + # Now define aliases that only make sense on the terminal, because they + # need direct access to the console in a way that we can't emulate in + # GUI or web frontend + if os.name == 'posix': + for cmd in ('clear', 'more', 'less', 'man'): + self.alias_manager.soft_define_alias(cmd, cmd) + + + def __init__(self, *args, **kwargs) -> None: + super(TerminalInteractiveShell, self).__init__(*args, **kwargs) + self._set_autosuggestions(self.autosuggestions_provider) + self.init_prompt_toolkit_cli() + self.init_term_title() + self.keep_running = True + self._set_formatter(self.autoformatter) + + + def ask_exit(self): + self.keep_running = False + + rl_next_input = None + + def interact(self): + self.keep_running = True + while self.keep_running: + print(self.separate_in, end='') + + try: + code = self.prompt_for_code() + except EOFError: + if (not self.confirm_exit) \ + or self.ask_yes_no('Do you really want to exit ([y]/n)?','y','n'): + self.ask_exit() + + else: + if code: + self.run_cell(code, store_history=True) + + def mainloop(self): + # An extra layer of protection in case someone mashing Ctrl-C breaks + # out of our internal code. + while True: + try: + self.interact() + break + except KeyboardInterrupt as e: + print("\n%s escaped interact()\n" % type(e).__name__) + finally: + # An interrupt during the eventloop will mess up the + # internal state of the prompt_toolkit library. + # Stopping the eventloop fixes this, see + # https://github.com/ipython/ipython/pull/9867 + if hasattr(self, '_eventloop'): + self._eventloop.stop() + + self.restore_term_title() + + # try to call some at-exit operation optimistically as some things can't + # be done during interpreter shutdown. this is technically inaccurate as + # this make mainlool not re-callable, but that should be a rare if not + # in existent use case. + + self._atexit_once() + + + _inputhook = None + def inputhook(self, context): + if self._inputhook is not None: + self._inputhook(context) + + active_eventloop = None + def enable_gui(self, gui=None): + if self._inputhook is None and gui is None: + print("No event loop hook running.") + return + + if self._inputhook is not None and gui is not None: + print( + f"Shell is already running a gui event loop for {self.active_eventloop}. " + "Call with no arguments to disable the current loop." + ) + return + if self._inputhook is not None and gui is None: + self.active_eventloop = self._inputhook = None + + if gui and (gui not in {"inline", "webagg"}): + # This hook runs with each cycle of the `prompt_toolkit`'s event loop. + self.active_eventloop, self._inputhook = get_inputhook_name_and_func(gui) + else: + self.active_eventloop = self._inputhook = None + + # For prompt_toolkit 3.0. We have to create an asyncio event loop with + # this inputhook. + if PTK3: + import asyncio + from prompt_toolkit.eventloop import new_eventloop_with_inputhook + + if gui == 'asyncio': + # When we integrate the asyncio event loop, run the UI in the + # same event loop as the rest of the code. don't use an actual + # input hook. (Asyncio is not made for nesting event loops.) + self.pt_loop = get_asyncio_loop() + print("Installed asyncio event loop hook.") + + elif self._inputhook: + # If an inputhook was set, create a new asyncio event loop with + # this inputhook for the prompt. + self.pt_loop = new_eventloop_with_inputhook(self._inputhook) + print(f"Installed {self.active_eventloop} event loop hook.") + else: + # When there's no inputhook, run the prompt in a separate + # asyncio event loop. + self.pt_loop = asyncio.new_event_loop() + print("GUI event loop hook disabled.") + + # Run !system commands directly, not through pipes, so terminal programs + # work correctly. + system = InteractiveShell.system_raw + + def auto_rewrite_input(self, cmd): + """Overridden from the parent class to use fancy rewriting prompt""" + if not self.show_rewritten_input: + return + + tokens = self.prompts.rewrite_prompt_tokens() + if self.pt_app: + print_formatted_text(PygmentsTokens(tokens), end='', + style=self.pt_app.app.style) + print(cmd) + else: + prompt = ''.join(s for t, s in tokens) + print(prompt, cmd, sep='') + + _prompts_before = None + def switch_doctest_mode(self, mode): + """Switch prompts to classic for %doctest_mode""" + if mode: + self._prompts_before = self.prompts + self.prompts = ClassicPrompts(self) + elif self._prompts_before: + self.prompts = self._prompts_before + self._prompts_before = None +# self._update_layout() + + +InteractiveShellABC.register(TerminalInteractiveShell) + +if __name__ == '__main__': + TerminalInteractiveShell.instance().interact() diff --git a/contrib/python/ipython/py3/IPython/terminal/ipapp.py b/contrib/python/ipython/py3/IPython/terminal/ipapp.py new file mode 100644 index 0000000000..6280bce3b2 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/ipapp.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +The :class:`~traitlets.config.application.Application` object for the command +line :command:`ipython` program. +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + + +import logging +import os +import sys +import warnings + +from traitlets.config.loader import Config +from traitlets.config.application import boolean_flag, catch_config_error +from IPython.core import release +from IPython.core import usage +from IPython.core.completer import IPCompleter +from IPython.core.crashhandler import CrashHandler +from IPython.core.formatters import PlainTextFormatter +from IPython.core.history import HistoryManager +from IPython.core.application import ( + ProfileDir, BaseIPythonApplication, base_flags, base_aliases +) +from IPython.core.magic import MagicsManager +from IPython.core.magics import ( + ScriptMagics, LoggingMagics +) +from IPython.core.shellapp import ( + InteractiveShellApp, shell_flags, shell_aliases +) +from IPython.extensions.storemagic import StoreMagics +from .interactiveshell import TerminalInteractiveShell +from IPython.paths import get_ipython_dir +from traitlets import ( + Bool, List, default, observe, Type +) + +#----------------------------------------------------------------------------- +# Globals, utilities and helpers +#----------------------------------------------------------------------------- + +_examples = """ +ipython --matplotlib # enable matplotlib integration +ipython --matplotlib=qt # enable matplotlib integration with qt4 backend + +ipython --log-level=DEBUG # set logging to DEBUG +ipython --profile=foo # start with profile foo + +ipython profile create foo # create profile foo w/ default config files +ipython help profile # show the help for the profile subcmd + +ipython locate # print the path to the IPython directory +ipython locate profile foo # print the path to the directory for profile `foo` +""" + +#----------------------------------------------------------------------------- +# Crash handler for this application +#----------------------------------------------------------------------------- + +class IPAppCrashHandler(CrashHandler): + """sys.excepthook for IPython itself, leaves a detailed report on disk.""" + + def __init__(self, app): + contact_name = release.author + contact_email = release.author_email + bug_tracker = 'https://github.com/ipython/ipython/issues' + super(IPAppCrashHandler,self).__init__( + app, contact_name, contact_email, bug_tracker + ) + + def make_report(self,traceback): + """Return a string containing a crash report.""" + + sec_sep = self.section_sep + # Start with parent report + report = [super(IPAppCrashHandler, self).make_report(traceback)] + # Add interactive-specific info we may have + rpt_add = report.append + try: + rpt_add(sec_sep+"History of session input:") + for line in self.app.shell.user_ns['_ih']: + rpt_add(line) + rpt_add('\n*** Last line of input (may not be in above history):\n') + rpt_add(self.app.shell._last_input_line+'\n') + except: + pass + + return ''.join(report) + +#----------------------------------------------------------------------------- +# Aliases and Flags +#----------------------------------------------------------------------------- +flags = dict(base_flags) +flags.update(shell_flags) +frontend_flags = {} +addflag = lambda *args: frontend_flags.update(boolean_flag(*args)) +addflag('autoedit-syntax', 'TerminalInteractiveShell.autoedit_syntax', + 'Turn on auto editing of files with syntax errors.', + 'Turn off auto editing of files with syntax errors.' +) +addflag('simple-prompt', 'TerminalInteractiveShell.simple_prompt', + "Force simple minimal prompt using `raw_input`", + "Use a rich interactive prompt with prompt_toolkit", +) + +addflag('banner', 'TerminalIPythonApp.display_banner', + "Display a banner upon starting IPython.", + "Don't display a banner upon starting IPython." +) +addflag('confirm-exit', 'TerminalInteractiveShell.confirm_exit', + """Set to confirm when you try to exit IPython with an EOF (Control-D + in Unix, Control-Z/Enter in Windows). By typing 'exit' or 'quit', + you can force a direct exit without any confirmation.""", + "Don't prompt the user when exiting." +) +addflag('term-title', 'TerminalInteractiveShell.term_title', + "Enable auto setting the terminal title.", + "Disable auto setting the terminal title." +) +classic_config = Config() +classic_config.InteractiveShell.cache_size = 0 +classic_config.PlainTextFormatter.pprint = False +classic_config.TerminalInteractiveShell.prompts_class='IPython.terminal.prompts.ClassicPrompts' +classic_config.InteractiveShell.separate_in = '' +classic_config.InteractiveShell.separate_out = '' +classic_config.InteractiveShell.separate_out2 = '' +classic_config.InteractiveShell.colors = 'NoColor' +classic_config.InteractiveShell.xmode = 'Plain' + +frontend_flags['classic']=( + classic_config, + "Gives IPython a similar feel to the classic Python prompt." +) +# # log doesn't make so much sense this way anymore +# paa('--log','-l', +# action='store_true', dest='InteractiveShell.logstart', +# help="Start logging to the default log file (./ipython_log.py).") +# +# # quick is harder to implement +frontend_flags['quick']=( + {'TerminalIPythonApp' : {'quick' : True}}, + "Enable quick startup with no config files." +) + +frontend_flags['i'] = ( + {'TerminalIPythonApp' : {'force_interact' : True}}, + """If running code from the command line, become interactive afterwards. + It is often useful to follow this with `--` to treat remaining flags as + script arguments. + """ +) +flags.update(frontend_flags) + +aliases = dict(base_aliases) +aliases.update(shell_aliases) # type: ignore[arg-type] + +#----------------------------------------------------------------------------- +# Main classes and functions +#----------------------------------------------------------------------------- + + +class LocateIPythonApp(BaseIPythonApplication): + description = """print the path to the IPython dir""" + subcommands = dict( + profile=('IPython.core.profileapp.ProfileLocate', + "print the path to an IPython profile directory", + ), + ) + def start(self): + if self.subapp is not None: + return self.subapp.start() + else: + print(self.ipython_dir) + + +class TerminalIPythonApp(BaseIPythonApplication, InteractiveShellApp): + name = u'ipython' + description = usage.cl_usage + crash_handler_class = IPAppCrashHandler # typing: ignore[assignment] + examples = _examples + + flags = flags + aliases = aliases + classes = List() + + interactive_shell_class = Type( + klass=object, # use default_value otherwise which only allow subclasses. + default_value=TerminalInteractiveShell, + help="Class to use to instantiate the TerminalInteractiveShell object. Useful for custom Frontends" + ).tag(config=True) + + @default('classes') + def _classes_default(self): + """This has to be in a method, for TerminalIPythonApp to be available.""" + return [ + InteractiveShellApp, # ShellApp comes before TerminalApp, because + self.__class__, # it will also affect subclasses (e.g. QtConsole) + TerminalInteractiveShell, + HistoryManager, + MagicsManager, + ProfileDir, + PlainTextFormatter, + IPCompleter, + ScriptMagics, + LoggingMagics, + StoreMagics, + ] + + subcommands = dict( + profile = ("IPython.core.profileapp.ProfileApp", + "Create and manage IPython profiles." + ), + kernel = ("ipykernel.kernelapp.IPKernelApp", + "Start a kernel without an attached frontend." + ), + locate=('IPython.terminal.ipapp.LocateIPythonApp', + LocateIPythonApp.description + ), + history=('IPython.core.historyapp.HistoryApp', + "Manage the IPython history database." + ), + ) + + + # *do* autocreate requested profile, but don't create the config file. + auto_create=Bool(True) + # configurables + quick = Bool(False, + help="""Start IPython quickly by skipping the loading of config files.""" + ).tag(config=True) + @observe('quick') + def _quick_changed(self, change): + if change['new']: + self.load_config_file = lambda *a, **kw: None + + display_banner = Bool(True, + help="Whether to display a banner upon starting IPython." + ).tag(config=True) + + # if there is code of files to run from the cmd line, don't interact + # unless the --i flag (App.force_interact) is true. + force_interact = Bool(False, + help="""If a command or file is given via the command-line, + e.g. 'ipython foo.py', start an interactive shell after executing the + file or command.""" + ).tag(config=True) + @observe('force_interact') + def _force_interact_changed(self, change): + if change['new']: + self.interact = True + + @observe('file_to_run', 'code_to_run', 'module_to_run') + def _file_to_run_changed(self, change): + new = change['new'] + if new: + self.something_to_run = True + if new and not self.force_interact: + self.interact = False + + # internal, not-configurable + something_to_run=Bool(False) + + @catch_config_error + def initialize(self, argv=None): + """Do actions after construct, but before starting the app.""" + super(TerminalIPythonApp, self).initialize(argv) + if self.subapp is not None: + # don't bother initializing further, starting subapp + return + # print self.extra_args + if self.extra_args and not self.something_to_run: + self.file_to_run = self.extra_args[0] + self.init_path() + # create the shell + self.init_shell() + # and draw the banner + self.init_banner() + # Now a variety of things that happen after the banner is printed. + self.init_gui_pylab() + self.init_extensions() + self.init_code() + + def init_shell(self): + """initialize the InteractiveShell instance""" + # Create an InteractiveShell instance. + # shell.display_banner should always be False for the terminal + # based app, because we call shell.show_banner() by hand below + # so the banner shows *before* all extension loading stuff. + self.shell = self.interactive_shell_class.instance(parent=self, + profile_dir=self.profile_dir, + ipython_dir=self.ipython_dir, user_ns=self.user_ns) + self.shell.configurables.append(self) + + def init_banner(self): + """optionally display the banner""" + if self.display_banner and self.interact: + self.shell.show_banner() + # Make sure there is a space below the banner. + if self.log_level <= logging.INFO: print() + + def _pylab_changed(self, name, old, new): + """Replace --pylab='inline' with --pylab='auto'""" + if new == 'inline': + warnings.warn("'inline' not available as pylab backend, " + "using 'auto' instead.") + self.pylab = 'auto' + + def start(self): + if self.subapp is not None: + return self.subapp.start() + # perform any prexec steps: + if self.interact: + self.log.debug("Starting IPython's mainloop...") + self.shell.mainloop() + else: + self.log.debug("IPython not interactive...") + self.shell.restore_term_title() + if not self.shell.last_execution_succeeded: + sys.exit(1) + +def load_default_config(ipython_dir=None): + """Load the default config file from the default ipython_dir. + + This is useful for embedded shells. + """ + if ipython_dir is None: + ipython_dir = get_ipython_dir() + + profile_dir = os.path.join(ipython_dir, 'profile_default') + app = TerminalIPythonApp() + app.config_file_paths.append(profile_dir) + app.load_config_file() + return app.config + +launch_new_instance = TerminalIPythonApp.launch_instance + + +if __name__ == '__main__': + launch_new_instance() diff --git a/contrib/python/ipython/py3/IPython/terminal/magics.py b/contrib/python/ipython/py3/IPython/terminal/magics.py new file mode 100644 index 0000000000..cea53e4a24 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/magics.py @@ -0,0 +1,214 @@ +"""Extra magics for terminal use.""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + + +from logging import error +import os +import sys + +from IPython.core.error import TryNext, UsageError +from IPython.core.magic import Magics, magics_class, line_magic +from IPython.lib.clipboard import ClipboardEmpty +from IPython.testing.skipdoctest import skip_doctest +from IPython.utils.text import SList, strip_email_quotes +from IPython.utils import py3compat + +def get_pasted_lines(sentinel, l_input=py3compat.input, quiet=False): + """ Yield pasted lines until the user enters the given sentinel value. + """ + if not quiet: + print("Pasting code; enter '%s' alone on the line to stop or use Ctrl-D." \ + % sentinel) + prompt = ":" + else: + prompt = "" + while True: + try: + l = l_input(prompt) + if l == sentinel: + return + else: + yield l + except EOFError: + print('<EOF>') + return + + +@magics_class +class TerminalMagics(Magics): + def __init__(self, shell): + super(TerminalMagics, self).__init__(shell) + + def store_or_execute(self, block, name, store_history=False): + """ Execute a block, or store it in a variable, per the user's request. + """ + if name: + # If storing it for further editing + self.shell.user_ns[name] = SList(block.splitlines()) + print("Block assigned to '%s'" % name) + else: + b = self.preclean_input(block) + self.shell.user_ns['pasted_block'] = b + self.shell.using_paste_magics = True + try: + self.shell.run_cell(b, store_history) + finally: + self.shell.using_paste_magics = False + + def preclean_input(self, block): + lines = block.splitlines() + while lines and not lines[0].strip(): + lines = lines[1:] + return strip_email_quotes('\n'.join(lines)) + + def rerun_pasted(self, name='pasted_block'): + """ Rerun a previously pasted command. + """ + b = self.shell.user_ns.get(name) + + # Sanity checks + if b is None: + raise UsageError('No previous pasted block available') + if not isinstance(b, str): + raise UsageError( + "Variable 'pasted_block' is not a string, can't execute") + + print("Re-executing '%s...' (%d chars)"% (b.split('\n',1)[0], len(b))) + self.shell.run_cell(b) + + @line_magic + def autoindent(self, parameter_s = ''): + """Toggle autoindent on/off (deprecated)""" + self.shell.set_autoindent() + print("Automatic indentation is:",['OFF','ON'][self.shell.autoindent]) + + @skip_doctest + @line_magic + def cpaste(self, parameter_s=''): + """Paste & execute a pre-formatted code block from clipboard. + + You must terminate the block with '--' (two minus-signs) or Ctrl-D + alone on the line. You can also provide your own sentinel with '%paste + -s %%' ('%%' is the new sentinel for this operation). + + The block is dedented prior to execution to enable execution of method + definitions. '>' and '+' characters at the beginning of a line are + ignored, to allow pasting directly from e-mails, diff files and + doctests (the '...' continuation prompt is also stripped). The + executed block is also assigned to variable named 'pasted_block' for + later editing with '%edit pasted_block'. + + You can also pass a variable name as an argument, e.g. '%cpaste foo'. + This assigns the pasted block to variable 'foo' as string, without + dedenting or executing it (preceding >>> and + is still stripped) + + '%cpaste -r' re-executes the block previously entered by cpaste. + '%cpaste -q' suppresses any additional output messages. + + Do not be alarmed by garbled output on Windows (it's a readline bug). + Just press enter and type -- (and press enter again) and the block + will be what was just pasted. + + Shell escapes are not supported (yet). + + See Also + -------- + paste : automatically pull code from clipboard. + + Examples + -------- + :: + + In [8]: %cpaste + Pasting code; enter '--' alone on the line to stop. + :>>> a = ["world!", "Hello"] + :>>> print(" ".join(sorted(a))) + :-- + Hello world! + + :: + In [8]: %cpaste + Pasting code; enter '--' alone on the line to stop. + :>>> %alias_magic t timeit + :>>> %t -n1 pass + :-- + Created `%t` as an alias for `%timeit`. + Created `%%t` as an alias for `%%timeit`. + 354 ns ± 224 ns per loop (mean ± std. dev. of 7 runs, 1 loop each) + """ + opts, name = self.parse_options(parameter_s, 'rqs:', mode='string') + if 'r' in opts: + self.rerun_pasted() + return + + quiet = ('q' in opts) + + sentinel = opts.get('s', u'--') + block = '\n'.join(get_pasted_lines(sentinel, quiet=quiet)) + self.store_or_execute(block, name, store_history=True) + + @line_magic + def paste(self, parameter_s=''): + """Paste & execute a pre-formatted code block from clipboard. + + The text is pulled directly from the clipboard without user + intervention and printed back on the screen before execution (unless + the -q flag is given to force quiet mode). + + The block is dedented prior to execution to enable execution of method + definitions. '>' and '+' characters at the beginning of a line are + ignored, to allow pasting directly from e-mails, diff files and + doctests (the '...' continuation prompt is also stripped). The + executed block is also assigned to variable named 'pasted_block' for + later editing with '%edit pasted_block'. + + You can also pass a variable name as an argument, e.g. '%paste foo'. + This assigns the pasted block to variable 'foo' as string, without + executing it (preceding >>> and + is still stripped). + + Options: + + -r: re-executes the block previously entered by cpaste. + + -q: quiet mode: do not echo the pasted text back to the terminal. + + IPython statements (magics, shell escapes) are not supported (yet). + + See Also + -------- + cpaste : manually paste code into terminal until you mark its end. + """ + opts, name = self.parse_options(parameter_s, 'rq', mode='string') + if 'r' in opts: + self.rerun_pasted() + return + try: + block = self.shell.hooks.clipboard_get() + except TryNext as clipboard_exc: + message = getattr(clipboard_exc, 'args') + if message: + error(message[0]) + else: + error('Could not get text from the clipboard.') + return + except ClipboardEmpty as e: + raise UsageError("The clipboard appears to be empty") from e + + # By default, echo back to terminal unless quiet mode is requested + if 'q' not in opts: + sys.stdout.write(self.shell.pycolorize(block)) + if not block.endswith("\n"): + sys.stdout.write("\n") + sys.stdout.write("## -- End pasted text --\n") + + self.store_or_execute(block, name, store_history=True) + + # Class-level: add a '%cls' magic only on Windows + if sys.platform == 'win32': + @line_magic + def cls(self, s): + """Clear screen. + """ + os.system("cls") diff --git a/contrib/python/ipython/py3/IPython/terminal/prompts.py b/contrib/python/ipython/py3/IPython/terminal/prompts.py new file mode 100644 index 0000000000..3f5c07b980 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/prompts.py @@ -0,0 +1,108 @@ +"""Terminal input and output prompts.""" + +from pygments.token import Token +import sys + +from IPython.core.displayhook import DisplayHook + +from prompt_toolkit.formatted_text import fragment_list_width, PygmentsTokens +from prompt_toolkit.shortcuts import print_formatted_text +from prompt_toolkit.enums import EditingMode + + +class Prompts(object): + def __init__(self, shell): + self.shell = shell + + def vi_mode(self): + if (getattr(self.shell.pt_app, 'editing_mode', None) == EditingMode.VI + and self.shell.prompt_includes_vi_mode): + mode = str(self.shell.pt_app.app.vi_state.input_mode) + if mode.startswith('InputMode.'): + mode = mode[10:13].lower() + elif mode.startswith('vi-'): + mode = mode[3:6] + return '['+mode+'] ' + return '' + + + def in_prompt_tokens(self): + return [ + (Token.Prompt, self.vi_mode() ), + (Token.Prompt, 'In ['), + (Token.PromptNum, str(self.shell.execution_count)), + (Token.Prompt, ']: '), + ] + + def _width(self): + return fragment_list_width(self.in_prompt_tokens()) + + def continuation_prompt_tokens(self, width=None): + if width is None: + width = self._width() + return [ + (Token.Prompt, (' ' * (width - 5)) + '...: '), + ] + + def rewrite_prompt_tokens(self): + width = self._width() + return [ + (Token.Prompt, ('-' * (width - 2)) + '> '), + ] + + def out_prompt_tokens(self): + return [ + (Token.OutPrompt, 'Out['), + (Token.OutPromptNum, str(self.shell.execution_count)), + (Token.OutPrompt, ']: '), + ] + +class ClassicPrompts(Prompts): + def in_prompt_tokens(self): + return [ + (Token.Prompt, '>>> '), + ] + + def continuation_prompt_tokens(self, width=None): + return [ + (Token.Prompt, '... ') + ] + + def rewrite_prompt_tokens(self): + return [] + + def out_prompt_tokens(self): + return [] + +class RichPromptDisplayHook(DisplayHook): + """Subclass of base display hook using coloured prompt""" + def write_output_prompt(self): + sys.stdout.write(self.shell.separate_out) + # If we're not displaying a prompt, it effectively ends with a newline, + # because the output will be left-aligned. + self.prompt_end_newline = True + + if self.do_full_cache: + tokens = self.shell.prompts.out_prompt_tokens() + prompt_txt = ''.join(s for t, s in tokens) + if prompt_txt and not prompt_txt.endswith('\n'): + # Ask for a newline before multiline output + self.prompt_end_newline = False + + if self.shell.pt_app: + print_formatted_text(PygmentsTokens(tokens), + style=self.shell.pt_app.app.style, end='', + ) + else: + sys.stdout.write(prompt_txt) + + def write_format_data(self, format_dict, md_dict=None) -> None: + if self.shell.mime_renderers: + + for mime, handler in self.shell.mime_renderers.items(): + if mime in format_dict: + handler(format_dict[mime], None) + return + + super().write_format_data(format_dict, md_dict) + diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/__init__.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/__init__.py new file mode 100644 index 0000000000..9043f15e86 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/__init__.py @@ -0,0 +1,138 @@ +import importlib +import os + +aliases = { + 'qt4': 'qt', + 'gtk2': 'gtk', +} + +backends = [ + "qt", + "qt5", + "qt6", + "gtk", + "gtk2", + "gtk3", + "gtk4", + "tk", + "wx", + "pyglet", + "glut", + "osx", + "asyncio", +] + +registered = {} + +def register(name, inputhook): + """Register the function *inputhook* as an event loop integration.""" + registered[name] = inputhook + + +class UnknownBackend(KeyError): + def __init__(self, name): + self.name = name + + def __str__(self): + return ("No event loop integration for {!r}. " + "Supported event loops are: {}").format(self.name, + ', '.join(backends + sorted(registered))) + + +def set_qt_api(gui): + """Sets the `QT_API` environment variable if it isn't already set.""" + + qt_api = os.environ.get("QT_API", None) + + from IPython.external.qt_loaders import ( + QT_API_PYQT, + QT_API_PYQT5, + QT_API_PYQT6, + QT_API_PYSIDE, + QT_API_PYSIDE2, + QT_API_PYSIDE6, + QT_API_PYQTv1, + loaded_api, + ) + + loaded = loaded_api() + + qt_env2gui = { + QT_API_PYSIDE: "qt4", + QT_API_PYQTv1: "qt4", + QT_API_PYQT: "qt4", + QT_API_PYSIDE2: "qt5", + QT_API_PYQT5: "qt5", + QT_API_PYSIDE6: "qt6", + QT_API_PYQT6: "qt6", + } + if loaded is not None and gui != "qt": + if qt_env2gui[loaded] != gui: + print( + f"Cannot switch Qt versions for this session; will use {qt_env2gui[loaded]}." + ) + return qt_env2gui[loaded] + + if qt_api is not None and gui != "qt": + if qt_env2gui[qt_api] != gui: + print( + f'Request for "{gui}" will be ignored because `QT_API` ' + f'environment variable is set to "{qt_api}"' + ) + return qt_env2gui[qt_api] + else: + if gui == "qt5": + try: + import PyQt5 # noqa + + os.environ["QT_API"] = "pyqt5" + except ImportError: + try: + import PySide2 # noqa + + os.environ["QT_API"] = "pyside2" + except ImportError: + os.environ["QT_API"] = "pyqt5" + elif gui == "qt6": + try: + import PyQt6 # noqa + + os.environ["QT_API"] = "pyqt6" + except ImportError: + try: + import PySide6 # noqa + + os.environ["QT_API"] = "pyside6" + except ImportError: + os.environ["QT_API"] = "pyqt6" + elif gui == "qt": + # Don't set QT_API; let IPython logic choose the version. + if "QT_API" in os.environ.keys(): + del os.environ["QT_API"] + else: + print(f'Unrecognized Qt version: {gui}. Should be "qt5", "qt6", or "qt".') + return + + # Import it now so we can figure out which version it is. + from IPython.external.qt_for_kernel import QT_API + + return qt_env2gui[QT_API] + + +def get_inputhook_name_and_func(gui): + if gui in registered: + return gui, registered[gui] + + if gui not in backends: + raise UnknownBackend(gui) + + if gui in aliases: + return get_inputhook_name_and_func(aliases[gui]) + + gui_mod = gui + if gui.startswith("qt"): + gui = set_qt_api(gui) + gui_mod = "qt" + + mod = importlib.import_module("IPython.terminal.pt_inputhooks." + gui_mod) + return gui, mod.inputhook diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/asyncio.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/asyncio.py new file mode 100644 index 0000000000..d2499e11e6 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/asyncio.py @@ -0,0 +1,62 @@ +""" +Inputhook for running the original asyncio event loop while we're waiting for +input. + +By default, in IPython, we run the prompt with a different asyncio event loop, +because otherwise we risk that people are freezing the prompt by scheduling bad +coroutines. E.g., a coroutine that does a while/true and never yield back +control to the loop. We can't cancel that. + +However, sometimes we want the asyncio loop to keep running while waiting for +a prompt. + +The following example will print the numbers from 1 to 10 above the prompt, +while we are waiting for input. (This works also because we use +prompt_toolkit`s `patch_stdout`):: + + In [1]: import asyncio + + In [2]: %gui asyncio + + In [3]: async def f(): + ...: for i in range(10): + ...: await asyncio.sleep(1) + ...: print(i) + + + In [4]: asyncio.ensure_future(f()) + +""" +from prompt_toolkit import __version__ as ptk_version + +from IPython.core.async_helpers import get_asyncio_loop + +PTK3 = ptk_version.startswith("3.") + + +def inputhook(context): + """ + Inputhook for asyncio event loop integration. + """ + # For prompt_toolkit 3.0, this input hook literally doesn't do anything. + # The event loop integration here is implemented in `interactiveshell.py` + # by running the prompt itself in the current asyncio loop. The main reason + # for this is that nesting asyncio event loops is unreliable. + if PTK3: + return + + # For prompt_toolkit 2.0, we can run the current asyncio event loop, + # because prompt_toolkit 2.0 uses a different event loop internally. + + # get the persistent asyncio event loop + loop = get_asyncio_loop() + + def stop(): + loop.stop() + + fileno = context.fileno() + loop.add_reader(fileno, stop) + try: + loop.run_forever() + finally: + loop.remove_reader(fileno) diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/glut.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/glut.py new file mode 100644 index 0000000000..835aadfc97 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/glut.py @@ -0,0 +1,140 @@ +"""GLUT Input hook for interactive use with prompt_toolkit +""" + + +# GLUT is quite an old library and it is difficult to ensure proper +# integration within IPython since original GLUT does not allow to handle +# events one by one. Instead, it requires for the mainloop to be entered +# and never returned (there is not even a function to exit he +# mainloop). Fortunately, there are alternatives such as freeglut +# (available for linux and windows) and the OSX implementation gives +# access to a glutCheckLoop() function that blocks itself until a new +# event is received. This means we have to setup the idle callback to +# ensure we got at least one event that will unblock the function. +# +# Furthermore, it is not possible to install these handlers without a window +# being first created. We choose to make this window invisible. This means that +# display mode options are set at this level and user won't be able to change +# them later without modifying the code. This should probably be made available +# via IPython options system. + +import sys +import time +import signal +import OpenGL.GLUT as glut +import OpenGL.platform as platform +from timeit import default_timer as clock + +# Frame per second : 60 +# Should probably be an IPython option +glut_fps = 60 + +# Display mode : double buffeed + rgba + depth +# Should probably be an IPython option +glut_display_mode = (glut.GLUT_DOUBLE | + glut.GLUT_RGBA | + glut.GLUT_DEPTH) + +glutMainLoopEvent = None +if sys.platform == 'darwin': + try: + glutCheckLoop = platform.createBaseFunction( + 'glutCheckLoop', dll=platform.GLUT, resultType=None, + argTypes=[], + doc='glutCheckLoop( ) -> None', + argNames=(), + ) + except AttributeError as e: + raise RuntimeError( + '''Your glut implementation does not allow interactive sessions. ''' + '''Consider installing freeglut.''') from e + glutMainLoopEvent = glutCheckLoop +elif glut.HAVE_FREEGLUT: + glutMainLoopEvent = glut.glutMainLoopEvent +else: + raise RuntimeError( + '''Your glut implementation does not allow interactive sessions. ''' + '''Consider installing freeglut.''') + + +def glut_display(): + # Dummy display function + pass + +def glut_idle(): + # Dummy idle function + pass + +def glut_close(): + # Close function only hides the current window + glut.glutHideWindow() + glutMainLoopEvent() + +def glut_int_handler(signum, frame): + # Catch sigint and print the defaultipyt message + signal.signal(signal.SIGINT, signal.default_int_handler) + print('\nKeyboardInterrupt') + # Need to reprint the prompt at this stage + +# Initialisation code +glut.glutInit( sys.argv ) +glut.glutInitDisplayMode( glut_display_mode ) +# This is specific to freeglut +if bool(glut.glutSetOption): + glut.glutSetOption( glut.GLUT_ACTION_ON_WINDOW_CLOSE, + glut.GLUT_ACTION_GLUTMAINLOOP_RETURNS ) +glut.glutCreateWindow( b'ipython' ) +glut.glutReshapeWindow( 1, 1 ) +glut.glutHideWindow( ) +glut.glutWMCloseFunc( glut_close ) +glut.glutDisplayFunc( glut_display ) +glut.glutIdleFunc( glut_idle ) + + +def inputhook(context): + """Run the pyglet event loop by processing pending events only. + + This keeps processing pending events until stdin is ready. After + processing all pending events, a call to time.sleep is inserted. This is + needed, otherwise, CPU usage is at 100%. This sleep time should be tuned + though for best performance. + """ + # We need to protect against a user pressing Control-C when IPython is + # idle and this is running. We trap KeyboardInterrupt and pass. + + signal.signal(signal.SIGINT, glut_int_handler) + + try: + t = clock() + + # Make sure the default window is set after a window has been closed + if glut.glutGetWindow() == 0: + glut.glutSetWindow( 1 ) + glutMainLoopEvent() + return 0 + + while not context.input_is_ready(): + glutMainLoopEvent() + # We need to sleep at this point to keep the idle CPU load + # low. However, if sleep to long, GUI response is poor. As + # a compromise, we watch how often GUI events are being processed + # and switch between a short and long sleep time. Here are some + # stats useful in helping to tune this. + # time CPU load + # 0.001 13% + # 0.005 3% + # 0.01 1.5% + # 0.05 0.5% + used_time = clock() - t + if used_time > 10.0: + # print 'Sleep for 1 s' # dbg + time.sleep(1.0) + elif used_time > 0.1: + # Few GUI events coming in, so we can sleep longer + # print 'Sleep for 0.05 s' # dbg + time.sleep(0.05) + else: + # Many GUI events coming in, so sleep only very little + time.sleep(0.001) + except KeyboardInterrupt: + pass diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk.py new file mode 100644 index 0000000000..5c201b65d7 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk.py @@ -0,0 +1,60 @@ +# Code borrowed from python-prompt-toolkit examples +# https://github.com/jonathanslenders/python-prompt-toolkit/blob/77cdcfbc7f4b4c34a9d2f9a34d422d7152f16209/examples/inputhook.py + +# Copyright (c) 2014, Jonathan Slenders +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, this +# list of conditions and the following disclaimer in the documentation and/or +# other materials provided with the distribution. +# +# * Neither the name of the {organization} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +PyGTK input hook for prompt_toolkit. + +Listens on the pipe prompt_toolkit sets up for a notification that it should +return control to the terminal event loop. +""" + +import gtk, gobject + +# Enable threading in GTK. (Otherwise, GTK will keep the GIL.) +gtk.gdk.threads_init() + + +def inputhook(context): + """ + When the eventloop of prompt-toolkit is idle, call this inputhook. + + This will run the GTK main loop until the file descriptor + `context.fileno()` becomes ready. + + :param context: An `InputHookContext` instance. + """ + + def _main_quit(*a, **kw): + gtk.main_quit() + return False + + gobject.io_add_watch(context.fileno(), gobject.IO_IN, _main_quit) + gtk.main() diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk3.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk3.py new file mode 100644 index 0000000000..b073bd94d9 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk3.py @@ -0,0 +1,14 @@ +"""prompt_toolkit input hook for GTK 3 +""" + +from gi.repository import Gtk, GLib + + +def _main_quit(*args, **kwargs): + Gtk.main_quit() + return False + + +def inputhook(context): + GLib.io_add_watch(context.fileno(), GLib.PRIORITY_DEFAULT, GLib.IO_IN, _main_quit) + Gtk.main() diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk4.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk4.py new file mode 100644 index 0000000000..009fbf1212 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/gtk4.py @@ -0,0 +1,27 @@ +""" +prompt_toolkit input hook for GTK 4. +""" + +from gi.repository import GLib + + +class _InputHook: + def __init__(self, context): + self._quit = False + GLib.io_add_watch( + context.fileno(), GLib.PRIORITY_DEFAULT, GLib.IO_IN, self.quit + ) + + def quit(self, *args, **kwargs): + self._quit = True + return False + + def run(self): + context = GLib.MainContext.default() + while not self._quit: + context.iteration(True) + + +def inputhook(context): + hook = _InputHook(context) + hook.run() diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/osx.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/osx.py new file mode 100644 index 0000000000..2754820efc --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/osx.py @@ -0,0 +1,157 @@ +"""Inputhook for OS X + +Calls NSApp / CoreFoundation APIs via ctypes. +""" + +# obj-c boilerplate from appnope, used under BSD 2-clause + +import ctypes +import ctypes.util +from threading import Event + +objc = ctypes.cdll.LoadLibrary(ctypes.util.find_library("objc")) # type: ignore + +void_p = ctypes.c_void_p + +objc.objc_getClass.restype = void_p +objc.sel_registerName.restype = void_p +objc.objc_msgSend.restype = void_p +objc.objc_msgSend.argtypes = [void_p, void_p] + +msg = objc.objc_msgSend + +def _utf8(s): + """ensure utf8 bytes""" + if not isinstance(s, bytes): + s = s.encode('utf8') + return s + +def n(name): + """create a selector name (for ObjC methods)""" + return objc.sel_registerName(_utf8(name)) + +def C(classname): + """get an ObjC Class by name""" + return objc.objc_getClass(_utf8(classname)) + +# end obj-c boilerplate from appnope + +# CoreFoundation C-API calls we will use: +CoreFoundation = ctypes.cdll.LoadLibrary(ctypes.util.find_library("CoreFoundation")) # type: ignore + +CFFileDescriptorCreate = CoreFoundation.CFFileDescriptorCreate +CFFileDescriptorCreate.restype = void_p +CFFileDescriptorCreate.argtypes = [void_p, ctypes.c_int, ctypes.c_bool, void_p, void_p] + +CFFileDescriptorGetNativeDescriptor = CoreFoundation.CFFileDescriptorGetNativeDescriptor +CFFileDescriptorGetNativeDescriptor.restype = ctypes.c_int +CFFileDescriptorGetNativeDescriptor.argtypes = [void_p] + +CFFileDescriptorEnableCallBacks = CoreFoundation.CFFileDescriptorEnableCallBacks +CFFileDescriptorEnableCallBacks.restype = None +CFFileDescriptorEnableCallBacks.argtypes = [void_p, ctypes.c_ulong] + +CFFileDescriptorCreateRunLoopSource = CoreFoundation.CFFileDescriptorCreateRunLoopSource +CFFileDescriptorCreateRunLoopSource.restype = void_p +CFFileDescriptorCreateRunLoopSource.argtypes = [void_p, void_p, void_p] + +CFRunLoopGetCurrent = CoreFoundation.CFRunLoopGetCurrent +CFRunLoopGetCurrent.restype = void_p + +CFRunLoopAddSource = CoreFoundation.CFRunLoopAddSource +CFRunLoopAddSource.restype = None +CFRunLoopAddSource.argtypes = [void_p, void_p, void_p] + +CFRelease = CoreFoundation.CFRelease +CFRelease.restype = None +CFRelease.argtypes = [void_p] + +CFFileDescriptorInvalidate = CoreFoundation.CFFileDescriptorInvalidate +CFFileDescriptorInvalidate.restype = None +CFFileDescriptorInvalidate.argtypes = [void_p] + +# From CFFileDescriptor.h +kCFFileDescriptorReadCallBack = 1 +kCFRunLoopCommonModes = void_p.in_dll(CoreFoundation, 'kCFRunLoopCommonModes') + + +def _NSApp(): + """Return the global NSApplication instance (NSApp)""" + objc.objc_msgSend.argtypes = [void_p, void_p] + return msg(C('NSApplication'), n('sharedApplication')) + + +def _wake(NSApp): + """Wake the Application""" + objc.objc_msgSend.argtypes = [ + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + void_p, + ] + event = msg( + C("NSEvent"), + n( + "otherEventWithType:location:modifierFlags:" + "timestamp:windowNumber:context:subtype:data1:data2:" + ), + 15, # Type + 0, # location + 0, # flags + 0, # timestamp + 0, # window + None, # context + 0, # subtype + 0, # data1 + 0, # data2 + ) + objc.objc_msgSend.argtypes = [void_p, void_p, void_p, void_p] + msg(NSApp, n('postEvent:atStart:'), void_p(event), True) + + +_triggered = Event() + +def _input_callback(fdref, flags, info): + """Callback to fire when there's input to be read""" + _triggered.set() + CFFileDescriptorInvalidate(fdref) + CFRelease(fdref) + NSApp = _NSApp() + objc.objc_msgSend.argtypes = [void_p, void_p, void_p] + msg(NSApp, n('stop:'), NSApp) + _wake(NSApp) + +_c_callback_func_type = ctypes.CFUNCTYPE(None, void_p, void_p, void_p) +_c_input_callback = _c_callback_func_type(_input_callback) + + +def _stop_on_read(fd): + """Register callback to stop eventloop when there's data on fd""" + _triggered.clear() + fdref = CFFileDescriptorCreate(None, fd, False, _c_input_callback, None) + CFFileDescriptorEnableCallBacks(fdref, kCFFileDescriptorReadCallBack) + source = CFFileDescriptorCreateRunLoopSource(None, fdref, 0) + loop = CFRunLoopGetCurrent() + CFRunLoopAddSource(loop, source, kCFRunLoopCommonModes) + CFRelease(source) + + +def inputhook(context): + """Inputhook for Cocoa (NSApp)""" + NSApp = _NSApp() + _stop_on_read(context.fileno()) + objc.objc_msgSend.argtypes = [void_p, void_p] + msg(NSApp, n('run')) + if not _triggered.is_set(): + # app closed without firing callback, + # probably due to last window being closed. + # Run the loop manually in this case, + # since there may be events still to process (#9734) + CoreFoundation.CFRunLoopRun() diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/pyglet.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/pyglet.py new file mode 100644 index 0000000000..49ec86d223 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/pyglet.py @@ -0,0 +1,66 @@ +"""Enable pyglet to be used interactively with prompt_toolkit +""" + +import sys +import time +from timeit import default_timer as clock +import pyglet + +# On linux only, window.flip() has a bug that causes an AttributeError on +# window close. For details, see: +# http://groups.google.com/group/pyglet-users/browse_thread/thread/47c1aab9aa4a3d23/c22f9e819826799e?#c22f9e819826799e + +if sys.platform.startswith('linux'): + def flip(window): + try: + window.flip() + except AttributeError: + pass +else: + def flip(window): + window.flip() + + +def inputhook(context): + """Run the pyglet event loop by processing pending events only. + + This keeps processing pending events until stdin is ready. After + processing all pending events, a call to time.sleep is inserted. This is + needed, otherwise, CPU usage is at 100%. This sleep time should be tuned + though for best performance. + """ + # We need to protect against a user pressing Control-C when IPython is + # idle and this is running. We trap KeyboardInterrupt and pass. + try: + t = clock() + while not context.input_is_ready(): + pyglet.clock.tick() + for window in pyglet.app.windows: + window.switch_to() + window.dispatch_events() + window.dispatch_event('on_draw') + flip(window) + + # We need to sleep at this point to keep the idle CPU load + # low. However, if sleep to long, GUI response is poor. As + # a compromise, we watch how often GUI events are being processed + # and switch between a short and long sleep time. Here are some + # stats useful in helping to tune this. + # time CPU load + # 0.001 13% + # 0.005 3% + # 0.01 1.5% + # 0.05 0.5% + used_time = clock() - t + if used_time > 10.0: + # print 'Sleep for 1 s' # dbg + time.sleep(1.0) + elif used_time > 0.1: + # Few GUI events coming in, so we can sleep longer + # print 'Sleep for 0.05 s' # dbg + time.sleep(0.05) + else: + # Many GUI events coming in, so sleep only very little + time.sleep(0.001) + except KeyboardInterrupt: + pass diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/qt.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/qt.py new file mode 100644 index 0000000000..cf6d11ea6c --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/qt.py @@ -0,0 +1,86 @@ +import sys +import os +from IPython.external.qt_for_kernel import QtCore, QtGui, enum_helper +from IPython import get_ipython + +# If we create a QApplication, keep a reference to it so that it doesn't get +# garbage collected. +_appref = None +_already_warned = False + + +def _exec(obj): + # exec on PyQt6, exec_ elsewhere. + obj.exec() if hasattr(obj, "exec") else obj.exec_() + + +def _reclaim_excepthook(): + shell = get_ipython() + if shell is not None: + sys.excepthook = shell.excepthook + + +def inputhook(context): + global _appref + app = QtCore.QCoreApplication.instance() + if not app: + if sys.platform == 'linux': + if not os.environ.get('DISPLAY') \ + and not os.environ.get('WAYLAND_DISPLAY'): + import warnings + global _already_warned + if not _already_warned: + _already_warned = True + warnings.warn( + 'The DISPLAY or WAYLAND_DISPLAY environment variable is ' + 'not set or empty and Qt5 requires this environment ' + 'variable. Deactivate Qt5 code.' + ) + return + try: + QtCore.QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling) + except AttributeError: # Only for Qt>=5.6, <6. + pass + try: + QtCore.QApplication.setHighDpiScaleFactorRoundingPolicy( + QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough + ) + except AttributeError: # Only for Qt>=5.14. + pass + _appref = app = QtGui.QApplication([" "]) + + # "reclaim" IPython sys.excepthook after event loop starts + # without this, it defaults back to BaseIPythonApplication.excepthook + # and exceptions in the Qt event loop are rendered without traceback + # formatting and look like "bug in IPython". + QtCore.QTimer.singleShot(0, _reclaim_excepthook) + + event_loop = QtCore.QEventLoop(app) + + if sys.platform == 'win32': + # The QSocketNotifier method doesn't appear to work on Windows. + # Use polling instead. + timer = QtCore.QTimer() + timer.timeout.connect(event_loop.quit) + while not context.input_is_ready(): + # NOTE: run the event loop, and after 50 ms, call `quit` to exit it. + timer.start(50) # 50 ms + _exec(event_loop) + timer.stop() + else: + # On POSIX platforms, we can use a file descriptor to quit the event + # loop when there is input ready to read. + notifier = QtCore.QSocketNotifier( + context.fileno(), enum_helper("QtCore.QSocketNotifier.Type").Read + ) + try: + # connect the callback we care about before we turn it on + # lambda is necessary as PyQT inspect the function signature to know + # what arguments to pass to. See https://github.com/ipython/ipython/pull/12355 + notifier.activated.connect(lambda: event_loop.exit()) + notifier.setEnabled(True) + # only start the event loop we are not already flipped + if not context.input_is_ready(): + _exec(event_loop) + finally: + notifier.setEnabled(False) diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/tk.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/tk.py new file mode 100644 index 0000000000..2715505f1f --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/tk.py @@ -0,0 +1,90 @@ +# Code borrowed from ptpython +# https://github.com/jonathanslenders/ptpython/blob/86b71a89626114b18898a0af463978bdb32eeb70/ptpython/eventloop.py + +# Copyright (c) 2015, Jonathan Slenders +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, this +# list of conditions and the following disclaimer in the documentation and/or +# other materials provided with the distribution. +# +# * Neither the name of the {organization} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Wrapper around the eventloop that gives some time to the Tkinter GUI to process +events when it's loaded and while we are waiting for input at the REPL. This +way we don't block the UI of for instance ``turtle`` and other Tk libraries. + +(Normally Tkinter registers it's callbacks in ``PyOS_InputHook`` to integrate +in readline. ``prompt-toolkit`` doesn't understand that input hook, but this +will fix it for Tk.) +""" +import time + +import _tkinter +import tkinter + +def inputhook(inputhook_context): + """ + Inputhook for Tk. + Run the Tk eventloop until prompt-toolkit needs to process the next input. + """ + # Get the current TK application. + root = tkinter._default_root + + def wait_using_filehandler(): + """ + Run the TK eventloop until the file handler that we got from the + inputhook becomes readable. + """ + # Add a handler that sets the stop flag when `prompt-toolkit` has input + # to process. + stop = [False] + def done(*a): + stop[0] = True + + root.createfilehandler(inputhook_context.fileno(), _tkinter.READABLE, done) + + # Run the TK event loop as long as we don't receive input. + while root.dooneevent(_tkinter.ALL_EVENTS): + if stop[0]: + break + + root.deletefilehandler(inputhook_context.fileno()) + + def wait_using_polling(): + """ + Windows TK doesn't support 'createfilehandler'. + So, run the TK eventloop and poll until input is ready. + """ + while not inputhook_context.input_is_ready(): + while root.dooneevent(_tkinter.ALL_EVENTS | _tkinter.DONT_WAIT): + pass + # Sleep to make the CPU idle, but not too long, so that the UI + # stays responsive. + time.sleep(.01) + + if root is not None: + if hasattr(root, 'createfilehandler'): + wait_using_filehandler() + else: + wait_using_polling() diff --git a/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/wx.py b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/wx.py new file mode 100644 index 0000000000..a0f4442c77 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/pt_inputhooks/wx.py @@ -0,0 +1,219 @@ +"""Enable wxPython to be used interactively in prompt_toolkit +""" + +import sys +import signal +import time +from timeit import default_timer as clock +import wx + + +def ignore_keyboardinterrupts(func): + """Decorator which causes KeyboardInterrupt exceptions to be ignored during + execution of the decorated function. + + This is used by the inputhook functions to handle the event where the user + presses CTRL+C while IPython is idle, and the inputhook loop is running. In + this case, we want to ignore interrupts. + """ + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except KeyboardInterrupt: + pass + return wrapper + + +@ignore_keyboardinterrupts +def inputhook_wx1(context): + """Run the wx event loop by processing pending events only. + + This approach seems to work, but its performance is not great as it + relies on having PyOS_InputHook called regularly. + """ + app = wx.GetApp() + if app is not None: + assert wx.Thread_IsMain() + + # Make a temporary event loop and process system events until + # there are no more waiting, then allow idle events (which + # will also deal with pending or posted wx events.) + evtloop = wx.EventLoop() + ea = wx.EventLoopActivator(evtloop) + while evtloop.Pending(): + evtloop.Dispatch() + app.ProcessIdle() + del ea + return 0 + + +class EventLoopTimer(wx.Timer): + + def __init__(self, func): + self.func = func + wx.Timer.__init__(self) + + def Notify(self): + self.func() + + +class EventLoopRunner(object): + + def Run(self, time, input_is_ready): + self.input_is_ready = input_is_ready + self.evtloop = wx.EventLoop() + self.timer = EventLoopTimer(self.check_stdin) + self.timer.Start(time) + self.evtloop.Run() + + def check_stdin(self): + if self.input_is_ready(): + self.timer.Stop() + self.evtloop.Exit() + + +@ignore_keyboardinterrupts +def inputhook_wx2(context): + """Run the wx event loop, polling for stdin. + + This version runs the wx eventloop for an undetermined amount of time, + during which it periodically checks to see if anything is ready on + stdin. If anything is ready on stdin, the event loop exits. + + The argument to elr.Run controls how often the event loop looks at stdin. + This determines the responsiveness at the keyboard. A setting of 1000 + enables a user to type at most 1 char per second. I have found that a + setting of 10 gives good keyboard response. We can shorten it further, + but eventually performance would suffer from calling select/kbhit too + often. + """ + app = wx.GetApp() + if app is not None: + assert wx.Thread_IsMain() + elr = EventLoopRunner() + # As this time is made shorter, keyboard response improves, but idle + # CPU load goes up. 10 ms seems like a good compromise. + elr.Run(time=10, # CHANGE time here to control polling interval + input_is_ready=context.input_is_ready) + return 0 + + +@ignore_keyboardinterrupts +def inputhook_wx3(context): + """Run the wx event loop by processing pending events only. + + This is like inputhook_wx1, but it keeps processing pending events + until stdin is ready. After processing all pending events, a call to + time.sleep is inserted. This is needed, otherwise, CPU usage is at 100%. + This sleep time should be tuned though for best performance. + """ + app = wx.GetApp() + if app is not None: + assert wx.Thread_IsMain() + + # The import of wx on Linux sets the handler for signal.SIGINT + # to 0. This is a bug in wx or gtk. We fix by just setting it + # back to the Python default. + if not callable(signal.getsignal(signal.SIGINT)): + signal.signal(signal.SIGINT, signal.default_int_handler) + + evtloop = wx.EventLoop() + ea = wx.EventLoopActivator(evtloop) + t = clock() + while not context.input_is_ready(): + while evtloop.Pending(): + t = clock() + evtloop.Dispatch() + app.ProcessIdle() + # We need to sleep at this point to keep the idle CPU load + # low. However, if sleep to long, GUI response is poor. As + # a compromise, we watch how often GUI events are being processed + # and switch between a short and long sleep time. Here are some + # stats useful in helping to tune this. + # time CPU load + # 0.001 13% + # 0.005 3% + # 0.01 1.5% + # 0.05 0.5% + used_time = clock() - t + if used_time > 10.0: + # print 'Sleep for 1 s' # dbg + time.sleep(1.0) + elif used_time > 0.1: + # Few GUI events coming in, so we can sleep longer + # print 'Sleep for 0.05 s' # dbg + time.sleep(0.05) + else: + # Many GUI events coming in, so sleep only very little + time.sleep(0.001) + del ea + return 0 + + +@ignore_keyboardinterrupts +def inputhook_wxphoenix(context): + """Run the wx event loop until the user provides more input. + + This input hook is suitable for use with wxPython >= 4 (a.k.a. Phoenix). + + It uses the same approach to that used in + ipykernel.eventloops.loop_wx. The wx.MainLoop is executed, and a wx.Timer + is used to periodically poll the context for input. As soon as input is + ready, the wx.MainLoop is stopped. + """ + + app = wx.GetApp() + + if app is None: + return + + if context.input_is_ready(): + return + + assert wx.IsMainThread() + + # Wx uses milliseconds + poll_interval = 100 + + # Use a wx.Timer to periodically check whether input is ready - as soon as + # it is, we exit the main loop + timer = wx.Timer() + + def poll(ev): + if context.input_is_ready(): + timer.Stop() + app.ExitMainLoop() + + timer.Start(poll_interval) + timer.Bind(wx.EVT_TIMER, poll) + + # The import of wx on Linux sets the handler for signal.SIGINT to 0. This + # is a bug in wx or gtk. We fix by just setting it back to the Python + # default. + if not callable(signal.getsignal(signal.SIGINT)): + signal.signal(signal.SIGINT, signal.default_int_handler) + + # The SetExitOnFrameDelete call allows us to run the wx mainloop without + # having a frame open. + app.SetExitOnFrameDelete(False) + app.MainLoop() + + +# Get the major wx version number to figure out what input hook we should use. +major_version = 3 + +try: + major_version = int(wx.__version__[0]) +except Exception: + pass + +# Use the phoenix hook on all platforms for wxpython >= 4 +if major_version >= 4: + inputhook = inputhook_wxphoenix +# On OSX, evtloop.Pending() always returns True, regardless of there being +# any events pending. As such we can't use implementations 1 or 3 of the +# inputhook as those depend on a pending/dispatch loop. +elif sys.platform == 'darwin': + inputhook = inputhook_wx2 +else: + inputhook = inputhook_wx3 diff --git a/contrib/python/ipython/py3/IPython/terminal/ptutils.py b/contrib/python/ipython/py3/IPython/terminal/ptutils.py new file mode 100644 index 0000000000..39bc2e15af --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/ptutils.py @@ -0,0 +1,204 @@ +"""prompt-toolkit utilities + +Everything in this module is a private API, +not to be used outside IPython. +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +import unicodedata +from wcwidth import wcwidth + +from IPython.core.completer import ( + provisionalcompleter, cursor_to_position, + _deduplicate_completions) +from prompt_toolkit.completion import Completer, Completion +from prompt_toolkit.lexers import Lexer +from prompt_toolkit.lexers import PygmentsLexer +from prompt_toolkit.patch_stdout import patch_stdout + +import pygments.lexers as pygments_lexers +import os +import sys +import traceback + +_completion_sentinel = object() + +def _elide_point(string:str, *, min_elide=30)->str: + """ + If a string is long enough, and has at least 3 dots, + replace the middle part with ellipses. + + If a string naming a file is long enough, and has at least 3 slashes, + replace the middle part with ellipses. + + If three consecutive dots, or two consecutive dots are encountered these are + replaced by the equivalents HORIZONTAL ELLIPSIS or TWO DOT LEADER unicode + equivalents + """ + string = string.replace('...','\N{HORIZONTAL ELLIPSIS}') + string = string.replace('..','\N{TWO DOT LEADER}') + if len(string) < min_elide: + return string + + object_parts = string.split('.') + file_parts = string.split(os.sep) + if file_parts[-1] == '': + file_parts.pop() + + if len(object_parts) > 3: + return "{}.{}\N{HORIZONTAL ELLIPSIS}{}.{}".format( + object_parts[0], + object_parts[1][:1], + object_parts[-2][-1:], + object_parts[-1], + ) + + elif len(file_parts) > 3: + return ("{}" + os.sep + "{}\N{HORIZONTAL ELLIPSIS}{}" + os.sep + "{}").format( + file_parts[0], file_parts[1][:1], file_parts[-2][-1:], file_parts[-1] + ) + + return string + +def _elide_typed(string:str, typed:str, *, min_elide:int=30)->str: + """ + Elide the middle of a long string if the beginning has already been typed. + """ + + if len(string) < min_elide: + return string + cut_how_much = len(typed)-3 + if cut_how_much < 7: + return string + if string.startswith(typed) and len(string)> len(typed): + return f"{string[:3]}\N{HORIZONTAL ELLIPSIS}{string[cut_how_much:]}" + return string + +def _elide(string:str, typed:str, min_elide=30)->str: + return _elide_typed( + _elide_point(string, min_elide=min_elide), + typed, min_elide=min_elide) + + + +def _adjust_completion_text_based_on_context(text, body, offset): + if text.endswith('=') and len(body) > offset and body[offset] == '=': + return text[:-1] + else: + return text + + +class IPythonPTCompleter(Completer): + """Adaptor to provide IPython completions to prompt_toolkit""" + def __init__(self, ipy_completer=None, shell=None): + if shell is None and ipy_completer is None: + raise TypeError("Please pass shell=an InteractiveShell instance.") + self._ipy_completer = ipy_completer + self.shell = shell + + @property + def ipy_completer(self): + if self._ipy_completer: + return self._ipy_completer + else: + return self.shell.Completer + + def get_completions(self, document, complete_event): + if not document.current_line.strip(): + return + # Some bits of our completion system may print stuff (e.g. if a module + # is imported). This context manager ensures that doesn't interfere with + # the prompt. + + with patch_stdout(), provisionalcompleter(): + body = document.text + cursor_row = document.cursor_position_row + cursor_col = document.cursor_position_col + cursor_position = document.cursor_position + offset = cursor_to_position(body, cursor_row, cursor_col) + try: + yield from self._get_completions(body, offset, cursor_position, self.ipy_completer) + except Exception as e: + try: + exc_type, exc_value, exc_tb = sys.exc_info() + traceback.print_exception(exc_type, exc_value, exc_tb) + except AttributeError: + print('Unrecoverable Error in completions') + + @staticmethod + def _get_completions(body, offset, cursor_position, ipyc): + """ + Private equivalent of get_completions() use only for unit_testing. + """ + debug = getattr(ipyc, 'debug', False) + completions = _deduplicate_completions( + body, ipyc.completions(body, offset)) + for c in completions: + if not c.text: + # Guard against completion machinery giving us an empty string. + continue + text = unicodedata.normalize('NFC', c.text) + # When the first character of the completion has a zero length, + # then it's probably a decomposed unicode character. E.g. caused by + # the "\dot" completion. Try to compose again with the previous + # character. + if wcwidth(text[0]) == 0: + if cursor_position + c.start > 0: + char_before = body[c.start - 1] + fixed_text = unicodedata.normalize( + 'NFC', char_before + text) + + # Yield the modified completion instead, if this worked. + if wcwidth(text[0:1]) == 1: + yield Completion(fixed_text, start_position=c.start - offset - 1) + continue + + # TODO: Use Jedi to determine meta_text + # (Jedi currently has a bug that results in incorrect information.) + # meta_text = '' + # yield Completion(m, start_position=start_pos, + # display_meta=meta_text) + display_text = c.text + + adjusted_text = _adjust_completion_text_based_on_context(c.text, body, offset) + if c.type == 'function': + yield Completion(adjusted_text, start_position=c.start - offset, display=_elide(display_text+'()', body[c.start:c.end]), display_meta=c.type+c.signature) + else: + yield Completion(adjusted_text, start_position=c.start - offset, display=_elide(display_text, body[c.start:c.end]), display_meta=c.type) + +class IPythonPTLexer(Lexer): + """ + Wrapper around PythonLexer and BashLexer. + """ + def __init__(self): + l = pygments_lexers + self.python_lexer = PygmentsLexer(l.Python3Lexer) + self.shell_lexer = PygmentsLexer(l.BashLexer) + + self.magic_lexers = { + 'HTML': PygmentsLexer(l.HtmlLexer), + 'html': PygmentsLexer(l.HtmlLexer), + 'javascript': PygmentsLexer(l.JavascriptLexer), + 'js': PygmentsLexer(l.JavascriptLexer), + 'perl': PygmentsLexer(l.PerlLexer), + 'ruby': PygmentsLexer(l.RubyLexer), + 'latex': PygmentsLexer(l.TexLexer), + } + + def lex_document(self, document): + text = document.text.lstrip() + + lexer = self.python_lexer + + if text.startswith('!') or text.startswith('%%bash'): + lexer = self.shell_lexer + + elif text.startswith('%%'): + for magic, l in self.magic_lexers.items(): + if text.startswith('%%' + magic): + lexer = l + break + + return lexer.lex_document(document) diff --git a/contrib/python/ipython/py3/IPython/terminal/shortcuts/__init__.py b/contrib/python/ipython/py3/IPython/terminal/shortcuts/__init__.py new file mode 100644 index 0000000000..12890f4ab6 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/shortcuts/__init__.py @@ -0,0 +1,630 @@ +""" +Module to define and register Terminal IPython shortcuts with +:mod:`prompt_toolkit` +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +import os +import signal +import sys +import warnings +from dataclasses import dataclass +from typing import Callable, Any, Optional, List + +from prompt_toolkit.application.current import get_app +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.key_binding.key_processor import KeyPressEvent +from prompt_toolkit.key_binding.bindings import named_commands as nc +from prompt_toolkit.key_binding.bindings.completion import ( + display_completions_like_readline, +) +from prompt_toolkit.key_binding.vi_state import InputMode, ViState +from prompt_toolkit.filters import Condition + +from IPython.core.getipython import get_ipython +from IPython.terminal.shortcuts import auto_match as match +from IPython.terminal.shortcuts import auto_suggest +from IPython.terminal.shortcuts.filters import filter_from_string +from IPython.utils.decorators import undoc + +from prompt_toolkit.enums import DEFAULT_BUFFER + +__all__ = ["create_ipython_shortcuts"] + + +@dataclass +class BaseBinding: + command: Callable[[KeyPressEvent], Any] + keys: List[str] + + +@dataclass +class RuntimeBinding(BaseBinding): + filter: Condition + + +@dataclass +class Binding(BaseBinding): + # while filter could be created by referencing variables directly (rather + # than created from strings), by using strings we ensure that users will + # be able to create filters in configuration (e.g. JSON) files too, which + # also benefits the documentation by enforcing human-readable filter names. + condition: Optional[str] = None + + def __post_init__(self): + if self.condition: + self.filter = filter_from_string(self.condition) + else: + self.filter = None + + +def create_identifier(handler: Callable): + parts = handler.__module__.split(".") + name = handler.__name__ + package = parts[0] + if len(parts) > 1: + final_module = parts[-1] + return f"{package}:{final_module}.{name}" + else: + return f"{package}:{name}" + + +AUTO_MATCH_BINDINGS = [ + *[ + Binding( + cmd, [key], "focused_insert & auto_match & followed_by_closing_paren_or_end" + ) + for key, cmd in match.auto_match_parens.items() + ], + *[ + # raw string + Binding(cmd, [key], "focused_insert & auto_match & preceded_by_raw_str_prefix") + for key, cmd in match.auto_match_parens_raw_string.items() + ], + Binding( + match.double_quote, + ['"'], + "focused_insert" + " & auto_match" + " & not_inside_unclosed_string" + " & preceded_by_paired_double_quotes" + " & followed_by_closing_paren_or_end", + ), + Binding( + match.single_quote, + ["'"], + "focused_insert" + " & auto_match" + " & not_inside_unclosed_string" + " & preceded_by_paired_single_quotes" + " & followed_by_closing_paren_or_end", + ), + Binding( + match.docstring_double_quotes, + ['"'], + "focused_insert" + " & auto_match" + " & not_inside_unclosed_string" + " & preceded_by_two_double_quotes", + ), + Binding( + match.docstring_single_quotes, + ["'"], + "focused_insert" + " & auto_match" + " & not_inside_unclosed_string" + " & preceded_by_two_single_quotes", + ), + Binding( + match.skip_over, + [")"], + "focused_insert & auto_match & followed_by_closing_round_paren", + ), + Binding( + match.skip_over, + ["]"], + "focused_insert & auto_match & followed_by_closing_bracket", + ), + Binding( + match.skip_over, + ["}"], + "focused_insert & auto_match & followed_by_closing_brace", + ), + Binding( + match.skip_over, ['"'], "focused_insert & auto_match & followed_by_double_quote" + ), + Binding( + match.skip_over, ["'"], "focused_insert & auto_match & followed_by_single_quote" + ), + Binding( + match.delete_pair, + ["backspace"], + "focused_insert" + " & preceded_by_opening_round_paren" + " & auto_match" + " & followed_by_closing_round_paren", + ), + Binding( + match.delete_pair, + ["backspace"], + "focused_insert" + " & preceded_by_opening_bracket" + " & auto_match" + " & followed_by_closing_bracket", + ), + Binding( + match.delete_pair, + ["backspace"], + "focused_insert" + " & preceded_by_opening_brace" + " & auto_match" + " & followed_by_closing_brace", + ), + Binding( + match.delete_pair, + ["backspace"], + "focused_insert" + " & preceded_by_double_quote" + " & auto_match" + " & followed_by_double_quote", + ), + Binding( + match.delete_pair, + ["backspace"], + "focused_insert" + " & preceded_by_single_quote" + " & auto_match" + " & followed_by_single_quote", + ), +] + +AUTO_SUGGEST_BINDINGS = [ + # there are two reasons for re-defining bindings defined upstream: + # 1) prompt-toolkit does not execute autosuggestion bindings in vi mode, + # 2) prompt-toolkit checks if we are at the end of text, not end of line + # hence it does not work in multi-line mode of navigable provider + Binding( + auto_suggest.accept_or_jump_to_end, + ["end"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept_or_jump_to_end, + ["c-e"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept, + ["c-f"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept, + ["right"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept_word, + ["escape", "f"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept_token, + ["c-right"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.discard, + ["escape"], + # note this one is using `emacs_insert_mode`, not `emacs_like_insert_mode` + # as in `vi_insert_mode` we do not want `escape` to be shadowed (ever). + "has_suggestion & default_buffer_focused & emacs_insert_mode", + ), + Binding( + auto_suggest.discard, + ["delete"], + "has_suggestion & default_buffer_focused & emacs_insert_mode", + ), + Binding( + auto_suggest.swap_autosuggestion_up, + ["c-up"], + "navigable_suggestions" + " & ~has_line_above" + " & has_suggestion" + " & default_buffer_focused", + ), + Binding( + auto_suggest.swap_autosuggestion_down, + ["c-down"], + "navigable_suggestions" + " & ~has_line_below" + " & has_suggestion" + " & default_buffer_focused", + ), + Binding( + auto_suggest.up_and_update_hint, + ["c-up"], + "has_line_above & navigable_suggestions & default_buffer_focused", + ), + Binding( + auto_suggest.down_and_update_hint, + ["c-down"], + "has_line_below & navigable_suggestions & default_buffer_focused", + ), + Binding( + auto_suggest.accept_character, + ["escape", "right"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept_and_move_cursor_left, + ["c-left"], + "has_suggestion & default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.accept_and_keep_cursor, + ["escape", "down"], + "has_suggestion & default_buffer_focused & emacs_insert_mode", + ), + Binding( + auto_suggest.backspace_and_resume_hint, + ["backspace"], + # no `has_suggestion` here to allow resuming if no suggestion + "default_buffer_focused & emacs_like_insert_mode", + ), + Binding( + auto_suggest.resume_hinting, + ["right"], + "is_cursor_at_the_end_of_line" + " & default_buffer_focused" + " & emacs_like_insert_mode" + " & pass_through", + ), +] + + +SIMPLE_CONTROL_BINDINGS = [ + Binding(cmd, [key], "vi_insert_mode & default_buffer_focused & ebivim") + for key, cmd in { + "c-a": nc.beginning_of_line, + "c-b": nc.backward_char, + "c-k": nc.kill_line, + "c-w": nc.backward_kill_word, + "c-y": nc.yank, + "c-_": nc.undo, + }.items() +] + + +ALT_AND_COMOBO_CONTROL_BINDINGS = [ + Binding(cmd, list(keys), "vi_insert_mode & default_buffer_focused & ebivim") + for keys, cmd in { + # Control Combos + ("c-x", "c-e"): nc.edit_and_execute, + ("c-x", "e"): nc.edit_and_execute, + # Alt + ("escape", "b"): nc.backward_word, + ("escape", "c"): nc.capitalize_word, + ("escape", "d"): nc.kill_word, + ("escape", "h"): nc.backward_kill_word, + ("escape", "l"): nc.downcase_word, + ("escape", "u"): nc.uppercase_word, + ("escape", "y"): nc.yank_pop, + ("escape", "."): nc.yank_last_arg, + }.items() +] + + +def add_binding(bindings: KeyBindings, binding: Binding): + bindings.add( + *binding.keys, + **({"filter": binding.filter} if binding.filter is not None else {}), + )(binding.command) + + +def create_ipython_shortcuts(shell, skip=None) -> KeyBindings: + """Set up the prompt_toolkit keyboard shortcuts for IPython. + + Parameters + ---------- + shell: InteractiveShell + The current IPython shell Instance + skip: List[Binding] + Bindings to skip. + + Returns + ------- + KeyBindings + the keybinding instance for prompt toolkit. + + """ + kb = KeyBindings() + skip = skip or [] + for binding in KEY_BINDINGS: + skip_this_one = False + for to_skip in skip: + if ( + to_skip.command == binding.command + and to_skip.filter == binding.filter + and to_skip.keys == binding.keys + ): + skip_this_one = True + break + if skip_this_one: + continue + add_binding(kb, binding) + + def get_input_mode(self): + app = get_app() + app.ttimeoutlen = shell.ttimeoutlen + app.timeoutlen = shell.timeoutlen + + return self._input_mode + + def set_input_mode(self, mode): + shape = {InputMode.NAVIGATION: 2, InputMode.REPLACE: 4}.get(mode, 6) + cursor = "\x1b[{} q".format(shape) + + sys.stdout.write(cursor) + sys.stdout.flush() + + self._input_mode = mode + + if shell.editing_mode == "vi" and shell.modal_cursor: + ViState._input_mode = InputMode.INSERT # type: ignore + ViState.input_mode = property(get_input_mode, set_input_mode) # type: ignore + + return kb + + +def reformat_and_execute(event): + """Reformat code and execute it""" + shell = get_ipython() + reformat_text_before_cursor( + event.current_buffer, event.current_buffer.document, shell + ) + event.current_buffer.validate_and_handle() + + +def reformat_text_before_cursor(buffer, document, shell): + text = buffer.delete_before_cursor(len(document.text[: document.cursor_position])) + try: + formatted_text = shell.reformat_handler(text) + buffer.insert_text(formatted_text) + except Exception as e: + buffer.insert_text(text) + + +def handle_return_or_newline_or_execute(event): + shell = get_ipython() + if getattr(shell, "handle_return", None): + return shell.handle_return(shell)(event) + else: + return newline_or_execute_outer(shell)(event) + + +def newline_or_execute_outer(shell): + def newline_or_execute(event): + """When the user presses return, insert a newline or execute the code.""" + b = event.current_buffer + d = b.document + + if b.complete_state: + cc = b.complete_state.current_completion + if cc: + b.apply_completion(cc) + else: + b.cancel_completion() + return + + # If there's only one line, treat it as if the cursor is at the end. + # See https://github.com/ipython/ipython/issues/10425 + if d.line_count == 1: + check_text = d.text + else: + check_text = d.text[: d.cursor_position] + status, indent = shell.check_complete(check_text) + + # if all we have after the cursor is whitespace: reformat current text + # before cursor + after_cursor = d.text[d.cursor_position :] + reformatted = False + if not after_cursor.strip(): + reformat_text_before_cursor(b, d, shell) + reformatted = True + if not ( + d.on_last_line + or d.cursor_position_row >= d.line_count - d.empty_line_count_at_the_end() + ): + if shell.autoindent: + b.insert_text("\n" + indent) + else: + b.insert_text("\n") + return + + if (status != "incomplete") and b.accept_handler: + if not reformatted: + reformat_text_before_cursor(b, d, shell) + b.validate_and_handle() + else: + if shell.autoindent: + b.insert_text("\n" + indent) + else: + b.insert_text("\n") + + return newline_or_execute + + +def previous_history_or_previous_completion(event): + """ + Control-P in vi edit mode on readline is history next, unlike default prompt toolkit. + + If completer is open this still select previous completion. + """ + event.current_buffer.auto_up() + + +def next_history_or_next_completion(event): + """ + Control-N in vi edit mode on readline is history previous, unlike default prompt toolkit. + + If completer is open this still select next completion. + """ + event.current_buffer.auto_down() + + +def dismiss_completion(event): + """Dismiss completion""" + b = event.current_buffer + if b.complete_state: + b.cancel_completion() + + +def reset_buffer(event): + """Reset buffer""" + b = event.current_buffer + if b.complete_state: + b.cancel_completion() + else: + b.reset() + + +def reset_search_buffer(event): + """Reset search buffer""" + if event.current_buffer.document.text: + event.current_buffer.reset() + else: + event.app.layout.focus(DEFAULT_BUFFER) + + +def suspend_to_bg(event): + """Suspend to background""" + event.app.suspend_to_background() + + +def quit(event): + """ + Quit application with ``SIGQUIT`` if supported or ``sys.exit`` otherwise. + + On platforms that support SIGQUIT, send SIGQUIT to the current process. + On other platforms, just exit the process with a message. + """ + sigquit = getattr(signal, "SIGQUIT", None) + if sigquit is not None: + os.kill(0, signal.SIGQUIT) + else: + sys.exit("Quit") + + +def indent_buffer(event): + """Indent buffer""" + event.current_buffer.insert_text(" " * 4) + + +def newline_autoindent(event): + """Insert a newline after the cursor indented appropriately. + + Fancier version of former ``newline_with_copy_margin`` which should + compute the correct indentation of the inserted line. That is to say, indent + by 4 extra space after a function definition, class definition, context + manager... And dedent by 4 space after ``pass``, ``return``, ``raise ...``. + """ + shell = get_ipython() + inputsplitter = shell.input_transformer_manager + b = event.current_buffer + d = b.document + + if b.complete_state: + b.cancel_completion() + text = d.text[: d.cursor_position] + "\n" + _, indent = inputsplitter.check_complete(text) + b.insert_text("\n" + (" " * (indent or 0)), move_cursor=False) + + +def open_input_in_editor(event): + """Open code from input in external editor""" + event.app.current_buffer.open_in_editor() + + +if sys.platform == "win32": + from IPython.core.error import TryNext + from IPython.lib.clipboard import ( + ClipboardEmpty, + tkinter_clipboard_get, + win32_clipboard_get, + ) + + @undoc + def win_paste(event): + try: + text = win32_clipboard_get() + except TryNext: + try: + text = tkinter_clipboard_get() + except (TryNext, ClipboardEmpty): + return + except ClipboardEmpty: + return + event.current_buffer.insert_text(text.replace("\t", " " * 4)) + +else: + + @undoc + def win_paste(event): + """Stub used on other platforms""" + pass + + +KEY_BINDINGS = [ + Binding( + handle_return_or_newline_or_execute, + ["enter"], + "default_buffer_focused & ~has_selection & insert_mode", + ), + Binding( + reformat_and_execute, + ["escape", "enter"], + "default_buffer_focused & ~has_selection & insert_mode & ebivim", + ), + Binding(quit, ["c-\\"]), + Binding( + previous_history_or_previous_completion, + ["c-p"], + "vi_insert_mode & default_buffer_focused", + ), + Binding( + next_history_or_next_completion, + ["c-n"], + "vi_insert_mode & default_buffer_focused", + ), + Binding(dismiss_completion, ["c-g"], "default_buffer_focused & has_completions"), + Binding(reset_buffer, ["c-c"], "default_buffer_focused"), + Binding(reset_search_buffer, ["c-c"], "search_buffer_focused"), + Binding(suspend_to_bg, ["c-z"], "supports_suspend"), + Binding( + indent_buffer, + ["tab"], # Ctrl+I == Tab + "default_buffer_focused" + " & ~has_selection" + " & insert_mode" + " & cursor_in_leading_ws", + ), + Binding(newline_autoindent, ["c-o"], "default_buffer_focused & emacs_insert_mode"), + Binding(open_input_in_editor, ["f2"], "default_buffer_focused"), + *AUTO_MATCH_BINDINGS, + *AUTO_SUGGEST_BINDINGS, + Binding( + display_completions_like_readline, + ["c-i"], + "readline_like_completions" + " & default_buffer_focused" + " & ~has_selection" + " & insert_mode" + " & ~cursor_in_leading_ws", + ), + Binding(win_paste, ["c-v"], "default_buffer_focused & ~vi_mode & is_windows_os"), + *SIMPLE_CONTROL_BINDINGS, + *ALT_AND_COMOBO_CONTROL_BINDINGS, +] diff --git a/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_match.py b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_match.py new file mode 100644 index 0000000000..6c2b1ef70c --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_match.py @@ -0,0 +1,104 @@ +""" +Utilities function for keybinding with prompt toolkit. + +This will be bound to specific key press and filter modes, +like whether we are in edit mode, and whether the completer is open. +""" +import re +from prompt_toolkit.key_binding import KeyPressEvent + + +def parenthesis(event: KeyPressEvent): + """Auto-close parenthesis""" + event.current_buffer.insert_text("()") + event.current_buffer.cursor_left() + + +def brackets(event: KeyPressEvent): + """Auto-close brackets""" + event.current_buffer.insert_text("[]") + event.current_buffer.cursor_left() + + +def braces(event: KeyPressEvent): + """Auto-close braces""" + event.current_buffer.insert_text("{}") + event.current_buffer.cursor_left() + + +def double_quote(event: KeyPressEvent): + """Auto-close double quotes""" + event.current_buffer.insert_text('""') + event.current_buffer.cursor_left() + + +def single_quote(event: KeyPressEvent): + """Auto-close single quotes""" + event.current_buffer.insert_text("''") + event.current_buffer.cursor_left() + + +def docstring_double_quotes(event: KeyPressEvent): + """Auto-close docstring (double quotes)""" + event.current_buffer.insert_text('""""') + event.current_buffer.cursor_left(3) + + +def docstring_single_quotes(event: KeyPressEvent): + """Auto-close docstring (single quotes)""" + event.current_buffer.insert_text("''''") + event.current_buffer.cursor_left(3) + + +def raw_string_parenthesis(event: KeyPressEvent): + """Auto-close parenthesis in raw strings""" + matches = re.match( + r".*(r|R)[\"'](-*)", + event.current_buffer.document.current_line_before_cursor, + ) + dashes = matches.group(2) if matches else "" + event.current_buffer.insert_text("()" + dashes) + event.current_buffer.cursor_left(len(dashes) + 1) + + +def raw_string_bracket(event: KeyPressEvent): + """Auto-close bracker in raw strings""" + matches = re.match( + r".*(r|R)[\"'](-*)", + event.current_buffer.document.current_line_before_cursor, + ) + dashes = matches.group(2) if matches else "" + event.current_buffer.insert_text("[]" + dashes) + event.current_buffer.cursor_left(len(dashes) + 1) + + +def raw_string_braces(event: KeyPressEvent): + """Auto-close braces in raw strings""" + matches = re.match( + r".*(r|R)[\"'](-*)", + event.current_buffer.document.current_line_before_cursor, + ) + dashes = matches.group(2) if matches else "" + event.current_buffer.insert_text("{}" + dashes) + event.current_buffer.cursor_left(len(dashes) + 1) + + +def skip_over(event: KeyPressEvent): + """Skip over automatically added parenthesis/quote. + + (rather than adding another parenthesis/quote)""" + event.current_buffer.cursor_right() + + +def delete_pair(event: KeyPressEvent): + """Delete auto-closed parenthesis""" + event.current_buffer.delete() + event.current_buffer.delete_before_cursor() + + +auto_match_parens = {"(": parenthesis, "[": brackets, "{": braces} +auto_match_parens_raw_string = { + "(": raw_string_parenthesis, + "[": raw_string_bracket, + "{": raw_string_braces, +} diff --git a/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py new file mode 100644 index 0000000000..65f91577ce --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/shortcuts/auto_suggest.py @@ -0,0 +1,401 @@ +import re +import tokenize +from io import StringIO +from typing import Callable, List, Optional, Union, Generator, Tuple +import warnings + +from prompt_toolkit.buffer import Buffer +from prompt_toolkit.key_binding import KeyPressEvent +from prompt_toolkit.key_binding.bindings import named_commands as nc +from prompt_toolkit.auto_suggest import AutoSuggestFromHistory, Suggestion +from prompt_toolkit.document import Document +from prompt_toolkit.history import History +from prompt_toolkit.shortcuts import PromptSession +from prompt_toolkit.layout.processors import ( + Processor, + Transformation, + TransformationInput, +) + +from IPython.core.getipython import get_ipython +from IPython.utils.tokenutil import generate_tokens + +from .filters import pass_through + + +def _get_query(document: Document): + return document.lines[document.cursor_position_row] + + +class AppendAutoSuggestionInAnyLine(Processor): + """ + Append the auto suggestion to lines other than the last (appending to the + last line is natively supported by the prompt toolkit). + """ + + def __init__(self, style: str = "class:auto-suggestion") -> None: + self.style = style + + def apply_transformation(self, ti: TransformationInput) -> Transformation: + is_last_line = ti.lineno == ti.document.line_count - 1 + is_active_line = ti.lineno == ti.document.cursor_position_row + + if not is_last_line and is_active_line: + buffer = ti.buffer_control.buffer + + if buffer.suggestion and ti.document.is_cursor_at_the_end_of_line: + suggestion = buffer.suggestion.text + else: + suggestion = "" + + return Transformation(fragments=ti.fragments + [(self.style, suggestion)]) + else: + return Transformation(fragments=ti.fragments) + + +class NavigableAutoSuggestFromHistory(AutoSuggestFromHistory): + """ + A subclass of AutoSuggestFromHistory that allow navigation to next/previous + suggestion from history. To do so it remembers the current position, but it + state need to carefully be cleared on the right events. + """ + + def __init__( + self, + ): + self.skip_lines = 0 + self._connected_apps = [] + + def reset_history_position(self, _: Buffer): + self.skip_lines = 0 + + def disconnect(self): + for pt_app in self._connected_apps: + text_insert_event = pt_app.default_buffer.on_text_insert + text_insert_event.remove_handler(self.reset_history_position) + + def connect(self, pt_app: PromptSession): + self._connected_apps.append(pt_app) + # note: `on_text_changed` could be used for a bit different behaviour + # on character deletion (i.e. reseting history position on backspace) + pt_app.default_buffer.on_text_insert.add_handler(self.reset_history_position) + pt_app.default_buffer.on_cursor_position_changed.add_handler(self._dismiss) + + def get_suggestion( + self, buffer: Buffer, document: Document + ) -> Optional[Suggestion]: + text = _get_query(document) + + if text.strip(): + for suggestion, _ in self._find_next_match( + text, self.skip_lines, buffer.history + ): + return Suggestion(suggestion) + + return None + + def _dismiss(self, buffer, *args, **kwargs): + buffer.suggestion = None + + def _find_match( + self, text: str, skip_lines: float, history: History, previous: bool + ) -> Generator[Tuple[str, float], None, None]: + """ + text : str + Text content to find a match for, the user cursor is most of the + time at the end of this text. + skip_lines : float + number of items to skip in the search, this is used to indicate how + far in the list the user has navigated by pressing up or down. + The float type is used as the base value is +inf + history : History + prompt_toolkit History instance to fetch previous entries from. + previous : bool + Direction of the search, whether we are looking previous match + (True), or next match (False). + + Yields + ------ + Tuple with: + str: + current suggestion. + float: + will actually yield only ints, which is passed back via skip_lines, + which may be a +inf (float) + + + """ + line_number = -1 + for string in reversed(list(history.get_strings())): + for line in reversed(string.splitlines()): + line_number += 1 + if not previous and line_number < skip_lines: + continue + # do not return empty suggestions as these + # close the auto-suggestion overlay (and are useless) + if line.startswith(text) and len(line) > len(text): + yield line[len(text) :], line_number + if previous and line_number >= skip_lines: + return + + def _find_next_match( + self, text: str, skip_lines: float, history: History + ) -> Generator[Tuple[str, float], None, None]: + return self._find_match(text, skip_lines, history, previous=False) + + def _find_previous_match(self, text: str, skip_lines: float, history: History): + return reversed( + list(self._find_match(text, skip_lines, history, previous=True)) + ) + + def up(self, query: str, other_than: str, history: History) -> None: + for suggestion, line_number in self._find_next_match( + query, self.skip_lines, history + ): + # if user has history ['very.a', 'very', 'very.b'] and typed 'very' + # we want to switch from 'very.b' to 'very.a' because a) if the + # suggestion equals current text, prompt-toolkit aborts suggesting + # b) user likely would not be interested in 'very' anyways (they + # already typed it). + if query + suggestion != other_than: + self.skip_lines = line_number + break + else: + # no matches found, cycle back to beginning + self.skip_lines = 0 + + def down(self, query: str, other_than: str, history: History) -> None: + for suggestion, line_number in self._find_previous_match( + query, self.skip_lines, history + ): + if query + suggestion != other_than: + self.skip_lines = line_number + break + else: + # no matches found, cycle to end + for suggestion, line_number in self._find_previous_match( + query, float("Inf"), history + ): + if query + suggestion != other_than: + self.skip_lines = line_number + break + + +def accept_or_jump_to_end(event: KeyPressEvent): + """Apply autosuggestion or jump to end of line.""" + buffer = event.current_buffer + d = buffer.document + after_cursor = d.text[d.cursor_position :] + lines = after_cursor.split("\n") + end_of_current_line = lines[0].strip() + suggestion = buffer.suggestion + if (suggestion is not None) and (suggestion.text) and (end_of_current_line == ""): + buffer.insert_text(suggestion.text) + else: + nc.end_of_line(event) + + +def _deprected_accept_in_vi_insert_mode(event: KeyPressEvent): + """Accept autosuggestion or jump to end of line. + + .. deprecated:: 8.12 + Use `accept_or_jump_to_end` instead. + """ + return accept_or_jump_to_end(event) + + +def accept(event: KeyPressEvent): + """Accept autosuggestion""" + buffer = event.current_buffer + suggestion = buffer.suggestion + if suggestion: + buffer.insert_text(suggestion.text) + else: + nc.forward_char(event) + + +def discard(event: KeyPressEvent): + """Discard autosuggestion""" + buffer = event.current_buffer + buffer.suggestion = None + + +def accept_word(event: KeyPressEvent): + """Fill partial autosuggestion by word""" + buffer = event.current_buffer + suggestion = buffer.suggestion + if suggestion: + t = re.split(r"(\S+\s+)", suggestion.text) + buffer.insert_text(next((x for x in t if x), "")) + else: + nc.forward_word(event) + + +def accept_character(event: KeyPressEvent): + """Fill partial autosuggestion by character""" + b = event.current_buffer + suggestion = b.suggestion + if suggestion and suggestion.text: + b.insert_text(suggestion.text[0]) + + +def accept_and_keep_cursor(event: KeyPressEvent): + """Accept autosuggestion and keep cursor in place""" + buffer = event.current_buffer + old_position = buffer.cursor_position + suggestion = buffer.suggestion + if suggestion: + buffer.insert_text(suggestion.text) + buffer.cursor_position = old_position + + +def accept_and_move_cursor_left(event: KeyPressEvent): + """Accept autosuggestion and move cursor left in place""" + accept_and_keep_cursor(event) + nc.backward_char(event) + + +def _update_hint(buffer: Buffer): + if buffer.auto_suggest: + suggestion = buffer.auto_suggest.get_suggestion(buffer, buffer.document) + buffer.suggestion = suggestion + + +def backspace_and_resume_hint(event: KeyPressEvent): + """Resume autosuggestions after deleting last character""" + nc.backward_delete_char(event) + _update_hint(event.current_buffer) + + +def resume_hinting(event: KeyPressEvent): + """Resume autosuggestions""" + pass_through.reply(event) + # Order matters: if update happened first and event reply second, the + # suggestion would be auto-accepted if both actions are bound to same key. + _update_hint(event.current_buffer) + + +def up_and_update_hint(event: KeyPressEvent): + """Go up and update hint""" + current_buffer = event.current_buffer + + current_buffer.auto_up(count=event.arg) + _update_hint(current_buffer) + + +def down_and_update_hint(event: KeyPressEvent): + """Go down and update hint""" + current_buffer = event.current_buffer + + current_buffer.auto_down(count=event.arg) + _update_hint(current_buffer) + + +def accept_token(event: KeyPressEvent): + """Fill partial autosuggestion by token""" + b = event.current_buffer + suggestion = b.suggestion + + if suggestion: + prefix = _get_query(b.document) + text = prefix + suggestion.text + + tokens: List[Optional[str]] = [None, None, None] + substrings = [""] + i = 0 + + for token in generate_tokens(StringIO(text).readline): + if token.type == tokenize.NEWLINE: + index = len(text) + else: + index = text.index(token[1], len(substrings[-1])) + substrings.append(text[:index]) + tokenized_so_far = substrings[-1] + if tokenized_so_far.startswith(prefix): + if i == 0 and len(tokenized_so_far) > len(prefix): + tokens[0] = tokenized_so_far[len(prefix) :] + substrings.append(tokenized_so_far) + i += 1 + tokens[i] = token[1] + if i == 2: + break + i += 1 + + if tokens[0]: + to_insert: str + insert_text = substrings[-2] + if tokens[1] and len(tokens[1]) == 1: + insert_text = substrings[-1] + to_insert = insert_text[len(prefix) :] + b.insert_text(to_insert) + return + + nc.forward_word(event) + + +Provider = Union[AutoSuggestFromHistory, NavigableAutoSuggestFromHistory, None] + + +def _swap_autosuggestion( + buffer: Buffer, + provider: NavigableAutoSuggestFromHistory, + direction_method: Callable, +): + """ + We skip most recent history entry (in either direction) if it equals the + current autosuggestion because if user cycles when auto-suggestion is shown + they most likely want something else than what was suggested (otherwise + they would have accepted the suggestion). + """ + suggestion = buffer.suggestion + if not suggestion: + return + + query = _get_query(buffer.document) + current = query + suggestion.text + + direction_method(query=query, other_than=current, history=buffer.history) + + new_suggestion = provider.get_suggestion(buffer, buffer.document) + buffer.suggestion = new_suggestion + + +def swap_autosuggestion_up(event: KeyPressEvent): + """Get next autosuggestion from history.""" + shell = get_ipython() + provider = shell.auto_suggest + + if not isinstance(provider, NavigableAutoSuggestFromHistory): + return + + return _swap_autosuggestion( + buffer=event.current_buffer, provider=provider, direction_method=provider.up + ) + + +def swap_autosuggestion_down(event: KeyPressEvent): + """Get previous autosuggestion from history.""" + shell = get_ipython() + provider = shell.auto_suggest + + if not isinstance(provider, NavigableAutoSuggestFromHistory): + return + + return _swap_autosuggestion( + buffer=event.current_buffer, + provider=provider, + direction_method=provider.down, + ) + + +def __getattr__(key): + if key == "accept_in_vi_insert_mode": + warnings.warn( + "`accept_in_vi_insert_mode` is deprecated since IPython 8.12 and " + "renamed to `accept_or_jump_to_end`. Please update your configuration " + "accordingly", + DeprecationWarning, + stacklevel=2, + ) + return _deprected_accept_in_vi_insert_mode + raise AttributeError diff --git a/contrib/python/ipython/py3/IPython/terminal/shortcuts/filters.py b/contrib/python/ipython/py3/IPython/terminal/shortcuts/filters.py new file mode 100644 index 0000000000..7c9d6a9c41 --- /dev/null +++ b/contrib/python/ipython/py3/IPython/terminal/shortcuts/filters.py @@ -0,0 +1,322 @@ +""" +Filters restricting scope of IPython Terminal shortcuts. +""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +import ast +import re +import signal +import sys +from typing import Callable, Dict, Union + +from prompt_toolkit.application.current import get_app +from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER +from prompt_toolkit.key_binding import KeyPressEvent +from prompt_toolkit.filters import Condition, Filter, emacs_insert_mode, has_completions +from prompt_toolkit.filters import has_focus as has_focus_impl +from prompt_toolkit.filters import ( + Always, + Never, + has_selection, + has_suggestion, + vi_insert_mode, + vi_mode, +) +from prompt_toolkit.layout.layout import FocusableElement + +from IPython.core.getipython import get_ipython +from IPython.core.guarded_eval import _find_dunder, BINARY_OP_DUNDERS, UNARY_OP_DUNDERS +from IPython.terminal.shortcuts import auto_suggest +from IPython.utils.decorators import undoc + + +@undoc +@Condition +def cursor_in_leading_ws(): + before = get_app().current_buffer.document.current_line_before_cursor + return (not before) or before.isspace() + + +def has_focus(value: FocusableElement): + """Wrapper around has_focus adding a nice `__name__` to tester function""" + tester = has_focus_impl(value).func + tester.__name__ = f"is_focused({value})" + return Condition(tester) + + +@undoc +@Condition +def has_line_below() -> bool: + document = get_app().current_buffer.document + return document.cursor_position_row < len(document.lines) - 1 + + +@undoc +@Condition +def is_cursor_at_the_end_of_line() -> bool: + document = get_app().current_buffer.document + return document.is_cursor_at_the_end_of_line + + +@undoc +@Condition +def has_line_above() -> bool: + document = get_app().current_buffer.document + return document.cursor_position_row != 0 + + +@Condition +def ebivim(): + shell = get_ipython() + return shell.emacs_bindings_in_vi_insert_mode + + +@Condition +def supports_suspend(): + return hasattr(signal, "SIGTSTP") + + +@Condition +def auto_match(): + shell = get_ipython() + return shell.auto_match + + +def all_quotes_paired(quote, buf): + paired = True + i = 0 + while i < len(buf): + c = buf[i] + if c == quote: + paired = not paired + elif c == "\\": + i += 1 + i += 1 + return paired + + +_preceding_text_cache: Dict[Union[str, Callable], Condition] = {} +_following_text_cache: Dict[Union[str, Callable], Condition] = {} + + +def preceding_text(pattern: Union[str, Callable]): + if pattern in _preceding_text_cache: + return _preceding_text_cache[pattern] + + if callable(pattern): + + def _preceding_text(): + app = get_app() + before_cursor = app.current_buffer.document.current_line_before_cursor + # mypy can't infer if(callable): https://github.com/python/mypy/issues/3603 + return bool(pattern(before_cursor)) # type: ignore[operator] + + else: + m = re.compile(pattern) + + def _preceding_text(): + app = get_app() + before_cursor = app.current_buffer.document.current_line_before_cursor + return bool(m.match(before_cursor)) + + _preceding_text.__name__ = f"preceding_text({pattern!r})" + + condition = Condition(_preceding_text) + _preceding_text_cache[pattern] = condition + return condition + + +def following_text(pattern): + try: + return _following_text_cache[pattern] + except KeyError: + pass + m = re.compile(pattern) + + def _following_text(): + app = get_app() + return bool(m.match(app.current_buffer.document.current_line_after_cursor)) + + _following_text.__name__ = f"following_text({pattern!r})" + + condition = Condition(_following_text) + _following_text_cache[pattern] = condition + return condition + + +@Condition +def not_inside_unclosed_string(): + app = get_app() + s = app.current_buffer.document.text_before_cursor + # remove escaped quotes + s = s.replace('\\"', "").replace("\\'", "") + # remove triple-quoted string literals + s = re.sub(r"(?:\"\"\"[\s\S]*\"\"\"|'''[\s\S]*''')", "", s) + # remove single-quoted string literals + s = re.sub(r"""(?:"[^"]*["\n]|'[^']*['\n])""", "", s) + return not ('"' in s or "'" in s) + + +@Condition +def navigable_suggestions(): + shell = get_ipython() + return isinstance(shell.auto_suggest, auto_suggest.NavigableAutoSuggestFromHistory) + + +@Condition +def readline_like_completions(): + shell = get_ipython() + return shell.display_completions == "readlinelike" + + +@Condition +def is_windows_os(): + return sys.platform == "win32" + + +class PassThrough(Filter): + """A filter allowing to implement pass-through behaviour of keybindings. + + Prompt toolkit key processor dispatches only one event per binding match, + which means that adding a new shortcut will suppress the old shortcut + if the keybindings are the same (unless one is filtered out). + + To stop a shortcut binding from suppressing other shortcuts: + - add the `pass_through` filter to list of filter, and + - call `pass_through.reply(event)` in the shortcut handler. + """ + + def __init__(self): + self._is_replying = False + + def reply(self, event: KeyPressEvent): + self._is_replying = True + try: + event.key_processor.reset() + event.key_processor.feed_multiple(event.key_sequence) + event.key_processor.process_keys() + finally: + self._is_replying = False + + def __call__(self): + return not self._is_replying + + +pass_through = PassThrough() + +# these one is callable and re-used multiple times hence needs to be +# only defined once beforhand so that transforming back to human-readable +# names works well in the documentation. +default_buffer_focused = has_focus(DEFAULT_BUFFER) + +KEYBINDING_FILTERS = { + "always": Always(), + # never is used for exposing commands which have no default keybindings + "never": Never(), + "has_line_below": has_line_below, + "has_line_above": has_line_above, + "is_cursor_at_the_end_of_line": is_cursor_at_the_end_of_line, + "has_selection": has_selection, + "has_suggestion": has_suggestion, + "vi_mode": vi_mode, + "vi_insert_mode": vi_insert_mode, + "emacs_insert_mode": emacs_insert_mode, + # https://github.com/ipython/ipython/pull/12603 argued for inclusion of + # emacs key bindings with a configurable `emacs_bindings_in_vi_insert_mode` + # toggle; when the toggle is on user can access keybindigns like `ctrl + e` + # in vi insert mode. Because some of the emacs bindings involve `escape` + # followed by another key, e.g. `escape` followed by `f`, prompt-toolkit + # needs to wait to see if there will be another character typed in before + # executing pure `escape` keybinding; in vi insert mode `escape` switches to + # command mode which is common and performance critical action for vi users. + # To avoid the delay users employ a workaround: + # https://github.com/ipython/ipython/issues/13443#issuecomment-1032753703 + # which involves switching `emacs_bindings_in_vi_insert_mode` off. + # + # For the workaround to work: + # 1) end users need to toggle `emacs_bindings_in_vi_insert_mode` off + # 2) all keybindings which would involve `escape` need to respect that + # toggle by including either: + # - `vi_insert_mode & ebivim` for actions which have emacs keybindings + # predefined upstream in prompt-toolkit, or + # - `emacs_like_insert_mode` for actions which do not have existing + # emacs keybindings predefined upstream (or need overriding of the + # upstream bindings to modify behaviour), defined below. + "emacs_like_insert_mode": (vi_insert_mode & ebivim) | emacs_insert_mode, + "has_completions": has_completions, + "insert_mode": vi_insert_mode | emacs_insert_mode, + "default_buffer_focused": default_buffer_focused, + "search_buffer_focused": has_focus(SEARCH_BUFFER), + # `ebivim` stands for emacs bindings in vi insert mode + "ebivim": ebivim, + "supports_suspend": supports_suspend, + "is_windows_os": is_windows_os, + "auto_match": auto_match, + "focused_insert": (vi_insert_mode | emacs_insert_mode) & default_buffer_focused, + "not_inside_unclosed_string": not_inside_unclosed_string, + "readline_like_completions": readline_like_completions, + "preceded_by_paired_double_quotes": preceding_text( + lambda line: all_quotes_paired('"', line) + ), + "preceded_by_paired_single_quotes": preceding_text( + lambda line: all_quotes_paired("'", line) + ), + "preceded_by_raw_str_prefix": preceding_text(r".*(r|R)[\"'](-*)$"), + "preceded_by_two_double_quotes": preceding_text(r'^.*""$'), + "preceded_by_two_single_quotes": preceding_text(r"^.*''$"), + "followed_by_closing_paren_or_end": following_text(r"[,)}\]]|$"), + "preceded_by_opening_round_paren": preceding_text(r".*\($"), + "preceded_by_opening_bracket": preceding_text(r".*\[$"), + "preceded_by_opening_brace": preceding_text(r".*\{$"), + "preceded_by_double_quote": preceding_text('.*"$'), + "preceded_by_single_quote": preceding_text(r".*'$"), + "followed_by_closing_round_paren": following_text(r"^\)"), + "followed_by_closing_bracket": following_text(r"^\]"), + "followed_by_closing_brace": following_text(r"^\}"), + "followed_by_double_quote": following_text('^"'), + "followed_by_single_quote": following_text("^'"), + "navigable_suggestions": navigable_suggestions, + "cursor_in_leading_ws": cursor_in_leading_ws, + "pass_through": pass_through, +} + + +def eval_node(node: Union[ast.AST, None]): + if node is None: + return None + if isinstance(node, ast.Expression): + return eval_node(node.body) + if isinstance(node, ast.BinOp): + left = eval_node(node.left) + right = eval_node(node.right) + dunders = _find_dunder(node.op, BINARY_OP_DUNDERS) + if dunders: + return getattr(left, dunders[0])(right) + raise ValueError(f"Unknown binary operation: {node.op}") + if isinstance(node, ast.UnaryOp): + value = eval_node(node.operand) + dunders = _find_dunder(node.op, UNARY_OP_DUNDERS) + if dunders: + return getattr(value, dunders[0])() + raise ValueError(f"Unknown unary operation: {node.op}") + if isinstance(node, ast.Name): + if node.id in KEYBINDING_FILTERS: + return KEYBINDING_FILTERS[node.id] + else: + sep = "\n - " + known_filters = sep.join(sorted(KEYBINDING_FILTERS)) + raise NameError( + f"{node.id} is not a known shortcut filter." + f" Known filters are: {sep}{known_filters}." + ) + raise ValueError("Unhandled node", ast.dump(node)) + + +def filter_from_string(code: str): + expression = ast.parse(code, mode="eval") + return eval_node(expression) + + +__all__ = ["KEYBINDING_FILTERS", "filter_from_string"] |