diff options
Diffstat (limited to 'contrib/tools/python3/Lib/_pyrepl')
24 files changed, 6136 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/_pyrepl/__init__.py b/contrib/tools/python3/Lib/_pyrepl/__init__.py new file mode 100644 index 00000000000..1693cbd0b98 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]> +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/contrib/tools/python3/Lib/_pyrepl/__main__.py b/contrib/tools/python3/Lib/_pyrepl/__main__.py new file mode 100644 index 00000000000..9c66812e13a --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/__main__.py @@ -0,0 +1,10 @@ +# Important: don't add things to this module, as they will end up in the REPL's +# default globals. Use _pyrepl.main instead. + +# Avoid caching this file by linecache and incorrectly report tracebacks. +# See https://github.com/python/cpython/issues/129098. +__spec__ = __loader__ = None + +if __name__ == "__main__": + from .main import interactive_console as __pyrepl_interactive_console + __pyrepl_interactive_console() diff --git a/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py b/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py new file mode 100644 index 00000000000..82f5e8650a2 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py @@ -0,0 +1,74 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +import traceback + + +TYPE_CHECKING = False +if TYPE_CHECKING: + from threading import Thread + from types import TracebackType + from typing import Protocol + + class ExceptHookArgs(Protocol): + @property + def exc_type(self) -> type[BaseException]: ... + @property + def exc_value(self) -> BaseException | None: ... + @property + def exc_traceback(self) -> TracebackType | None: ... + @property + def thread(self) -> Thread | None: ... + + class ShowExceptions(Protocol): + def __call__(self) -> int: ... + def add(self, s: str) -> None: ... + + from .reader import Reader + + +def install_threading_hook(reader: Reader) -> None: + import threading + + @dataclass + class ExceptHookHandler: + lock: threading.Lock = field(default_factory=threading.Lock) + messages: list[str] = field(default_factory=list) + + def show(self) -> int: + count = 0 + with self.lock: + if not self.messages: + return 0 + reader.restore() + for tb in self.messages: + count += 1 + if tb: + print(tb) + self.messages.clear() + reader.scheduled_commands.append("ctrl-c") + reader.prepare() + return count + + def add(self, s: str) -> None: + with self.lock: + self.messages.append(s) + + def exception(self, args: ExceptHookArgs) -> None: + lines = traceback.format_exception( + args.exc_type, + args.exc_value, + args.exc_traceback, + colorize=reader.can_colorize, + ) # type: ignore[call-overload] + pre = f"\nException in {args.thread.name}:\n" if args.thread else "\n" + tb = pre + "".join(lines) + self.add(tb) + + def __call__(self) -> int: + return self.show() + + + handler = ExceptHookHandler() + reader.threading_hook = handler + threading.excepthook = handler.exception diff --git a/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py new file mode 100644 index 00000000000..0589a0f437e --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py @@ -0,0 +1,110 @@ +# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]> +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +OS-independent base for an event and VT sequence scanner + +See unix_eventqueue and windows_eventqueue for subclasses. +""" + +from collections import deque + +from . import keymap +from .console import Event +from .trace import trace + +class BaseEventQueue: + def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None: + self.compiled_keymap = keymap.compile_keymap(keymap_dict) + self.keymap = self.compiled_keymap + trace("keymap {k!r}", k=self.keymap) + self.encoding = encoding + self.events: deque[Event] = deque() + self.buf = bytearray() + + def get(self) -> Event | None: + """ + Retrieves the next event from the queue. + """ + if self.events: + return self.events.popleft() + else: + return None + + def empty(self) -> bool: + """ + Checks if the queue is empty. + """ + return not self.events + + def flush_buf(self) -> bytearray: + """ + Flushes the buffer and returns its contents. + """ + old = self.buf + self.buf = bytearray() + return old + + def insert(self, event: Event) -> None: + """ + Inserts an event into the queue. + """ + trace('added event {event}', event=event) + self.events.append(event) + + def push(self, char: int | bytes) -> None: + """ + Processes a character by updating the buffer and handling special key mappings. + """ + assert isinstance(char, (int, bytes)) + ord_char = char if isinstance(char, int) else ord(char) + char = ord_char.to_bytes() + self.buf.append(ord_char) + + if char in self.keymap: + if self.keymap is self.compiled_keymap: + # sanity check, buffer is empty when a special key comes + assert len(self.buf) == 1 + k = self.keymap[char] + trace('found map {k!r}', k=k) + if isinstance(k, dict): + self.keymap = k + else: + self.insert(Event('key', k, bytes(self.flush_buf()))) + self.keymap = self.compiled_keymap + + elif self.buf and self.buf[0] == 27: # escape + # escape sequence not recognized by our keymap: propagate it + # outside so that i can be recognized as an M-... key (see also + # the docstring in keymap.py + trace('unrecognized escape sequence, propagating...') + self.keymap = self.compiled_keymap + self.insert(Event('key', '\033', b'\033')) + for _c in self.flush_buf()[1:]: + self.push(_c) + + else: + try: + decoded = bytes(self.buf).decode(self.encoding) + except UnicodeError: + return + else: + self.insert(Event('key', decoded, bytes(self.flush_buf()))) + self.keymap = self.compiled_keymap diff --git a/contrib/tools/python3/Lib/_pyrepl/commands.py b/contrib/tools/python3/Lib/_pyrepl/commands.py new file mode 100644 index 00000000000..f3bd6ead46c --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/commands.py @@ -0,0 +1,492 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Antonio Cuni +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations +import os + +# Categories of actions: +# killing +# yanking +# motion +# editing +# history +# finishing +# [completion] + + +# types +if False: + from .historical_reader import HistoricalReader + + +class Command: + finish: bool = False + kills_digit_arg: bool = True + + def __init__( + self, reader: HistoricalReader, event_name: str, event: list[str] + ) -> None: + # Reader should really be "any reader" but there's too much usage of + # HistoricalReader methods and fields in the code below for us to + # refactor at the moment. + + self.reader = reader + self.event = event + self.event_name = event_name + + def do(self) -> None: + pass + + +class KillCommand(Command): + def kill_range(self, start: int, end: int) -> None: + if start == end: + return + r = self.reader + b = r.buffer + text = b[start:end] + del b[start:end] + if is_kill(r.last_command): + if start < r.pos: + r.kill_ring[-1] = text + r.kill_ring[-1] + else: + r.kill_ring[-1] = r.kill_ring[-1] + text + else: + r.kill_ring.append(text) + r.pos = start + r.dirty = True + + +class YankCommand(Command): + pass + + +class MotionCommand(Command): + pass + + +class EditCommand(Command): + pass + + +class FinishCommand(Command): + finish = True + pass + + +def is_kill(command: type[Command] | None) -> bool: + return command is not None and issubclass(command, KillCommand) + + +def is_yank(command: type[Command] | None) -> bool: + return command is not None and issubclass(command, YankCommand) + + +# etc + + +class digit_arg(Command): + kills_digit_arg = False + + def do(self) -> None: + r = self.reader + c = self.event[-1] + if c == "-": + if r.arg is not None: + r.arg = -r.arg + else: + r.arg = -1 + else: + d = int(c) + if r.arg is None: + r.arg = d + else: + if r.arg < 0: + r.arg = 10 * r.arg - d + else: + r.arg = 10 * r.arg + d + r.dirty = True + + +class clear_screen(Command): + def do(self) -> None: + r = self.reader + r.console.clear() + r.dirty = True + + +class refresh(Command): + def do(self) -> None: + self.reader.dirty = True + + +class repaint(Command): + def do(self) -> None: + self.reader.dirty = True + self.reader.console.repaint() + + +class kill_line(KillCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + eol = r.eol() + for c in b[r.pos : eol]: + if not c.isspace(): + self.kill_range(r.pos, eol) + return + else: + self.kill_range(r.pos, eol + 1) + + +class unix_line_discard(KillCommand): + def do(self) -> None: + r = self.reader + self.kill_range(r.bol(), r.pos) + + +class unix_word_rubout(KillCommand): + def do(self) -> None: + r = self.reader + for i in range(r.get_arg()): + self.kill_range(r.bow(), r.pos) + + +class kill_word(KillCommand): + def do(self) -> None: + r = self.reader + for i in range(r.get_arg()): + self.kill_range(r.pos, r.eow()) + + +class backward_kill_word(KillCommand): + def do(self) -> None: + r = self.reader + for i in range(r.get_arg()): + self.kill_range(r.bow(), r.pos) + + +class yank(YankCommand): + def do(self) -> None: + r = self.reader + if not r.kill_ring: + r.error("nothing to yank") + return + r.insert(r.kill_ring[-1]) + + +class yank_pop(YankCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + if not r.kill_ring: + r.error("nothing to yank") + return + if not is_yank(r.last_command): + r.error("previous command was not a yank") + return + repl = len(r.kill_ring[-1]) + r.kill_ring.insert(0, r.kill_ring.pop()) + t = r.kill_ring[-1] + b[r.pos - repl : r.pos] = t + r.pos = r.pos - repl + len(t) + r.dirty = True + + +class interrupt(FinishCommand): + def do(self) -> None: + import signal + + self.reader.console.finish() + self.reader.finish() + os.kill(os.getpid(), signal.SIGINT) + + +class ctrl_c(Command): + def do(self) -> None: + self.reader.console.finish() + self.reader.finish() + raise KeyboardInterrupt + + +class suspend(Command): + def do(self) -> None: + import signal + + r = self.reader + p = r.pos + r.console.finish() + os.kill(os.getpid(), signal.SIGSTOP) + ## this should probably be done + ## in a handler for SIGCONT? + r.console.prepare() + r.pos = p + # r.posxy = 0, 0 # XXX this is invalid + r.dirty = True + r.console.screen = [] + + +class up(MotionCommand): + def do(self) -> None: + r = self.reader + for _ in range(r.get_arg()): + x, y = r.pos2xy() + new_y = y - 1 + + if r.bol() == 0: + if r.historyi > 0: + r.select_item(r.historyi - 1) + return + r.pos = 0 + r.error("start of buffer") + return + + if ( + x + > ( + new_x := r.max_column(new_y) + ) # we're past the end of the previous line + or x == r.max_column(y) + and any( + not i.isspace() for i in r.buffer[r.bol() :] + ) # move between eols + ): + x = new_x + + r.setpos_from_xy(x, new_y) + + +class down(MotionCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + for _ in range(r.get_arg()): + x, y = r.pos2xy() + new_y = y + 1 + + if r.eol() == len(b): + if r.historyi < len(r.history): + r.select_item(r.historyi + 1) + r.pos = r.eol(0) + return + r.pos = len(b) + r.error("end of buffer") + return + + if ( + x + > ( + new_x := r.max_column(new_y) + ) # we're past the end of the previous line + or x == r.max_column(y) + and any( + not i.isspace() for i in r.buffer[r.bol() :] + ) # move between eols + ): + x = new_x + + r.setpos_from_xy(x, new_y) + + +class left(MotionCommand): + def do(self) -> None: + r = self.reader + for _ in range(r.get_arg()): + p = r.pos - 1 + if p >= 0: + r.pos = p + else: + self.reader.error("start of buffer") + + +class right(MotionCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + for _ in range(r.get_arg()): + p = r.pos + 1 + if p <= len(b): + r.pos = p + else: + self.reader.error("end of buffer") + + +class beginning_of_line(MotionCommand): + def do(self) -> None: + self.reader.pos = self.reader.bol() + + +class end_of_line(MotionCommand): + def do(self) -> None: + self.reader.pos = self.reader.eol() + + +class home(MotionCommand): + def do(self) -> None: + self.reader.pos = 0 + + +class end(MotionCommand): + def do(self) -> None: + self.reader.pos = len(self.reader.buffer) + + +class forward_word(MotionCommand): + def do(self) -> None: + r = self.reader + for i in range(r.get_arg()): + r.pos = r.eow() + + +class backward_word(MotionCommand): + def do(self) -> None: + r = self.reader + for i in range(r.get_arg()): + r.pos = r.bow() + + +class self_insert(EditCommand): + def do(self) -> None: + r = self.reader + text = self.event * r.get_arg() + r.insert(text) + + +class insert_nl(EditCommand): + def do(self) -> None: + r = self.reader + r.insert("\n" * r.get_arg()) + + +class transpose_characters(EditCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + s = r.pos - 1 + if s < 0: + r.error("cannot transpose at start of buffer") + else: + if s == len(b): + s -= 1 + t = min(s + r.get_arg(), len(b) - 1) + c = b[s] + del b[s] + b.insert(t, c) + r.pos = t + r.dirty = True + + +class backspace(EditCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + for i in range(r.get_arg()): + if r.pos > 0: + r.pos -= 1 + del b[r.pos] + r.dirty = True + else: + self.reader.error("can't backspace at start") + + +class delete(EditCommand): + def do(self) -> None: + r = self.reader + b = r.buffer + if self.event[-1] == "\004": + if b and b[-1].endswith("\n"): + self.finish = True + elif ( + r.pos == 0 + and len(b) == 0 # this is something of a hack + ): + r.update_screen() + r.console.finish() + raise EOFError + + for i in range(r.get_arg()): + if r.pos != len(b): + del b[r.pos] + r.dirty = True + else: + self.reader.error("end of buffer") + + +class accept(FinishCommand): + def do(self) -> None: + pass + + +class help(Command): + def do(self) -> None: + import _sitebuiltins + + with self.reader.suspend(): + self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment] + + +class invalid_key(Command): + def do(self) -> None: + pending = self.reader.console.getpending() + s = "".join(self.event) + pending.data + self.reader.error("`%r' not bound" % s) + + +class invalid_command(Command): + def do(self) -> None: + s = self.event_name + self.reader.error("command `%s' not known" % s) + + +class show_history(Command): + def do(self) -> None: + from .pager import get_pager + from site import gethistoryfile + + history = os.linesep.join(self.reader.history[:]) + self.reader.console.restore() + pager = get_pager() + pager(history, gethistoryfile()) + self.reader.console.prepare() + + # We need to copy over the state so that it's consistent between + # console and reader, and console does not overwrite/append stuff + self.reader.console.screen = self.reader.screen.copy() + self.reader.console.posxy = self.reader.cxy + + +class paste_mode(Command): + + def do(self) -> None: + self.reader.paste_mode = not self.reader.paste_mode + self.reader.dirty = True + + +class enable_bracketed_paste(Command): + def do(self) -> None: + self.reader.paste_mode = True + self.reader.in_bracketed_paste = True + +class disable_bracketed_paste(Command): + def do(self) -> None: + self.reader.paste_mode = False + self.reader.in_bracketed_paste = False + self.reader.dirty = True diff --git a/contrib/tools/python3/Lib/_pyrepl/completing_reader.py b/contrib/tools/python3/Lib/_pyrepl/completing_reader.py new file mode 100644 index 00000000000..9a005281dab --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/completing_reader.py @@ -0,0 +1,295 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Antonio Cuni +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +from dataclasses import dataclass, field + +import re +from . import commands, console, reader +from .reader import Reader + + +# types +Command = commands.Command +if False: + from .types import KeySpec, CommandName + + +def prefix(wordlist: list[str], j: int = 0) -> str: + d = {} + i = j + try: + while 1: + for word in wordlist: + d[word[i]] = 1 + if len(d) > 1: + return wordlist[0][j:i] + i += 1 + d = {} + except IndexError: + return wordlist[0][j:i] + return "" + + +STRIPCOLOR_REGEX = re.compile(r"\x1B\[([0-9]{1,3}(;[0-9]{1,2})?)?[m|K]") + +def stripcolor(s: str) -> str: + return STRIPCOLOR_REGEX.sub('', s) + + +def real_len(s: str) -> int: + return len(stripcolor(s)) + + +def left_align(s: str, maxlen: int) -> str: + stripped = stripcolor(s) + if len(stripped) > maxlen: + # too bad, we remove the color + return stripped[:maxlen] + padding = maxlen - len(stripped) + return s + ' '*padding + + +def build_menu( + cons: console.Console, + wordlist: list[str], + start: int, + use_brackets: bool, + sort_in_column: bool, +) -> tuple[list[str], int]: + if use_brackets: + item = "[ %s ]" + padding = 4 + else: + item = "%s " + padding = 2 + maxlen = min(max(map(real_len, wordlist)), cons.width - padding) + cols = int(cons.width / (maxlen + padding)) + rows = int((len(wordlist) - 1)/cols + 1) + + if sort_in_column: + # sort_in_column=False (default) sort_in_column=True + # A B C A D G + # D E F B E + # G C F + # + # "fill" the table with empty words, so we always have the same amout + # of rows for each column + missing = cols*rows - len(wordlist) + wordlist = wordlist + ['']*missing + indexes = [(i % cols) * rows + i // cols for i in range(len(wordlist))] + wordlist = [wordlist[i] for i in indexes] + menu = [] + i = start + for r in range(rows): + row = [] + for col in range(cols): + row.append(item % left_align(wordlist[i], maxlen)) + i += 1 + if i >= len(wordlist): + break + menu.append(''.join(row)) + if i >= len(wordlist): + i = 0 + break + if r + 5 > cons.height: + menu.append(" %d more... " % (len(wordlist) - i)) + break + return menu, i + +# this gets somewhat user interface-y, and as a result the logic gets +# very convoluted. +# +# To summarise the summary of the summary:- people are a problem. +# -- The Hitch-Hikers Guide to the Galaxy, Episode 12 + +#### Desired behaviour of the completions commands. +# the considerations are: +# (1) how many completions are possible +# (2) whether the last command was a completion +# (3) if we can assume that the completer is going to return the same set of +# completions: this is controlled by the ``assume_immutable_completions`` +# variable on the reader, which is True by default to match the historical +# behaviour of pyrepl, but e.g. False in the ReadlineAlikeReader to match +# more closely readline's semantics (this is needed e.g. by +# fancycompleter) +# +# if there's no possible completion, beep at the user and point this out. +# this is easy. +# +# if there's only one possible completion, stick it in. if the last thing +# user did was a completion, point out that he isn't getting anywhere, but +# only if the ``assume_immutable_completions`` is True. +# +# now it gets complicated. +# +# for the first press of a completion key: +# if there's a common prefix, stick it in. + +# irrespective of whether anything got stuck in, if the word is now +# complete, show the "complete but not unique" message + +# if there's no common prefix and if the word is not now complete, +# beep. + +# common prefix -> yes no +# word complete \/ +# yes "cbnu" "cbnu" +# no - beep + +# for the second bang on the completion key +# there will necessarily be no common prefix +# show a menu of the choices. + +# for subsequent bangs, rotate the menu around (if there are sufficient +# choices). + + +class complete(commands.Command): + def do(self) -> None: + r: CompletingReader + r = self.reader # type: ignore[assignment] + last_is_completer = r.last_command_is(self.__class__) + immutable_completions = r.assume_immutable_completions + completions_unchangable = last_is_completer and immutable_completions + stem = r.get_stem() + if not completions_unchangable: + r.cmpltn_menu_choices = r.get_completions(stem) + + completions = r.cmpltn_menu_choices + if not completions: + r.error("no matches") + elif len(completions) == 1: + if completions_unchangable and len(completions[0]) == len(stem): + r.msg = "[ sole completion ]" + r.dirty = True + r.insert(completions[0][len(stem):]) + else: + p = prefix(completions, len(stem)) + if p: + r.insert(p) + if last_is_completer: + r.cmpltn_menu_visible = True + r.cmpltn_message_visible = False + r.cmpltn_menu, r.cmpltn_menu_end = build_menu( + r.console, completions, r.cmpltn_menu_end, + r.use_brackets, r.sort_in_column) + r.dirty = True + elif not r.cmpltn_menu_visible: + r.cmpltn_message_visible = True + if stem + p in completions: + r.msg = "[ complete but not unique ]" + r.dirty = True + else: + r.msg = "[ not unique ]" + r.dirty = True + + +class self_insert(commands.self_insert): + def do(self) -> None: + r: CompletingReader + r = self.reader # type: ignore[assignment] + + commands.self_insert.do(self) + if r.cmpltn_menu_visible: + stem = r.get_stem() + if len(stem) < 1: + r.cmpltn_reset() + else: + completions = [w for w in r.cmpltn_menu_choices + if w.startswith(stem)] + if completions: + r.cmpltn_menu, r.cmpltn_menu_end = build_menu( + r.console, completions, 0, + r.use_brackets, r.sort_in_column) + else: + r.cmpltn_reset() + + +@dataclass +class CompletingReader(Reader): + """Adds completion support""" + + ### Class variables + # see the comment for the complete command + assume_immutable_completions = True + use_brackets = True # display completions inside [] + sort_in_column = False + + ### Instance variables + cmpltn_menu: list[str] = field(init=False) + cmpltn_menu_visible: bool = field(init=False) + cmpltn_message_visible: bool = field(init=False) + cmpltn_menu_end: int = field(init=False) + cmpltn_menu_choices: list[str] = field(init=False) + + def __post_init__(self) -> None: + super().__post_init__() + self.cmpltn_reset() + for c in (complete, self_insert): + self.commands[c.__name__] = c + self.commands[c.__name__.replace('_', '-')] = c + + def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + return super().collect_keymap() + ( + (r'\t', 'complete'),) + + def after_command(self, cmd: Command) -> None: + super().after_command(cmd) + if not isinstance(cmd, (complete, self_insert)): + self.cmpltn_reset() + + def calc_screen(self) -> list[str]: + screen = super().calc_screen() + if self.cmpltn_menu_visible: + # We display the completions menu below the current prompt + ly = self.lxy[1] + 1 + screen[ly:ly] = self.cmpltn_menu + # If we're not in the middle of multiline edit, don't append to screeninfo + # since that screws up the position calculation in pos2xy function. + # This is a hack to prevent the cursor jumping + # into the completions menu when pressing left or down arrow. + if self.pos != len(self.buffer): + self.screeninfo[ly:ly] = [(0, [])]*len(self.cmpltn_menu) + return screen + + def finish(self) -> None: + super().finish() + self.cmpltn_reset() + + def cmpltn_reset(self) -> None: + self.cmpltn_menu = [] + self.cmpltn_menu_visible = False + self.cmpltn_message_visible = False + self.cmpltn_menu_end = 0 + self.cmpltn_menu_choices = [] + + def get_stem(self) -> str: + st = self.syntax_table + SW = reader.SYNTAX_WORD + b = self.buffer + p = self.pos - 1 + while p >= 0 and st.get(b[p], SW) == SW: + p -= 1 + return ''.join(b[p+1:self.pos]) + + def get_completions(self, stem: str) -> list[str]: + return [] diff --git a/contrib/tools/python3/Lib/_pyrepl/console.py b/contrib/tools/python3/Lib/_pyrepl/console.py new file mode 100644 index 00000000000..8956fb1242e --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/console.py @@ -0,0 +1,229 @@ +# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]> +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +import _colorize + +from abc import ABC, abstractmethod +import ast +import code +import linecache +from dataclasses import dataclass, field +import os.path +import sys + + +TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import IO + from typing import Callable + + +@dataclass +class Event: + evt: str + data: str + raw: bytes = b"" + + +@dataclass +class Console(ABC): + posxy: tuple[int, int] + screen: list[str] = field(default_factory=list) + height: int = 25 + width: int = 80 + + def __init__( + self, + f_in: IO[bytes] | int = 0, + f_out: IO[bytes] | int = 1, + term: str = "", + encoding: str = "", + ): + self.encoding = encoding or sys.getdefaultencoding() + + if isinstance(f_in, int): + self.input_fd = f_in + else: + self.input_fd = f_in.fileno() + + if isinstance(f_out, int): + self.output_fd = f_out + else: + self.output_fd = f_out.fileno() + + @abstractmethod + def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ... + + @abstractmethod + def prepare(self) -> None: ... + + @abstractmethod + def restore(self) -> None: ... + + @abstractmethod + def move_cursor(self, x: int, y: int) -> None: ... + + @abstractmethod + def set_cursor_vis(self, visible: bool) -> None: ... + + @abstractmethod + def getheightwidth(self) -> tuple[int, int]: + """Return (height, width) where height and width are the height + and width of the terminal window in characters.""" + ... + + @abstractmethod + def get_event(self, block: bool = True) -> Event | None: + """Return an Event instance. Returns None if |block| is false + and there is no event pending, otherwise waits for the + completion of an event.""" + ... + + @abstractmethod + def push_char(self, char: int | bytes) -> None: + """ + Push a character to the console event queue. + """ + ... + + @abstractmethod + def beep(self) -> None: ... + + @abstractmethod + def clear(self) -> None: + """Wipe the screen""" + ... + + @abstractmethod + def finish(self) -> None: + """Move the cursor to the end of the display and otherwise get + ready for end. XXX could be merged with restore? Hmm.""" + ... + + @abstractmethod + def flushoutput(self) -> None: + """Flush all output to the screen (assuming there's some + buffering going on somewhere).""" + ... + + @abstractmethod + def forgetinput(self) -> None: + """Forget all pending, but not yet processed input.""" + ... + + @abstractmethod + def getpending(self) -> Event: + """Return the characters that have been typed but not yet + processed.""" + ... + + @abstractmethod + def wait(self, timeout: float | None) -> bool: + """Wait for an event. The return value is True if an event is + available, False if the timeout has been reached. If timeout is + None, wait forever. The timeout is in milliseconds.""" + ... + + @property + def input_hook(self) -> Callable[[], int] | None: + """Returns the current input hook.""" + ... + + @abstractmethod + def repaint(self) -> None: ... + + +class InteractiveColoredConsole(code.InteractiveConsole): + STATEMENT_FAILED = object() + + def __init__( + self, + locals: dict[str, object] | None = None, + filename: str = "<console>", + *, + local_exit: bool = False, + ) -> None: + super().__init__(locals=locals, filename=filename, local_exit=local_exit) + self.can_colorize = _colorize.can_colorize() + + def showsyntaxerror(self, filename=None, **kwargs): + super().showsyntaxerror(filename=filename, **kwargs) + + def _excepthook(self, typ, value, tb): + import traceback + lines = traceback.format_exception( + typ, value, tb, + colorize=self.can_colorize, + limit=traceback.BUILTIN_EXCEPTION_LIMIT) + self.write(''.join(lines)) + + def runcode(self, code): + try: + exec(code, self.locals) + except SystemExit: + raise + except BaseException: + self.showtraceback() + return self.STATEMENT_FAILED + return None + + def runsource(self, source, filename="<input>", symbol="single"): + try: + tree = self.compile.compiler( + source, + filename, + "exec", + ast.PyCF_ONLY_AST, + incomplete_input=False, + ) + except (SyntaxError, OverflowError, ValueError): + self.showsyntaxerror(filename, source=source) + return False + if tree.body: + *_, last_stmt = tree.body + for stmt in tree.body: + wrapper = ast.Interactive if stmt is last_stmt else ast.Module + the_symbol = symbol if stmt is last_stmt else "exec" + item = wrapper([stmt]) + try: + code = self.compile.compiler(item, filename, the_symbol) + linecache._register_code(code, source, filename) + except SyntaxError as e: + if e.args[0] == "'await' outside function": + python = os.path.basename(sys.executable) + e.add_note( + f"Try the asyncio REPL ({python} -m asyncio) to use" + f" top-level 'await' and run background asyncio tasks." + ) + self.showsyntaxerror(filename, source=source) + return False + except (OverflowError, ValueError): + self.showsyntaxerror(filename, source=source) + return False + + if code is None: + return True + + result = self.runcode(code) + if result is self.STATEMENT_FAILED: + break + return False diff --git a/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py b/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py new file mode 100644 index 00000000000..8d5bd183f21 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py @@ -0,0 +1,82 @@ +# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]> +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import termios + + +TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import cast +else: + cast = lambda typ, val: val + + +class TermState: + def __init__(self, attrs: list[int | list[bytes]]) -> None: + self.iflag = cast(int, attrs[0]) + self.oflag = cast(int, attrs[1]) + self.cflag = cast(int, attrs[2]) + self.lflag = cast(int, attrs[3]) + self.ispeed = cast(int, attrs[4]) + self.ospeed = cast(int, attrs[5]) + self.cc = cast(list[bytes], attrs[6]) + + def as_list(self) -> list[int | list[bytes]]: + return [ + self.iflag, + self.oflag, + self.cflag, + self.lflag, + self.ispeed, + self.ospeed, + # Always return a copy of the control characters list to ensure + # there are not any additional references to self.cc + self.cc[:], + ] + + def copy(self) -> "TermState": + return self.__class__(self.as_list()) + + +def tcgetattr(fd: int) -> TermState: + return TermState(termios.tcgetattr(fd)) + + +def tcsetattr(fd: int, when: int, attrs: TermState) -> None: + termios.tcsetattr(fd, when, attrs.as_list()) + + +class Term(TermState): + TS__init__ = TermState.__init__ + + def __init__(self, fd: int = 0) -> None: + self.TS__init__(termios.tcgetattr(fd)) + self.fd = fd + self.stack: list[list[int | list[bytes]]] = [] + + def save(self) -> None: + self.stack.append(self.as_list()) + + def set(self, when: int = termios.TCSANOW) -> None: + termios.tcsetattr(self.fd, when, self.as_list()) + + def restore(self) -> None: + self.TS__init__(self.stack.pop()) + self.set() diff --git a/contrib/tools/python3/Lib/_pyrepl/historical_reader.py b/contrib/tools/python3/Lib/_pyrepl/historical_reader.py new file mode 100644 index 00000000000..c4b95fa2e81 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/historical_reader.py @@ -0,0 +1,419 @@ +# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]> +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +from contextlib import contextmanager +from dataclasses import dataclass, field + +from . import commands, input +from .reader import Reader + + +if False: + from .types import SimpleContextManager, KeySpec, CommandName + + +isearch_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( + [("\\%03o" % c, "isearch-end") for c in range(256) if chr(c) != "\\"] + + [(c, "isearch-add-character") for c in map(chr, range(32, 127)) if c != "\\"] + + [ + ("\\%03o" % c, "isearch-add-character") + for c in range(256) + if chr(c).isalpha() and chr(c) != "\\" + ] + + [ + ("\\\\", "self-insert"), + (r"\C-r", "isearch-backwards"), + (r"\C-s", "isearch-forwards"), + (r"\C-c", "isearch-cancel"), + (r"\C-g", "isearch-cancel"), + (r"\<backspace>", "isearch-backspace"), + ] +) + +ISEARCH_DIRECTION_NONE = "" +ISEARCH_DIRECTION_BACKWARDS = "r" +ISEARCH_DIRECTION_FORWARDS = "f" + + +class next_history(commands.Command): + def do(self) -> None: + r = self.reader + if r.historyi == len(r.history): + r.error("end of history list") + return + r.select_item(r.historyi + 1) + + +class previous_history(commands.Command): + def do(self) -> None: + r = self.reader + if r.historyi == 0: + r.error("start of history list") + return + r.select_item(r.historyi - 1) + + +class history_search_backward(commands.Command): + def do(self) -> None: + r = self.reader + r.search_next(forwards=False) + + +class history_search_forward(commands.Command): + def do(self) -> None: + r = self.reader + r.search_next(forwards=True) + + +class restore_history(commands.Command): + def do(self) -> None: + r = self.reader + if r.historyi != len(r.history): + if r.get_unicode() != r.history[r.historyi]: + r.buffer = list(r.history[r.historyi]) + r.pos = len(r.buffer) + r.dirty = True + + +class first_history(commands.Command): + def do(self) -> None: + self.reader.select_item(0) + + +class last_history(commands.Command): + def do(self) -> None: + self.reader.select_item(len(self.reader.history)) + + +class operate_and_get_next(commands.FinishCommand): + def do(self) -> None: + self.reader.next_history = self.reader.historyi + 1 + + +class yank_arg(commands.Command): + def do(self) -> None: + r = self.reader + if r.last_command is self.__class__: + r.yank_arg_i += 1 + else: + r.yank_arg_i = 0 + if r.historyi < r.yank_arg_i: + r.error("beginning of history list") + return + a = r.get_arg(-1) + # XXX how to split? + words = r.get_item(r.historyi - r.yank_arg_i - 1).split() + if a < -len(words) or a >= len(words): + r.error("no such arg") + return + w = words[a] + b = r.buffer + if r.yank_arg_i > 0: + o = len(r.yank_arg_yanked) + else: + o = 0 + b[r.pos - o : r.pos] = list(w) + r.yank_arg_yanked = w + r.pos += len(w) - o + r.dirty = True + + +class forward_history_isearch(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_FORWARDS + r.isearch_start = r.historyi, r.pos + r.isearch_term = "" + r.dirty = True + r.push_input_trans(r.isearch_trans) + + +class reverse_history_isearch(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS + r.dirty = True + r.isearch_term = "" + r.push_input_trans(r.isearch_trans) + r.isearch_start = r.historyi, r.pos + + +class isearch_cancel(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_NONE + r.pop_input_trans() + r.select_item(r.isearch_start[0]) + r.pos = r.isearch_start[1] + r.dirty = True + + +class isearch_add_character(commands.Command): + def do(self) -> None: + r = self.reader + b = r.buffer + r.isearch_term += self.event[-1] + r.dirty = True + p = r.pos + len(r.isearch_term) - 1 + if b[p : p + 1] != [r.isearch_term[-1]]: + r.isearch_next() + + +class isearch_backspace(commands.Command): + def do(self) -> None: + r = self.reader + if len(r.isearch_term) > 0: + r.isearch_term = r.isearch_term[:-1] + r.dirty = True + else: + r.error("nothing to rubout") + + +class isearch_forwards(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_FORWARDS + r.isearch_next() + + +class isearch_backwards(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS + r.isearch_next() + + +class isearch_end(commands.Command): + def do(self) -> None: + r = self.reader + r.isearch_direction = ISEARCH_DIRECTION_NONE + r.console.forgetinput() + r.pop_input_trans() + r.dirty = True + + +@dataclass +class HistoricalReader(Reader): + """Adds history support (with incremental history searching) to the + Reader class. + """ + + history: list[str] = field(default_factory=list) + historyi: int = 0 + next_history: int | None = None + transient_history: dict[int, str] = field(default_factory=dict) + isearch_term: str = "" + isearch_direction: str = ISEARCH_DIRECTION_NONE + isearch_start: tuple[int, int] = field(init=False) + isearch_trans: input.KeymapTranslator = field(init=False) + yank_arg_i: int = 0 + yank_arg_yanked: str = "" + + def __post_init__(self) -> None: + super().__post_init__() + for c in [ + next_history, + previous_history, + restore_history, + first_history, + last_history, + yank_arg, + forward_history_isearch, + reverse_history_isearch, + isearch_end, + isearch_add_character, + isearch_cancel, + isearch_add_character, + isearch_backspace, + isearch_forwards, + isearch_backwards, + operate_and_get_next, + history_search_backward, + history_search_forward, + ]: + self.commands[c.__name__] = c + self.commands[c.__name__.replace("_", "-")] = c + self.isearch_start = self.historyi, self.pos + self.isearch_trans = input.KeymapTranslator( + isearch_keymap, invalid_cls=isearch_end, character_cls=isearch_add_character + ) + + def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + return super().collect_keymap() + ( + (r"\C-n", "next-history"), + (r"\C-p", "previous-history"), + (r"\C-o", "operate-and-get-next"), + (r"\C-r", "reverse-history-isearch"), + (r"\C-s", "forward-history-isearch"), + (r"\M-r", "restore-history"), + (r"\M-.", "yank-arg"), + (r"\<page down>", "history-search-forward"), + (r"\x1b[6~", "history-search-forward"), + (r"\<page up>", "history-search-backward"), + (r"\x1b[5~", "history-search-backward"), + ) + + def select_item(self, i: int) -> None: + self.transient_history[self.historyi] = self.get_unicode() + buf = self.transient_history.get(i) + if buf is None: + buf = self.history[i].rstrip() + self.buffer = list(buf) + self.historyi = i + self.pos = len(self.buffer) + self.dirty = True + self.last_refresh_cache.invalidated = True + + def get_item(self, i: int) -> str: + if i != len(self.history): + return self.transient_history.get(i, self.history[i]) + else: + return self.transient_history.get(i, self.get_unicode()) + + @contextmanager + def suspend(self) -> SimpleContextManager: + with super().suspend(), self.suspend_history(): + yield + + @contextmanager + def suspend_history(self) -> SimpleContextManager: + try: + old_history = self.history[:] + del self.history[:] + yield + finally: + self.history[:] = old_history + + def prepare(self) -> None: + super().prepare() + try: + self.transient_history = {} + if self.next_history is not None and self.next_history < len(self.history): + self.historyi = self.next_history + self.buffer[:] = list(self.history[self.next_history]) + self.pos = len(self.buffer) + self.transient_history[len(self.history)] = "" + else: + self.historyi = len(self.history) + self.next_history = None + except: + self.restore() + raise + + def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: + if cursor_on_line and self.isearch_direction != ISEARCH_DIRECTION_NONE: + d = "rf"[self.isearch_direction == ISEARCH_DIRECTION_FORWARDS] + return "(%s-search `%s') " % (d, self.isearch_term) + else: + return super().get_prompt(lineno, cursor_on_line) + + def search_next(self, *, forwards: bool) -> None: + """Search history for the current line contents up to the cursor. + + Selects the first item found. If nothing is under the cursor, any next + item in history is selected. + """ + pos = self.pos + s = self.get_unicode() + history_index = self.historyi + + # In multiline contexts, we're only interested in the current line. + nl_index = s.rfind('\n', 0, pos) + prefix = s[nl_index + 1:pos] + pos = len(prefix) + + match_prefix = len(prefix) + len_item = 0 + if history_index < len(self.history): + len_item = len(self.get_item(history_index)) + if len_item and pos == len_item: + match_prefix = False + elif not pos: + match_prefix = False + + while 1: + if forwards: + out_of_bounds = history_index >= len(self.history) - 1 + else: + out_of_bounds = history_index == 0 + if out_of_bounds: + if forwards and not match_prefix: + self.pos = 0 + self.buffer = [] + self.dirty = True + else: + self.error("not found") + return + + history_index += 1 if forwards else -1 + s = self.get_item(history_index) + + if not match_prefix: + self.select_item(history_index) + return + + len_acc = 0 + for i, line in enumerate(s.splitlines(keepends=True)): + if line.startswith(prefix): + self.select_item(history_index) + self.pos = pos + len_acc + return + len_acc += len(line) + + def isearch_next(self) -> None: + st = self.isearch_term + p = self.pos + i = self.historyi + s = self.get_unicode() + forwards = self.isearch_direction == ISEARCH_DIRECTION_FORWARDS + while 1: + if forwards: + p = s.find(st, p + 1) + else: + p = s.rfind(st, 0, p + len(st) - 1) + if p != -1: + self.select_item(i) + self.pos = p + return + elif (forwards and i >= len(self.history) - 1) or (not forwards and i == 0): + self.error("not found") + return + else: + if forwards: + i += 1 + s = self.get_item(i) + p = -1 + else: + i -= 1 + s = self.get_item(i) + p = len(s) + + def finish(self) -> None: + super().finish() + ret = self.get_unicode() + for i, t in self.transient_history.items(): + if i < len(self.history) and i != self.historyi: + self.history[i] = t + if ret and should_auto_add_history: + self.history.append(ret) + + +should_auto_add_history = True diff --git a/contrib/tools/python3/Lib/_pyrepl/input.py b/contrib/tools/python3/Lib/_pyrepl/input.py new file mode 100644 index 00000000000..21c24eb5cde --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/input.py @@ -0,0 +1,114 @@ +# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]> +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +# (naming modules after builtin functions is not such a hot idea...) + +# an KeyTrans instance translates Event objects into Command objects + +# hmm, at what level do we want [C-i] and [tab] to be equivalent? +# [meta-a] and [esc a]? obviously, these are going to be equivalent +# for the UnixConsole, but should they be for PygameConsole? + +# it would in any situation seem to be a bad idea to bind, say, [tab] +# and [C-i] to *different* things... but should binding one bind the +# other? + +# executive, temporary decision: [tab] and [C-i] are distinct, but +# [meta-key] is identified with [esc key]. We demand that any console +# class does quite a lot towards emulating a unix terminal. + +from __future__ import annotations + +from abc import ABC, abstractmethod +import unicodedata +from collections import deque + + +# types +if False: + from .types import EventTuple + + +class InputTranslator(ABC): + @abstractmethod + def push(self, evt: EventTuple) -> None: + pass + + @abstractmethod + def get(self) -> EventTuple | None: + return None + + @abstractmethod + def empty(self) -> bool: + return True + + +class KeymapTranslator(InputTranslator): + def __init__(self, keymap, verbose=False, invalid_cls=None, character_cls=None): + self.verbose = verbose + from .keymap import compile_keymap, parse_keys + + self.keymap = keymap + self.invalid_cls = invalid_cls + self.character_cls = character_cls + d = {} + for keyspec, command in keymap: + keyseq = tuple(parse_keys(keyspec)) + d[keyseq] = command + if self.verbose: + print(d) + self.k = self.ck = compile_keymap(d, ()) + self.results = deque() + self.stack = [] + + def push(self, evt): + if self.verbose: + print("pushed", evt.data, end="") + key = evt.data + d = self.k.get(key) + if isinstance(d, dict): + if self.verbose: + print("transition") + self.stack.append(key) + self.k = d + else: + if d is None: + if self.verbose: + print("invalid") + if self.stack or len(key) > 1 or unicodedata.category(key) == "C": + self.results.append((self.invalid_cls, self.stack + [key])) + else: + # small optimization: + self.k[key] = self.character_cls + self.results.append((self.character_cls, [key])) + else: + if self.verbose: + print("matched", d) + self.results.append((d, self.stack + [key])) + self.stack = [] + self.k = self.ck + + def get(self): + if self.results: + return self.results.popleft() + else: + return None + + def empty(self) -> bool: + return not self.results diff --git a/contrib/tools/python3/Lib/_pyrepl/keymap.py b/contrib/tools/python3/Lib/_pyrepl/keymap.py new file mode 100644 index 00000000000..2fb03d19523 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/keymap.py @@ -0,0 +1,213 @@ +# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]> +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +""" +Keymap contains functions for parsing keyspecs and turning keyspecs into +appropriate sequences. + +A keyspec is a string representing a sequence of key presses that can +be bound to a command. All characters other than the backslash represent +themselves. In the traditional manner, a backslash introduces an escape +sequence. + +pyrepl uses its own keyspec format that is meant to be a strict superset of +readline's KEYSEQ format. This means that if a spec is found that readline +accepts that this doesn't, it should be logged as a bug. Note that this means +we're using the `\\C-o' style of readline's keyspec, not the `Control-o' sort. + +The extension to readline is that the sequence \\<KEY> denotes the +sequence of characters produced by hitting KEY. + +Examples: +`a' - what you get when you hit the `a' key +`\\EOA' - Escape - O - A (up, on my terminal) +`\\<UP>' - the up arrow key +`\\<up>' - ditto (keynames are case-insensitive) +`\\C-o', `\\c-o' - control-o +`\\M-.' - meta-period +`\\E.' - ditto (that's how meta works for pyrepl) +`\\<tab>', `\\<TAB>', `\\t', `\\011', '\\x09', '\\X09', '\\C-i', '\\C-I' + - all of these are the tab character. +""" + +_escapes = { + "\\": "\\", + "'": "'", + '"': '"', + "a": "\a", + "b": "\b", + "e": "\033", + "f": "\f", + "n": "\n", + "r": "\r", + "t": "\t", + "v": "\v", +} + +_keynames = { + "backspace": "backspace", + "delete": "delete", + "down": "down", + "end": "end", + "enter": "\r", + "escape": "\033", + "f1": "f1", + "f2": "f2", + "f3": "f3", + "f4": "f4", + "f5": "f5", + "f6": "f6", + "f7": "f7", + "f8": "f8", + "f9": "f9", + "f10": "f10", + "f11": "f11", + "f12": "f12", + "f13": "f13", + "f14": "f14", + "f15": "f15", + "f16": "f16", + "f17": "f17", + "f18": "f18", + "f19": "f19", + "f20": "f20", + "home": "home", + "insert": "insert", + "left": "left", + "page down": "page down", + "page up": "page up", + "return": "\r", + "right": "right", + "space": " ", + "tab": "\t", + "up": "up", +} + + +class KeySpecError(Exception): + pass + + +def parse_keys(keys: str) -> list[str]: + """Parse keys in keyspec format to a sequence of keys.""" + s = 0 + r: list[str] = [] + while s < len(keys): + k, s = _parse_single_key_sequence(keys, s) + r.extend(k) + return r + + +def _parse_single_key_sequence(key: str, s: int) -> tuple[list[str], int]: + ctrl = 0 + meta = 0 + ret = "" + while not ret and s < len(key): + if key[s] == "\\": + c = key[s + 1].lower() + if c in _escapes: + ret = _escapes[c] + s += 2 + elif c == "c": + if key[s + 2] != "-": + raise KeySpecError( + "\\C must be followed by `-' (char %d of %s)" + % (s + 2, repr(key)) + ) + if ctrl: + raise KeySpecError( + "doubled \\C- (char %d of %s)" % (s + 1, repr(key)) + ) + ctrl = 1 + s += 3 + elif c == "m": + if key[s + 2] != "-": + raise KeySpecError( + "\\M must be followed by `-' (char %d of %s)" + % (s + 2, repr(key)) + ) + if meta: + raise KeySpecError( + "doubled \\M- (char %d of %s)" % (s + 1, repr(key)) + ) + meta = 1 + s += 3 + elif c.isdigit(): + n = key[s + 1 : s + 4] + ret = chr(int(n, 8)) + s += 4 + elif c == "x": + n = key[s + 2 : s + 4] + ret = chr(int(n, 16)) + s += 4 + elif c == "<": + t = key.find(">", s) + if t == -1: + raise KeySpecError( + "unterminated \\< starting at char %d of %s" + % (s + 1, repr(key)) + ) + ret = key[s + 2 : t].lower() + if ret not in _keynames: + raise KeySpecError( + "unrecognised keyname `%s' at char %d of %s" + % (ret, s + 2, repr(key)) + ) + ret = _keynames[ret] + s = t + 1 + else: + raise KeySpecError( + "unknown backslash escape %s at char %d of %s" + % (repr(c), s + 2, repr(key)) + ) + else: + ret = key[s] + s += 1 + if ctrl: + if len(ret) == 1: + ret = chr(ord(ret) & 0x1F) # curses.ascii.ctrl() + elif ret in {"left", "right"}: + ret = f"ctrl {ret}" + else: + raise KeySpecError("\\C- followed by invalid key") + + result = [ret], s + if meta: + result[0].insert(0, "\033") + return result + + +def compile_keymap(keymap, empty=b""): + r = {} + for key, value in keymap.items(): + if isinstance(key, bytes): + first = key[:1] + else: + first = key[0] + r.setdefault(first, {})[key[1:]] = value + for key, value in r.items(): + if empty in value: + if len(value) != 1: + raise KeySpecError("key definitions for %s clash" % (value.values(),)) + else: + r[key] = value[empty] + else: + r[key] = compile_keymap(value, empty) + return r diff --git a/contrib/tools/python3/Lib/_pyrepl/main.py b/contrib/tools/python3/Lib/_pyrepl/main.py new file mode 100644 index 00000000000..a6f824dcc4a --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/main.py @@ -0,0 +1,59 @@ +import errno +import os +import sys + + +CAN_USE_PYREPL: bool +FAIL_REASON: str +try: + if sys.platform == "win32" and sys.getwindowsversion().build < 10586: + raise RuntimeError("Windows 10 TH2 or later required") + if not os.isatty(sys.stdin.fileno()): + raise OSError(errno.ENOTTY, "tty required", "stdin") + from .simple_interact import check + if err := check(): + raise RuntimeError(err) +except Exception as e: + CAN_USE_PYREPL = False + FAIL_REASON = f"warning: can't use pyrepl: {e}" +else: + CAN_USE_PYREPL = True + FAIL_REASON = "" + + +def interactive_console(mainmodule=None, quiet=False, pythonstartup=False): + if not CAN_USE_PYREPL: + if not os.getenv('PYTHON_BASIC_REPL') and FAIL_REASON: + from .trace import trace + trace(FAIL_REASON) + print(FAIL_REASON, file=sys.stderr) + return sys._baserepl() + + if mainmodule: + namespace = mainmodule.__dict__ + else: + import __main__ + namespace = __main__.__dict__ + namespace.pop("__pyrepl_interactive_console", None) + + # sys._baserepl() above does this internally, we do it here + startup_path = os.getenv("PYTHONSTARTUP") + if pythonstartup and startup_path: + sys.audit("cpython.run_startup", startup_path) + + import tokenize + with tokenize.open(startup_path) as f: + startup_code = compile(f.read(), startup_path, "exec") + exec(startup_code, namespace) + + # set sys.{ps1,ps2} just before invoking the interactive interpreter. This + # mimics what CPython does in pythonrun.c + if not hasattr(sys, "ps1"): + sys.ps1 = ">>> " + if not hasattr(sys, "ps2"): + sys.ps2 = "... " + + from .console import InteractiveColoredConsole + from .simple_interact import run_multiline_interactive_console + console = InteractiveColoredConsole(namespace, filename="<stdin>") + run_multiline_interactive_console(console) diff --git a/contrib/tools/python3/Lib/_pyrepl/pager.py b/contrib/tools/python3/Lib/_pyrepl/pager.py new file mode 100644 index 00000000000..1fddc63e3ee --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/pager.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import io +import os +import re +import sys + + +# types +if False: + from typing import Protocol + class Pager(Protocol): + def __call__(self, text: str, title: str = "") -> None: + ... + + +def get_pager() -> Pager: + """Decide what method to use for paging through text.""" + if not hasattr(sys.stdin, "isatty"): + return plain_pager + if not hasattr(sys.stdout, "isatty"): + return plain_pager + if not sys.stdin.isatty() or not sys.stdout.isatty(): + return plain_pager + if sys.platform == "emscripten": + return plain_pager + use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER') + if use_pager: + if sys.platform == 'win32': # pipes completely broken in Windows + return lambda text, title='': tempfile_pager(plain(text), use_pager) + elif os.environ.get('TERM') in ('dumb', 'emacs'): + return lambda text, title='': pipe_pager(plain(text), use_pager, title) + else: + return lambda text, title='': pipe_pager(text, use_pager, title) + if os.environ.get('TERM') in ('dumb', 'emacs'): + return plain_pager + if sys.platform == 'win32': + return lambda text, title='': tempfile_pager(plain(text), 'more <') + if hasattr(os, 'system') and os.system('(pager) 2>/dev/null') == 0: + return lambda text, title='': pipe_pager(text, 'pager', title) + if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0: + return lambda text, title='': pipe_pager(text, 'less', title) + + import tempfile + (fd, filename) = tempfile.mkstemp() + os.close(fd) + try: + if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0: + return lambda text, title='': pipe_pager(text, 'more', title) + else: + return tty_pager + finally: + os.unlink(filename) + + +def escape_stdout(text: str) -> str: + # Escape non-encodable characters to avoid encoding errors later + encoding = getattr(sys.stdout, 'encoding', None) or 'utf-8' + return text.encode(encoding, 'backslashreplace').decode(encoding) + + +def escape_less(s: str) -> str: + return re.sub(r'([?:.%\\])', r'\\\1', s) + + +def plain(text: str) -> str: + """Remove boldface formatting from text.""" + return re.sub('.\b', '', text) + + +def tty_pager(text: str, title: str = '') -> None: + """Page through text on a text terminal.""" + lines = plain(escape_stdout(text)).split('\n') + has_tty = False + try: + import tty + import termios + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + tty.setcbreak(fd) + has_tty = True + + def getchar() -> str: + return sys.stdin.read(1) + + except (ImportError, AttributeError, io.UnsupportedOperation): + def getchar() -> str: + return sys.stdin.readline()[:-1][:1] + + try: + try: + h = int(os.environ.get('LINES', 0)) + except ValueError: + h = 0 + if h <= 1: + h = 25 + r = inc = h - 1 + sys.stdout.write('\n'.join(lines[:inc]) + '\n') + while lines[r:]: + sys.stdout.write('-- more --') + sys.stdout.flush() + c = getchar() + + if c in ('q', 'Q'): + sys.stdout.write('\r \r') + break + elif c in ('\r', '\n'): + sys.stdout.write('\r \r' + lines[r] + '\n') + r = r + 1 + continue + if c in ('b', 'B', '\x1b'): + r = r - inc - inc + if r < 0: r = 0 + sys.stdout.write('\n' + '\n'.join(lines[r:r+inc]) + '\n') + r = r + inc + + finally: + if has_tty: + termios.tcsetattr(fd, termios.TCSAFLUSH, old) + + +def plain_pager(text: str, title: str = '') -> None: + """Simply print unformatted text. This is the ultimate fallback.""" + sys.stdout.write(plain(escape_stdout(text))) + + +def pipe_pager(text: str, cmd: str, title: str = '') -> None: + """Page through text by feeding it to another program.""" + import subprocess + env = os.environ.copy() + if title: + title += ' ' + esc_title = escape_less(title) + prompt_string = ( + f' {esc_title}' + + '?ltline %lt?L/%L.' + ':byte %bB?s/%s.' + '.' + '?e (END):?pB %pB\\%..' + ' (press h for help or q to quit)') + env['LESS'] = '-RmPm{0}$PM{0}$'.format(prompt_string) + proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, + errors='backslashreplace', env=env) + assert proc.stdin is not None + try: + with proc.stdin as pipe: + try: + pipe.write(text) + except KeyboardInterrupt: + # We've hereby abandoned whatever text hasn't been written, + # but the pager is still in control of the terminal. + pass + except OSError: + pass # Ignore broken pipes caused by quitting the pager program. + while True: + try: + proc.wait() + break + except KeyboardInterrupt: + # Ignore ctl-c like the pager itself does. Otherwise the pager is + # left running and the terminal is in raw mode and unusable. + pass + + +def tempfile_pager(text: str, cmd: str, title: str = '') -> None: + """Page through text by invoking a program on a temporary file.""" + import tempfile + with tempfile.TemporaryDirectory() as tempdir: + filename = os.path.join(tempdir, 'pydoc.out') + with open(filename, 'w', errors='backslashreplace', + encoding=os.device_encoding(0) if + sys.platform == 'win32' else None + ) as file: + file.write(text) + os.system(cmd + ' "' + filename + '"') diff --git a/contrib/tools/python3/Lib/_pyrepl/reader.py b/contrib/tools/python3/Lib/_pyrepl/reader.py new file mode 100644 index 00000000000..012871e7650 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/reader.py @@ -0,0 +1,764 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Antonio Cuni +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +import sys + +from contextlib import contextmanager +from dataclasses import dataclass, field, fields +from _colorize import can_colorize, ANSIColors + + +from . import commands, console, input +from .utils import wlen, unbracket, disp_str +from .trace import trace + + +# types +Command = commands.Command +from .types import Callback, SimpleContextManager, KeySpec, CommandName + + +# syntax classes: + +SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) + + +def make_default_syntax_table() -> dict[str, int]: + # XXX perhaps should use some unicodedata here? + st: dict[str, int] = {} + for c in map(chr, range(256)): + st[c] = SYNTAX_SYMBOL + for c in [a for a in map(chr, range(256)) if a.isalnum()]: + st[c] = SYNTAX_WORD + st["\n"] = st[" "] = SYNTAX_WHITESPACE + return st + + +def make_default_commands() -> dict[CommandName, type[Command]]: + result: dict[CommandName, type[Command]] = {} + for v in vars(commands).values(): + if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower(): + result[v.__name__] = v + result[v.__name__.replace("_", "-")] = v + return result + + +default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( + [ + (r"\C-a", "beginning-of-line"), + (r"\C-b", "left"), + (r"\C-c", "interrupt"), + (r"\C-d", "delete"), + (r"\C-e", "end-of-line"), + (r"\C-f", "right"), + (r"\C-g", "cancel"), + (r"\C-h", "backspace"), + (r"\C-j", "accept"), + (r"\<return>", "accept"), + (r"\C-k", "kill-line"), + (r"\C-l", "clear-screen"), + (r"\C-m", "accept"), + (r"\C-t", "transpose-characters"), + (r"\C-u", "unix-line-discard"), + (r"\C-w", "unix-word-rubout"), + (r"\C-x\C-u", "upcase-region"), + (r"\C-y", "yank"), + *(() if sys.platform == "win32" else ((r"\C-z", "suspend"), )), + (r"\M-b", "backward-word"), + (r"\M-c", "capitalize-word"), + (r"\M-d", "kill-word"), + (r"\M-f", "forward-word"), + (r"\M-l", "downcase-word"), + (r"\M-t", "transpose-words"), + (r"\M-u", "upcase-word"), + (r"\M-y", "yank-pop"), + (r"\M--", "digit-arg"), + (r"\M-0", "digit-arg"), + (r"\M-1", "digit-arg"), + (r"\M-2", "digit-arg"), + (r"\M-3", "digit-arg"), + (r"\M-4", "digit-arg"), + (r"\M-5", "digit-arg"), + (r"\M-6", "digit-arg"), + (r"\M-7", "digit-arg"), + (r"\M-8", "digit-arg"), + (r"\M-9", "digit-arg"), + (r"\M-\n", "accept"), + ("\\\\", "self-insert"), + (r"\x1b[200~", "enable_bracketed_paste"), + (r"\x1b[201~", "disable_bracketed_paste"), + (r"\x03", "ctrl-c"), + ] + + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] + + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()] + + [ + (r"\<up>", "up"), + (r"\<down>", "down"), + (r"\<left>", "left"), + (r"\C-\<left>", "backward-word"), + (r"\<right>", "right"), + (r"\C-\<right>", "forward-word"), + (r"\<delete>", "delete"), + (r"\x1b[3~", "delete"), + (r"\<backspace>", "backspace"), + (r"\M-\<backspace>", "backward-kill-word"), + (r"\<end>", "end-of-line"), # was 'end' + (r"\<home>", "beginning-of-line"), # was 'home' + (r"\<f1>", "help"), + (r"\<f2>", "show-history"), + (r"\<f3>", "paste-mode"), + (r"\EOF", "end"), # the entries in the terminfo database for xterms + (r"\EOH", "home"), # seem to be wrong. this is a less than ideal + # workaround + ] +) + + +@dataclass(slots=True) +class Reader: + """The Reader class implements the bare bones of a command reader, + handling such details as editing and cursor motion. What it does + not support are such things as completion or history support - + these are implemented elsewhere. + + Instance variables of note include: + + * buffer: + A *list* (*not* a string at the moment :-) containing all the + characters that have been entered. + * console: + Hopefully encapsulates the OS dependent stuff. + * pos: + A 0-based index into `buffer' for where the insertion point + is. + * screeninfo: + Ahem. This list contains some info needed to move the + insertion point around reasonably efficiently. + * cxy, lxy: + the position of the insertion point in screen ... + * syntax_table: + Dictionary mapping characters to `syntax class'; read the + emacs docs to see what this means :-) + * commands: + Dictionary mapping command names to command classes. + * arg: + The emacs-style prefix argument. It will be None if no such + argument has been provided. + * dirty: + True if we need to refresh the display. + * kill_ring: + The emacs-style kill-ring; manipulated with yank & yank-pop + * ps1, ps2, ps3, ps4: + prompts. ps1 is the prompt for a one-line input; for a + multiline input it looks like: + ps2> first line of input goes here + ps3> second and further + ps3> lines get ps3 + ... + ps4> and the last one gets ps4 + As with the usual top-level, you can set these to instances if + you like; str() will be called on them (once) at the beginning + of each command. Don't put really long or newline containing + strings here, please! + This is just the default policy; you can change it freely by + overriding get_prompt() (and indeed some standard subclasses + do). + * finished: + handle1 will set this to a true value if a command signals + that we're done. + """ + + console: console.Console + + ## state + buffer: list[str] = field(default_factory=list) + pos: int = 0 + ps1: str = "->> " + ps2: str = "/>> " + ps3: str = "|.. " + ps4: str = R"\__ " + kill_ring: list[list[str]] = field(default_factory=list) + msg: str = "" + arg: int | None = None + dirty: bool = False + finished: bool = False + paste_mode: bool = False + in_bracketed_paste: bool = False + commands: dict[str, type[Command]] = field(default_factory=make_default_commands) + last_command: type[Command] | None = None + syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) + keymap: tuple[tuple[str, str], ...] = () + input_trans: input.KeymapTranslator = field(init=False) + input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list) + screen: list[str] = field(default_factory=list) + screeninfo: list[tuple[int, list[int]]] = field(init=False) + cxy: tuple[int, int] = field(init=False) + lxy: tuple[int, int] = field(init=False) + scheduled_commands: list[str] = field(default_factory=list) + can_colorize: bool = False + threading_hook: Callback | None = None + + ## cached metadata to speed up screen refreshes + @dataclass + class RefreshCache: + in_bracketed_paste: bool = False + screen: list[str] = field(default_factory=list) + screeninfo: list[tuple[int, list[int]]] = field(init=False) + line_end_offsets: list[int] = field(default_factory=list) + pos: int = field(init=False) + cxy: tuple[int, int] = field(init=False) + dimensions: tuple[int, int] = field(init=False) + invalidated: bool = False + + def update_cache(self, + reader: Reader, + screen: list[str], + screeninfo: list[tuple[int, list[int]]], + ) -> None: + self.in_bracketed_paste = reader.in_bracketed_paste + self.screen = screen.copy() + self.screeninfo = screeninfo.copy() + self.pos = reader.pos + self.cxy = reader.cxy + self.dimensions = reader.console.width, reader.console.height + self.invalidated = False + + def valid(self, reader: Reader) -> bool: + if self.invalidated: + return False + dimensions = reader.console.width, reader.console.height + dimensions_changed = dimensions != self.dimensions + paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste + return not (dimensions_changed or paste_changed) + + def get_cached_location(self, reader: Reader) -> tuple[int, int]: + if self.invalidated: + raise ValueError("Cache is invalidated") + offset = 0 + earliest_common_pos = min(reader.pos, self.pos) + num_common_lines = len(self.line_end_offsets) + while num_common_lines > 0: + offset = self.line_end_offsets[num_common_lines - 1] + if earliest_common_pos > offset: + break + num_common_lines -= 1 + else: + offset = 0 + return offset, num_common_lines + + last_refresh_cache: RefreshCache = field(default_factory=RefreshCache) + + def __post_init__(self) -> None: + # Enable the use of `insert` without a `prepare` call - necessary to + # facilitate the tab completion hack implemented for + # <https://bugs.python.org/issue25660>. + self.keymap = self.collect_keymap() + self.input_trans = input.KeymapTranslator( + self.keymap, invalid_cls="invalid-key", character_cls="self-insert" + ) + self.screeninfo = [(0, [])] + self.cxy = self.pos2xy() + self.lxy = (self.pos, 0) + self.can_colorize = can_colorize() + + self.last_refresh_cache.screeninfo = self.screeninfo + self.last_refresh_cache.pos = self.pos + self.last_refresh_cache.cxy = self.cxy + self.last_refresh_cache.dimensions = (0, 0) + + def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + return default_keymap + + def calc_screen(self) -> list[str]: + """Translate changes in self.buffer into changes in self.console.screen.""" + # Since the last call to calc_screen: + # screen and screeninfo may differ due to a completion menu being shown + # pos and cxy may differ due to edits, cursor movements, or completion menus + + # Lines that are above both the old and new cursor position can't have changed, + # unless the terminal has been resized (which might cause reflowing) or we've + # entered or left paste mode (which changes prompts, causing reflowing). + num_common_lines = 0 + offset = 0 + if self.last_refresh_cache.valid(self): + offset, num_common_lines = self.last_refresh_cache.get_cached_location(self) + + screen = self.last_refresh_cache.screen + del screen[num_common_lines:] + + screeninfo = self.last_refresh_cache.screeninfo + del screeninfo[num_common_lines:] + + last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets + del last_refresh_line_end_offsets[num_common_lines:] + + pos = self.pos + pos -= offset + + prompt_from_cache = (offset and self.buffer[offset - 1] != "\n") + lines = "".join(self.buffer[offset:]).split("\n") + cursor_found = False + lines_beyond_cursor = 0 + for ln, line in enumerate(lines, num_common_lines): + line_len = len(line) + if 0 <= pos <= line_len: + self.lxy = pos, ln + cursor_found = True + elif cursor_found: + lines_beyond_cursor += 1 + if lines_beyond_cursor > self.console.height: + # No need to keep formatting lines. + # The console can't show them. + break + if prompt_from_cache: + # Only the first line's prompt can come from the cache + prompt_from_cache = False + prompt = "" + else: + prompt = self.get_prompt(ln, line_len >= pos >= 0) + while "\n" in prompt: + pre_prompt, _, prompt = prompt.partition("\n") + last_refresh_line_end_offsets.append(offset) + screen.append(pre_prompt) + screeninfo.append((0, [])) + pos -= line_len + 1 + prompt, prompt_len = self.process_prompt(prompt) + chars, char_widths = disp_str(line) + wrapcount = (sum(char_widths) + prompt_len) // self.console.width + trace("wrapcount = {wrapcount}", wrapcount=wrapcount) + if wrapcount == 0 or not char_widths: + offset += line_len + 1 # Takes all of the line plus the newline + last_refresh_line_end_offsets.append(offset) + screen.append(prompt + "".join(chars)) + screeninfo.append((prompt_len, char_widths)) + else: + pre = prompt + prelen = prompt_len + for wrap in range(wrapcount + 1): + index_to_wrap_before = 0 + column = 0 + for char_width in char_widths: + if column + char_width + prelen >= self.console.width: + break + index_to_wrap_before += 1 + column += char_width + if len(chars) > index_to_wrap_before: + offset += index_to_wrap_before + post = "\\" + after = [1] + else: + offset += index_to_wrap_before + 1 # Takes the newline + post = "" + after = [] + last_refresh_line_end_offsets.append(offset) + render = pre + "".join(chars[:index_to_wrap_before]) + post + render_widths = char_widths[:index_to_wrap_before] + after + screen.append(render) + screeninfo.append((prelen, render_widths)) + chars = chars[index_to_wrap_before:] + char_widths = char_widths[index_to_wrap_before:] + pre = "" + prelen = 0 + self.screeninfo = screeninfo + self.cxy = self.pos2xy() + if self.msg: + for mline in self.msg.split("\n"): + screen.append(mline) + screeninfo.append((0, [])) + + self.last_refresh_cache.update_cache(self, screen, screeninfo) + return screen + + @staticmethod + def process_prompt(prompt: str) -> tuple[str, int]: + r"""Return a tuple with the prompt string and its visible length. + + The prompt string has the zero-width brackets recognized by shells + (\x01 and \x02) removed. The length ignores anything between those + brackets as well as any ANSI escape sequences. + """ + out_prompt = unbracket(prompt, including_content=False) + visible_prompt = unbracket(prompt, including_content=True) + return out_prompt, wlen(visible_prompt) + + def bow(self, p: int | None = None) -> int: + """Return the 0-based index of the word break preceding p most + immediately. + + p defaults to self.pos; word boundaries are determined using + self.syntax_table.""" + if p is None: + p = self.pos + st = self.syntax_table + b = self.buffer + p -= 1 + while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: + p -= 1 + while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: + p -= 1 + return p + 1 + + def eow(self, p: int | None = None) -> int: + """Return the 0-based index of the word break following p most + immediately. + + p defaults to self.pos; word boundaries are determined using + self.syntax_table.""" + if p is None: + p = self.pos + st = self.syntax_table + b = self.buffer + while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: + p += 1 + while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: + p += 1 + return p + + def bol(self, p: int | None = None) -> int: + """Return the 0-based index of the line break preceding p most + immediately. + + p defaults to self.pos.""" + if p is None: + p = self.pos + b = self.buffer + p -= 1 + while p >= 0 and b[p] != "\n": + p -= 1 + return p + 1 + + def eol(self, p: int | None = None) -> int: + """Return the 0-based index of the line break following p most + immediately. + + p defaults to self.pos.""" + if p is None: + p = self.pos + b = self.buffer + while p < len(b) and b[p] != "\n": + p += 1 + return p + + def max_column(self, y: int) -> int: + """Return the last x-offset for line y""" + return self.screeninfo[y][0] + sum(self.screeninfo[y][1]) + + def max_row(self) -> int: + return len(self.screeninfo) - 1 + + def get_arg(self, default: int = 1) -> int: + """Return any prefix argument that the user has supplied, + returning `default' if there is None. Defaults to 1. + """ + if self.arg is None: + return default + return self.arg + + def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: + """Return what should be in the left-hand margin for line + `lineno'.""" + if self.arg is not None and cursor_on_line: + prompt = f"(arg: {self.arg}) " + elif self.paste_mode and not self.in_bracketed_paste: + prompt = "(paste) " + elif "\n" in self.buffer: + if lineno == 0: + prompt = self.ps2 + elif self.ps4 and lineno == self.buffer.count("\n"): + prompt = self.ps4 + else: + prompt = self.ps3 + else: + prompt = self.ps1 + + if self.can_colorize: + prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}" + return prompt + + def push_input_trans(self, itrans: input.KeymapTranslator) -> None: + self.input_trans_stack.append(self.input_trans) + self.input_trans = itrans + + def pop_input_trans(self) -> None: + self.input_trans = self.input_trans_stack.pop() + + def setpos_from_xy(self, x: int, y: int) -> None: + """Set pos according to coordinates x, y""" + pos = 0 + i = 0 + while i < y: + prompt_len, char_widths = self.screeninfo[i] + offset = len(char_widths) + in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width + if in_wrapped_line: + pos += offset - 1 # -1 cause backslash is not in buffer + else: + pos += offset + 1 # +1 cause newline is in buffer + i += 1 + + j = 0 + cur_x = self.screeninfo[i][0] + while cur_x < x: + if self.screeninfo[i][1][j] == 0: + j += 1 # prevent potential future infinite loop + continue + cur_x += self.screeninfo[i][1][j] + j += 1 + pos += 1 + + self.pos = pos + + def pos2xy(self) -> tuple[int, int]: + """Return the x, y coordinates of position 'pos'.""" + + prompt_len, y = 0, 0 + char_widths: list[int] = [] + pos = self.pos + assert 0 <= pos <= len(self.buffer) + + # optimize for the common case: typing at the end of the buffer + if pos == len(self.buffer) and len(self.screeninfo) > 0: + y = len(self.screeninfo) - 1 + prompt_len, char_widths = self.screeninfo[y] + return prompt_len + sum(char_widths), y + + for prompt_len, char_widths in self.screeninfo: + offset = len(char_widths) + in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width + if in_wrapped_line: + offset -= 1 # need to remove line-wrapping backslash + + if offset >= pos: + break + + if not in_wrapped_line: + offset += 1 # there's a newline in buffer + + pos -= offset + y += 1 + return prompt_len + sum(char_widths[:pos]), y + + def insert(self, text: str | list[str]) -> None: + """Insert 'text' at the insertion point.""" + self.buffer[self.pos : self.pos] = list(text) + self.pos += len(text) + self.dirty = True + + def update_cursor(self) -> None: + """Move the cursor to reflect changes in self.pos""" + self.cxy = self.pos2xy() + self.console.move_cursor(*self.cxy) + + def after_command(self, cmd: Command) -> None: + """This function is called to allow post command cleanup.""" + if getattr(cmd, "kills_digit_arg", True): + if self.arg is not None: + self.dirty = True + self.arg = None + + def prepare(self) -> None: + """Get ready to run. Call restore when finished. You must not + write to the console in between the calls to prepare and + restore.""" + try: + self.console.prepare() + self.arg = None + self.finished = False + del self.buffer[:] + self.pos = 0 + self.dirty = True + self.last_command = None + self.calc_screen() + except BaseException: + self.restore() + raise + + while self.scheduled_commands: + cmd = self.scheduled_commands.pop() + self.do_cmd((cmd, [])) + + def last_command_is(self, cls: type) -> bool: + if not self.last_command: + return False + return issubclass(cls, self.last_command) + + def restore(self) -> None: + """Clean up after a run.""" + self.console.restore() + + @contextmanager + def suspend(self) -> SimpleContextManager: + """A context manager to delegate to another reader.""" + prev_state = {f.name: getattr(self, f.name) for f in fields(self)} + try: + self.restore() + yield + finally: + for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"): + setattr(self, arg, prev_state[arg]) + self.prepare() + + def finish(self) -> None: + """Called when a command signals that we're finished.""" + pass + + def error(self, msg: str = "none") -> None: + self.msg = "! " + msg + " " + self.dirty = True + self.console.beep() + + def update_screen(self) -> None: + if self.dirty: + self.refresh() + + def refresh(self) -> None: + """Recalculate and refresh the screen.""" + if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n": + return + + # this call sets up self.cxy, so call it first. + self.screen = self.calc_screen() + self.console.refresh(self.screen, self.cxy) + self.dirty = False + + def do_cmd(self, cmd: tuple[str, list[str]]) -> None: + """`cmd` is a tuple of "event_name" and "event", which in the current + implementation is always just the "buffer" which happens to be a list + of single-character strings.""" + + trace("received command {cmd}", cmd=cmd) + if isinstance(cmd[0], str): + command_type = self.commands.get(cmd[0], commands.invalid_command) + elif isinstance(cmd[0], type): + command_type = cmd[0] + else: + return # nothing to do + + command = command_type(self, *cmd) # type: ignore[arg-type] + command.do() + + self.after_command(command) + + if self.dirty: + self.refresh() + else: + self.update_cursor() + + if not isinstance(cmd, commands.digit_arg): + self.last_command = command_type + + self.finished = bool(command.finish) + if self.finished: + self.console.finish() + self.finish() + + def run_hooks(self) -> None: + threading_hook = self.threading_hook + if threading_hook is None and 'threading' in sys.modules: + from ._threading_handler import install_threading_hook + install_threading_hook(self) + if threading_hook is not None: + try: + threading_hook() + except Exception: + pass + + input_hook = self.console.input_hook + if input_hook: + try: + input_hook() + except Exception: + pass + + def handle1(self, block: bool = True) -> bool: + """Handle a single event. Wait as long as it takes if block + is true (the default), otherwise return False if no event is + pending.""" + + if self.msg: + self.msg = "" + self.dirty = True + + while True: + # We use the same timeout as in readline.c: 100ms + self.run_hooks() + self.console.wait(100) + event = self.console.get_event(block=False) + if not event: + if block: + continue + return False + + translate = True + + if event.evt == "key": + self.input_trans.push(event) + elif event.evt == "scroll": + self.refresh() + elif event.evt == "resize": + self.refresh() + else: + translate = False + + if translate: + cmd = self.input_trans.get() + else: + cmd = [event.evt, event.data] + + if cmd is None: + if block: + continue + return False + + self.do_cmd(cmd) + return True + + def push_char(self, char: int | bytes) -> None: + self.console.push_char(char) + self.handle1(block=False) + + def readline(self, startup_hook: Callback | None = None) -> str: + """Read a line. The implementation of this method also shows + how to drive Reader if you want more control over the event + loop.""" + self.prepare() + try: + if startup_hook is not None: + startup_hook() + self.refresh() + while not self.finished: + self.handle1() + return self.get_unicode() + + finally: + self.restore() + + def bind(self, spec: KeySpec, command: CommandName) -> None: + self.keymap = self.keymap + ((spec, command),) + self.input_trans = input.KeymapTranslator( + self.keymap, invalid_cls="invalid-key", character_cls="self-insert" + ) + + def get_unicode(self) -> str: + """Return the current buffer as a unicode string.""" + return "".join(self.buffer) diff --git a/contrib/tools/python3/Lib/_pyrepl/readline.py b/contrib/tools/python3/Lib/_pyrepl/readline.py new file mode 100644 index 00000000000..f802a952690 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/readline.py @@ -0,0 +1,599 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Alex Gaynor +# Antonio Cuni +# Armin Rigo +# Holger Krekel +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +"""A compatibility wrapper reimplementing the 'readline' standard module +on top of pyrepl. Not all functionalities are supported. Contains +extensions for multiline input. +""" + +from __future__ import annotations + +import warnings +from dataclasses import dataclass, field + +import os +from site import gethistoryfile +import sys +from rlcompleter import Completer as RLCompleter + +from . import commands, historical_reader +from .completing_reader import CompletingReader +from .console import Console as ConsoleType + +Console: type[ConsoleType] +_error: tuple[type[Exception], ...] | type[Exception] + +if os.name == "nt": + from .windows_console import WindowsConsole as Console, _error +else: + from .unix_console import UnixConsole as Console, _error + +ENCODING = sys.getdefaultencoding() or "latin1" + + +# types +Command = commands.Command +from collections.abc import Callable, Collection +from .types import Callback, Completer, KeySpec, CommandName + +TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import Any, Mapping + + +MoreLinesCallable = Callable[[str], bool] + + +__all__ = [ + "add_history", + "clear_history", + "get_begidx", + "get_completer", + "get_completer_delims", + "get_current_history_length", + "get_endidx", + "get_history_item", + "get_history_length", + "get_line_buffer", + "insert_text", + "parse_and_bind", + "read_history_file", + # "read_init_file", + # "redisplay", + "remove_history_item", + "replace_history_item", + "set_auto_history", + "set_completer", + "set_completer_delims", + "set_history_length", + # "set_pre_input_hook", + "set_startup_hook", + "write_history_file", + # ---- multiline extensions ---- + "multiline_input", +] + +# ____________________________________________________________ + +@dataclass +class ReadlineConfig: + readline_completer: Completer | None = None + completer_delims: frozenset[str] = frozenset(" \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?") + + +@dataclass(kw_only=True) +class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader): + # Class fields + assume_immutable_completions = False + use_brackets = False + sort_in_column = True + + # Instance fields + config: ReadlineConfig + more_lines: MoreLinesCallable | None = None + last_used_indentation: str | None = None + + def __post_init__(self) -> None: + super().__post_init__() + self.commands["maybe_accept"] = maybe_accept + self.commands["maybe-accept"] = maybe_accept + self.commands["backspace_dedent"] = backspace_dedent + self.commands["backspace-dedent"] = backspace_dedent + + def error(self, msg: str = "none") -> None: + pass # don't show error messages by default + + def get_stem(self) -> str: + b = self.buffer + p = self.pos - 1 + completer_delims = self.config.completer_delims + while p >= 0 and b[p] not in completer_delims: + p -= 1 + return "".join(b[p + 1 : self.pos]) + + def get_completions(self, stem: str) -> list[str]: + if len(stem) == 0 and self.more_lines is not None: + b = self.buffer + p = self.pos + while p > 0 and b[p - 1] != "\n": + p -= 1 + num_spaces = 4 - ((self.pos - p) % 4) + return [" " * num_spaces] + result = [] + function = self.config.readline_completer + if function is not None: + try: + stem = str(stem) # rlcompleter.py seems to not like unicode + except UnicodeEncodeError: + pass # but feed unicode anyway if we have no choice + state = 0 + while True: + try: + next = function(stem, state) + except Exception: + break + if not isinstance(next, str): + break + result.append(next) + state += 1 + # emulate the behavior of the standard readline that sorts + # the completions before displaying them. + result.sort() + return result + + def get_trimmed_history(self, maxlength: int) -> list[str]: + if maxlength >= 0: + cut = len(self.history) - maxlength + if cut < 0: + cut = 0 + else: + cut = 0 + return self.history[cut:] + + def update_last_used_indentation(self) -> None: + indentation = _get_first_indentation(self.buffer) + if indentation is not None: + self.last_used_indentation = indentation + + # --- simplified support for reading multiline Python statements --- + + def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: + return super().collect_keymap() + ( + (r"\n", "maybe-accept"), + (r"\<backspace>", "backspace-dedent"), + ) + + def after_command(self, cmd: Command) -> None: + super().after_command(cmd) + if self.more_lines is None: + # Force single-line input if we are in raw_input() mode. + # Although there is no direct way to add a \n in this mode, + # multiline buffers can still show up using various + # commands, e.g. navigating the history. + try: + index = self.buffer.index("\n") + except ValueError: + pass + else: + self.buffer = self.buffer[:index] + if self.pos > len(self.buffer): + self.pos = len(self.buffer) + + +def set_auto_history(_should_auto_add_history: bool) -> None: + """Enable or disable automatic history""" + historical_reader.should_auto_add_history = bool(_should_auto_add_history) + + +def _get_this_line_indent(buffer: list[str], pos: int) -> int: + indent = 0 + while pos > 0 and buffer[pos - 1] in " \t": + indent += 1 + pos -= 1 + if pos > 0 and buffer[pos - 1] == "\n": + return indent + return 0 + + +def _get_previous_line_indent(buffer: list[str], pos: int) -> tuple[int, int | None]: + prevlinestart = pos + while prevlinestart > 0 and buffer[prevlinestart - 1] != "\n": + prevlinestart -= 1 + prevlinetext = prevlinestart + while prevlinetext < pos and buffer[prevlinetext] in " \t": + prevlinetext += 1 + if prevlinetext == pos: + indent = None + else: + indent = prevlinetext - prevlinestart + return prevlinestart, indent + + +def _get_first_indentation(buffer: list[str]) -> str | None: + indented_line_start = None + for i in range(len(buffer)): + if (i < len(buffer) - 1 + and buffer[i] == "\n" + and buffer[i + 1] in " \t" + ): + indented_line_start = i + 1 + elif indented_line_start is not None and buffer[i] not in " \t\n": + return ''.join(buffer[indented_line_start : i]) + return None + + +def _should_auto_indent(buffer: list[str], pos: int) -> bool: + # check if last character before "pos" is a colon, ignoring + # whitespaces and comments. + last_char = None + while pos > 0: + pos -= 1 + if last_char is None: + if buffer[pos] not in " \t\n#": # ignore whitespaces and comments + last_char = buffer[pos] + else: + # even if we found a non-whitespace character before + # original pos, we keep going back until newline is reached + # to make sure we ignore comments + if buffer[pos] == "\n": + break + if buffer[pos] == "#": + last_char = None + return last_char == ":" + + +class maybe_accept(commands.Command): + def do(self) -> None: + r: ReadlineAlikeReader + r = self.reader # type: ignore[assignment] + r.dirty = True # this is needed to hide the completion menu, if visible + + if self.reader.in_bracketed_paste: + r.insert("\n") + return + + # if there are already several lines and the cursor + # is not on the last one, always insert a new \n. + text = r.get_unicode() + + if "\n" in r.buffer[r.pos :] or ( + r.more_lines is not None and r.more_lines(text) + ): + def _newline_before_pos(): + before_idx = r.pos - 1 + while before_idx > 0 and text[before_idx].isspace(): + before_idx -= 1 + return text[before_idx : r.pos].count("\n") > 0 + + # if there's already a new line before the cursor then + # even if the cursor is followed by whitespace, we assume + # the user is trying to terminate the block + if _newline_before_pos() and text[r.pos:].isspace(): + self.finish = True + return + + # auto-indent the next line like the previous line + prevlinestart, indent = _get_previous_line_indent(r.buffer, r.pos) + r.insert("\n") + if not self.reader.paste_mode: + if indent: + for i in range(prevlinestart, prevlinestart + indent): + r.insert(r.buffer[i]) + r.update_last_used_indentation() + if _should_auto_indent(r.buffer, r.pos): + if r.last_used_indentation is not None: + indentation = r.last_used_indentation + else: + # default + indentation = " " * 4 + r.insert(indentation) + elif not self.reader.paste_mode: + self.finish = True + else: + r.insert("\n") + + +class backspace_dedent(commands.Command): + def do(self) -> None: + r = self.reader + b = r.buffer + if r.pos > 0: + repeat = 1 + if b[r.pos - 1] != "\n": + indent = _get_this_line_indent(b, r.pos) + if indent > 0: + ls = r.pos - indent + while ls > 0: + ls, pi = _get_previous_line_indent(b, ls - 1) + if pi is not None and pi < indent: + repeat = indent - pi + break + r.pos -= repeat + del b[r.pos : r.pos + repeat] + r.dirty = True + else: + self.reader.error("can't backspace at start") + + +# ____________________________________________________________ + + +@dataclass(slots=True) +class _ReadlineWrapper: + f_in: int = -1 + f_out: int = -1 + reader: ReadlineAlikeReader | None = field(default=None, repr=False) + saved_history_length: int = -1 + startup_hook: Callback | None = None + config: ReadlineConfig = field(default_factory=ReadlineConfig, repr=False) + + def __post_init__(self) -> None: + if self.f_in == -1: + self.f_in = os.dup(0) + if self.f_out == -1: + self.f_out = os.dup(1) + + def get_reader(self) -> ReadlineAlikeReader: + if self.reader is None: + console = Console(self.f_in, self.f_out, encoding=ENCODING) + self.reader = ReadlineAlikeReader(console=console, config=self.config) + return self.reader + + def input(self, prompt: object = "") -> str: + try: + reader = self.get_reader() + except _error: + assert raw_input is not None + return raw_input(prompt) + prompt_str = str(prompt) + reader.ps1 = prompt_str + sys.audit("builtins.input", prompt_str) + result = reader.readline(startup_hook=self.startup_hook) + sys.audit("builtins.input/result", result) + return result + + def multiline_input(self, more_lines: MoreLinesCallable, ps1: str, ps2: str) -> str: + """Read an input on possibly multiple lines, asking for more + lines as long as 'more_lines(unicodetext)' returns an object whose + boolean value is true. + """ + reader = self.get_reader() + saved = reader.more_lines + try: + reader.more_lines = more_lines + reader.ps1 = ps1 + reader.ps2 = ps1 + reader.ps3 = ps2 + reader.ps4 = "" + with warnings.catch_warnings(action="ignore"): + return reader.readline() + finally: + reader.more_lines = saved + reader.paste_mode = False + + def parse_and_bind(self, string: str) -> None: + pass # XXX we don't support parsing GNU-readline-style init files + + def set_completer(self, function: Completer | None = None) -> None: + self.config.readline_completer = function + + def get_completer(self) -> Completer | None: + return self.config.readline_completer + + def set_completer_delims(self, delimiters: Collection[str]) -> None: + self.config.completer_delims = frozenset(delimiters) + + def get_completer_delims(self) -> str: + return "".join(sorted(self.config.completer_delims)) + + def _histline(self, line: str) -> str: + line = line.rstrip("\n") + return line + + def get_history_length(self) -> int: + return self.saved_history_length + + def set_history_length(self, length: int) -> None: + self.saved_history_length = length + + def get_current_history_length(self) -> int: + return len(self.get_reader().history) + + def read_history_file(self, filename: str = gethistoryfile()) -> None: + # multiline extension (really a hack) for the end of lines that + # are actually continuations inside a single multiline_input() + # history item: we use \r\n instead of just \n. If the history + # file is passed to GNU readline, the extra \r are just ignored. + history = self.get_reader().history + + with open(os.path.expanduser(filename), 'rb') as f: + is_editline = f.readline().startswith(b"_HiStOrY_V2_") + if is_editline: + encoding = "unicode-escape" + else: + f.seek(0) + encoding = "utf-8" + + lines = [line.decode(encoding, errors='replace') for line in f.read().split(b'\n')] + buffer = [] + for line in lines: + if line.endswith("\r"): + buffer.append(line+'\n') + else: + line = self._histline(line) + if buffer: + line = self._histline("".join(buffer).replace("\r", "") + line) + del buffer[:] + if line: + history.append(line) + + def write_history_file(self, filename: str = gethistoryfile()) -> None: + maxlength = self.saved_history_length + history = self.get_reader().get_trimmed_history(maxlength) + f = open(os.path.expanduser(filename), "w", + encoding="utf-8", newline="\n") + with f: + for entry in history: + entry = entry.replace("\n", "\r\n") # multiline history support + f.write(entry + "\n") + + def clear_history(self) -> None: + del self.get_reader().history[:] + + def get_history_item(self, index: int) -> str | None: + history = self.get_reader().history + if 1 <= index <= len(history): + return history[index - 1] + else: + return None # like readline.c + + def remove_history_item(self, index: int) -> None: + history = self.get_reader().history + if 0 <= index < len(history): + del history[index] + else: + raise ValueError("No history item at position %d" % index) + # like readline.c + + def replace_history_item(self, index: int, line: str) -> None: + history = self.get_reader().history + if 0 <= index < len(history): + history[index] = self._histline(line) + else: + raise ValueError("No history item at position %d" % index) + # like readline.c + + def add_history(self, line: str) -> None: + self.get_reader().history.append(self._histline(line)) + + def set_startup_hook(self, function: Callback | None = None) -> None: + self.startup_hook = function + + def get_line_buffer(self) -> str: + return self.get_reader().get_unicode() + + def _get_idxs(self) -> tuple[int, int]: + start = cursor = self.get_reader().pos + buf = self.get_line_buffer() + for i in range(cursor - 1, -1, -1): + if buf[i] in self.get_completer_delims(): + break + start = i + return start, cursor + + def get_begidx(self) -> int: + return self._get_idxs()[0] + + def get_endidx(self) -> int: + return self._get_idxs()[1] + + def insert_text(self, text: str) -> None: + self.get_reader().insert(text) + + +_wrapper = _ReadlineWrapper() + +# ____________________________________________________________ +# Public API + +parse_and_bind = _wrapper.parse_and_bind +set_completer = _wrapper.set_completer +get_completer = _wrapper.get_completer +set_completer_delims = _wrapper.set_completer_delims +get_completer_delims = _wrapper.get_completer_delims +get_history_length = _wrapper.get_history_length +set_history_length = _wrapper.set_history_length +get_current_history_length = _wrapper.get_current_history_length +read_history_file = _wrapper.read_history_file +write_history_file = _wrapper.write_history_file +clear_history = _wrapper.clear_history +get_history_item = _wrapper.get_history_item +remove_history_item = _wrapper.remove_history_item +replace_history_item = _wrapper.replace_history_item +add_history = _wrapper.add_history +set_startup_hook = _wrapper.set_startup_hook +get_line_buffer = _wrapper.get_line_buffer +get_begidx = _wrapper.get_begidx +get_endidx = _wrapper.get_endidx +insert_text = _wrapper.insert_text + +# Extension +multiline_input = _wrapper.multiline_input + +# Internal hook +_get_reader = _wrapper.get_reader + +# ____________________________________________________________ +# Stubs + + +def _make_stub(_name: str, _ret: object) -> None: + def stub(*args: object, **kwds: object) -> None: + import warnings + + warnings.warn("readline.%s() not implemented" % _name, stacklevel=2) + + stub.__name__ = _name + globals()[_name] = stub + + +for _name, _ret in [ + ("read_init_file", None), + ("redisplay", None), + ("set_pre_input_hook", None), +]: + assert _name not in globals(), _name + _make_stub(_name, _ret) + +# ____________________________________________________________ + + +def _setup(namespace: Mapping[str, Any]) -> None: + global raw_input + if raw_input is not None: + return # don't run _setup twice + + try: + f_in = sys.stdin.fileno() + f_out = sys.stdout.fileno() + except (AttributeError, ValueError): + return + if not os.isatty(f_in) or not os.isatty(f_out): + return + + _wrapper.f_in = f_in + _wrapper.f_out = f_out + + # set up namespace in rlcompleter, which requires it to be a bona fide dict + if not isinstance(namespace, dict): + namespace = dict(namespace) + _wrapper.config.readline_completer = RLCompleter(namespace).complete + + # this is not really what readline.c does. Better than nothing I guess + import builtins + raw_input = builtins.input + builtins.input = _wrapper.input + + +raw_input: Callable[[object], str] | None = None diff --git a/contrib/tools/python3/Lib/_pyrepl/simple_interact.py b/contrib/tools/python3/Lib/_pyrepl/simple_interact.py new file mode 100644 index 00000000000..8fb2359fb51 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/simple_interact.py @@ -0,0 +1,180 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +"""This is an alternative to python_reader which tries to emulate +the CPython prompt as closely as possible, with the exception of +allowing multiline input and multiline history entries. +""" + +from __future__ import annotations + +import _sitebuiltins +import functools +import os +import sys +import code + +from .readline import _get_reader, multiline_input + +TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import Any + + +_error: tuple[type[Exception], ...] | type[Exception] +try: + from .unix_console import _error +except ModuleNotFoundError: + from .windows_console import _error + +def check() -> str: + """Returns the error message if there is a problem initializing the state.""" + try: + _get_reader() + except _error as e: + if term := os.environ.get("TERM", ""): + term = f"; TERM={term}" + return str(str(e) or repr(e) or "unknown error") + term + return "" + + +def _strip_final_indent(text: str) -> str: + # kill spaces and tabs at the end, but only if they follow '\n'. + # meant to remove the auto-indentation only (although it would of + # course also remove explicitly-added indentation). + short = text.rstrip(" \t") + n = len(short) + if n > 0 and text[n - 1] == "\n": + return short + return text + + +def _clear_screen(): + reader = _get_reader() + reader.scheduled_commands.append("clear_screen") + + +REPL_COMMANDS = { + "exit": _sitebuiltins.Quitter('exit', ''), + "quit": _sitebuiltins.Quitter('quit' ,''), + "copyright": _sitebuiltins._Printer('copyright', sys.copyright), + "help": _sitebuiltins._Helper(), + "clear": _clear_screen, + "\x1a": _sitebuiltins.Quitter('\x1a', ''), +} + + +def _more_lines(console: code.InteractiveConsole, unicodetext: str) -> bool: + # ooh, look at the hack: + src = _strip_final_indent(unicodetext) + try: + code = console.compile(src, "<stdin>", "single") + except (OverflowError, SyntaxError, ValueError): + lines = src.splitlines(keepends=True) + if len(lines) == 1: + return False + + last_line = lines[-1] + was_indented = last_line.startswith((" ", "\t")) + not_empty = last_line.strip() != "" + incomplete = not last_line.endswith("\n") + return (was_indented or not_empty) and incomplete + else: + return code is None + + +def run_multiline_interactive_console( + console: code.InteractiveConsole, + *, + future_flags: int = 0, +) -> None: + from .readline import _setup + _setup(console.locals) + if future_flags: + console.compile.compiler.flags |= future_flags + + more_lines = functools.partial(_more_lines, console) + input_n = 0 + + _is_x_showrefcount_set = sys._xoptions.get("showrefcount") + _is_pydebug_build = hasattr(sys, "gettotalrefcount") + show_ref_count = _is_x_showrefcount_set and _is_pydebug_build + + def maybe_run_command(statement: str) -> bool: + statement = statement.strip() + if statement in console.locals or statement not in REPL_COMMANDS: + return False + + reader = _get_reader() + reader.history.pop() # skip internal commands in history + command = REPL_COMMANDS[statement] + if callable(command): + # Make sure that history does not change because of commands + with reader.suspend_history(): + command() + return True + return False + + while 1: + try: + try: + sys.stdout.flush() + except Exception: + pass + + ps1 = getattr(sys, "ps1", ">>> ") + ps2 = getattr(sys, "ps2", "... ") + try: + statement = multiline_input(more_lines, ps1, ps2) + except EOFError: + break + + if maybe_run_command(statement): + continue + + input_name = f"<python-input-{input_n}>" + more = console.push(_strip_final_indent(statement), filename=input_name, _symbol="single") # type: ignore[call-arg] + assert not more + input_n += 1 + except KeyboardInterrupt: + r = _get_reader() + r.cmpltn_reset() + if r.input_trans is r.isearch_trans: + r.do_cmd(("isearch-end", [""])) + r.pos = len(r.get_unicode()) + r.dirty = True + r.refresh() + r.in_bracketed_paste = False + console.write("\nKeyboardInterrupt\n") + console.resetbuffer() + except MemoryError: + console.write("\nMemoryError\n") + console.resetbuffer() + except SystemExit: + raise + except: + console.showtraceback() + console.resetbuffer() + if show_ref_count: + console.write( + f"[{sys.gettotalrefcount()} refs," + f" {sys.getallocatedblocks()} blocks]\n" + ) diff --git a/contrib/tools/python3/Lib/_pyrepl/terminfo.py b/contrib/tools/python3/Lib/_pyrepl/terminfo.py new file mode 100644 index 00000000000..063a285bb99 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/terminfo.py @@ -0,0 +1,530 @@ +"""Pure Python curses-like terminal capability queries.""" + +from dataclasses import dataclass, field +import errno +import os +from pathlib import Path +import re +import struct + + +# Terminfo constants +MAGIC16 = 0o432 # Magic number for 16-bit terminfo format +MAGIC32 = 0o1036 # Magic number for 32-bit terminfo format + +# Special values for absent/cancelled capabilities +ABSENT_BOOLEAN = -1 +ABSENT_NUMERIC = -1 +CANCELLED_NUMERIC = -2 +ABSENT_STRING = None +CANCELLED_STRING = None + + +# Standard string capability names from ncurses Caps file +# This matches the order used by ncurses when compiling terminfo +# fmt: off +_STRING_NAMES: tuple[str, ...] = ( + "cbt", "bel", "cr", "csr", "tbc", "clear", "el", "ed", "hpa", "cmdch", + "cup", "cud1", "home", "civis", "cub1", "mrcup", "cnorm", "cuf1", "ll", + "cuu1", "cvvis", "dch1", "dl1", "dsl", "hd", "smacs", "blink", "bold", + "smcup", "smdc", "dim", "smir", "invis", "prot", "rev", "smso", "smul", + "ech", "rmacs", "sgr0", "rmcup", "rmdc", "rmir", "rmso", "rmul", "flash", + "ff", "fsl", "is1", "is2", "is3", "if", "ich1", "il1", "ip", "kbs", "ktbc", + "kclr", "kctab", "kdch1", "kdl1", "kcud1", "krmir", "kel", "ked", "kf0", + "kf1", "kf10", "kf2", "kf3", "kf4", "kf5", "kf6", "kf7", "kf8", "kf9", + "khome", "kich1", "kil1", "kcub1", "kll", "knp", "kpp", "kcuf1", "kind", + "kri", "khts", "kcuu1", "rmkx", "smkx", "lf0", "lf1", "lf10", "lf2", "lf3", + "lf4", "lf5", "lf6", "lf7", "lf8", "lf9", "rmm", "smm", "nel", "pad", "dch", + "dl", "cud", "ich", "indn", "il", "cub", "cuf", "rin", "cuu", "pfkey", + "pfloc", "pfx", "mc0", "mc4", "mc5", "rep", "rs1", "rs2", "rs3", "rf", "rc", + "vpa", "sc", "ind", "ri", "sgr", "hts", "wind", "ht", "tsl", "uc", "hu", + "iprog", "ka1", "ka3", "kb2", "kc1", "kc3", "mc5p", "rmp", "acsc", "pln", + "kcbt", "smxon", "rmxon", "smam", "rmam", "xonc", "xoffc", "enacs", "smln", + "rmln", "kbeg", "kcan", "kclo", "kcmd", "kcpy", "kcrt", "kend", "kent", + "kext", "kfnd", "khlp", "kmrk", "kmsg", "kmov", "knxt", "kopn", "kopt", + "kprv", "kprt", "krdo", "kref", "krfr", "krpl", "krst", "kres", "ksav", + "kspd", "kund", "kBEG", "kCAN", "kCMD", "kCPY", "kCRT", "kDC", "kDL", + "kslt", "kEND", "kEOL", "kEXT", "kFND", "kHLP", "kHOM", "kIC", "kLFT", + "kMSG", "kMOV", "kNXT", "kOPT", "kPRV", "kPRT", "kRDO", "kRPL", "kRIT", + "kRES", "kSAV", "kSPD", "kUND", "rfi", "kf11", "kf12", "kf13", "kf14", + "kf15", "kf16", "kf17", "kf18", "kf19", "kf20", "kf21", "kf22", "kf23", + "kf24", "kf25", "kf26", "kf27", "kf28", "kf29", "kf30", "kf31", "kf32", + "kf33", "kf34", "kf35", "kf36", "kf37", "kf38", "kf39", "kf40", "kf41", + "kf42", "kf43", "kf44", "kf45", "kf46", "kf47", "kf48", "kf49", "kf50", + "kf51", "kf52", "kf53", "kf54", "kf55", "kf56", "kf57", "kf58", "kf59", + "kf60", "kf61", "kf62", "kf63", "el1", "mgc", "smgl", "smgr", "fln", "sclk", + "dclk", "rmclk", "cwin", "wingo", "hup","dial", "qdial", "tone", "pulse", + "hook", "pause", "wait", "u0", "u1", "u2", "u3", "u4", "u5", "u6", "u7", + "u8", "u9", "op", "oc", "initc", "initp", "scp", "setf", "setb", "cpi", + "lpi", "chr", "cvr", "defc", "swidm", "sdrfq", "sitm", "slm", "smicm", + "snlq", "snrmq", "sshm", "ssubm", "ssupm", "sum", "rwidm", "ritm", "rlm", + "rmicm", "rshm", "rsubm", "rsupm", "rum", "mhpa", "mcud1", "mcub1", "mcuf1", + "mvpa", "mcuu1", "porder", "mcud", "mcub", "mcuf", "mcuu", "scs", "smgb", + "smgbp", "smglp", "smgrp", "smgt", "smgtp", "sbim", "scsd", "rbim", "rcsd", + "subcs", "supcs", "docr", "zerom", "csnm", "kmous", "minfo", "reqmp", + "getm", "setaf", "setab", "pfxl", "devt", "csin", "s0ds", "s1ds", "s2ds", + "s3ds", "smglr", "smgtb", "birep", "binel", "bicr", "colornm", "defbi", + "endbi", "setcolor", "slines", "dispc", "smpch", "rmpch", "smsc", "rmsc", + "pctrm", "scesc", "scesa", "ehhlm", "elhlm", "elohlm", "erhlm", "ethlm", + "evhlm", "sgr1", "slength", "OTi2", "OTrs", "OTnl", "OTbc", "OTko", "OTma", + "OTG2", "OTG3", "OTG1", "OTG4", "OTGR", "OTGL", "OTGU", "OTGD", "OTGH", + "OTGV", "OTGC","meml", "memu", "box1" +) +# fmt: on +_STRING_CAPABILITY_NAMES = {name: i for i, name in enumerate(_STRING_NAMES)} + + +def _get_terminfo_dirs() -> list[Path]: + """Get list of directories to search for terminfo files. + + Based on ncurses behavior in: + - ncurses/tinfo/db_iterator.c:_nc_next_db() + - ncurses/tinfo/read_entry.c:_nc_read_entry() + """ + dirs = [] + + terminfo = os.environ.get("TERMINFO") + if terminfo: + dirs.append(terminfo) + + try: + home = Path.home() + dirs.append(str(home / ".terminfo")) + except RuntimeError: + pass + + # Check TERMINFO_DIRS + terminfo_dirs = os.environ.get("TERMINFO_DIRS", "") + if terminfo_dirs: + for d in terminfo_dirs.split(":"): + if d: + dirs.append(d) + + dirs.extend( + [ + "/etc/terminfo", + "/lib/terminfo", + "/usr/lib/terminfo", + "/usr/share/terminfo", + "/usr/share/lib/terminfo", + "/usr/share/misc/terminfo", + "/usr/local/lib/terminfo", + "/usr/local/share/terminfo", + ] + ) + + return [Path(d) for d in dirs if Path(d).is_dir()] + + +def _validate_terminal_name_or_raise(terminal_name: str) -> None: + if not isinstance(terminal_name, str): + raise TypeError("`terminal_name` must be a string") + + if not terminal_name: + raise ValueError("`terminal_name` cannot be empty") + + if "\x00" in terminal_name: + raise ValueError("NUL character found in `terminal_name`") + + t = Path(terminal_name) + if len(t.parts) > 1: + raise ValueError("`terminal_name` cannot contain path separators") + + +def _read_terminfo_file(terminal_name: str) -> bytes: + """Find and read terminfo file for given terminal name. + + Terminfo files are stored in directories using the first character + of the terminal name as a subdirectory. + """ + _validate_terminal_name_or_raise(terminal_name) + first_char = terminal_name[0].lower() + filename = terminal_name + + for directory in _get_terminfo_dirs(): + path = directory / first_char / filename + if path.is_file(): + return path.read_bytes() + + # Try with hex encoding of first char (for special chars) + hex_dir = "%02x" % ord(first_char) + path = directory / hex_dir / filename + if path.is_file(): + return path.read_bytes() + + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename) + + +# Hard-coded terminal capabilities for common terminals +# This is a minimal subset needed by PyREPL +_TERMINAL_CAPABILITIES = { + # ANSI/xterm-compatible terminals + "ansi": { + # Bell + "bel": b"\x07", + # Cursor movement + "cub": b"\x1b[%p1%dD", # Move cursor left N columns + "cud": b"\x1b[%p1%dB", # Move cursor down N rows + "cuf": b"\x1b[%p1%dC", # Move cursor right N columns + "cuu": b"\x1b[%p1%dA", # Move cursor up N rows + "cub1": b"\x08", # Move cursor left 1 column + "cud1": b"\n", # Move cursor down 1 row + "cuf1": b"\x1b[C", # Move cursor right 1 column + "cuu1": b"\x1b[A", # Move cursor up 1 row + "cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column + "hpa": b"\x1b[%i%p1%dG", # Move cursor to column + # Clear operations + "clear": b"\x1b[H\x1b[2J", # Clear screen and home cursor + "el": b"\x1b[K", # Clear to end of line + # Insert/delete + "dch": b"\x1b[%p1%dP", # Delete N characters + "dch1": b"\x1b[P", # Delete 1 character + "ich": b"\x1b[%p1%d@", # Insert N characters + "ich1": b"", # Insert 1 character + # Cursor visibility + "civis": b"\x1b[?25l", # Make cursor invisible + "cnorm": b"\x1b[?12l\x1b[?25h", # Make cursor normal (visible) + # Scrolling + "ind": b"\n", # Scroll up one line + "ri": b"\x1bM", # Scroll down one line + # Keypad mode + "smkx": b"\x1b[?1h\x1b=", # Enable keypad mode + "rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode + # Padding (not used in modern terminals) + "pad": b"", + # Function keys and special keys + "kdch1": b"\x1b[3~", # Delete key + "kcud1": b"\x1bOB", # Down arrow + "kend": b"\x1bOF", # End key + "kent": b"\x1bOM", # Enter key + "khome": b"\x1bOH", # Home key + "kich1": b"\x1b[2~", # Insert key + "kcub1": b"\x1bOD", # Left arrow + "knp": b"\x1b[6~", # Page down + "kpp": b"\x1b[5~", # Page up + "kcuf1": b"\x1bOC", # Right arrow + "kcuu1": b"\x1bOA", # Up arrow + # Function keys F1-F20 + "kf1": b"\x1bOP", + "kf2": b"\x1bOQ", + "kf3": b"\x1bOR", + "kf4": b"\x1bOS", + "kf5": b"\x1b[15~", + "kf6": b"\x1b[17~", + "kf7": b"\x1b[18~", + "kf8": b"\x1b[19~", + "kf9": b"\x1b[20~", + "kf10": b"\x1b[21~", + "kf11": b"\x1b[23~", + "kf12": b"\x1b[24~", + "kf13": b"\x1b[1;2P", + "kf14": b"\x1b[1;2Q", + "kf15": b"\x1b[1;2R", + "kf16": b"\x1b[1;2S", + "kf17": b"\x1b[15;2~", + "kf18": b"\x1b[17;2~", + "kf19": b"\x1b[18;2~", + "kf20": b"\x1b[19;2~", + }, + # Dumb terminal - minimal capabilities + "dumb": { + "bel": b"\x07", # Bell + "cud1": b"\n", # Move down 1 row (newline) + "ind": b"\n", # Scroll up one line (newline) + }, + # Linux console + "linux": { + # Bell + "bel": b"\x07", + # Cursor movement + "cub": b"\x1b[%p1%dD", # Move cursor left N columns + "cud": b"\x1b[%p1%dB", # Move cursor down N rows + "cuf": b"\x1b[%p1%dC", # Move cursor right N columns + "cuu": b"\x1b[%p1%dA", # Move cursor up N rows + "cub1": b"\x08", # Move cursor left 1 column (backspace) + "cud1": b"\n", # Move cursor down 1 row (newline) + "cuf1": b"\x1b[C", # Move cursor right 1 column + "cuu1": b"\x1b[A", # Move cursor up 1 row + "cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column + "hpa": b"\x1b[%i%p1%dG", # Move cursor to column + # Clear operations + "clear": b"\x1b[H\x1b[J", # Clear screen and home cursor (different from ansi!) + "el": b"\x1b[K", # Clear to end of line + # Insert/delete + "dch": b"\x1b[%p1%dP", # Delete N characters + "dch1": b"\x1b[P", # Delete 1 character + "ich": b"\x1b[%p1%d@", # Insert N characters + "ich1": b"\x1b[@", # Insert 1 character + # Cursor visibility + "civis": b"\x1b[?25l\x1b[?1c", # Make cursor invisible + "cnorm": b"\x1b[?25h\x1b[?0c", # Make cursor normal + # Scrolling + "ind": b"\n", # Scroll up one line + "ri": b"\x1bM", # Scroll down one line + # Keypad mode + "smkx": b"\x1b[?1h\x1b=", # Enable keypad mode + "rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode + # Function keys and special keys + "kdch1": b"\x1b[3~", # Delete key + "kcud1": b"\x1b[B", # Down arrow + "kend": b"\x1b[4~", # End key (different from ansi!) + "khome": b"\x1b[1~", # Home key (different from ansi!) + "kich1": b"\x1b[2~", # Insert key + "kcub1": b"\x1b[D", # Left arrow + "knp": b"\x1b[6~", # Page down + "kpp": b"\x1b[5~", # Page up + "kcuf1": b"\x1b[C", # Right arrow + "kcuu1": b"\x1b[A", # Up arrow + # Function keys + "kf1": b"\x1b[[A", + "kf2": b"\x1b[[B", + "kf3": b"\x1b[[C", + "kf4": b"\x1b[[D", + "kf5": b"\x1b[[E", + "kf6": b"\x1b[17~", + "kf7": b"\x1b[18~", + "kf8": b"\x1b[19~", + "kf9": b"\x1b[20~", + "kf10": b"\x1b[21~", + "kf11": b"\x1b[23~", + "kf12": b"\x1b[24~", + "kf13": b"\x1b[25~", + "kf14": b"\x1b[26~", + "kf15": b"\x1b[28~", + "kf16": b"\x1b[29~", + "kf17": b"\x1b[31~", + "kf18": b"\x1b[32~", + "kf19": b"\x1b[33~", + "kf20": b"\x1b[34~", + }, +} + +# Map common TERM values to capability sets +_TERM_ALIASES = { + "xterm": "ansi", + "xterm-color": "ansi", + "xterm-256color": "ansi", + "screen": "ansi", + "screen-256color": "ansi", + "tmux": "ansi", + "tmux-256color": "ansi", + "vt100": "ansi", + "vt220": "ansi", + "rxvt": "ansi", + "rxvt-unicode": "ansi", + "rxvt-unicode-256color": "ansi", + "unknown": "dumb", +} + + +@dataclass +class TermInfo: + terminal_name: str | bytes | None + fallback: bool = True + + _names: list[str] = field(default_factory=list) + _booleans: list[int] = field(default_factory=list) + _numbers: list[int] = field(default_factory=list) + _strings: list[bytes | None] = field(default_factory=list) + _capabilities: dict[str, bytes] = field(default_factory=dict) + + def __post_init__(self) -> None: + """Initialize terminal capabilities for the given terminal type. + + Based on ncurses implementation in: + - ncurses/tinfo/lib_setup.c:setupterm() and _nc_setupterm() + - ncurses/tinfo/lib_setup.c:TINFO_SETUP_TERM() + + This version first attempts to read terminfo database files like ncurses, + then, if `fallback` is True, falls back to hardcoded capabilities for + common terminal types. + """ + # If termstr is None or empty, try to get from environment + if not self.terminal_name: + self.terminal_name = os.environ.get("TERM") or "ANSI" + + if isinstance(self.terminal_name, bytes): + self.terminal_name = self.terminal_name.decode("ascii") + + try: + self._parse_terminfo_file(self.terminal_name) + except (OSError, ValueError): + if not self.fallback: + raise + + term_type = _TERM_ALIASES.get( + self.terminal_name, self.terminal_name + ) + if term_type not in _TERMINAL_CAPABILITIES: + term_type = "dumb" + self._capabilities = _TERMINAL_CAPABILITIES[term_type].copy() + + def _parse_terminfo_file(self, terminal_name: str) -> None: + """Parse a terminfo file. + + Based on ncurses implementation in: + - ncurses/tinfo/read_entry.c:_nc_read_termtype() + - ncurses/tinfo/read_entry.c:_nc_read_file_entry() + """ + data = _read_terminfo_file(terminal_name) + too_short = f"TermInfo file for {terminal_name!r} too short" + offset = 12 + if len(data) < offset: + raise ValueError(too_short) + + magic, name_size, bool_count, num_count, str_count, str_size = ( + struct.unpack("<Hhhhhh", data[:offset]) + ) + + if magic == MAGIC16: + number_format = "<h" # 16-bit signed + number_size = 2 + elif magic == MAGIC32: + number_format = "<i" # 32-bit signed + number_size = 4 + else: + raise ValueError( + f"TermInfo file for {terminal_name!r} uses unknown magic" + ) + + # Read terminal names + if offset + name_size > len(data): + raise ValueError(too_short) + names = data[offset : offset + name_size - 1].decode( + "ascii", errors="ignore" + ) + offset += name_size + + # Read boolean capabilities + if offset + bool_count > len(data): + raise ValueError(too_short) + booleans = list(data[offset : offset + bool_count]) + offset += bool_count + + # Align to even byte boundary for numbers + if offset % 2: + offset += 1 + + # Read numeric capabilities + numbers = [] + for i in range(num_count): + if offset + number_size > len(data): + raise ValueError(too_short) + num = struct.unpack( + number_format, data[offset : offset + number_size] + )[0] + numbers.append(num) + offset += number_size + + # Read string offsets + string_offsets = [] + for i in range(str_count): + if offset + 2 > len(data): + raise ValueError(too_short) + off = struct.unpack("<h", data[offset : offset + 2])[0] + string_offsets.append(off) + offset += 2 + + # Read string table + if offset + str_size > len(data): + raise ValueError(too_short) + string_table = data[offset : offset + str_size] + + # Extract strings from string table + strings: list[bytes | None] = [] + for off in string_offsets: + if off < 0: + strings.append(CANCELLED_STRING) + elif off < len(string_table): + # Find null terminator + end = off + while end < len(string_table) and string_table[end] != 0: + end += 1 + if end <= len(string_table): + strings.append(string_table[off:end]) + else: + strings.append(ABSENT_STRING) + else: + strings.append(ABSENT_STRING) + + self._names = names.split("|") + self._booleans = booleans + self._numbers = numbers + self._strings = strings + + def get(self, cap: str) -> bytes | None: + """Get terminal capability string by name. + + Based on ncurses implementation in: + - ncurses/tinfo/lib_ti.c:tigetstr() + + The ncurses version searches through compiled terminfo data structures. + This version first checks parsed terminfo data, then falls back to + hardcoded capabilities. + """ + if not isinstance(cap, str): + raise TypeError(f"`cap` must be a string, not {type(cap)}") + + if self._capabilities: + # Fallbacks populated, use them + return self._capabilities.get(cap) + + # Look up in standard capabilities first + if cap in _STRING_CAPABILITY_NAMES: + index = _STRING_CAPABILITY_NAMES[cap] + if index < len(self._strings): + return self._strings[index] + + # Note: we don't support extended capabilities since PyREPL doesn't + # need them. + return None + + +def tparm(cap_bytes: bytes, *params: int) -> bytes: + """Parameterize a terminal capability string. + + Based on ncurses implementation in: + - ncurses/tinfo/lib_tparm.c:tparm() + - ncurses/tinfo/lib_tparm.c:tparam_internal() + + The ncurses version implements a full stack-based interpreter for + terminfo parameter strings. This pure Python version implements only + the subset of parameter substitution operations needed by PyREPL: + - %i (increment parameters for 1-based indexing) + - %p[1-9]%d (parameter substitution) + - %p[1-9]%{n}%+%d (parameter plus constant) + """ + if not isinstance(cap_bytes, bytes): + raise TypeError(f"`cap` must be bytes, not {type(cap_bytes)}") + + result = cap_bytes + + # %i - increment parameters (1-based instead of 0-based) + increment = b"%i" in result + if increment: + result = result.replace(b"%i", b"") + + # Replace %p1%d, %p2%d, etc. with actual parameter values + for i in range(len(params)): + pattern = b"%%p%d%%d" % (i + 1) + if pattern in result: + value = params[i] + if increment: + value += 1 + result = result.replace(pattern, str(value).encode("ascii")) + + # Handle %p1%{1}%+%d (parameter plus constant) + # Used in some cursor positioning sequences + pattern_re = re.compile(rb"%p(\d)%\{(\d+)\}%\+%d") + matches = list(pattern_re.finditer(result)) + for match in reversed(matches): # reversed to maintain positions + param_idx = int(match.group(1)) + constant = int(match.group(2)) + value = params[param_idx] + constant + result = ( + result[: match.start()] + + str(value).encode("ascii") + + result[match.end() :] + ) + + return result diff --git a/contrib/tools/python3/Lib/_pyrepl/trace.py b/contrib/tools/python3/Lib/_pyrepl/trace.py new file mode 100644 index 00000000000..a8eb2433cd3 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/trace.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import os + +# types +if False: + from typing import IO + + +trace_file: IO[str] | None = None +if trace_filename := os.environ.get("PYREPL_TRACE"): + trace_file = open(trace_filename, "a") + + +def trace(line: str, *k: object, **kw: object) -> None: + if trace_file is None: + return + if k or kw: + line = line.format(*k, **kw) + trace_file.write(line + "\n") + trace_file.flush() diff --git a/contrib/tools/python3/Lib/_pyrepl/types.py b/contrib/tools/python3/Lib/_pyrepl/types.py new file mode 100644 index 00000000000..c5b7ebc1a40 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/types.py @@ -0,0 +1,10 @@ +from collections.abc import Callable, Iterator + +type Callback = Callable[[], object] +type SimpleContextManager = Iterator[None] +type KeySpec = str # like r"\C-c" +type CommandName = str # like "interrupt" +type EventTuple = tuple[CommandName, str] +type Completer = Callable[[str, int], str | None] +type CharBuffer = list[str] +type CharWidths = list[int] diff --git a/contrib/tools/python3/Lib/_pyrepl/unix_console.py b/contrib/tools/python3/Lib/_pyrepl/unix_console.py new file mode 100644 index 00000000000..db5b8ddadce --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/unix_console.py @@ -0,0 +1,852 @@ +# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]> +# Antonio Cuni +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +import errno +import os +import re +import select +import signal +import struct +import termios +import time +import platform +from fcntl import ioctl + +from . import terminfo +from .console import Console, Event +from .fancy_termios import tcgetattr, tcsetattr, TermState +from .trace import trace +from .unix_eventqueue import EventQueue +from .utils import wlen + + +TYPE_CHECKING = False + +# types +if TYPE_CHECKING: + from typing import AbstractSet, IO, Literal, overload, cast +else: + overload = lambda func: None + cast = lambda typ, val: val + + +class InvalidTerminal(RuntimeError): + def __init__(self, message: str) -> None: + super().__init__(errno.EIO, message) + + +_error = (termios.error, InvalidTerminal) +_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM]) + +SIGWINCH_EVENT = "repaint" + +FIONREAD = getattr(termios, "FIONREAD", None) +TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None) + +# ------------ start of baudrate definitions ------------ + +# Add (possibly) missing baudrates (check termios man page) to termios + + +def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None: + baudrate_name = "B%d" % rate + if hasattr(termios, baudrate_name): + dictionary[getattr(termios, baudrate_name)] = rate + + +# Check the termios man page (Line speed) to know where these +# values come from. +potential_baudrates = [ + 0, + 110, + 115200, + 1200, + 134, + 150, + 1800, + 19200, + 200, + 230400, + 2400, + 300, + 38400, + 460800, + 4800, + 50, + 57600, + 600, + 75, + 9600, +] + +ratedict: dict[int, int] = {} +for rate in potential_baudrates: + add_baudrate_if_supported(ratedict, rate) + +# Clean up variables to avoid unintended usage +del rate, add_baudrate_if_supported + +# ------------ end of baudrate definitions ------------ + +delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>") + +try: + poll: type[select.poll] = select.poll +except AttributeError: + # this is exactly the minumum necessary to support what we + # do with poll objects + class MinimalPoll: + def __init__(self): + pass + + def register(self, fd, flag): + self.fd = fd + + # note: The 'timeout' argument is received as *milliseconds* + def poll(self, timeout: float | None = None) -> list[int]: + if timeout is None: + r, w, e = select.select([self.fd], [], []) + else: + r, w, e = select.select([self.fd], [], [], timeout / 1000) + return r + + poll = MinimalPoll # type: ignore[assignment] + + +class UnixConsole(Console): + def __init__( + self, + f_in: IO[bytes] | int = 0, + f_out: IO[bytes] | int = 1, + term: str = "", + encoding: str = "", + ): + """ + Initialize the UnixConsole. + + Parameters: + - f_in (int or file-like object): Input file descriptor or object. + - f_out (int or file-like object): Output file descriptor or object. + - term (str): Terminal name. + - encoding (str): Encoding to use for I/O operations. + """ + super().__init__(f_in, f_out, term, encoding) + + self.pollob = poll() + self.pollob.register(self.input_fd, select.POLLIN) + self.input_buffer = b"" + self.input_buffer_pos = 0 + self.terminfo = terminfo.TermInfo(term or None) + self.term = term + self.is_apple_terminal = ( + platform.system() == "Darwin" + and os.getenv("TERM_PROGRAM") == "Apple_Terminal" + ) + + try: + self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset()) + except _error as e: + raise RuntimeError(f"termios failure ({e.args[1]})") + + @overload + def _my_getstr( + cap: str, optional: Literal[False] = False + ) -> bytes: ... + + @overload + def _my_getstr(cap: str, optional: bool) -> bytes | None: ... + + def _my_getstr(cap: str, optional: bool = False) -> bytes | None: + r = self.terminfo.get(cap) + if not optional and r is None: + raise InvalidTerminal( + f"terminal doesn't have the required {cap} capability" + ) + return r + + self._bel = _my_getstr("bel") + self._civis = _my_getstr("civis", optional=True) + self._clear = _my_getstr("clear") + self._cnorm = _my_getstr("cnorm", optional=True) + self._cub = _my_getstr("cub", optional=True) + self._cub1 = _my_getstr("cub1", optional=True) + self._cud = _my_getstr("cud", optional=True) + self._cud1 = _my_getstr("cud1", optional=True) + self._cuf = _my_getstr("cuf", optional=True) + self._cuf1 = _my_getstr("cuf1", optional=True) + self._cup = _my_getstr("cup") + self._cuu = _my_getstr("cuu", optional=True) + self._cuu1 = _my_getstr("cuu1", optional=True) + self._dch1 = _my_getstr("dch1", optional=True) + self._dch = _my_getstr("dch", optional=True) + self._el = _my_getstr("el") + self._hpa = _my_getstr("hpa", optional=True) + self._ich = _my_getstr("ich", optional=True) + self._ich1 = _my_getstr("ich1", optional=True) + self._ind = _my_getstr("ind", optional=True) + self._pad = _my_getstr("pad", optional=True) + self._ri = _my_getstr("ri", optional=True) + self._rmkx = _my_getstr("rmkx", optional=True) + self._smkx = _my_getstr("smkx", optional=True) + + self.__setup_movement() + + self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo) + self.cursor_visible = 1 + + signal.signal(signal.SIGCONT, self._sigcont_handler) + + def _sigcont_handler(self, signum, frame): + self.restore() + self.prepare() + + def more_in_buffer(self) -> bool: + return bool( + self.input_buffer + and self.input_buffer_pos < len(self.input_buffer) + ) + + def __read(self, n: int) -> bytes: + if not self.more_in_buffer(): + self.input_buffer = os.read(self.input_fd, 10000) + + ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n] + self.input_buffer_pos += len(ret) + if self.input_buffer_pos >= len(self.input_buffer): + self.input_buffer = b"" + self.input_buffer_pos = 0 + return ret + + def change_encoding(self, encoding: str) -> None: + """ + Change the encoding used for I/O operations. + + Parameters: + - encoding (str): New encoding to use. + """ + self.encoding = encoding + + def refresh(self, screen, c_xy): + """ + Refresh the console screen. + + Parameters: + - screen (list): List of strings representing the screen contents. + - c_xy (tuple): Cursor position (x, y) on the screen. + """ + cx, cy = c_xy + if not self.__gone_tall: + while len(self.screen) < min(len(screen), self.height): + self.__hide_cursor() + if self.screen: + self.__move(0, len(self.screen) - 1) + self.__write("\n") + self.posxy = 0, len(self.screen) + self.screen.append("") + else: + while len(self.screen) < len(screen): + self.screen.append("") + + if len(screen) > self.height: + self.__gone_tall = 1 + self.__move = self.__move_tall + + px, py = self.posxy + old_offset = offset = self.__offset + height = self.height + + # we make sure the cursor is on the screen, and that we're + # using all of the screen if we can + if cy < offset: + offset = cy + elif cy >= offset + height: + offset = cy - height + 1 + elif offset > 0 and len(screen) < offset + height: + offset = max(len(screen) - height, 0) + screen.append("") + + oldscr = self.screen[old_offset : old_offset + height] + newscr = screen[offset : offset + height] + + # use hardware scrolling if we have it. + if old_offset > offset and self._ri: + self.__hide_cursor() + self.__write_code(self._cup, 0, 0) + self.posxy = 0, old_offset + for i in range(old_offset - offset): + self.__write_code(self._ri) + oldscr.pop(-1) + oldscr.insert(0, "") + elif old_offset < offset and self._ind: + self.__hide_cursor() + self.__write_code(self._cup, self.height - 1, 0) + self.posxy = 0, old_offset + self.height - 1 + for i in range(offset - old_offset): + self.__write_code(self._ind) + oldscr.pop(0) + oldscr.append("") + + self.__offset = offset + + for ( + y, + oldline, + newline, + ) in zip(range(offset, offset + height), oldscr, newscr): + if oldline != newline: + self.__write_changed_line(y, oldline, newline, px) + + y = len(newscr) + while y < len(oldscr): + self.__hide_cursor() + self.__move(0, y) + self.posxy = 0, y + self.__write_code(self._el) + y += 1 + + self.__show_cursor() + + self.screen = screen.copy() + self.move_cursor(cx, cy) + self.flushoutput() + + def move_cursor(self, x, y): + """ + Move the cursor to the specified position on the screen. + + Parameters: + - x (int): X coordinate. + - y (int): Y coordinate. + """ + if y < self.__offset or y >= self.__offset + self.height: + self.event_queue.insert(Event("scroll", None)) + else: + self.__move(x, y) + self.posxy = x, y + self.flushoutput() + + def prepare(self): + """ + Prepare the console for input/output operations. + """ + self.__buffer = [] + + self.__svtermstate = tcgetattr(self.input_fd) + raw = self.__svtermstate.copy() + raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON) + raw.oflag &= ~(termios.OPOST) + raw.cflag &= ~(termios.CSIZE | termios.PARENB) + raw.cflag |= termios.CS8 + raw.iflag |= termios.BRKINT + raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN) + raw.lflag |= termios.ISIG + raw.cc[termios.VMIN] = 1 + raw.cc[termios.VTIME] = 0 + self.__input_fd_set(raw) + + # In macOS terminal we need to deactivate line wrap via ANSI escape code + if self.is_apple_terminal: + os.write(self.output_fd, b"\033[?7l") + + self.screen = [] + self.height, self.width = self.getheightwidth() + + self.posxy = 0, 0 + self.__gone_tall = 0 + self.__move = self.__move_short + self.__offset = 0 + + self.__maybe_write_code(self._smkx) + + try: + self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch) + except ValueError: + pass + + self.__enable_bracketed_paste() + + def restore(self): + """ + Restore the console to the default state + """ + self.__disable_bracketed_paste() + self.__maybe_write_code(self._rmkx) + self.flushoutput() + self.__input_fd_set(self.__svtermstate) + + if self.is_apple_terminal: + os.write(self.output_fd, b"\033[?7h") + + if hasattr(self, "old_sigwinch"): + try: + signal.signal(signal.SIGWINCH, self.old_sigwinch) + except ValueError as e: + import threading + if threading.current_thread() is threading.main_thread(): + raise e + del self.old_sigwinch + + def push_char(self, char: int | bytes) -> None: + """ + Push a character to the console event queue. + """ + trace("push char {char!r}", char=char) + self.event_queue.push(char) + + def get_event(self, block: bool = True) -> Event | None: + """ + Get an event from the console event queue. + + Parameters: + - block (bool): Whether to block until an event is available. + + Returns: + - Event: Event object from the event queue. + """ + if not block and not self.wait(timeout=0): + return None + + while self.event_queue.empty(): + while True: + try: + self.push_char(self.__read(1)) + except OSError as err: + if err.errno == errno.EINTR: + if not self.event_queue.empty(): + return self.event_queue.get() + else: + continue + elif err.errno == errno.EIO: + raise SystemExit(errno.EIO) + else: + raise + else: + break + return self.event_queue.get() + + def wait(self, timeout: float | None = None) -> bool: + """ + Wait for events on the console. + """ + return ( + not self.event_queue.empty() + or self.more_in_buffer() + or bool(self.pollob.poll(timeout)) + ) + + def set_cursor_vis(self, visible): + """ + Set the visibility of the cursor. + + Parameters: + - visible (bool): Visibility flag. + """ + if visible: + self.__show_cursor() + else: + self.__hide_cursor() + + if TIOCGWINSZ: + + def getheightwidth(self): + """ + Get the height and width of the console. + + Returns: + - tuple: Height and width of the console. + """ + try: + return int(os.environ["LINES"]), int(os.environ["COLUMNS"]) + except (KeyError, TypeError, ValueError): + try: + size = ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8) + except OSError: + return 25, 80 + height, width = struct.unpack("hhhh", size)[0:2] + if not height: + return 25, 80 + return height, width + + else: + + def getheightwidth(self): + """ + Get the height and width of the console. + + Returns: + - tuple: Height and width of the console. + """ + try: + return int(os.environ["LINES"]), int(os.environ["COLUMNS"]) + except (KeyError, TypeError, ValueError): + return 25, 80 + + def forgetinput(self): + """ + Discard any pending input on the console. + """ + termios.tcflush(self.input_fd, termios.TCIFLUSH) + + def flushoutput(self): + """ + Flush the output buffer. + """ + for text, iscode in self.__buffer: + if iscode: + self.__tputs(text) + else: + os.write(self.output_fd, text.encode(self.encoding, "replace")) + del self.__buffer[:] + + def finish(self): + """ + Finish console operations and flush the output buffer. + """ + y = len(self.screen) - 1 + while y >= 0 and not self.screen[y]: + y -= 1 + self.__move(0, min(y, self.height + self.__offset - 1)) + self.__write("\n\r") + self.flushoutput() + + def beep(self): + """ + Emit a beep sound. + """ + self.__maybe_write_code(self._bel) + self.flushoutput() + + if FIONREAD: + + def getpending(self): + """ + Get pending events from the console event queue. + + Returns: + - Event: Pending event from the event queue. + """ + e = Event("key", "", b"") + + while not self.event_queue.empty(): + e2 = self.event_queue.get() + e.data += e2.data + e.raw += e.raw + + amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0] + raw = self.__read(amount) + data = str(raw, self.encoding, "replace") + e.data += data + e.raw += raw + return e + + else: + + def getpending(self): + """ + Get pending events from the console event queue. + + Returns: + - Event: Pending event from the event queue. + """ + e = Event("key", "", b"") + + while not self.event_queue.empty(): + e2 = self.event_queue.get() + e.data += e2.data + e.raw += e.raw + + amount = 10000 + raw = self.__read(amount) + data = str(raw, self.encoding, "replace") + e.data += data + e.raw += raw + return e + + def clear(self): + """ + Clear the console screen. + """ + self.__write_code(self._clear) + self.__gone_tall = 1 + self.__move = self.__move_tall + self.posxy = 0, 0 + self.screen = [] + + @property + def input_hook(self): + try: + import posix + except ImportError: + return None + if posix._is_inputhook_installed(): + return posix._inputhook + + def __enable_bracketed_paste(self) -> None: + os.write(self.output_fd, b"\x1b[?2004h") + + def __disable_bracketed_paste(self) -> None: + os.write(self.output_fd, b"\x1b[?2004l") + + def __setup_movement(self): + """ + Set up the movement functions based on the terminal capabilities. + """ + if 0 and self._hpa: # hpa don't work in windows telnet :-( + self.__move_x = self.__move_x_hpa + elif self._cub and self._cuf: + self.__move_x = self.__move_x_cub_cuf + elif self._cub1 and self._cuf1: + self.__move_x = self.__move_x_cub1_cuf1 + else: + raise RuntimeError("insufficient terminal (horizontal)") + + if self._cuu and self._cud: + self.__move_y = self.__move_y_cuu_cud + elif self._cuu1 and self._cud1: + self.__move_y = self.__move_y_cuu1_cud1 + else: + raise RuntimeError("insufficient terminal (vertical)") + + if self._dch1: + self.dch1 = self._dch1 + elif self._dch: + self.dch1 = terminfo.tparm(self._dch, 1) + else: + self.dch1 = None + + if self._ich1: + self.ich1 = self._ich1 + elif self._ich: + self.ich1 = terminfo.tparm(self._ich, 1) + else: + self.ich1 = None + + self.__move = self.__move_short + + def __write_changed_line(self, y, oldline, newline, px_coord): + # this is frustrating; there's no reason to test (say) + # self.dch1 inside the loop -- but alternative ways of + # structuring this function are equally painful (I'm trying to + # avoid writing code generators these days...) + minlen = min(wlen(oldline), wlen(newline)) + x_pos = 0 + x_coord = 0 + + px_pos = 0 + j = 0 + for c in oldline: + if j >= px_coord: + break + j += wlen(c) + px_pos += 1 + + # reuse the oldline as much as possible, but stop as soon as we + # encounter an ESCAPE, because it might be the start of an escape + # sequene + while ( + x_coord < minlen + and oldline[x_pos] == newline[x_pos] + and newline[x_pos] != "\x1b" + ): + x_coord += wlen(newline[x_pos]) + x_pos += 1 + + # if we need to insert a single character right after the first detected change + if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1: + if ( + y == self.posxy[1] + and x_coord > self.posxy[0] + and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1] + ): + x_pos = px_pos + x_coord = px_coord + character_width = wlen(newline[x_pos]) + self.__move(x_coord, y) + self.__write_code(self.ich1) + self.__write(newline[x_pos]) + self.posxy = x_coord + character_width, y + + # if it's a single character change in the middle of the line + elif ( + x_coord < minlen + and oldline[x_pos + 1 :] == newline[x_pos + 1 :] + and wlen(oldline[x_pos]) == wlen(newline[x_pos]) + ): + character_width = wlen(newline[x_pos]) + self.__move(x_coord, y) + self.__write(newline[x_pos]) + self.posxy = x_coord + character_width, y + + # if this is the last character to fit in the line and we edit in the middle of the line + elif ( + self.dch1 + and self.ich1 + and wlen(newline) == self.width + and x_coord < wlen(newline) - 2 + and newline[x_pos + 1 : -1] == oldline[x_pos:-2] + ): + self.__hide_cursor() + self.__move(self.width - 2, y) + self.posxy = self.width - 2, y + self.__write_code(self.dch1) + + character_width = wlen(newline[x_pos]) + self.__move(x_coord, y) + self.__write_code(self.ich1) + self.__write(newline[x_pos]) + self.posxy = character_width + 1, y + + else: + self.__hide_cursor() + self.__move(x_coord, y) + if wlen(oldline) > wlen(newline): + self.__write_code(self._el) + self.__write(newline[x_pos:]) + self.posxy = wlen(newline), y + + if "\x1b" in newline: + # ANSI escape characters are present, so we can't assume + # anything about the position of the cursor. Moving the cursor + # to the left margin should work to get to a known position. + self.move_cursor(0, y) + + def __write(self, text): + self.__buffer.append((text, 0)) + + def __write_code(self, fmt, *args): + self.__buffer.append((terminfo.tparm(fmt, *args), 1)) + + def __maybe_write_code(self, fmt, *args): + if fmt: + self.__write_code(fmt, *args) + + def __move_y_cuu1_cud1(self, y): + assert self._cud1 is not None + assert self._cuu1 is not None + dy = y - self.posxy[1] + if dy > 0: + self.__write_code(dy * self._cud1) + elif dy < 0: + self.__write_code((-dy) * self._cuu1) + + def __move_y_cuu_cud(self, y): + dy = y - self.posxy[1] + if dy > 0: + self.__write_code(self._cud, dy) + elif dy < 0: + self.__write_code(self._cuu, -dy) + + def __move_x_hpa(self, x: int) -> None: + if x != self.posxy[0]: + self.__write_code(self._hpa, x) + + def __move_x_cub1_cuf1(self, x: int) -> None: + assert self._cuf1 is not None + assert self._cub1 is not None + dx = x - self.posxy[0] + if dx > 0: + self.__write_code(self._cuf1 * dx) + elif dx < 0: + self.__write_code(self._cub1 * (-dx)) + + def __move_x_cub_cuf(self, x: int) -> None: + dx = x - self.posxy[0] + if dx > 0: + self.__write_code(self._cuf, dx) + elif dx < 0: + self.__write_code(self._cub, -dx) + + def __move_short(self, x, y): + self.__move_x(x) + self.__move_y(y) + + def __move_tall(self, x, y): + assert 0 <= y - self.__offset < self.height, y - self.__offset + self.__write_code(self._cup, y - self.__offset, x) + + def __sigwinch(self, signum, frame): + self.height, self.width = self.getheightwidth() + self.event_queue.insert(Event("resize", None)) + + def __hide_cursor(self): + if self.cursor_visible: + self.__maybe_write_code(self._civis) + self.cursor_visible = 0 + + def __show_cursor(self): + if not self.cursor_visible: + self.__maybe_write_code(self._cnorm) + self.cursor_visible = 1 + + def repaint(self): + if not self.__gone_tall: + self.posxy = 0, self.posxy[1] + self.__write("\r") + ns = len(self.screen) * ["\000" * self.width] + self.screen = ns + else: + self.posxy = 0, self.__offset + self.__move(0, self.__offset) + ns = self.height * ["\000" * self.width] + self.screen = ns + + def __tputs(self, fmt, prog=delayprog): + """A Python implementation of the curses tputs function; the + curses one can't really be wrapped in a sane manner. + + I have the strong suspicion that this is complexity that + will never do anyone any good.""" + # using .get() means that things will blow up + # only if the bps is actually needed (which I'm + # betting is pretty unlikely) + bps = ratedict.get(self.__svtermstate.ospeed) + while 1: + m = prog.search(fmt) + if not m: + os.write(self.output_fd, fmt) + break + x, y = m.span() + os.write(self.output_fd, fmt[:x]) + fmt = fmt[y:] + delay = int(m.group(1)) + if b"*" in m.group(2): + delay *= self.height + if self._pad and bps is not None: + nchars = (bps * delay) / 1000 + os.write(self.output_fd, self._pad * nchars) + else: + time.sleep(float(delay) / 1000.0) + + def __input_fd_set( + self, + state: TermState, + ignore: AbstractSet[int] = _error_codes_to_ignore, + ) -> bool: + try: + tcsetattr(self.input_fd, termios.TCSADRAIN, state) + except termios.error as te: + if te.args[0] not in ignore: + raise + return False + else: + return True diff --git a/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py new file mode 100644 index 00000000000..2a9cca59e74 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py @@ -0,0 +1,77 @@ +# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]> +# Armin Rigo +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from .terminfo import TermInfo +from .trace import trace +from .base_eventqueue import BaseEventQueue +from termios import tcgetattr, VERASE +import os + + +# Mapping of human-readable key names to their terminal-specific codes +TERMINAL_KEYNAMES = { + "delete": "kdch1", + "down": "kcud1", + "end": "kend", + "enter": "kent", + "home": "khome", + "insert": "kich1", + "left": "kcub1", + "page down": "knp", + "page up": "kpp", + "right": "kcuf1", + "up": "kcuu1", +} + + +# Function keys F1-F20 mapping +TERMINAL_KEYNAMES.update(("f%d" % i, "kf%d" % i) for i in range(1, 21)) + +# Known CTRL-arrow keycodes +CTRL_ARROW_KEYCODES= { + # for xterm, gnome-terminal, xfce terminal, etc. + b'\033[1;5D': 'ctrl left', + b'\033[1;5C': 'ctrl right', + # for rxvt + b'\033Od': 'ctrl left', + b'\033Oc': 'ctrl right', +} + +def get_terminal_keycodes(ti: TermInfo) -> dict[bytes, str]: + """ + Generates a dictionary mapping terminal keycodes to human-readable names. + """ + keycodes = {} + for key, terminal_code in TERMINAL_KEYNAMES.items(): + keycode = ti.get(terminal_code) + trace('key {key} tiname {terminal_code} keycode {keycode!r}', **locals()) + if keycode: + keycodes[keycode] = key + keycodes.update(CTRL_ARROW_KEYCODES) + return keycodes + + +class EventQueue(BaseEventQueue): + def __init__(self, fd: int, encoding: str, ti: TermInfo) -> None: + keycodes = get_terminal_keycodes(ti) + if os.isatty(fd): + backspace = tcgetattr(fd)[6][VERASE] + keycodes[backspace] = "backspace" + BaseEventQueue.__init__(self, encoding, keycodes) diff --git a/contrib/tools/python3/Lib/_pyrepl/utils.py b/contrib/tools/python3/Lib/_pyrepl/utils.py new file mode 100644 index 00000000000..a30fbdee3a4 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/utils.py @@ -0,0 +1,83 @@ +import re +import unicodedata +import functools + +from .types import CharBuffer, CharWidths +from .trace import trace + +ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]") +ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02") +ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""}) + + +def str_width(c: str) -> int: + if ord(c) < 128: + return 1 + # gh-139246 for zero-width joiner and combining characters + if unicodedata.combining(c): + return 0 + category = unicodedata.category(c) + if category == "Cf" and c != "\u00ad": + return 0 + w = unicodedata.east_asian_width(c) + if w in ("N", "Na", "H", "A"): + return 1 + return 2 + + +def wlen(s: str) -> int: + if len(s) == 1 and s != "\x1a": + return str_width(s) + length = sum(str_width(i) for i in s) + # remove lengths of any escape sequences + sequence = ANSI_ESCAPE_SEQUENCE.findall(s) + ctrl_z_cnt = s.count("\x1a") + return length - sum(len(i) for i in sequence) + ctrl_z_cnt + + +def unbracket(s: str, including_content: bool = False) -> str: + r"""Return `s` with \001 and \002 characters removed. + + If `including_content` is True, content between \001 and \002 is also + stripped. + """ + if including_content: + return ZERO_WIDTH_BRACKET.sub("", s) + return s.translate(ZERO_WIDTH_TRANS) + + +def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]: + r"""Decompose the input buffer into a printable variant. + + Returns a tuple of two lists: + - the first list is the input buffer, character by character; + - the second list is the visible width of each character in the input + buffer. + + Examples: + >>> utils.disp_str("a = 9") + (['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1]) + """ + chars: CharBuffer = [] + char_widths: CharWidths = [] + + if not buffer: + return chars, char_widths + + for c in buffer: + if c == "\x1a": # CTRL-Z on Windows + chars.append(c) + char_widths.append(2) + elif ord(c) < 128: + chars.append(c) + char_widths.append(1) + elif unicodedata.category(c).startswith("C"): + c = r"\u%04x" % ord(c) + chars.append(c) + char_widths.append(len(c)) + else: + chars.append(c) + char_widths.append(str_width(c)) + trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths) + return chars, char_widths diff --git a/contrib/tools/python3/Lib/_pyrepl/windows_console.py b/contrib/tools/python3/Lib/_pyrepl/windows_console.py new file mode 100644 index 00000000000..bb2ef1aa51f --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/windows_console.py @@ -0,0 +1,687 @@ +# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]> +# +# All Rights Reserved +# +# +# Permission to use, copy, modify, and distribute this software and +# its documentation for any purpose is hereby granted without fee, +# provided that the above copyright notice appear in all copies and +# that both that copyright notice and this permission notice appear in +# supporting documentation. +# +# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO +# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, +# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER +# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF +# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN +# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +from __future__ import annotations + +import io +import os +import sys +import time +import msvcrt + +from collections import deque +import ctypes +from ctypes.wintypes import ( + _COORD, + WORD, + SMALL_RECT, + BOOL, + HANDLE, + CHAR, + DWORD, + WCHAR, + SHORT, +) +from ctypes import Structure, POINTER, Union +from .console import Event, Console +from .trace import trace +from .utils import wlen +from .windows_eventqueue import EventQueue + +try: + from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined] +except: + # Keep MyPy happy off Windows + from ctypes import CDLL as WinDLL, cdll as windll + + def GetLastError() -> int: + return 42 + + class WinError(OSError): # type: ignore[no-redef] + def __init__(self, err: int | None, descr: str | None = None) -> None: + self.err = err + self.descr = descr + + +TYPE_CHECKING = False + +if TYPE_CHECKING: + from typing import IO + +# Virtual-Key Codes: https://learn.microsoft.com/en-us/windows/win32/inputdev/virtual-key-codes +VK_MAP: dict[int, str] = { + 0x23: "end", # VK_END + 0x24: "home", # VK_HOME + 0x25: "left", # VK_LEFT + 0x26: "up", # VK_UP + 0x27: "right", # VK_RIGHT + 0x28: "down", # VK_DOWN + 0x2E: "delete", # VK_DELETE + 0x70: "f1", # VK_F1 + 0x71: "f2", # VK_F2 + 0x72: "f3", # VK_F3 + 0x73: "f4", # VK_F4 + 0x74: "f5", # VK_F5 + 0x75: "f6", # VK_F6 + 0x76: "f7", # VK_F7 + 0x77: "f8", # VK_F8 + 0x78: "f9", # VK_F9 + 0x79: "f10", # VK_F10 + 0x7A: "f11", # VK_F11 + 0x7B: "f12", # VK_F12 + 0x7C: "f13", # VK_F13 + 0x7D: "f14", # VK_F14 + 0x7E: "f15", # VK_F15 + 0x7F: "f16", # VK_F16 + 0x80: "f17", # VK_F17 + 0x81: "f18", # VK_F18 + 0x82: "f19", # VK_F19 + 0x83: "f20", # VK_F20 +} + +# Virtual terminal output sequences +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences +# Check `windows_eventqueue.py` for input sequences +ERASE_IN_LINE = "\x1b[K" +MOVE_LEFT = "\x1b[{}D" +MOVE_RIGHT = "\x1b[{}C" +MOVE_UP = "\x1b[{}A" +MOVE_DOWN = "\x1b[{}B" +CLEAR = "\x1b[H\x1b[J" + +# State of control keys: https://learn.microsoft.com/en-us/windows/console/key-event-record-str +ALT_ACTIVE = 0x01 | 0x02 +CTRL_ACTIVE = 0x04 | 0x08 + + +class _error(Exception): + pass + +def _supports_vt(): + try: + import nt + return nt._supports_virtual_terminal() + except (ImportError, AttributeError): + return False + +class WindowsConsole(Console): + def __init__( + self, + f_in: IO[bytes] | int = 0, + f_out: IO[bytes] | int = 1, + term: str = "", + encoding: str = "", + ): + super().__init__(f_in, f_out, term, encoding) + + self.__vt_support = _supports_vt() + + if self.__vt_support: + trace('console supports virtual terminal') + + # Save original console modes so we can recover on cleanup. + original_input_mode = DWORD() + GetConsoleMode(InHandle, original_input_mode) + trace(f'saved original input mode 0x{original_input_mode.value:x}') + self.__original_input_mode = original_input_mode.value + + SetConsoleMode( + OutHandle, + ENABLE_WRAP_AT_EOL_OUTPUT + | ENABLE_PROCESSED_OUTPUT + | ENABLE_VIRTUAL_TERMINAL_PROCESSING, + ) + + self.screen: list[str] = [] + self.width = 80 + self.height = 25 + self.__offset = 0 + self.event_queue = EventQueue(encoding) + try: + self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined] + except ValueError: + # Console I/O is redirected, fallback... + self.out = None + + def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None: + """ + Refresh the console screen. + + Parameters: + - screen (list): List of strings representing the screen contents. + - c_xy (tuple): Cursor position (x, y) on the screen. + """ + cx, cy = c_xy + + while len(self.screen) < min(len(screen), self.height): + self._hide_cursor() + if self.screen: + self._move_relative(0, len(self.screen) - 1) + self.__write("\n") + self.posxy = 0, len(self.screen) + self.screen.append("") + + px, py = self.posxy + old_offset = offset = self.__offset + height = self.height + + # we make sure the cursor is on the screen, and that we're + # using all of the screen if we can + if cy < offset: + offset = cy + elif cy >= offset + height: + offset = cy - height + 1 + scroll_lines = offset - old_offset + + # Scrolling the buffer as the current input is greater than the visible + # portion of the window. We need to scroll the visible portion and the + # entire history + self._scroll(scroll_lines, self._getscrollbacksize()) + self.posxy = self.posxy[0], self.posxy[1] + scroll_lines + self.__offset += scroll_lines + + for i in range(scroll_lines): + self.screen.append("") + elif offset > 0 and len(screen) < offset + height: + offset = max(len(screen) - height, 0) + screen.append("") + + oldscr = self.screen[old_offset : old_offset + height] + newscr = screen[offset : offset + height] + + self.__offset = offset + + self._hide_cursor() + for ( + y, + oldline, + newline, + ) in zip(range(offset, offset + height), oldscr, newscr): + if oldline != newline: + self.__write_changed_line(y, oldline, newline, px) + + y = len(newscr) + while y < len(oldscr): + self._move_relative(0, y) + self.posxy = 0, y + self._erase_to_end() + y += 1 + + self._show_cursor() + + self.screen = screen + self.move_cursor(cx, cy) + + @property + def input_hook(self): + try: + import nt + except ImportError: + return None + if nt._is_inputhook_installed(): + return nt._inputhook + + def __write_changed_line( + self, y: int, oldline: str, newline: str, px_coord: int + ) -> None: + # this is frustrating; there's no reason to test (say) + # self.dch1 inside the loop -- but alternative ways of + # structuring this function are equally painful (I'm trying to + # avoid writing code generators these days...) + minlen = min(wlen(oldline), wlen(newline)) + x_pos = 0 + x_coord = 0 + + px_pos = 0 + j = 0 + for c in oldline: + if j >= px_coord: + break + j += wlen(c) + px_pos += 1 + + # reuse the oldline as much as possible, but stop as soon as we + # encounter an ESCAPE, because it might be the start of an escape + # sequene + while ( + x_coord < minlen + and oldline[x_pos] == newline[x_pos] + and newline[x_pos] != "\x1b" + ): + x_coord += wlen(newline[x_pos]) + x_pos += 1 + + self._hide_cursor() + self._move_relative(x_coord, y) + if wlen(oldline) > wlen(newline): + self._erase_to_end() + + self.__write(newline[x_pos:]) + self.posxy = min(wlen(newline), self.width - 1), y + + if "\x1b" in newline or y != self.posxy[1] or '\x1a' in newline: + # ANSI escape characters are present, so we can't assume + # anything about the position of the cursor. Moving the cursor + # to the left margin should work to get to a known position. + self.move_cursor(0, y) + + def _scroll( + self, top: int, bottom: int, left: int | None = None, right: int | None = None + ) -> None: + scroll_rect = SMALL_RECT() + scroll_rect.Top = SHORT(top) + scroll_rect.Bottom = SHORT(bottom) + scroll_rect.Left = SHORT(0 if left is None else left) + scroll_rect.Right = SHORT( + self.getheightwidth()[1] - 1 if right is None else right + ) + destination_origin = _COORD() + fill_info = CHAR_INFO() + fill_info.UnicodeChar = " " + + if not ScrollConsoleScreenBuffer( + OutHandle, scroll_rect, None, destination_origin, fill_info + ): + raise WinError(GetLastError()) + + def _hide_cursor(self): + self.__write("\x1b[?25l") + + def _show_cursor(self): + self.__write("\x1b[?25h") + + def _enable_blinking(self): + self.__write("\x1b[?12h") + + def _disable_blinking(self): + self.__write("\x1b[?12l") + + def _enable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004h") + + def _disable_bracketed_paste(self) -> None: + self.__write("\x1b[?2004l") + + def __write(self, text: str) -> None: + if "\x1a" in text: + text = ''.join(["^Z" if x == '\x1a' else x for x in text]) + + if self.out is not None: + self.out.write(text.encode(self.encoding, "replace")) + self.out.flush() + else: + os.write(self.output_fd, text.encode(self.encoding, "replace")) + + @property + def screen_xy(self) -> tuple[int, int]: + info = CONSOLE_SCREEN_BUFFER_INFO() + if not GetConsoleScreenBufferInfo(OutHandle, info): + raise WinError(GetLastError()) + return info.dwCursorPosition.X, info.dwCursorPosition.Y + + def _erase_to_end(self) -> None: + self.__write(ERASE_IN_LINE) + + def prepare(self) -> None: + trace("prepare") + self.screen = [] + self.height, self.width = self.getheightwidth() + + self.posxy = 0, 0 + self.__gone_tall = 0 + self.__offset = 0 + + if self.__vt_support: + SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT) + self._enable_bracketed_paste() + + def restore(self) -> None: + if self.__vt_support: + # Recover to original mode before running REPL + self._disable_bracketed_paste() + SetConsoleMode(InHandle, self.__original_input_mode) + + def _move_relative(self, x: int, y: int) -> None: + """Moves relative to the current posxy""" + dx = x - self.posxy[0] + dy = y - self.posxy[1] + if dx < 0: + self.__write(MOVE_LEFT.format(-dx)) + elif dx > 0: + self.__write(MOVE_RIGHT.format(dx)) + + if dy < 0: + self.__write(MOVE_UP.format(-dy)) + elif dy > 0: + self.__write(MOVE_DOWN.format(dy)) + + def move_cursor(self, x: int, y: int) -> None: + if x < 0 or y < 0: + raise ValueError(f"Bad cursor position {x}, {y}") + + if y < self.__offset or y >= self.__offset + self.height: + self.event_queue.insert(Event("scroll", "")) + else: + self._move_relative(x, y) + self.posxy = x, y + + def set_cursor_vis(self, visible: bool) -> None: + if visible: + self._show_cursor() + else: + self._hide_cursor() + + def getheightwidth(self) -> tuple[int, int]: + """Return (height, width) where height and width are the height + and width of the terminal window in characters.""" + info = CONSOLE_SCREEN_BUFFER_INFO() + if not GetConsoleScreenBufferInfo(OutHandle, info): + raise WinError(GetLastError()) + return ( + info.srWindow.Bottom - info.srWindow.Top + 1, + info.srWindow.Right - info.srWindow.Left + 1, + ) + + def _getscrollbacksize(self) -> int: + info = CONSOLE_SCREEN_BUFFER_INFO() + if not GetConsoleScreenBufferInfo(OutHandle, info): + raise WinError(GetLastError()) + + return info.srWindow.Bottom # type: ignore[no-any-return] + + def _read_input(self, block: bool = True) -> INPUT_RECORD | None: + if not block: + events = DWORD() + if not GetNumberOfConsoleInputEvents(InHandle, events): + raise WinError(GetLastError()) + if not events.value: + return None + + rec = INPUT_RECORD() + read = DWORD() + if not ReadConsoleInput(InHandle, rec, 1, read): + raise WinError(GetLastError()) + + return rec + + def get_event(self, block: bool = True) -> Event | None: + """Return an Event instance. Returns None if |block| is false + and there is no event pending, otherwise waits for the + completion of an event.""" + + while self.event_queue.empty(): + rec = self._read_input(block) + if rec is None: + return None + + if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: + return Event("resize", "") + + if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: + # Only process keys and keydown events + if block: + continue + return None + + key_event = rec.Event.KeyEvent + raw_key = key = key_event.uChar.UnicodeChar + + if key == "\r": + # Make enter unix-like + return Event(evt="key", data="\n") + elif key_event.wVirtualKeyCode == 8: + # Turn backspace directly into the command + key = "backspace" + elif key == "\x00": + # Handle special keys like arrow keys and translate them into the appropriate command + key = VK_MAP.get(key_event.wVirtualKeyCode) + if key: + if key_event.dwControlKeyState & CTRL_ACTIVE: + key = f"ctrl {key}" + elif key_event.dwControlKeyState & ALT_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key)) + return Event(evt="key", data="\033") # keymap.py uses this for meta + return Event(evt="key", data=key) + if block: + continue + + return None + elif self.__vt_support: + # If virtual terminal is enabled, scanning VT sequences + for char in raw_key.encode(self.event_queue.encoding, "replace"): + self.event_queue.push(char) + continue + + if key_event.dwControlKeyState & ALT_ACTIVE: + # Do not swallow characters that have been entered via AltGr: + # Windows internally converts AltGr to CTRL+ALT, see + # https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-vkkeyscanw + if not key_event.dwControlKeyState & CTRL_ACTIVE: + # queue the key, return the meta command + self.event_queue.insert(Event(evt="key", data=key)) + return Event(evt="key", data="\033") # keymap.py uses this for meta + + return Event(evt="key", data=key) + return self.event_queue.get() + + def push_char(self, char: int | bytes) -> None: + """ + Push a character to the console event queue. + """ + raise NotImplementedError("push_char not supported on Windows") + + def beep(self) -> None: + self.__write("\x07") + + def clear(self) -> None: + """Wipe the screen""" + self.__write(CLEAR) + self.posxy = 0, 0 + self.screen = [] + + def finish(self) -> None: + """Move the cursor to the end of the display and otherwise get + ready for end. XXX could be merged with restore? Hmm.""" + y = len(self.screen) - 1 + while y >= 0 and not self.screen[y]: + y -= 1 + self._move_relative(0, min(y, self.height + self.__offset - 1)) + self.__write("\r\n") + + def flushoutput(self) -> None: + """Flush all output to the screen (assuming there's some + buffering going on somewhere). + + All output on Windows is unbuffered so this is a nop""" + pass + + def forgetinput(self) -> None: + """Forget all pending, but not yet processed input.""" + if not FlushConsoleInputBuffer(InHandle): + raise WinError(GetLastError()) + + def getpending(self) -> Event: + """Return the characters that have been typed but not yet + processed.""" + return Event("key", "", b"") + + def wait_for_event(self, timeout: float | None) -> bool: + """Wait for an event.""" + # Poor man's Windows select loop + start_time = time.time() + while True: + if msvcrt.kbhit(): # type: ignore[attr-defined] + return True + if timeout and time.time() - start_time > timeout / 1000: + return False + time.sleep(0.01) + + def wait(self, timeout: float | None) -> bool: + """ + Wait for events on the console. + """ + return ( + not self.event_queue.empty() + or self.wait_for_event(timeout) + ) + + def repaint(self) -> None: + raise NotImplementedError("No repaint support") + + +# Windows interop +class CONSOLE_SCREEN_BUFFER_INFO(Structure): + _fields_ = [ + ("dwSize", _COORD), + ("dwCursorPosition", _COORD), + ("wAttributes", WORD), + ("srWindow", SMALL_RECT), + ("dwMaximumWindowSize", _COORD), + ] + + +class CONSOLE_CURSOR_INFO(Structure): + _fields_ = [ + ("dwSize", DWORD), + ("bVisible", BOOL), + ] + + +class CHAR_INFO(Structure): + _fields_ = [ + ("UnicodeChar", WCHAR), + ("Attributes", WORD), + ] + + +class Char(Union): + _fields_ = [ + ("UnicodeChar", WCHAR), + ("Char", CHAR), + ] + + +class KeyEvent(ctypes.Structure): + _fields_ = [ + ("bKeyDown", BOOL), + ("wRepeatCount", WORD), + ("wVirtualKeyCode", WORD), + ("wVirtualScanCode", WORD), + ("uChar", Char), + ("dwControlKeyState", DWORD), + ] + + +class WindowsBufferSizeEvent(ctypes.Structure): + _fields_ = [("dwSize", _COORD)] + + +class ConsoleEvent(ctypes.Union): + _fields_ = [ + ("KeyEvent", KeyEvent), + ("WindowsBufferSizeEvent", WindowsBufferSizeEvent), + ] + + +class INPUT_RECORD(Structure): + _fields_ = [("EventType", WORD), ("Event", ConsoleEvent)] + + +KEY_EVENT = 0x01 +FOCUS_EVENT = 0x10 +MENU_EVENT = 0x08 +MOUSE_EVENT = 0x02 +WINDOW_BUFFER_SIZE_EVENT = 0x04 + +ENABLE_PROCESSED_INPUT = 0x0001 +ENABLE_LINE_INPUT = 0x0002 +ENABLE_ECHO_INPUT = 0x0004 +ENABLE_MOUSE_INPUT = 0x0010 +ENABLE_INSERT_MODE = 0x0020 +ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200 + +ENABLE_PROCESSED_OUTPUT = 0x01 +ENABLE_WRAP_AT_EOL_OUTPUT = 0x02 +ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04 + +STD_INPUT_HANDLE = -10 +STD_OUTPUT_HANDLE = -11 + +if sys.platform == "win32": + _KERNEL32 = WinDLL("kernel32", use_last_error=True) + + GetStdHandle = windll.kernel32.GetStdHandle + GetStdHandle.argtypes = [DWORD] + GetStdHandle.restype = HANDLE + + GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo + GetConsoleScreenBufferInfo.argtypes = [ + HANDLE, + ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO), + ] + GetConsoleScreenBufferInfo.restype = BOOL + + ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW + ScrollConsoleScreenBuffer.argtypes = [ + HANDLE, + POINTER(SMALL_RECT), + POINTER(SMALL_RECT), + _COORD, + POINTER(CHAR_INFO), + ] + ScrollConsoleScreenBuffer.restype = BOOL + + GetConsoleMode = _KERNEL32.GetConsoleMode + GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)] + GetConsoleMode.restype = BOOL + + SetConsoleMode = _KERNEL32.SetConsoleMode + SetConsoleMode.argtypes = [HANDLE, DWORD] + SetConsoleMode.restype = BOOL + + ReadConsoleInput = _KERNEL32.ReadConsoleInputW + ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)] + ReadConsoleInput.restype = BOOL + + GetNumberOfConsoleInputEvents = _KERNEL32.GetNumberOfConsoleInputEvents + GetNumberOfConsoleInputEvents.argtypes = [HANDLE, POINTER(DWORD)] + GetNumberOfConsoleInputEvents.restype = BOOL + + FlushConsoleInputBuffer = _KERNEL32.FlushConsoleInputBuffer + FlushConsoleInputBuffer.argtypes = [HANDLE] + FlushConsoleInputBuffer.restype = BOOL + + OutHandle = GetStdHandle(STD_OUTPUT_HANDLE) + InHandle = GetStdHandle(STD_INPUT_HANDLE) +else: + + def _win_only(*args, **kwargs): + raise NotImplementedError("Windows only") + + GetStdHandle = _win_only + GetConsoleScreenBufferInfo = _win_only + ScrollConsoleScreenBuffer = _win_only + GetConsoleMode = _win_only + SetConsoleMode = _win_only + ReadConsoleInput = _win_only + GetNumberOfConsoleInputEvents = _win_only + FlushConsoleInputBuffer = _win_only + OutHandle = 0 + InHandle = 0 diff --git a/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py new file mode 100644 index 00000000000..d99722f9a16 --- /dev/null +++ b/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py @@ -0,0 +1,42 @@ +""" +Windows event and VT sequence scanner +""" + +from .base_eventqueue import BaseEventQueue + + +# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences +VT_MAP: dict[bytes, str] = { + b'\x1b[A': 'up', + b'\x1b[B': 'down', + b'\x1b[C': 'right', + b'\x1b[D': 'left', + b'\x1b[1;5D': 'ctrl left', + b'\x1b[1;5C': 'ctrl right', + + b'\x1b[H': 'home', + b'\x1b[F': 'end', + + b'\x7f': 'backspace', + b'\x1b[2~': 'insert', + b'\x1b[3~': 'delete', + b'\x1b[5~': 'page up', + b'\x1b[6~': 'page down', + + b'\x1bOP': 'f1', + b'\x1bOQ': 'f2', + b'\x1bOR': 'f3', + b'\x1bOS': 'f4', + b'\x1b[15~': 'f5', + b'\x1b[17~': 'f6', + b'\x1b[18~': 'f7', + b'\x1b[19~': 'f8', + b'\x1b[20~': 'f9', + b'\x1b[21~': 'f10', + b'\x1b[23~': 'f11', + b'\x1b[24~': 'f12', +} + +class EventQueue(BaseEventQueue): + def __init__(self, encoding: str) -> None: + BaseEventQueue.__init__(self, encoding, VT_MAP) |
