summaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/_pyrepl
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/tools/python3/Lib/_pyrepl')
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/__init__.py19
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/__main__.py10
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/_threading_handler.py74
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py110
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/commands.py492
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/completing_reader.py295
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/console.py229
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/fancy_termios.py82
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/historical_reader.py419
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/input.py114
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/keymap.py213
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/main.py59
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/pager.py175
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/reader.py764
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/readline.py599
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/simple_interact.py180
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/terminfo.py530
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/trace.py21
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/types.py10
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/unix_console.py852
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py77
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/utils.py83
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/windows_console.py687
-rw-r--r--contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py42
24 files changed, 6136 insertions, 0 deletions
diff --git a/contrib/tools/python3/Lib/_pyrepl/__init__.py b/contrib/tools/python3/Lib/_pyrepl/__init__.py
new file mode 100644
index 00000000000..1693cbd0b98
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/__init__.py
@@ -0,0 +1,19 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
diff --git a/contrib/tools/python3/Lib/_pyrepl/__main__.py b/contrib/tools/python3/Lib/_pyrepl/__main__.py
new file mode 100644
index 00000000000..9c66812e13a
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/__main__.py
@@ -0,0 +1,10 @@
+# Important: don't add things to this module, as they will end up in the REPL's
+# default globals. Use _pyrepl.main instead.
+
+# Avoid caching this file by linecache and incorrectly report tracebacks.
+# See https://github.com/python/cpython/issues/129098.
+__spec__ = __loader__ = None
+
+if __name__ == "__main__":
+ from .main import interactive_console as __pyrepl_interactive_console
+ __pyrepl_interactive_console()
diff --git a/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py b/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py
new file mode 100644
index 00000000000..82f5e8650a2
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/_threading_handler.py
@@ -0,0 +1,74 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+import traceback
+
+
+TYPE_CHECKING = False
+if TYPE_CHECKING:
+ from threading import Thread
+ from types import TracebackType
+ from typing import Protocol
+
+ class ExceptHookArgs(Protocol):
+ @property
+ def exc_type(self) -> type[BaseException]: ...
+ @property
+ def exc_value(self) -> BaseException | None: ...
+ @property
+ def exc_traceback(self) -> TracebackType | None: ...
+ @property
+ def thread(self) -> Thread | None: ...
+
+ class ShowExceptions(Protocol):
+ def __call__(self) -> int: ...
+ def add(self, s: str) -> None: ...
+
+ from .reader import Reader
+
+
+def install_threading_hook(reader: Reader) -> None:
+ import threading
+
+ @dataclass
+ class ExceptHookHandler:
+ lock: threading.Lock = field(default_factory=threading.Lock)
+ messages: list[str] = field(default_factory=list)
+
+ def show(self) -> int:
+ count = 0
+ with self.lock:
+ if not self.messages:
+ return 0
+ reader.restore()
+ for tb in self.messages:
+ count += 1
+ if tb:
+ print(tb)
+ self.messages.clear()
+ reader.scheduled_commands.append("ctrl-c")
+ reader.prepare()
+ return count
+
+ def add(self, s: str) -> None:
+ with self.lock:
+ self.messages.append(s)
+
+ def exception(self, args: ExceptHookArgs) -> None:
+ lines = traceback.format_exception(
+ args.exc_type,
+ args.exc_value,
+ args.exc_traceback,
+ colorize=reader.can_colorize,
+ ) # type: ignore[call-overload]
+ pre = f"\nException in {args.thread.name}:\n" if args.thread else "\n"
+ tb = pre + "".join(lines)
+ self.add(tb)
+
+ def __call__(self) -> int:
+ return self.show()
+
+
+ handler = ExceptHookHandler()
+ reader.threading_hook = handler
+ threading.excepthook = handler.exception
diff --git a/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py
new file mode 100644
index 00000000000..0589a0f437e
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/base_eventqueue.py
@@ -0,0 +1,110 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+OS-independent base for an event and VT sequence scanner
+
+See unix_eventqueue and windows_eventqueue for subclasses.
+"""
+
+from collections import deque
+
+from . import keymap
+from .console import Event
+from .trace import trace
+
+class BaseEventQueue:
+ def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
+ self.compiled_keymap = keymap.compile_keymap(keymap_dict)
+ self.keymap = self.compiled_keymap
+ trace("keymap {k!r}", k=self.keymap)
+ self.encoding = encoding
+ self.events: deque[Event] = deque()
+ self.buf = bytearray()
+
+ def get(self) -> Event | None:
+ """
+ Retrieves the next event from the queue.
+ """
+ if self.events:
+ return self.events.popleft()
+ else:
+ return None
+
+ def empty(self) -> bool:
+ """
+ Checks if the queue is empty.
+ """
+ return not self.events
+
+ def flush_buf(self) -> bytearray:
+ """
+ Flushes the buffer and returns its contents.
+ """
+ old = self.buf
+ self.buf = bytearray()
+ return old
+
+ def insert(self, event: Event) -> None:
+ """
+ Inserts an event into the queue.
+ """
+ trace('added event {event}', event=event)
+ self.events.append(event)
+
+ def push(self, char: int | bytes) -> None:
+ """
+ Processes a character by updating the buffer and handling special key mappings.
+ """
+ assert isinstance(char, (int, bytes))
+ ord_char = char if isinstance(char, int) else ord(char)
+ char = ord_char.to_bytes()
+ self.buf.append(ord_char)
+
+ if char in self.keymap:
+ if self.keymap is self.compiled_keymap:
+ # sanity check, buffer is empty when a special key comes
+ assert len(self.buf) == 1
+ k = self.keymap[char]
+ trace('found map {k!r}', k=k)
+ if isinstance(k, dict):
+ self.keymap = k
+ else:
+ self.insert(Event('key', k, bytes(self.flush_buf())))
+ self.keymap = self.compiled_keymap
+
+ elif self.buf and self.buf[0] == 27: # escape
+ # escape sequence not recognized by our keymap: propagate it
+ # outside so that i can be recognized as an M-... key (see also
+ # the docstring in keymap.py
+ trace('unrecognized escape sequence, propagating...')
+ self.keymap = self.compiled_keymap
+ self.insert(Event('key', '\033', b'\033'))
+ for _c in self.flush_buf()[1:]:
+ self.push(_c)
+
+ else:
+ try:
+ decoded = bytes(self.buf).decode(self.encoding)
+ except UnicodeError:
+ return
+ else:
+ self.insert(Event('key', decoded, bytes(self.flush_buf())))
+ self.keymap = self.compiled_keymap
diff --git a/contrib/tools/python3/Lib/_pyrepl/commands.py b/contrib/tools/python3/Lib/_pyrepl/commands.py
new file mode 100644
index 00000000000..f3bd6ead46c
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/commands.py
@@ -0,0 +1,492 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+import os
+
+# Categories of actions:
+# killing
+# yanking
+# motion
+# editing
+# history
+# finishing
+# [completion]
+
+
+# types
+if False:
+ from .historical_reader import HistoricalReader
+
+
+class Command:
+ finish: bool = False
+ kills_digit_arg: bool = True
+
+ def __init__(
+ self, reader: HistoricalReader, event_name: str, event: list[str]
+ ) -> None:
+ # Reader should really be "any reader" but there's too much usage of
+ # HistoricalReader methods and fields in the code below for us to
+ # refactor at the moment.
+
+ self.reader = reader
+ self.event = event
+ self.event_name = event_name
+
+ def do(self) -> None:
+ pass
+
+
+class KillCommand(Command):
+ def kill_range(self, start: int, end: int) -> None:
+ if start == end:
+ return
+ r = self.reader
+ b = r.buffer
+ text = b[start:end]
+ del b[start:end]
+ if is_kill(r.last_command):
+ if start < r.pos:
+ r.kill_ring[-1] = text + r.kill_ring[-1]
+ else:
+ r.kill_ring[-1] = r.kill_ring[-1] + text
+ else:
+ r.kill_ring.append(text)
+ r.pos = start
+ r.dirty = True
+
+
+class YankCommand(Command):
+ pass
+
+
+class MotionCommand(Command):
+ pass
+
+
+class EditCommand(Command):
+ pass
+
+
+class FinishCommand(Command):
+ finish = True
+ pass
+
+
+def is_kill(command: type[Command] | None) -> bool:
+ return command is not None and issubclass(command, KillCommand)
+
+
+def is_yank(command: type[Command] | None) -> bool:
+ return command is not None and issubclass(command, YankCommand)
+
+
+# etc
+
+
+class digit_arg(Command):
+ kills_digit_arg = False
+
+ def do(self) -> None:
+ r = self.reader
+ c = self.event[-1]
+ if c == "-":
+ if r.arg is not None:
+ r.arg = -r.arg
+ else:
+ r.arg = -1
+ else:
+ d = int(c)
+ if r.arg is None:
+ r.arg = d
+ else:
+ if r.arg < 0:
+ r.arg = 10 * r.arg - d
+ else:
+ r.arg = 10 * r.arg + d
+ r.dirty = True
+
+
+class clear_screen(Command):
+ def do(self) -> None:
+ r = self.reader
+ r.console.clear()
+ r.dirty = True
+
+
+class refresh(Command):
+ def do(self) -> None:
+ self.reader.dirty = True
+
+
+class repaint(Command):
+ def do(self) -> None:
+ self.reader.dirty = True
+ self.reader.console.repaint()
+
+
+class kill_line(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ eol = r.eol()
+ for c in b[r.pos : eol]:
+ if not c.isspace():
+ self.kill_range(r.pos, eol)
+ return
+ else:
+ self.kill_range(r.pos, eol + 1)
+
+
+class unix_line_discard(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ self.kill_range(r.bol(), r.pos)
+
+
+class unix_word_rubout(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.bow(), r.pos)
+
+
+class kill_word(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.pos, r.eow())
+
+
+class backward_kill_word(KillCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ self.kill_range(r.bow(), r.pos)
+
+
+class yank(YankCommand):
+ def do(self) -> None:
+ r = self.reader
+ if not r.kill_ring:
+ r.error("nothing to yank")
+ return
+ r.insert(r.kill_ring[-1])
+
+
+class yank_pop(YankCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if not r.kill_ring:
+ r.error("nothing to yank")
+ return
+ if not is_yank(r.last_command):
+ r.error("previous command was not a yank")
+ return
+ repl = len(r.kill_ring[-1])
+ r.kill_ring.insert(0, r.kill_ring.pop())
+ t = r.kill_ring[-1]
+ b[r.pos - repl : r.pos] = t
+ r.pos = r.pos - repl + len(t)
+ r.dirty = True
+
+
+class interrupt(FinishCommand):
+ def do(self) -> None:
+ import signal
+
+ self.reader.console.finish()
+ self.reader.finish()
+ os.kill(os.getpid(), signal.SIGINT)
+
+
+class ctrl_c(Command):
+ def do(self) -> None:
+ self.reader.console.finish()
+ self.reader.finish()
+ raise KeyboardInterrupt
+
+
+class suspend(Command):
+ def do(self) -> None:
+ import signal
+
+ r = self.reader
+ p = r.pos
+ r.console.finish()
+ os.kill(os.getpid(), signal.SIGSTOP)
+ ## this should probably be done
+ ## in a handler for SIGCONT?
+ r.console.prepare()
+ r.pos = p
+ # r.posxy = 0, 0 # XXX this is invalid
+ r.dirty = True
+ r.console.screen = []
+
+
+class up(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for _ in range(r.get_arg()):
+ x, y = r.pos2xy()
+ new_y = y - 1
+
+ if r.bol() == 0:
+ if r.historyi > 0:
+ r.select_item(r.historyi - 1)
+ return
+ r.pos = 0
+ r.error("start of buffer")
+ return
+
+ if (
+ x
+ > (
+ new_x := r.max_column(new_y)
+ ) # we're past the end of the previous line
+ or x == r.max_column(y)
+ and any(
+ not i.isspace() for i in r.buffer[r.bol() :]
+ ) # move between eols
+ ):
+ x = new_x
+
+ r.setpos_from_xy(x, new_y)
+
+
+class down(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for _ in range(r.get_arg()):
+ x, y = r.pos2xy()
+ new_y = y + 1
+
+ if r.eol() == len(b):
+ if r.historyi < len(r.history):
+ r.select_item(r.historyi + 1)
+ r.pos = r.eol(0)
+ return
+ r.pos = len(b)
+ r.error("end of buffer")
+ return
+
+ if (
+ x
+ > (
+ new_x := r.max_column(new_y)
+ ) # we're past the end of the previous line
+ or x == r.max_column(y)
+ and any(
+ not i.isspace() for i in r.buffer[r.bol() :]
+ ) # move between eols
+ ):
+ x = new_x
+
+ r.setpos_from_xy(x, new_y)
+
+
+class left(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for _ in range(r.get_arg()):
+ p = r.pos - 1
+ if p >= 0:
+ r.pos = p
+ else:
+ self.reader.error("start of buffer")
+
+
+class right(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for _ in range(r.get_arg()):
+ p = r.pos + 1
+ if p <= len(b):
+ r.pos = p
+ else:
+ self.reader.error("end of buffer")
+
+
+class beginning_of_line(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = self.reader.bol()
+
+
+class end_of_line(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = self.reader.eol()
+
+
+class home(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = 0
+
+
+class end(MotionCommand):
+ def do(self) -> None:
+ self.reader.pos = len(self.reader.buffer)
+
+
+class forward_word(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ r.pos = r.eow()
+
+
+class backward_word(MotionCommand):
+ def do(self) -> None:
+ r = self.reader
+ for i in range(r.get_arg()):
+ r.pos = r.bow()
+
+
+class self_insert(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ text = self.event * r.get_arg()
+ r.insert(text)
+
+
+class insert_nl(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ r.insert("\n" * r.get_arg())
+
+
+class transpose_characters(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ s = r.pos - 1
+ if s < 0:
+ r.error("cannot transpose at start of buffer")
+ else:
+ if s == len(b):
+ s -= 1
+ t = min(s + r.get_arg(), len(b) - 1)
+ c = b[s]
+ del b[s]
+ b.insert(t, c)
+ r.pos = t
+ r.dirty = True
+
+
+class backspace(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ for i in range(r.get_arg()):
+ if r.pos > 0:
+ r.pos -= 1
+ del b[r.pos]
+ r.dirty = True
+ else:
+ self.reader.error("can't backspace at start")
+
+
+class delete(EditCommand):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if self.event[-1] == "\004":
+ if b and b[-1].endswith("\n"):
+ self.finish = True
+ elif (
+ r.pos == 0
+ and len(b) == 0 # this is something of a hack
+ ):
+ r.update_screen()
+ r.console.finish()
+ raise EOFError
+
+ for i in range(r.get_arg()):
+ if r.pos != len(b):
+ del b[r.pos]
+ r.dirty = True
+ else:
+ self.reader.error("end of buffer")
+
+
+class accept(FinishCommand):
+ def do(self) -> None:
+ pass
+
+
+class help(Command):
+ def do(self) -> None:
+ import _sitebuiltins
+
+ with self.reader.suspend():
+ self.reader.msg = _sitebuiltins._Helper()() # type: ignore[assignment]
+
+
+class invalid_key(Command):
+ def do(self) -> None:
+ pending = self.reader.console.getpending()
+ s = "".join(self.event) + pending.data
+ self.reader.error("`%r' not bound" % s)
+
+
+class invalid_command(Command):
+ def do(self) -> None:
+ s = self.event_name
+ self.reader.error("command `%s' not known" % s)
+
+
+class show_history(Command):
+ def do(self) -> None:
+ from .pager import get_pager
+ from site import gethistoryfile
+
+ history = os.linesep.join(self.reader.history[:])
+ self.reader.console.restore()
+ pager = get_pager()
+ pager(history, gethistoryfile())
+ self.reader.console.prepare()
+
+ # We need to copy over the state so that it's consistent between
+ # console and reader, and console does not overwrite/append stuff
+ self.reader.console.screen = self.reader.screen.copy()
+ self.reader.console.posxy = self.reader.cxy
+
+
+class paste_mode(Command):
+
+ def do(self) -> None:
+ self.reader.paste_mode = not self.reader.paste_mode
+ self.reader.dirty = True
+
+
+class enable_bracketed_paste(Command):
+ def do(self) -> None:
+ self.reader.paste_mode = True
+ self.reader.in_bracketed_paste = True
+
+class disable_bracketed_paste(Command):
+ def do(self) -> None:
+ self.reader.paste_mode = False
+ self.reader.in_bracketed_paste = False
+ self.reader.dirty = True
diff --git a/contrib/tools/python3/Lib/_pyrepl/completing_reader.py b/contrib/tools/python3/Lib/_pyrepl/completing_reader.py
new file mode 100644
index 00000000000..9a005281dab
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/completing_reader.py
@@ -0,0 +1,295 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Antonio Cuni
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+
+import re
+from . import commands, console, reader
+from .reader import Reader
+
+
+# types
+Command = commands.Command
+if False:
+ from .types import KeySpec, CommandName
+
+
+def prefix(wordlist: list[str], j: int = 0) -> str:
+ d = {}
+ i = j
+ try:
+ while 1:
+ for word in wordlist:
+ d[word[i]] = 1
+ if len(d) > 1:
+ return wordlist[0][j:i]
+ i += 1
+ d = {}
+ except IndexError:
+ return wordlist[0][j:i]
+ return ""
+
+
+STRIPCOLOR_REGEX = re.compile(r"\x1B\[([0-9]{1,3}(;[0-9]{1,2})?)?[m|K]")
+
+def stripcolor(s: str) -> str:
+ return STRIPCOLOR_REGEX.sub('', s)
+
+
+def real_len(s: str) -> int:
+ return len(stripcolor(s))
+
+
+def left_align(s: str, maxlen: int) -> str:
+ stripped = stripcolor(s)
+ if len(stripped) > maxlen:
+ # too bad, we remove the color
+ return stripped[:maxlen]
+ padding = maxlen - len(stripped)
+ return s + ' '*padding
+
+
+def build_menu(
+ cons: console.Console,
+ wordlist: list[str],
+ start: int,
+ use_brackets: bool,
+ sort_in_column: bool,
+) -> tuple[list[str], int]:
+ if use_brackets:
+ item = "[ %s ]"
+ padding = 4
+ else:
+ item = "%s "
+ padding = 2
+ maxlen = min(max(map(real_len, wordlist)), cons.width - padding)
+ cols = int(cons.width / (maxlen + padding))
+ rows = int((len(wordlist) - 1)/cols + 1)
+
+ if sort_in_column:
+ # sort_in_column=False (default) sort_in_column=True
+ # A B C A D G
+ # D E F B E
+ # G C F
+ #
+ # "fill" the table with empty words, so we always have the same amout
+ # of rows for each column
+ missing = cols*rows - len(wordlist)
+ wordlist = wordlist + ['']*missing
+ indexes = [(i % cols) * rows + i // cols for i in range(len(wordlist))]
+ wordlist = [wordlist[i] for i in indexes]
+ menu = []
+ i = start
+ for r in range(rows):
+ row = []
+ for col in range(cols):
+ row.append(item % left_align(wordlist[i], maxlen))
+ i += 1
+ if i >= len(wordlist):
+ break
+ menu.append(''.join(row))
+ if i >= len(wordlist):
+ i = 0
+ break
+ if r + 5 > cons.height:
+ menu.append(" %d more... " % (len(wordlist) - i))
+ break
+ return menu, i
+
+# this gets somewhat user interface-y, and as a result the logic gets
+# very convoluted.
+#
+# To summarise the summary of the summary:- people are a problem.
+# -- The Hitch-Hikers Guide to the Galaxy, Episode 12
+
+#### Desired behaviour of the completions commands.
+# the considerations are:
+# (1) how many completions are possible
+# (2) whether the last command was a completion
+# (3) if we can assume that the completer is going to return the same set of
+# completions: this is controlled by the ``assume_immutable_completions``
+# variable on the reader, which is True by default to match the historical
+# behaviour of pyrepl, but e.g. False in the ReadlineAlikeReader to match
+# more closely readline's semantics (this is needed e.g. by
+# fancycompleter)
+#
+# if there's no possible completion, beep at the user and point this out.
+# this is easy.
+#
+# if there's only one possible completion, stick it in. if the last thing
+# user did was a completion, point out that he isn't getting anywhere, but
+# only if the ``assume_immutable_completions`` is True.
+#
+# now it gets complicated.
+#
+# for the first press of a completion key:
+# if there's a common prefix, stick it in.
+
+# irrespective of whether anything got stuck in, if the word is now
+# complete, show the "complete but not unique" message
+
+# if there's no common prefix and if the word is not now complete,
+# beep.
+
+# common prefix -> yes no
+# word complete \/
+# yes "cbnu" "cbnu"
+# no - beep
+
+# for the second bang on the completion key
+# there will necessarily be no common prefix
+# show a menu of the choices.
+
+# for subsequent bangs, rotate the menu around (if there are sufficient
+# choices).
+
+
+class complete(commands.Command):
+ def do(self) -> None:
+ r: CompletingReader
+ r = self.reader # type: ignore[assignment]
+ last_is_completer = r.last_command_is(self.__class__)
+ immutable_completions = r.assume_immutable_completions
+ completions_unchangable = last_is_completer and immutable_completions
+ stem = r.get_stem()
+ if not completions_unchangable:
+ r.cmpltn_menu_choices = r.get_completions(stem)
+
+ completions = r.cmpltn_menu_choices
+ if not completions:
+ r.error("no matches")
+ elif len(completions) == 1:
+ if completions_unchangable and len(completions[0]) == len(stem):
+ r.msg = "[ sole completion ]"
+ r.dirty = True
+ r.insert(completions[0][len(stem):])
+ else:
+ p = prefix(completions, len(stem))
+ if p:
+ r.insert(p)
+ if last_is_completer:
+ r.cmpltn_menu_visible = True
+ r.cmpltn_message_visible = False
+ r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
+ r.console, completions, r.cmpltn_menu_end,
+ r.use_brackets, r.sort_in_column)
+ r.dirty = True
+ elif not r.cmpltn_menu_visible:
+ r.cmpltn_message_visible = True
+ if stem + p in completions:
+ r.msg = "[ complete but not unique ]"
+ r.dirty = True
+ else:
+ r.msg = "[ not unique ]"
+ r.dirty = True
+
+
+class self_insert(commands.self_insert):
+ def do(self) -> None:
+ r: CompletingReader
+ r = self.reader # type: ignore[assignment]
+
+ commands.self_insert.do(self)
+ if r.cmpltn_menu_visible:
+ stem = r.get_stem()
+ if len(stem) < 1:
+ r.cmpltn_reset()
+ else:
+ completions = [w for w in r.cmpltn_menu_choices
+ if w.startswith(stem)]
+ if completions:
+ r.cmpltn_menu, r.cmpltn_menu_end = build_menu(
+ r.console, completions, 0,
+ r.use_brackets, r.sort_in_column)
+ else:
+ r.cmpltn_reset()
+
+
+@dataclass
+class CompletingReader(Reader):
+ """Adds completion support"""
+
+ ### Class variables
+ # see the comment for the complete command
+ assume_immutable_completions = True
+ use_brackets = True # display completions inside []
+ sort_in_column = False
+
+ ### Instance variables
+ cmpltn_menu: list[str] = field(init=False)
+ cmpltn_menu_visible: bool = field(init=False)
+ cmpltn_message_visible: bool = field(init=False)
+ cmpltn_menu_end: int = field(init=False)
+ cmpltn_menu_choices: list[str] = field(init=False)
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ self.cmpltn_reset()
+ for c in (complete, self_insert):
+ self.commands[c.__name__] = c
+ self.commands[c.__name__.replace('_', '-')] = c
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r'\t', 'complete'),)
+
+ def after_command(self, cmd: Command) -> None:
+ super().after_command(cmd)
+ if not isinstance(cmd, (complete, self_insert)):
+ self.cmpltn_reset()
+
+ def calc_screen(self) -> list[str]:
+ screen = super().calc_screen()
+ if self.cmpltn_menu_visible:
+ # We display the completions menu below the current prompt
+ ly = self.lxy[1] + 1
+ screen[ly:ly] = self.cmpltn_menu
+ # If we're not in the middle of multiline edit, don't append to screeninfo
+ # since that screws up the position calculation in pos2xy function.
+ # This is a hack to prevent the cursor jumping
+ # into the completions menu when pressing left or down arrow.
+ if self.pos != len(self.buffer):
+ self.screeninfo[ly:ly] = [(0, [])]*len(self.cmpltn_menu)
+ return screen
+
+ def finish(self) -> None:
+ super().finish()
+ self.cmpltn_reset()
+
+ def cmpltn_reset(self) -> None:
+ self.cmpltn_menu = []
+ self.cmpltn_menu_visible = False
+ self.cmpltn_message_visible = False
+ self.cmpltn_menu_end = 0
+ self.cmpltn_menu_choices = []
+
+ def get_stem(self) -> str:
+ st = self.syntax_table
+ SW = reader.SYNTAX_WORD
+ b = self.buffer
+ p = self.pos - 1
+ while p >= 0 and st.get(b[p], SW) == SW:
+ p -= 1
+ return ''.join(b[p+1:self.pos])
+
+ def get_completions(self, stem: str) -> list[str]:
+ return []
diff --git a/contrib/tools/python3/Lib/_pyrepl/console.py b/contrib/tools/python3/Lib/_pyrepl/console.py
new file mode 100644
index 00000000000..8956fb1242e
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/console.py
@@ -0,0 +1,229 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import _colorize
+
+from abc import ABC, abstractmethod
+import ast
+import code
+import linecache
+from dataclasses import dataclass, field
+import os.path
+import sys
+
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import IO
+ from typing import Callable
+
+
+@dataclass
+class Event:
+ evt: str
+ data: str
+ raw: bytes = b""
+
+
+@dataclass
+class Console(ABC):
+ posxy: tuple[int, int]
+ screen: list[str] = field(default_factory=list)
+ height: int = 25
+ width: int = 80
+
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ self.encoding = encoding or sys.getdefaultencoding()
+
+ if isinstance(f_in, int):
+ self.input_fd = f_in
+ else:
+ self.input_fd = f_in.fileno()
+
+ if isinstance(f_out, int):
+ self.output_fd = f_out
+ else:
+ self.output_fd = f_out.fileno()
+
+ @abstractmethod
+ def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
+
+ @abstractmethod
+ def prepare(self) -> None: ...
+
+ @abstractmethod
+ def restore(self) -> None: ...
+
+ @abstractmethod
+ def move_cursor(self, x: int, y: int) -> None: ...
+
+ @abstractmethod
+ def set_cursor_vis(self, visible: bool) -> None: ...
+
+ @abstractmethod
+ def getheightwidth(self) -> tuple[int, int]:
+ """Return (height, width) where height and width are the height
+ and width of the terminal window in characters."""
+ ...
+
+ @abstractmethod
+ def get_event(self, block: bool = True) -> Event | None:
+ """Return an Event instance. Returns None if |block| is false
+ and there is no event pending, otherwise waits for the
+ completion of an event."""
+ ...
+
+ @abstractmethod
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ ...
+
+ @abstractmethod
+ def beep(self) -> None: ...
+
+ @abstractmethod
+ def clear(self) -> None:
+ """Wipe the screen"""
+ ...
+
+ @abstractmethod
+ def finish(self) -> None:
+ """Move the cursor to the end of the display and otherwise get
+ ready for end. XXX could be merged with restore? Hmm."""
+ ...
+
+ @abstractmethod
+ def flushoutput(self) -> None:
+ """Flush all output to the screen (assuming there's some
+ buffering going on somewhere)."""
+ ...
+
+ @abstractmethod
+ def forgetinput(self) -> None:
+ """Forget all pending, but not yet processed input."""
+ ...
+
+ @abstractmethod
+ def getpending(self) -> Event:
+ """Return the characters that have been typed but not yet
+ processed."""
+ ...
+
+ @abstractmethod
+ def wait(self, timeout: float | None) -> bool:
+ """Wait for an event. The return value is True if an event is
+ available, False if the timeout has been reached. If timeout is
+ None, wait forever. The timeout is in milliseconds."""
+ ...
+
+ @property
+ def input_hook(self) -> Callable[[], int] | None:
+ """Returns the current input hook."""
+ ...
+
+ @abstractmethod
+ def repaint(self) -> None: ...
+
+
+class InteractiveColoredConsole(code.InteractiveConsole):
+ STATEMENT_FAILED = object()
+
+ def __init__(
+ self,
+ locals: dict[str, object] | None = None,
+ filename: str = "<console>",
+ *,
+ local_exit: bool = False,
+ ) -> None:
+ super().__init__(locals=locals, filename=filename, local_exit=local_exit)
+ self.can_colorize = _colorize.can_colorize()
+
+ def showsyntaxerror(self, filename=None, **kwargs):
+ super().showsyntaxerror(filename=filename, **kwargs)
+
+ def _excepthook(self, typ, value, tb):
+ import traceback
+ lines = traceback.format_exception(
+ typ, value, tb,
+ colorize=self.can_colorize,
+ limit=traceback.BUILTIN_EXCEPTION_LIMIT)
+ self.write(''.join(lines))
+
+ def runcode(self, code):
+ try:
+ exec(code, self.locals)
+ except SystemExit:
+ raise
+ except BaseException:
+ self.showtraceback()
+ return self.STATEMENT_FAILED
+ return None
+
+ def runsource(self, source, filename="<input>", symbol="single"):
+ try:
+ tree = self.compile.compiler(
+ source,
+ filename,
+ "exec",
+ ast.PyCF_ONLY_AST,
+ incomplete_input=False,
+ )
+ except (SyntaxError, OverflowError, ValueError):
+ self.showsyntaxerror(filename, source=source)
+ return False
+ if tree.body:
+ *_, last_stmt = tree.body
+ for stmt in tree.body:
+ wrapper = ast.Interactive if stmt is last_stmt else ast.Module
+ the_symbol = symbol if stmt is last_stmt else "exec"
+ item = wrapper([stmt])
+ try:
+ code = self.compile.compiler(item, filename, the_symbol)
+ linecache._register_code(code, source, filename)
+ except SyntaxError as e:
+ if e.args[0] == "'await' outside function":
+ python = os.path.basename(sys.executable)
+ e.add_note(
+ f"Try the asyncio REPL ({python} -m asyncio) to use"
+ f" top-level 'await' and run background asyncio tasks."
+ )
+ self.showsyntaxerror(filename, source=source)
+ return False
+ except (OverflowError, ValueError):
+ self.showsyntaxerror(filename, source=source)
+ return False
+
+ if code is None:
+ return True
+
+ result = self.runcode(code)
+ if result is self.STATEMENT_FAILED:
+ break
+ return False
diff --git a/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py b/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py
new file mode 100644
index 00000000000..8d5bd183f21
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/fancy_termios.py
@@ -0,0 +1,82 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import termios
+
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import cast
+else:
+ cast = lambda typ, val: val
+
+
+class TermState:
+ def __init__(self, attrs: list[int | list[bytes]]) -> None:
+ self.iflag = cast(int, attrs[0])
+ self.oflag = cast(int, attrs[1])
+ self.cflag = cast(int, attrs[2])
+ self.lflag = cast(int, attrs[3])
+ self.ispeed = cast(int, attrs[4])
+ self.ospeed = cast(int, attrs[5])
+ self.cc = cast(list[bytes], attrs[6])
+
+ def as_list(self) -> list[int | list[bytes]]:
+ return [
+ self.iflag,
+ self.oflag,
+ self.cflag,
+ self.lflag,
+ self.ispeed,
+ self.ospeed,
+ # Always return a copy of the control characters list to ensure
+ # there are not any additional references to self.cc
+ self.cc[:],
+ ]
+
+ def copy(self) -> "TermState":
+ return self.__class__(self.as_list())
+
+
+def tcgetattr(fd: int) -> TermState:
+ return TermState(termios.tcgetattr(fd))
+
+
+def tcsetattr(fd: int, when: int, attrs: TermState) -> None:
+ termios.tcsetattr(fd, when, attrs.as_list())
+
+
+class Term(TermState):
+ TS__init__ = TermState.__init__
+
+ def __init__(self, fd: int = 0) -> None:
+ self.TS__init__(termios.tcgetattr(fd))
+ self.fd = fd
+ self.stack: list[list[int | list[bytes]]] = []
+
+ def save(self) -> None:
+ self.stack.append(self.as_list())
+
+ def set(self, when: int = termios.TCSANOW) -> None:
+ termios.tcsetattr(self.fd, when, self.as_list())
+
+ def restore(self) -> None:
+ self.TS__init__(self.stack.pop())
+ self.set()
diff --git a/contrib/tools/python3/Lib/_pyrepl/historical_reader.py b/contrib/tools/python3/Lib/_pyrepl/historical_reader.py
new file mode 100644
index 00000000000..c4b95fa2e81
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/historical_reader.py
@@ -0,0 +1,419 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field
+
+from . import commands, input
+from .reader import Reader
+
+
+if False:
+ from .types import SimpleContextManager, KeySpec, CommandName
+
+
+isearch_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
+ [("\\%03o" % c, "isearch-end") for c in range(256) if chr(c) != "\\"]
+ + [(c, "isearch-add-character") for c in map(chr, range(32, 127)) if c != "\\"]
+ + [
+ ("\\%03o" % c, "isearch-add-character")
+ for c in range(256)
+ if chr(c).isalpha() and chr(c) != "\\"
+ ]
+ + [
+ ("\\\\", "self-insert"),
+ (r"\C-r", "isearch-backwards"),
+ (r"\C-s", "isearch-forwards"),
+ (r"\C-c", "isearch-cancel"),
+ (r"\C-g", "isearch-cancel"),
+ (r"\<backspace>", "isearch-backspace"),
+ ]
+)
+
+ISEARCH_DIRECTION_NONE = ""
+ISEARCH_DIRECTION_BACKWARDS = "r"
+ISEARCH_DIRECTION_FORWARDS = "f"
+
+
+class next_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi == len(r.history):
+ r.error("end of history list")
+ return
+ r.select_item(r.historyi + 1)
+
+
+class previous_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi == 0:
+ r.error("start of history list")
+ return
+ r.select_item(r.historyi - 1)
+
+
+class history_search_backward(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.search_next(forwards=False)
+
+
+class history_search_forward(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.search_next(forwards=True)
+
+
+class restore_history(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.historyi != len(r.history):
+ if r.get_unicode() != r.history[r.historyi]:
+ r.buffer = list(r.history[r.historyi])
+ r.pos = len(r.buffer)
+ r.dirty = True
+
+
+class first_history(commands.Command):
+ def do(self) -> None:
+ self.reader.select_item(0)
+
+
+class last_history(commands.Command):
+ def do(self) -> None:
+ self.reader.select_item(len(self.reader.history))
+
+
+class operate_and_get_next(commands.FinishCommand):
+ def do(self) -> None:
+ self.reader.next_history = self.reader.historyi + 1
+
+
+class yank_arg(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if r.last_command is self.__class__:
+ r.yank_arg_i += 1
+ else:
+ r.yank_arg_i = 0
+ if r.historyi < r.yank_arg_i:
+ r.error("beginning of history list")
+ return
+ a = r.get_arg(-1)
+ # XXX how to split?
+ words = r.get_item(r.historyi - r.yank_arg_i - 1).split()
+ if a < -len(words) or a >= len(words):
+ r.error("no such arg")
+ return
+ w = words[a]
+ b = r.buffer
+ if r.yank_arg_i > 0:
+ o = len(r.yank_arg_yanked)
+ else:
+ o = 0
+ b[r.pos - o : r.pos] = list(w)
+ r.yank_arg_yanked = w
+ r.pos += len(w) - o
+ r.dirty = True
+
+
+class forward_history_isearch(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_FORWARDS
+ r.isearch_start = r.historyi, r.pos
+ r.isearch_term = ""
+ r.dirty = True
+ r.push_input_trans(r.isearch_trans)
+
+
+class reverse_history_isearch(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS
+ r.dirty = True
+ r.isearch_term = ""
+ r.push_input_trans(r.isearch_trans)
+ r.isearch_start = r.historyi, r.pos
+
+
+class isearch_cancel(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_NONE
+ r.pop_input_trans()
+ r.select_item(r.isearch_start[0])
+ r.pos = r.isearch_start[1]
+ r.dirty = True
+
+
+class isearch_add_character(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ r.isearch_term += self.event[-1]
+ r.dirty = True
+ p = r.pos + len(r.isearch_term) - 1
+ if b[p : p + 1] != [r.isearch_term[-1]]:
+ r.isearch_next()
+
+
+class isearch_backspace(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ if len(r.isearch_term) > 0:
+ r.isearch_term = r.isearch_term[:-1]
+ r.dirty = True
+ else:
+ r.error("nothing to rubout")
+
+
+class isearch_forwards(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_FORWARDS
+ r.isearch_next()
+
+
+class isearch_backwards(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_BACKWARDS
+ r.isearch_next()
+
+
+class isearch_end(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ r.isearch_direction = ISEARCH_DIRECTION_NONE
+ r.console.forgetinput()
+ r.pop_input_trans()
+ r.dirty = True
+
+
+@dataclass
+class HistoricalReader(Reader):
+ """Adds history support (with incremental history searching) to the
+ Reader class.
+ """
+
+ history: list[str] = field(default_factory=list)
+ historyi: int = 0
+ next_history: int | None = None
+ transient_history: dict[int, str] = field(default_factory=dict)
+ isearch_term: str = ""
+ isearch_direction: str = ISEARCH_DIRECTION_NONE
+ isearch_start: tuple[int, int] = field(init=False)
+ isearch_trans: input.KeymapTranslator = field(init=False)
+ yank_arg_i: int = 0
+ yank_arg_yanked: str = ""
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ for c in [
+ next_history,
+ previous_history,
+ restore_history,
+ first_history,
+ last_history,
+ yank_arg,
+ forward_history_isearch,
+ reverse_history_isearch,
+ isearch_end,
+ isearch_add_character,
+ isearch_cancel,
+ isearch_add_character,
+ isearch_backspace,
+ isearch_forwards,
+ isearch_backwards,
+ operate_and_get_next,
+ history_search_backward,
+ history_search_forward,
+ ]:
+ self.commands[c.__name__] = c
+ self.commands[c.__name__.replace("_", "-")] = c
+ self.isearch_start = self.historyi, self.pos
+ self.isearch_trans = input.KeymapTranslator(
+ isearch_keymap, invalid_cls=isearch_end, character_cls=isearch_add_character
+ )
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r"\C-n", "next-history"),
+ (r"\C-p", "previous-history"),
+ (r"\C-o", "operate-and-get-next"),
+ (r"\C-r", "reverse-history-isearch"),
+ (r"\C-s", "forward-history-isearch"),
+ (r"\M-r", "restore-history"),
+ (r"\M-.", "yank-arg"),
+ (r"\<page down>", "history-search-forward"),
+ (r"\x1b[6~", "history-search-forward"),
+ (r"\<page up>", "history-search-backward"),
+ (r"\x1b[5~", "history-search-backward"),
+ )
+
+ def select_item(self, i: int) -> None:
+ self.transient_history[self.historyi] = self.get_unicode()
+ buf = self.transient_history.get(i)
+ if buf is None:
+ buf = self.history[i].rstrip()
+ self.buffer = list(buf)
+ self.historyi = i
+ self.pos = len(self.buffer)
+ self.dirty = True
+ self.last_refresh_cache.invalidated = True
+
+ def get_item(self, i: int) -> str:
+ if i != len(self.history):
+ return self.transient_history.get(i, self.history[i])
+ else:
+ return self.transient_history.get(i, self.get_unicode())
+
+ @contextmanager
+ def suspend(self) -> SimpleContextManager:
+ with super().suspend(), self.suspend_history():
+ yield
+
+ @contextmanager
+ def suspend_history(self) -> SimpleContextManager:
+ try:
+ old_history = self.history[:]
+ del self.history[:]
+ yield
+ finally:
+ self.history[:] = old_history
+
+ def prepare(self) -> None:
+ super().prepare()
+ try:
+ self.transient_history = {}
+ if self.next_history is not None and self.next_history < len(self.history):
+ self.historyi = self.next_history
+ self.buffer[:] = list(self.history[self.next_history])
+ self.pos = len(self.buffer)
+ self.transient_history[len(self.history)] = ""
+ else:
+ self.historyi = len(self.history)
+ self.next_history = None
+ except:
+ self.restore()
+ raise
+
+ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
+ if cursor_on_line and self.isearch_direction != ISEARCH_DIRECTION_NONE:
+ d = "rf"[self.isearch_direction == ISEARCH_DIRECTION_FORWARDS]
+ return "(%s-search `%s') " % (d, self.isearch_term)
+ else:
+ return super().get_prompt(lineno, cursor_on_line)
+
+ def search_next(self, *, forwards: bool) -> None:
+ """Search history for the current line contents up to the cursor.
+
+ Selects the first item found. If nothing is under the cursor, any next
+ item in history is selected.
+ """
+ pos = self.pos
+ s = self.get_unicode()
+ history_index = self.historyi
+
+ # In multiline contexts, we're only interested in the current line.
+ nl_index = s.rfind('\n', 0, pos)
+ prefix = s[nl_index + 1:pos]
+ pos = len(prefix)
+
+ match_prefix = len(prefix)
+ len_item = 0
+ if history_index < len(self.history):
+ len_item = len(self.get_item(history_index))
+ if len_item and pos == len_item:
+ match_prefix = False
+ elif not pos:
+ match_prefix = False
+
+ while 1:
+ if forwards:
+ out_of_bounds = history_index >= len(self.history) - 1
+ else:
+ out_of_bounds = history_index == 0
+ if out_of_bounds:
+ if forwards and not match_prefix:
+ self.pos = 0
+ self.buffer = []
+ self.dirty = True
+ else:
+ self.error("not found")
+ return
+
+ history_index += 1 if forwards else -1
+ s = self.get_item(history_index)
+
+ if not match_prefix:
+ self.select_item(history_index)
+ return
+
+ len_acc = 0
+ for i, line in enumerate(s.splitlines(keepends=True)):
+ if line.startswith(prefix):
+ self.select_item(history_index)
+ self.pos = pos + len_acc
+ return
+ len_acc += len(line)
+
+ def isearch_next(self) -> None:
+ st = self.isearch_term
+ p = self.pos
+ i = self.historyi
+ s = self.get_unicode()
+ forwards = self.isearch_direction == ISEARCH_DIRECTION_FORWARDS
+ while 1:
+ if forwards:
+ p = s.find(st, p + 1)
+ else:
+ p = s.rfind(st, 0, p + len(st) - 1)
+ if p != -1:
+ self.select_item(i)
+ self.pos = p
+ return
+ elif (forwards and i >= len(self.history) - 1) or (not forwards and i == 0):
+ self.error("not found")
+ return
+ else:
+ if forwards:
+ i += 1
+ s = self.get_item(i)
+ p = -1
+ else:
+ i -= 1
+ s = self.get_item(i)
+ p = len(s)
+
+ def finish(self) -> None:
+ super().finish()
+ ret = self.get_unicode()
+ for i, t in self.transient_history.items():
+ if i < len(self.history) and i != self.historyi:
+ self.history[i] = t
+ if ret and should_auto_add_history:
+ self.history.append(ret)
+
+
+should_auto_add_history = True
diff --git a/contrib/tools/python3/Lib/_pyrepl/input.py b/contrib/tools/python3/Lib/_pyrepl/input.py
new file mode 100644
index 00000000000..21c24eb5cde
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/input.py
@@ -0,0 +1,114 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+# (naming modules after builtin functions is not such a hot idea...)
+
+# an KeyTrans instance translates Event objects into Command objects
+
+# hmm, at what level do we want [C-i] and [tab] to be equivalent?
+# [meta-a] and [esc a]? obviously, these are going to be equivalent
+# for the UnixConsole, but should they be for PygameConsole?
+
+# it would in any situation seem to be a bad idea to bind, say, [tab]
+# and [C-i] to *different* things... but should binding one bind the
+# other?
+
+# executive, temporary decision: [tab] and [C-i] are distinct, but
+# [meta-key] is identified with [esc key]. We demand that any console
+# class does quite a lot towards emulating a unix terminal.
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+import unicodedata
+from collections import deque
+
+
+# types
+if False:
+ from .types import EventTuple
+
+
+class InputTranslator(ABC):
+ @abstractmethod
+ def push(self, evt: EventTuple) -> None:
+ pass
+
+ @abstractmethod
+ def get(self) -> EventTuple | None:
+ return None
+
+ @abstractmethod
+ def empty(self) -> bool:
+ return True
+
+
+class KeymapTranslator(InputTranslator):
+ def __init__(self, keymap, verbose=False, invalid_cls=None, character_cls=None):
+ self.verbose = verbose
+ from .keymap import compile_keymap, parse_keys
+
+ self.keymap = keymap
+ self.invalid_cls = invalid_cls
+ self.character_cls = character_cls
+ d = {}
+ for keyspec, command in keymap:
+ keyseq = tuple(parse_keys(keyspec))
+ d[keyseq] = command
+ if self.verbose:
+ print(d)
+ self.k = self.ck = compile_keymap(d, ())
+ self.results = deque()
+ self.stack = []
+
+ def push(self, evt):
+ if self.verbose:
+ print("pushed", evt.data, end="")
+ key = evt.data
+ d = self.k.get(key)
+ if isinstance(d, dict):
+ if self.verbose:
+ print("transition")
+ self.stack.append(key)
+ self.k = d
+ else:
+ if d is None:
+ if self.verbose:
+ print("invalid")
+ if self.stack or len(key) > 1 or unicodedata.category(key) == "C":
+ self.results.append((self.invalid_cls, self.stack + [key]))
+ else:
+ # small optimization:
+ self.k[key] = self.character_cls
+ self.results.append((self.character_cls, [key]))
+ else:
+ if self.verbose:
+ print("matched", d)
+ self.results.append((d, self.stack + [key]))
+ self.stack = []
+ self.k = self.ck
+
+ def get(self):
+ if self.results:
+ return self.results.popleft()
+ else:
+ return None
+
+ def empty(self) -> bool:
+ return not self.results
diff --git a/contrib/tools/python3/Lib/_pyrepl/keymap.py b/contrib/tools/python3/Lib/_pyrepl/keymap.py
new file mode 100644
index 00000000000..2fb03d19523
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/keymap.py
@@ -0,0 +1,213 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+Keymap contains functions for parsing keyspecs and turning keyspecs into
+appropriate sequences.
+
+A keyspec is a string representing a sequence of key presses that can
+be bound to a command. All characters other than the backslash represent
+themselves. In the traditional manner, a backslash introduces an escape
+sequence.
+
+pyrepl uses its own keyspec format that is meant to be a strict superset of
+readline's KEYSEQ format. This means that if a spec is found that readline
+accepts that this doesn't, it should be logged as a bug. Note that this means
+we're using the `\\C-o' style of readline's keyspec, not the `Control-o' sort.
+
+The extension to readline is that the sequence \\<KEY> denotes the
+sequence of characters produced by hitting KEY.
+
+Examples:
+`a' - what you get when you hit the `a' key
+`\\EOA' - Escape - O - A (up, on my terminal)
+`\\<UP>' - the up arrow key
+`\\<up>' - ditto (keynames are case-insensitive)
+`\\C-o', `\\c-o' - control-o
+`\\M-.' - meta-period
+`\\E.' - ditto (that's how meta works for pyrepl)
+`\\<tab>', `\\<TAB>', `\\t', `\\011', '\\x09', '\\X09', '\\C-i', '\\C-I'
+ - all of these are the tab character.
+"""
+
+_escapes = {
+ "\\": "\\",
+ "'": "'",
+ '"': '"',
+ "a": "\a",
+ "b": "\b",
+ "e": "\033",
+ "f": "\f",
+ "n": "\n",
+ "r": "\r",
+ "t": "\t",
+ "v": "\v",
+}
+
+_keynames = {
+ "backspace": "backspace",
+ "delete": "delete",
+ "down": "down",
+ "end": "end",
+ "enter": "\r",
+ "escape": "\033",
+ "f1": "f1",
+ "f2": "f2",
+ "f3": "f3",
+ "f4": "f4",
+ "f5": "f5",
+ "f6": "f6",
+ "f7": "f7",
+ "f8": "f8",
+ "f9": "f9",
+ "f10": "f10",
+ "f11": "f11",
+ "f12": "f12",
+ "f13": "f13",
+ "f14": "f14",
+ "f15": "f15",
+ "f16": "f16",
+ "f17": "f17",
+ "f18": "f18",
+ "f19": "f19",
+ "f20": "f20",
+ "home": "home",
+ "insert": "insert",
+ "left": "left",
+ "page down": "page down",
+ "page up": "page up",
+ "return": "\r",
+ "right": "right",
+ "space": " ",
+ "tab": "\t",
+ "up": "up",
+}
+
+
+class KeySpecError(Exception):
+ pass
+
+
+def parse_keys(keys: str) -> list[str]:
+ """Parse keys in keyspec format to a sequence of keys."""
+ s = 0
+ r: list[str] = []
+ while s < len(keys):
+ k, s = _parse_single_key_sequence(keys, s)
+ r.extend(k)
+ return r
+
+
+def _parse_single_key_sequence(key: str, s: int) -> tuple[list[str], int]:
+ ctrl = 0
+ meta = 0
+ ret = ""
+ while not ret and s < len(key):
+ if key[s] == "\\":
+ c = key[s + 1].lower()
+ if c in _escapes:
+ ret = _escapes[c]
+ s += 2
+ elif c == "c":
+ if key[s + 2] != "-":
+ raise KeySpecError(
+ "\\C must be followed by `-' (char %d of %s)"
+ % (s + 2, repr(key))
+ )
+ if ctrl:
+ raise KeySpecError(
+ "doubled \\C- (char %d of %s)" % (s + 1, repr(key))
+ )
+ ctrl = 1
+ s += 3
+ elif c == "m":
+ if key[s + 2] != "-":
+ raise KeySpecError(
+ "\\M must be followed by `-' (char %d of %s)"
+ % (s + 2, repr(key))
+ )
+ if meta:
+ raise KeySpecError(
+ "doubled \\M- (char %d of %s)" % (s + 1, repr(key))
+ )
+ meta = 1
+ s += 3
+ elif c.isdigit():
+ n = key[s + 1 : s + 4]
+ ret = chr(int(n, 8))
+ s += 4
+ elif c == "x":
+ n = key[s + 2 : s + 4]
+ ret = chr(int(n, 16))
+ s += 4
+ elif c == "<":
+ t = key.find(">", s)
+ if t == -1:
+ raise KeySpecError(
+ "unterminated \\< starting at char %d of %s"
+ % (s + 1, repr(key))
+ )
+ ret = key[s + 2 : t].lower()
+ if ret not in _keynames:
+ raise KeySpecError(
+ "unrecognised keyname `%s' at char %d of %s"
+ % (ret, s + 2, repr(key))
+ )
+ ret = _keynames[ret]
+ s = t + 1
+ else:
+ raise KeySpecError(
+ "unknown backslash escape %s at char %d of %s"
+ % (repr(c), s + 2, repr(key))
+ )
+ else:
+ ret = key[s]
+ s += 1
+ if ctrl:
+ if len(ret) == 1:
+ ret = chr(ord(ret) & 0x1F) # curses.ascii.ctrl()
+ elif ret in {"left", "right"}:
+ ret = f"ctrl {ret}"
+ else:
+ raise KeySpecError("\\C- followed by invalid key")
+
+ result = [ret], s
+ if meta:
+ result[0].insert(0, "\033")
+ return result
+
+
+def compile_keymap(keymap, empty=b""):
+ r = {}
+ for key, value in keymap.items():
+ if isinstance(key, bytes):
+ first = key[:1]
+ else:
+ first = key[0]
+ r.setdefault(first, {})[key[1:]] = value
+ for key, value in r.items():
+ if empty in value:
+ if len(value) != 1:
+ raise KeySpecError("key definitions for %s clash" % (value.values(),))
+ else:
+ r[key] = value[empty]
+ else:
+ r[key] = compile_keymap(value, empty)
+ return r
diff --git a/contrib/tools/python3/Lib/_pyrepl/main.py b/contrib/tools/python3/Lib/_pyrepl/main.py
new file mode 100644
index 00000000000..a6f824dcc4a
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/main.py
@@ -0,0 +1,59 @@
+import errno
+import os
+import sys
+
+
+CAN_USE_PYREPL: bool
+FAIL_REASON: str
+try:
+ if sys.platform == "win32" and sys.getwindowsversion().build < 10586:
+ raise RuntimeError("Windows 10 TH2 or later required")
+ if not os.isatty(sys.stdin.fileno()):
+ raise OSError(errno.ENOTTY, "tty required", "stdin")
+ from .simple_interact import check
+ if err := check():
+ raise RuntimeError(err)
+except Exception as e:
+ CAN_USE_PYREPL = False
+ FAIL_REASON = f"warning: can't use pyrepl: {e}"
+else:
+ CAN_USE_PYREPL = True
+ FAIL_REASON = ""
+
+
+def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
+ if not CAN_USE_PYREPL:
+ if not os.getenv('PYTHON_BASIC_REPL') and FAIL_REASON:
+ from .trace import trace
+ trace(FAIL_REASON)
+ print(FAIL_REASON, file=sys.stderr)
+ return sys._baserepl()
+
+ if mainmodule:
+ namespace = mainmodule.__dict__
+ else:
+ import __main__
+ namespace = __main__.__dict__
+ namespace.pop("__pyrepl_interactive_console", None)
+
+ # sys._baserepl() above does this internally, we do it here
+ startup_path = os.getenv("PYTHONSTARTUP")
+ if pythonstartup and startup_path:
+ sys.audit("cpython.run_startup", startup_path)
+
+ import tokenize
+ with tokenize.open(startup_path) as f:
+ startup_code = compile(f.read(), startup_path, "exec")
+ exec(startup_code, namespace)
+
+ # set sys.{ps1,ps2} just before invoking the interactive interpreter. This
+ # mimics what CPython does in pythonrun.c
+ if not hasattr(sys, "ps1"):
+ sys.ps1 = ">>> "
+ if not hasattr(sys, "ps2"):
+ sys.ps2 = "... "
+
+ from .console import InteractiveColoredConsole
+ from .simple_interact import run_multiline_interactive_console
+ console = InteractiveColoredConsole(namespace, filename="<stdin>")
+ run_multiline_interactive_console(console)
diff --git a/contrib/tools/python3/Lib/_pyrepl/pager.py b/contrib/tools/python3/Lib/_pyrepl/pager.py
new file mode 100644
index 00000000000..1fddc63e3ee
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/pager.py
@@ -0,0 +1,175 @@
+from __future__ import annotations
+
+import io
+import os
+import re
+import sys
+
+
+# types
+if False:
+ from typing import Protocol
+ class Pager(Protocol):
+ def __call__(self, text: str, title: str = "") -> None:
+ ...
+
+
+def get_pager() -> Pager:
+ """Decide what method to use for paging through text."""
+ if not hasattr(sys.stdin, "isatty"):
+ return plain_pager
+ if not hasattr(sys.stdout, "isatty"):
+ return plain_pager
+ if not sys.stdin.isatty() or not sys.stdout.isatty():
+ return plain_pager
+ if sys.platform == "emscripten":
+ return plain_pager
+ use_pager = os.environ.get('MANPAGER') or os.environ.get('PAGER')
+ if use_pager:
+ if sys.platform == 'win32': # pipes completely broken in Windows
+ return lambda text, title='': tempfile_pager(plain(text), use_pager)
+ elif os.environ.get('TERM') in ('dumb', 'emacs'):
+ return lambda text, title='': pipe_pager(plain(text), use_pager, title)
+ else:
+ return lambda text, title='': pipe_pager(text, use_pager, title)
+ if os.environ.get('TERM') in ('dumb', 'emacs'):
+ return plain_pager
+ if sys.platform == 'win32':
+ return lambda text, title='': tempfile_pager(plain(text), 'more <')
+ if hasattr(os, 'system') and os.system('(pager) 2>/dev/null') == 0:
+ return lambda text, title='': pipe_pager(text, 'pager', title)
+ if hasattr(os, 'system') and os.system('(less) 2>/dev/null') == 0:
+ return lambda text, title='': pipe_pager(text, 'less', title)
+
+ import tempfile
+ (fd, filename) = tempfile.mkstemp()
+ os.close(fd)
+ try:
+ if hasattr(os, 'system') and os.system('more "%s"' % filename) == 0:
+ return lambda text, title='': pipe_pager(text, 'more', title)
+ else:
+ return tty_pager
+ finally:
+ os.unlink(filename)
+
+
+def escape_stdout(text: str) -> str:
+ # Escape non-encodable characters to avoid encoding errors later
+ encoding = getattr(sys.stdout, 'encoding', None) or 'utf-8'
+ return text.encode(encoding, 'backslashreplace').decode(encoding)
+
+
+def escape_less(s: str) -> str:
+ return re.sub(r'([?:.%\\])', r'\\\1', s)
+
+
+def plain(text: str) -> str:
+ """Remove boldface formatting from text."""
+ return re.sub('.\b', '', text)
+
+
+def tty_pager(text: str, title: str = '') -> None:
+ """Page through text on a text terminal."""
+ lines = plain(escape_stdout(text)).split('\n')
+ has_tty = False
+ try:
+ import tty
+ import termios
+ fd = sys.stdin.fileno()
+ old = termios.tcgetattr(fd)
+ tty.setcbreak(fd)
+ has_tty = True
+
+ def getchar() -> str:
+ return sys.stdin.read(1)
+
+ except (ImportError, AttributeError, io.UnsupportedOperation):
+ def getchar() -> str:
+ return sys.stdin.readline()[:-1][:1]
+
+ try:
+ try:
+ h = int(os.environ.get('LINES', 0))
+ except ValueError:
+ h = 0
+ if h <= 1:
+ h = 25
+ r = inc = h - 1
+ sys.stdout.write('\n'.join(lines[:inc]) + '\n')
+ while lines[r:]:
+ sys.stdout.write('-- more --')
+ sys.stdout.flush()
+ c = getchar()
+
+ if c in ('q', 'Q'):
+ sys.stdout.write('\r \r')
+ break
+ elif c in ('\r', '\n'):
+ sys.stdout.write('\r \r' + lines[r] + '\n')
+ r = r + 1
+ continue
+ if c in ('b', 'B', '\x1b'):
+ r = r - inc - inc
+ if r < 0: r = 0
+ sys.stdout.write('\n' + '\n'.join(lines[r:r+inc]) + '\n')
+ r = r + inc
+
+ finally:
+ if has_tty:
+ termios.tcsetattr(fd, termios.TCSAFLUSH, old)
+
+
+def plain_pager(text: str, title: str = '') -> None:
+ """Simply print unformatted text. This is the ultimate fallback."""
+ sys.stdout.write(plain(escape_stdout(text)))
+
+
+def pipe_pager(text: str, cmd: str, title: str = '') -> None:
+ """Page through text by feeding it to another program."""
+ import subprocess
+ env = os.environ.copy()
+ if title:
+ title += ' '
+ esc_title = escape_less(title)
+ prompt_string = (
+ f' {esc_title}' +
+ '?ltline %lt?L/%L.'
+ ':byte %bB?s/%s.'
+ '.'
+ '?e (END):?pB %pB\\%..'
+ ' (press h for help or q to quit)')
+ env['LESS'] = '-RmPm{0}$PM{0}$'.format(prompt_string)
+ proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
+ errors='backslashreplace', env=env)
+ assert proc.stdin is not None
+ try:
+ with proc.stdin as pipe:
+ try:
+ pipe.write(text)
+ except KeyboardInterrupt:
+ # We've hereby abandoned whatever text hasn't been written,
+ # but the pager is still in control of the terminal.
+ pass
+ except OSError:
+ pass # Ignore broken pipes caused by quitting the pager program.
+ while True:
+ try:
+ proc.wait()
+ break
+ except KeyboardInterrupt:
+ # Ignore ctl-c like the pager itself does. Otherwise the pager is
+ # left running and the terminal is in raw mode and unusable.
+ pass
+
+
+def tempfile_pager(text: str, cmd: str, title: str = '') -> None:
+ """Page through text by invoking a program on a temporary file."""
+ import tempfile
+ with tempfile.TemporaryDirectory() as tempdir:
+ filename = os.path.join(tempdir, 'pydoc.out')
+ with open(filename, 'w', errors='backslashreplace',
+ encoding=os.device_encoding(0) if
+ sys.platform == 'win32' else None
+ ) as file:
+ file.write(text)
+ os.system(cmd + ' "' + filename + '"')
diff --git a/contrib/tools/python3/Lib/_pyrepl/reader.py b/contrib/tools/python3/Lib/_pyrepl/reader.py
new file mode 100644
index 00000000000..012871e7650
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/reader.py
@@ -0,0 +1,764 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import sys
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field, fields
+from _colorize import can_colorize, ANSIColors
+
+
+from . import commands, console, input
+from .utils import wlen, unbracket, disp_str
+from .trace import trace
+
+
+# types
+Command = commands.Command
+from .types import Callback, SimpleContextManager, KeySpec, CommandName
+
+
+# syntax classes:
+
+SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
+
+
+def make_default_syntax_table() -> dict[str, int]:
+ # XXX perhaps should use some unicodedata here?
+ st: dict[str, int] = {}
+ for c in map(chr, range(256)):
+ st[c] = SYNTAX_SYMBOL
+ for c in [a for a in map(chr, range(256)) if a.isalnum()]:
+ st[c] = SYNTAX_WORD
+ st["\n"] = st[" "] = SYNTAX_WHITESPACE
+ return st
+
+
+def make_default_commands() -> dict[CommandName, type[Command]]:
+ result: dict[CommandName, type[Command]] = {}
+ for v in vars(commands).values():
+ if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower():
+ result[v.__name__] = v
+ result[v.__name__.replace("_", "-")] = v
+ return result
+
+
+default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
+ [
+ (r"\C-a", "beginning-of-line"),
+ (r"\C-b", "left"),
+ (r"\C-c", "interrupt"),
+ (r"\C-d", "delete"),
+ (r"\C-e", "end-of-line"),
+ (r"\C-f", "right"),
+ (r"\C-g", "cancel"),
+ (r"\C-h", "backspace"),
+ (r"\C-j", "accept"),
+ (r"\<return>", "accept"),
+ (r"\C-k", "kill-line"),
+ (r"\C-l", "clear-screen"),
+ (r"\C-m", "accept"),
+ (r"\C-t", "transpose-characters"),
+ (r"\C-u", "unix-line-discard"),
+ (r"\C-w", "unix-word-rubout"),
+ (r"\C-x\C-u", "upcase-region"),
+ (r"\C-y", "yank"),
+ *(() if sys.platform == "win32" else ((r"\C-z", "suspend"), )),
+ (r"\M-b", "backward-word"),
+ (r"\M-c", "capitalize-word"),
+ (r"\M-d", "kill-word"),
+ (r"\M-f", "forward-word"),
+ (r"\M-l", "downcase-word"),
+ (r"\M-t", "transpose-words"),
+ (r"\M-u", "upcase-word"),
+ (r"\M-y", "yank-pop"),
+ (r"\M--", "digit-arg"),
+ (r"\M-0", "digit-arg"),
+ (r"\M-1", "digit-arg"),
+ (r"\M-2", "digit-arg"),
+ (r"\M-3", "digit-arg"),
+ (r"\M-4", "digit-arg"),
+ (r"\M-5", "digit-arg"),
+ (r"\M-6", "digit-arg"),
+ (r"\M-7", "digit-arg"),
+ (r"\M-8", "digit-arg"),
+ (r"\M-9", "digit-arg"),
+ (r"\M-\n", "accept"),
+ ("\\\\", "self-insert"),
+ (r"\x1b[200~", "enable_bracketed_paste"),
+ (r"\x1b[201~", "disable_bracketed_paste"),
+ (r"\x03", "ctrl-c"),
+ ]
+ + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
+ + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()]
+ + [
+ (r"\<up>", "up"),
+ (r"\<down>", "down"),
+ (r"\<left>", "left"),
+ (r"\C-\<left>", "backward-word"),
+ (r"\<right>", "right"),
+ (r"\C-\<right>", "forward-word"),
+ (r"\<delete>", "delete"),
+ (r"\x1b[3~", "delete"),
+ (r"\<backspace>", "backspace"),
+ (r"\M-\<backspace>", "backward-kill-word"),
+ (r"\<end>", "end-of-line"), # was 'end'
+ (r"\<home>", "beginning-of-line"), # was 'home'
+ (r"\<f1>", "help"),
+ (r"\<f2>", "show-history"),
+ (r"\<f3>", "paste-mode"),
+ (r"\EOF", "end"), # the entries in the terminfo database for xterms
+ (r"\EOH", "home"), # seem to be wrong. this is a less than ideal
+ # workaround
+ ]
+)
+
+
+@dataclass(slots=True)
+class Reader:
+ """The Reader class implements the bare bones of a command reader,
+ handling such details as editing and cursor motion. What it does
+ not support are such things as completion or history support -
+ these are implemented elsewhere.
+
+ Instance variables of note include:
+
+ * buffer:
+ A *list* (*not* a string at the moment :-) containing all the
+ characters that have been entered.
+ * console:
+ Hopefully encapsulates the OS dependent stuff.
+ * pos:
+ A 0-based index into `buffer' for where the insertion point
+ is.
+ * screeninfo:
+ Ahem. This list contains some info needed to move the
+ insertion point around reasonably efficiently.
+ * cxy, lxy:
+ the position of the insertion point in screen ...
+ * syntax_table:
+ Dictionary mapping characters to `syntax class'; read the
+ emacs docs to see what this means :-)
+ * commands:
+ Dictionary mapping command names to command classes.
+ * arg:
+ The emacs-style prefix argument. It will be None if no such
+ argument has been provided.
+ * dirty:
+ True if we need to refresh the display.
+ * kill_ring:
+ The emacs-style kill-ring; manipulated with yank & yank-pop
+ * ps1, ps2, ps3, ps4:
+ prompts. ps1 is the prompt for a one-line input; for a
+ multiline input it looks like:
+ ps2> first line of input goes here
+ ps3> second and further
+ ps3> lines get ps3
+ ...
+ ps4> and the last one gets ps4
+ As with the usual top-level, you can set these to instances if
+ you like; str() will be called on them (once) at the beginning
+ of each command. Don't put really long or newline containing
+ strings here, please!
+ This is just the default policy; you can change it freely by
+ overriding get_prompt() (and indeed some standard subclasses
+ do).
+ * finished:
+ handle1 will set this to a true value if a command signals
+ that we're done.
+ """
+
+ console: console.Console
+
+ ## state
+ buffer: list[str] = field(default_factory=list)
+ pos: int = 0
+ ps1: str = "->> "
+ ps2: str = "/>> "
+ ps3: str = "|.. "
+ ps4: str = R"\__ "
+ kill_ring: list[list[str]] = field(default_factory=list)
+ msg: str = ""
+ arg: int | None = None
+ dirty: bool = False
+ finished: bool = False
+ paste_mode: bool = False
+ in_bracketed_paste: bool = False
+ commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
+ last_command: type[Command] | None = None
+ syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
+ keymap: tuple[tuple[str, str], ...] = ()
+ input_trans: input.KeymapTranslator = field(init=False)
+ input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
+ screen: list[str] = field(default_factory=list)
+ screeninfo: list[tuple[int, list[int]]] = field(init=False)
+ cxy: tuple[int, int] = field(init=False)
+ lxy: tuple[int, int] = field(init=False)
+ scheduled_commands: list[str] = field(default_factory=list)
+ can_colorize: bool = False
+ threading_hook: Callback | None = None
+
+ ## cached metadata to speed up screen refreshes
+ @dataclass
+ class RefreshCache:
+ in_bracketed_paste: bool = False
+ screen: list[str] = field(default_factory=list)
+ screeninfo: list[tuple[int, list[int]]] = field(init=False)
+ line_end_offsets: list[int] = field(default_factory=list)
+ pos: int = field(init=False)
+ cxy: tuple[int, int] = field(init=False)
+ dimensions: tuple[int, int] = field(init=False)
+ invalidated: bool = False
+
+ def update_cache(self,
+ reader: Reader,
+ screen: list[str],
+ screeninfo: list[tuple[int, list[int]]],
+ ) -> None:
+ self.in_bracketed_paste = reader.in_bracketed_paste
+ self.screen = screen.copy()
+ self.screeninfo = screeninfo.copy()
+ self.pos = reader.pos
+ self.cxy = reader.cxy
+ self.dimensions = reader.console.width, reader.console.height
+ self.invalidated = False
+
+ def valid(self, reader: Reader) -> bool:
+ if self.invalidated:
+ return False
+ dimensions = reader.console.width, reader.console.height
+ dimensions_changed = dimensions != self.dimensions
+ paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
+ return not (dimensions_changed or paste_changed)
+
+ def get_cached_location(self, reader: Reader) -> tuple[int, int]:
+ if self.invalidated:
+ raise ValueError("Cache is invalidated")
+ offset = 0
+ earliest_common_pos = min(reader.pos, self.pos)
+ num_common_lines = len(self.line_end_offsets)
+ while num_common_lines > 0:
+ offset = self.line_end_offsets[num_common_lines - 1]
+ if earliest_common_pos > offset:
+ break
+ num_common_lines -= 1
+ else:
+ offset = 0
+ return offset, num_common_lines
+
+ last_refresh_cache: RefreshCache = field(default_factory=RefreshCache)
+
+ def __post_init__(self) -> None:
+ # Enable the use of `insert` without a `prepare` call - necessary to
+ # facilitate the tab completion hack implemented for
+ # <https://bugs.python.org/issue25660>.
+ self.keymap = self.collect_keymap()
+ self.input_trans = input.KeymapTranslator(
+ self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
+ )
+ self.screeninfo = [(0, [])]
+ self.cxy = self.pos2xy()
+ self.lxy = (self.pos, 0)
+ self.can_colorize = can_colorize()
+
+ self.last_refresh_cache.screeninfo = self.screeninfo
+ self.last_refresh_cache.pos = self.pos
+ self.last_refresh_cache.cxy = self.cxy
+ self.last_refresh_cache.dimensions = (0, 0)
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return default_keymap
+
+ def calc_screen(self) -> list[str]:
+ """Translate changes in self.buffer into changes in self.console.screen."""
+ # Since the last call to calc_screen:
+ # screen and screeninfo may differ due to a completion menu being shown
+ # pos and cxy may differ due to edits, cursor movements, or completion menus
+
+ # Lines that are above both the old and new cursor position can't have changed,
+ # unless the terminal has been resized (which might cause reflowing) or we've
+ # entered or left paste mode (which changes prompts, causing reflowing).
+ num_common_lines = 0
+ offset = 0
+ if self.last_refresh_cache.valid(self):
+ offset, num_common_lines = self.last_refresh_cache.get_cached_location(self)
+
+ screen = self.last_refresh_cache.screen
+ del screen[num_common_lines:]
+
+ screeninfo = self.last_refresh_cache.screeninfo
+ del screeninfo[num_common_lines:]
+
+ last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets
+ del last_refresh_line_end_offsets[num_common_lines:]
+
+ pos = self.pos
+ pos -= offset
+
+ prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
+ lines = "".join(self.buffer[offset:]).split("\n")
+ cursor_found = False
+ lines_beyond_cursor = 0
+ for ln, line in enumerate(lines, num_common_lines):
+ line_len = len(line)
+ if 0 <= pos <= line_len:
+ self.lxy = pos, ln
+ cursor_found = True
+ elif cursor_found:
+ lines_beyond_cursor += 1
+ if lines_beyond_cursor > self.console.height:
+ # No need to keep formatting lines.
+ # The console can't show them.
+ break
+ if prompt_from_cache:
+ # Only the first line's prompt can come from the cache
+ prompt_from_cache = False
+ prompt = ""
+ else:
+ prompt = self.get_prompt(ln, line_len >= pos >= 0)
+ while "\n" in prompt:
+ pre_prompt, _, prompt = prompt.partition("\n")
+ last_refresh_line_end_offsets.append(offset)
+ screen.append(pre_prompt)
+ screeninfo.append((0, []))
+ pos -= line_len + 1
+ prompt, prompt_len = self.process_prompt(prompt)
+ chars, char_widths = disp_str(line)
+ wrapcount = (sum(char_widths) + prompt_len) // self.console.width
+ trace("wrapcount = {wrapcount}", wrapcount=wrapcount)
+ if wrapcount == 0 or not char_widths:
+ offset += line_len + 1 # Takes all of the line plus the newline
+ last_refresh_line_end_offsets.append(offset)
+ screen.append(prompt + "".join(chars))
+ screeninfo.append((prompt_len, char_widths))
+ else:
+ pre = prompt
+ prelen = prompt_len
+ for wrap in range(wrapcount + 1):
+ index_to_wrap_before = 0
+ column = 0
+ for char_width in char_widths:
+ if column + char_width + prelen >= self.console.width:
+ break
+ index_to_wrap_before += 1
+ column += char_width
+ if len(chars) > index_to_wrap_before:
+ offset += index_to_wrap_before
+ post = "\\"
+ after = [1]
+ else:
+ offset += index_to_wrap_before + 1 # Takes the newline
+ post = ""
+ after = []
+ last_refresh_line_end_offsets.append(offset)
+ render = pre + "".join(chars[:index_to_wrap_before]) + post
+ render_widths = char_widths[:index_to_wrap_before] + after
+ screen.append(render)
+ screeninfo.append((prelen, render_widths))
+ chars = chars[index_to_wrap_before:]
+ char_widths = char_widths[index_to_wrap_before:]
+ pre = ""
+ prelen = 0
+ self.screeninfo = screeninfo
+ self.cxy = self.pos2xy()
+ if self.msg:
+ for mline in self.msg.split("\n"):
+ screen.append(mline)
+ screeninfo.append((0, []))
+
+ self.last_refresh_cache.update_cache(self, screen, screeninfo)
+ return screen
+
+ @staticmethod
+ def process_prompt(prompt: str) -> tuple[str, int]:
+ r"""Return a tuple with the prompt string and its visible length.
+
+ The prompt string has the zero-width brackets recognized by shells
+ (\x01 and \x02) removed. The length ignores anything between those
+ brackets as well as any ANSI escape sequences.
+ """
+ out_prompt = unbracket(prompt, including_content=False)
+ visible_prompt = unbracket(prompt, including_content=True)
+ return out_prompt, wlen(visible_prompt)
+
+ def bow(self, p: int | None = None) -> int:
+ """Return the 0-based index of the word break preceding p most
+ immediately.
+
+ p defaults to self.pos; word boundaries are determined using
+ self.syntax_table."""
+ if p is None:
+ p = self.pos
+ st = self.syntax_table
+ b = self.buffer
+ p -= 1
+ while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
+ p -= 1
+ while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
+ p -= 1
+ return p + 1
+
+ def eow(self, p: int | None = None) -> int:
+ """Return the 0-based index of the word break following p most
+ immediately.
+
+ p defaults to self.pos; word boundaries are determined using
+ self.syntax_table."""
+ if p is None:
+ p = self.pos
+ st = self.syntax_table
+ b = self.buffer
+ while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
+ p += 1
+ while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
+ p += 1
+ return p
+
+ def bol(self, p: int | None = None) -> int:
+ """Return the 0-based index of the line break preceding p most
+ immediately.
+
+ p defaults to self.pos."""
+ if p is None:
+ p = self.pos
+ b = self.buffer
+ p -= 1
+ while p >= 0 and b[p] != "\n":
+ p -= 1
+ return p + 1
+
+ def eol(self, p: int | None = None) -> int:
+ """Return the 0-based index of the line break following p most
+ immediately.
+
+ p defaults to self.pos."""
+ if p is None:
+ p = self.pos
+ b = self.buffer
+ while p < len(b) and b[p] != "\n":
+ p += 1
+ return p
+
+ def max_column(self, y: int) -> int:
+ """Return the last x-offset for line y"""
+ return self.screeninfo[y][0] + sum(self.screeninfo[y][1])
+
+ def max_row(self) -> int:
+ return len(self.screeninfo) - 1
+
+ def get_arg(self, default: int = 1) -> int:
+ """Return any prefix argument that the user has supplied,
+ returning `default' if there is None. Defaults to 1.
+ """
+ if self.arg is None:
+ return default
+ return self.arg
+
+ def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
+ """Return what should be in the left-hand margin for line
+ `lineno'."""
+ if self.arg is not None and cursor_on_line:
+ prompt = f"(arg: {self.arg}) "
+ elif self.paste_mode and not self.in_bracketed_paste:
+ prompt = "(paste) "
+ elif "\n" in self.buffer:
+ if lineno == 0:
+ prompt = self.ps2
+ elif self.ps4 and lineno == self.buffer.count("\n"):
+ prompt = self.ps4
+ else:
+ prompt = self.ps3
+ else:
+ prompt = self.ps1
+
+ if self.can_colorize:
+ prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
+ return prompt
+
+ def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
+ self.input_trans_stack.append(self.input_trans)
+ self.input_trans = itrans
+
+ def pop_input_trans(self) -> None:
+ self.input_trans = self.input_trans_stack.pop()
+
+ def setpos_from_xy(self, x: int, y: int) -> None:
+ """Set pos according to coordinates x, y"""
+ pos = 0
+ i = 0
+ while i < y:
+ prompt_len, char_widths = self.screeninfo[i]
+ offset = len(char_widths)
+ in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width
+ if in_wrapped_line:
+ pos += offset - 1 # -1 cause backslash is not in buffer
+ else:
+ pos += offset + 1 # +1 cause newline is in buffer
+ i += 1
+
+ j = 0
+ cur_x = self.screeninfo[i][0]
+ while cur_x < x:
+ if self.screeninfo[i][1][j] == 0:
+ j += 1 # prevent potential future infinite loop
+ continue
+ cur_x += self.screeninfo[i][1][j]
+ j += 1
+ pos += 1
+
+ self.pos = pos
+
+ def pos2xy(self) -> tuple[int, int]:
+ """Return the x, y coordinates of position 'pos'."""
+
+ prompt_len, y = 0, 0
+ char_widths: list[int] = []
+ pos = self.pos
+ assert 0 <= pos <= len(self.buffer)
+
+ # optimize for the common case: typing at the end of the buffer
+ if pos == len(self.buffer) and len(self.screeninfo) > 0:
+ y = len(self.screeninfo) - 1
+ prompt_len, char_widths = self.screeninfo[y]
+ return prompt_len + sum(char_widths), y
+
+ for prompt_len, char_widths in self.screeninfo:
+ offset = len(char_widths)
+ in_wrapped_line = prompt_len + sum(char_widths) >= self.console.width
+ if in_wrapped_line:
+ offset -= 1 # need to remove line-wrapping backslash
+
+ if offset >= pos:
+ break
+
+ if not in_wrapped_line:
+ offset += 1 # there's a newline in buffer
+
+ pos -= offset
+ y += 1
+ return prompt_len + sum(char_widths[:pos]), y
+
+ def insert(self, text: str | list[str]) -> None:
+ """Insert 'text' at the insertion point."""
+ self.buffer[self.pos : self.pos] = list(text)
+ self.pos += len(text)
+ self.dirty = True
+
+ def update_cursor(self) -> None:
+ """Move the cursor to reflect changes in self.pos"""
+ self.cxy = self.pos2xy()
+ self.console.move_cursor(*self.cxy)
+
+ def after_command(self, cmd: Command) -> None:
+ """This function is called to allow post command cleanup."""
+ if getattr(cmd, "kills_digit_arg", True):
+ if self.arg is not None:
+ self.dirty = True
+ self.arg = None
+
+ def prepare(self) -> None:
+ """Get ready to run. Call restore when finished. You must not
+ write to the console in between the calls to prepare and
+ restore."""
+ try:
+ self.console.prepare()
+ self.arg = None
+ self.finished = False
+ del self.buffer[:]
+ self.pos = 0
+ self.dirty = True
+ self.last_command = None
+ self.calc_screen()
+ except BaseException:
+ self.restore()
+ raise
+
+ while self.scheduled_commands:
+ cmd = self.scheduled_commands.pop()
+ self.do_cmd((cmd, []))
+
+ def last_command_is(self, cls: type) -> bool:
+ if not self.last_command:
+ return False
+ return issubclass(cls, self.last_command)
+
+ def restore(self) -> None:
+ """Clean up after a run."""
+ self.console.restore()
+
+ @contextmanager
+ def suspend(self) -> SimpleContextManager:
+ """A context manager to delegate to another reader."""
+ prev_state = {f.name: getattr(self, f.name) for f in fields(self)}
+ try:
+ self.restore()
+ yield
+ finally:
+ for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
+ setattr(self, arg, prev_state[arg])
+ self.prepare()
+
+ def finish(self) -> None:
+ """Called when a command signals that we're finished."""
+ pass
+
+ def error(self, msg: str = "none") -> None:
+ self.msg = "! " + msg + " "
+ self.dirty = True
+ self.console.beep()
+
+ def update_screen(self) -> None:
+ if self.dirty:
+ self.refresh()
+
+ def refresh(self) -> None:
+ """Recalculate and refresh the screen."""
+ if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
+ return
+
+ # this call sets up self.cxy, so call it first.
+ self.screen = self.calc_screen()
+ self.console.refresh(self.screen, self.cxy)
+ self.dirty = False
+
+ def do_cmd(self, cmd: tuple[str, list[str]]) -> None:
+ """`cmd` is a tuple of "event_name" and "event", which in the current
+ implementation is always just the "buffer" which happens to be a list
+ of single-character strings."""
+
+ trace("received command {cmd}", cmd=cmd)
+ if isinstance(cmd[0], str):
+ command_type = self.commands.get(cmd[0], commands.invalid_command)
+ elif isinstance(cmd[0], type):
+ command_type = cmd[0]
+ else:
+ return # nothing to do
+
+ command = command_type(self, *cmd) # type: ignore[arg-type]
+ command.do()
+
+ self.after_command(command)
+
+ if self.dirty:
+ self.refresh()
+ else:
+ self.update_cursor()
+
+ if not isinstance(cmd, commands.digit_arg):
+ self.last_command = command_type
+
+ self.finished = bool(command.finish)
+ if self.finished:
+ self.console.finish()
+ self.finish()
+
+ def run_hooks(self) -> None:
+ threading_hook = self.threading_hook
+ if threading_hook is None and 'threading' in sys.modules:
+ from ._threading_handler import install_threading_hook
+ install_threading_hook(self)
+ if threading_hook is not None:
+ try:
+ threading_hook()
+ except Exception:
+ pass
+
+ input_hook = self.console.input_hook
+ if input_hook:
+ try:
+ input_hook()
+ except Exception:
+ pass
+
+ def handle1(self, block: bool = True) -> bool:
+ """Handle a single event. Wait as long as it takes if block
+ is true (the default), otherwise return False if no event is
+ pending."""
+
+ if self.msg:
+ self.msg = ""
+ self.dirty = True
+
+ while True:
+ # We use the same timeout as in readline.c: 100ms
+ self.run_hooks()
+ self.console.wait(100)
+ event = self.console.get_event(block=False)
+ if not event:
+ if block:
+ continue
+ return False
+
+ translate = True
+
+ if event.evt == "key":
+ self.input_trans.push(event)
+ elif event.evt == "scroll":
+ self.refresh()
+ elif event.evt == "resize":
+ self.refresh()
+ else:
+ translate = False
+
+ if translate:
+ cmd = self.input_trans.get()
+ else:
+ cmd = [event.evt, event.data]
+
+ if cmd is None:
+ if block:
+ continue
+ return False
+
+ self.do_cmd(cmd)
+ return True
+
+ def push_char(self, char: int | bytes) -> None:
+ self.console.push_char(char)
+ self.handle1(block=False)
+
+ def readline(self, startup_hook: Callback | None = None) -> str:
+ """Read a line. The implementation of this method also shows
+ how to drive Reader if you want more control over the event
+ loop."""
+ self.prepare()
+ try:
+ if startup_hook is not None:
+ startup_hook()
+ self.refresh()
+ while not self.finished:
+ self.handle1()
+ return self.get_unicode()
+
+ finally:
+ self.restore()
+
+ def bind(self, spec: KeySpec, command: CommandName) -> None:
+ self.keymap = self.keymap + ((spec, command),)
+ self.input_trans = input.KeymapTranslator(
+ self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
+ )
+
+ def get_unicode(self) -> str:
+ """Return the current buffer as a unicode string."""
+ return "".join(self.buffer)
diff --git a/contrib/tools/python3/Lib/_pyrepl/readline.py b/contrib/tools/python3/Lib/_pyrepl/readline.py
new file mode 100644
index 00000000000..f802a952690
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/readline.py
@@ -0,0 +1,599 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Alex Gaynor
+# Antonio Cuni
+# Armin Rigo
+# Holger Krekel
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""A compatibility wrapper reimplementing the 'readline' standard module
+on top of pyrepl. Not all functionalities are supported. Contains
+extensions for multiline input.
+"""
+
+from __future__ import annotations
+
+import warnings
+from dataclasses import dataclass, field
+
+import os
+from site import gethistoryfile
+import sys
+from rlcompleter import Completer as RLCompleter
+
+from . import commands, historical_reader
+from .completing_reader import CompletingReader
+from .console import Console as ConsoleType
+
+Console: type[ConsoleType]
+_error: tuple[type[Exception], ...] | type[Exception]
+
+if os.name == "nt":
+ from .windows_console import WindowsConsole as Console, _error
+else:
+ from .unix_console import UnixConsole as Console, _error
+
+ENCODING = sys.getdefaultencoding() or "latin1"
+
+
+# types
+Command = commands.Command
+from collections.abc import Callable, Collection
+from .types import Callback, Completer, KeySpec, CommandName
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import Any, Mapping
+
+
+MoreLinesCallable = Callable[[str], bool]
+
+
+__all__ = [
+ "add_history",
+ "clear_history",
+ "get_begidx",
+ "get_completer",
+ "get_completer_delims",
+ "get_current_history_length",
+ "get_endidx",
+ "get_history_item",
+ "get_history_length",
+ "get_line_buffer",
+ "insert_text",
+ "parse_and_bind",
+ "read_history_file",
+ # "read_init_file",
+ # "redisplay",
+ "remove_history_item",
+ "replace_history_item",
+ "set_auto_history",
+ "set_completer",
+ "set_completer_delims",
+ "set_history_length",
+ # "set_pre_input_hook",
+ "set_startup_hook",
+ "write_history_file",
+ # ---- multiline extensions ----
+ "multiline_input",
+]
+
+# ____________________________________________________________
+
+@dataclass
+class ReadlineConfig:
+ readline_completer: Completer | None = None
+ completer_delims: frozenset[str] = frozenset(" \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?")
+
+
+@dataclass(kw_only=True)
+class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader):
+ # Class fields
+ assume_immutable_completions = False
+ use_brackets = False
+ sort_in_column = True
+
+ # Instance fields
+ config: ReadlineConfig
+ more_lines: MoreLinesCallable | None = None
+ last_used_indentation: str | None = None
+
+ def __post_init__(self) -> None:
+ super().__post_init__()
+ self.commands["maybe_accept"] = maybe_accept
+ self.commands["maybe-accept"] = maybe_accept
+ self.commands["backspace_dedent"] = backspace_dedent
+ self.commands["backspace-dedent"] = backspace_dedent
+
+ def error(self, msg: str = "none") -> None:
+ pass # don't show error messages by default
+
+ def get_stem(self) -> str:
+ b = self.buffer
+ p = self.pos - 1
+ completer_delims = self.config.completer_delims
+ while p >= 0 and b[p] not in completer_delims:
+ p -= 1
+ return "".join(b[p + 1 : self.pos])
+
+ def get_completions(self, stem: str) -> list[str]:
+ if len(stem) == 0 and self.more_lines is not None:
+ b = self.buffer
+ p = self.pos
+ while p > 0 and b[p - 1] != "\n":
+ p -= 1
+ num_spaces = 4 - ((self.pos - p) % 4)
+ return [" " * num_spaces]
+ result = []
+ function = self.config.readline_completer
+ if function is not None:
+ try:
+ stem = str(stem) # rlcompleter.py seems to not like unicode
+ except UnicodeEncodeError:
+ pass # but feed unicode anyway if we have no choice
+ state = 0
+ while True:
+ try:
+ next = function(stem, state)
+ except Exception:
+ break
+ if not isinstance(next, str):
+ break
+ result.append(next)
+ state += 1
+ # emulate the behavior of the standard readline that sorts
+ # the completions before displaying them.
+ result.sort()
+ return result
+
+ def get_trimmed_history(self, maxlength: int) -> list[str]:
+ if maxlength >= 0:
+ cut = len(self.history) - maxlength
+ if cut < 0:
+ cut = 0
+ else:
+ cut = 0
+ return self.history[cut:]
+
+ def update_last_used_indentation(self) -> None:
+ indentation = _get_first_indentation(self.buffer)
+ if indentation is not None:
+ self.last_used_indentation = indentation
+
+ # --- simplified support for reading multiline Python statements ---
+
+ def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
+ return super().collect_keymap() + (
+ (r"\n", "maybe-accept"),
+ (r"\<backspace>", "backspace-dedent"),
+ )
+
+ def after_command(self, cmd: Command) -> None:
+ super().after_command(cmd)
+ if self.more_lines is None:
+ # Force single-line input if we are in raw_input() mode.
+ # Although there is no direct way to add a \n in this mode,
+ # multiline buffers can still show up using various
+ # commands, e.g. navigating the history.
+ try:
+ index = self.buffer.index("\n")
+ except ValueError:
+ pass
+ else:
+ self.buffer = self.buffer[:index]
+ if self.pos > len(self.buffer):
+ self.pos = len(self.buffer)
+
+
+def set_auto_history(_should_auto_add_history: bool) -> None:
+ """Enable or disable automatic history"""
+ historical_reader.should_auto_add_history = bool(_should_auto_add_history)
+
+
+def _get_this_line_indent(buffer: list[str], pos: int) -> int:
+ indent = 0
+ while pos > 0 and buffer[pos - 1] in " \t":
+ indent += 1
+ pos -= 1
+ if pos > 0 and buffer[pos - 1] == "\n":
+ return indent
+ return 0
+
+
+def _get_previous_line_indent(buffer: list[str], pos: int) -> tuple[int, int | None]:
+ prevlinestart = pos
+ while prevlinestart > 0 and buffer[prevlinestart - 1] != "\n":
+ prevlinestart -= 1
+ prevlinetext = prevlinestart
+ while prevlinetext < pos and buffer[prevlinetext] in " \t":
+ prevlinetext += 1
+ if prevlinetext == pos:
+ indent = None
+ else:
+ indent = prevlinetext - prevlinestart
+ return prevlinestart, indent
+
+
+def _get_first_indentation(buffer: list[str]) -> str | None:
+ indented_line_start = None
+ for i in range(len(buffer)):
+ if (i < len(buffer) - 1
+ and buffer[i] == "\n"
+ and buffer[i + 1] in " \t"
+ ):
+ indented_line_start = i + 1
+ elif indented_line_start is not None and buffer[i] not in " \t\n":
+ return ''.join(buffer[indented_line_start : i])
+ return None
+
+
+def _should_auto_indent(buffer: list[str], pos: int) -> bool:
+ # check if last character before "pos" is a colon, ignoring
+ # whitespaces and comments.
+ last_char = None
+ while pos > 0:
+ pos -= 1
+ if last_char is None:
+ if buffer[pos] not in " \t\n#": # ignore whitespaces and comments
+ last_char = buffer[pos]
+ else:
+ # even if we found a non-whitespace character before
+ # original pos, we keep going back until newline is reached
+ # to make sure we ignore comments
+ if buffer[pos] == "\n":
+ break
+ if buffer[pos] == "#":
+ last_char = None
+ return last_char == ":"
+
+
+class maybe_accept(commands.Command):
+ def do(self) -> None:
+ r: ReadlineAlikeReader
+ r = self.reader # type: ignore[assignment]
+ r.dirty = True # this is needed to hide the completion menu, if visible
+
+ if self.reader.in_bracketed_paste:
+ r.insert("\n")
+ return
+
+ # if there are already several lines and the cursor
+ # is not on the last one, always insert a new \n.
+ text = r.get_unicode()
+
+ if "\n" in r.buffer[r.pos :] or (
+ r.more_lines is not None and r.more_lines(text)
+ ):
+ def _newline_before_pos():
+ before_idx = r.pos - 1
+ while before_idx > 0 and text[before_idx].isspace():
+ before_idx -= 1
+ return text[before_idx : r.pos].count("\n") > 0
+
+ # if there's already a new line before the cursor then
+ # even if the cursor is followed by whitespace, we assume
+ # the user is trying to terminate the block
+ if _newline_before_pos() and text[r.pos:].isspace():
+ self.finish = True
+ return
+
+ # auto-indent the next line like the previous line
+ prevlinestart, indent = _get_previous_line_indent(r.buffer, r.pos)
+ r.insert("\n")
+ if not self.reader.paste_mode:
+ if indent:
+ for i in range(prevlinestart, prevlinestart + indent):
+ r.insert(r.buffer[i])
+ r.update_last_used_indentation()
+ if _should_auto_indent(r.buffer, r.pos):
+ if r.last_used_indentation is not None:
+ indentation = r.last_used_indentation
+ else:
+ # default
+ indentation = " " * 4
+ r.insert(indentation)
+ elif not self.reader.paste_mode:
+ self.finish = True
+ else:
+ r.insert("\n")
+
+
+class backspace_dedent(commands.Command):
+ def do(self) -> None:
+ r = self.reader
+ b = r.buffer
+ if r.pos > 0:
+ repeat = 1
+ if b[r.pos - 1] != "\n":
+ indent = _get_this_line_indent(b, r.pos)
+ if indent > 0:
+ ls = r.pos - indent
+ while ls > 0:
+ ls, pi = _get_previous_line_indent(b, ls - 1)
+ if pi is not None and pi < indent:
+ repeat = indent - pi
+ break
+ r.pos -= repeat
+ del b[r.pos : r.pos + repeat]
+ r.dirty = True
+ else:
+ self.reader.error("can't backspace at start")
+
+
+# ____________________________________________________________
+
+
+@dataclass(slots=True)
+class _ReadlineWrapper:
+ f_in: int = -1
+ f_out: int = -1
+ reader: ReadlineAlikeReader | None = field(default=None, repr=False)
+ saved_history_length: int = -1
+ startup_hook: Callback | None = None
+ config: ReadlineConfig = field(default_factory=ReadlineConfig, repr=False)
+
+ def __post_init__(self) -> None:
+ if self.f_in == -1:
+ self.f_in = os.dup(0)
+ if self.f_out == -1:
+ self.f_out = os.dup(1)
+
+ def get_reader(self) -> ReadlineAlikeReader:
+ if self.reader is None:
+ console = Console(self.f_in, self.f_out, encoding=ENCODING)
+ self.reader = ReadlineAlikeReader(console=console, config=self.config)
+ return self.reader
+
+ def input(self, prompt: object = "") -> str:
+ try:
+ reader = self.get_reader()
+ except _error:
+ assert raw_input is not None
+ return raw_input(prompt)
+ prompt_str = str(prompt)
+ reader.ps1 = prompt_str
+ sys.audit("builtins.input", prompt_str)
+ result = reader.readline(startup_hook=self.startup_hook)
+ sys.audit("builtins.input/result", result)
+ return result
+
+ def multiline_input(self, more_lines: MoreLinesCallable, ps1: str, ps2: str) -> str:
+ """Read an input on possibly multiple lines, asking for more
+ lines as long as 'more_lines(unicodetext)' returns an object whose
+ boolean value is true.
+ """
+ reader = self.get_reader()
+ saved = reader.more_lines
+ try:
+ reader.more_lines = more_lines
+ reader.ps1 = ps1
+ reader.ps2 = ps1
+ reader.ps3 = ps2
+ reader.ps4 = ""
+ with warnings.catch_warnings(action="ignore"):
+ return reader.readline()
+ finally:
+ reader.more_lines = saved
+ reader.paste_mode = False
+
+ def parse_and_bind(self, string: str) -> None:
+ pass # XXX we don't support parsing GNU-readline-style init files
+
+ def set_completer(self, function: Completer | None = None) -> None:
+ self.config.readline_completer = function
+
+ def get_completer(self) -> Completer | None:
+ return self.config.readline_completer
+
+ def set_completer_delims(self, delimiters: Collection[str]) -> None:
+ self.config.completer_delims = frozenset(delimiters)
+
+ def get_completer_delims(self) -> str:
+ return "".join(sorted(self.config.completer_delims))
+
+ def _histline(self, line: str) -> str:
+ line = line.rstrip("\n")
+ return line
+
+ def get_history_length(self) -> int:
+ return self.saved_history_length
+
+ def set_history_length(self, length: int) -> None:
+ self.saved_history_length = length
+
+ def get_current_history_length(self) -> int:
+ return len(self.get_reader().history)
+
+ def read_history_file(self, filename: str = gethistoryfile()) -> None:
+ # multiline extension (really a hack) for the end of lines that
+ # are actually continuations inside a single multiline_input()
+ # history item: we use \r\n instead of just \n. If the history
+ # file is passed to GNU readline, the extra \r are just ignored.
+ history = self.get_reader().history
+
+ with open(os.path.expanduser(filename), 'rb') as f:
+ is_editline = f.readline().startswith(b"_HiStOrY_V2_")
+ if is_editline:
+ encoding = "unicode-escape"
+ else:
+ f.seek(0)
+ encoding = "utf-8"
+
+ lines = [line.decode(encoding, errors='replace') for line in f.read().split(b'\n')]
+ buffer = []
+ for line in lines:
+ if line.endswith("\r"):
+ buffer.append(line+'\n')
+ else:
+ line = self._histline(line)
+ if buffer:
+ line = self._histline("".join(buffer).replace("\r", "") + line)
+ del buffer[:]
+ if line:
+ history.append(line)
+
+ def write_history_file(self, filename: str = gethistoryfile()) -> None:
+ maxlength = self.saved_history_length
+ history = self.get_reader().get_trimmed_history(maxlength)
+ f = open(os.path.expanduser(filename), "w",
+ encoding="utf-8", newline="\n")
+ with f:
+ for entry in history:
+ entry = entry.replace("\n", "\r\n") # multiline history support
+ f.write(entry + "\n")
+
+ def clear_history(self) -> None:
+ del self.get_reader().history[:]
+
+ def get_history_item(self, index: int) -> str | None:
+ history = self.get_reader().history
+ if 1 <= index <= len(history):
+ return history[index - 1]
+ else:
+ return None # like readline.c
+
+ def remove_history_item(self, index: int) -> None:
+ history = self.get_reader().history
+ if 0 <= index < len(history):
+ del history[index]
+ else:
+ raise ValueError("No history item at position %d" % index)
+ # like readline.c
+
+ def replace_history_item(self, index: int, line: str) -> None:
+ history = self.get_reader().history
+ if 0 <= index < len(history):
+ history[index] = self._histline(line)
+ else:
+ raise ValueError("No history item at position %d" % index)
+ # like readline.c
+
+ def add_history(self, line: str) -> None:
+ self.get_reader().history.append(self._histline(line))
+
+ def set_startup_hook(self, function: Callback | None = None) -> None:
+ self.startup_hook = function
+
+ def get_line_buffer(self) -> str:
+ return self.get_reader().get_unicode()
+
+ def _get_idxs(self) -> tuple[int, int]:
+ start = cursor = self.get_reader().pos
+ buf = self.get_line_buffer()
+ for i in range(cursor - 1, -1, -1):
+ if buf[i] in self.get_completer_delims():
+ break
+ start = i
+ return start, cursor
+
+ def get_begidx(self) -> int:
+ return self._get_idxs()[0]
+
+ def get_endidx(self) -> int:
+ return self._get_idxs()[1]
+
+ def insert_text(self, text: str) -> None:
+ self.get_reader().insert(text)
+
+
+_wrapper = _ReadlineWrapper()
+
+# ____________________________________________________________
+# Public API
+
+parse_and_bind = _wrapper.parse_and_bind
+set_completer = _wrapper.set_completer
+get_completer = _wrapper.get_completer
+set_completer_delims = _wrapper.set_completer_delims
+get_completer_delims = _wrapper.get_completer_delims
+get_history_length = _wrapper.get_history_length
+set_history_length = _wrapper.set_history_length
+get_current_history_length = _wrapper.get_current_history_length
+read_history_file = _wrapper.read_history_file
+write_history_file = _wrapper.write_history_file
+clear_history = _wrapper.clear_history
+get_history_item = _wrapper.get_history_item
+remove_history_item = _wrapper.remove_history_item
+replace_history_item = _wrapper.replace_history_item
+add_history = _wrapper.add_history
+set_startup_hook = _wrapper.set_startup_hook
+get_line_buffer = _wrapper.get_line_buffer
+get_begidx = _wrapper.get_begidx
+get_endidx = _wrapper.get_endidx
+insert_text = _wrapper.insert_text
+
+# Extension
+multiline_input = _wrapper.multiline_input
+
+# Internal hook
+_get_reader = _wrapper.get_reader
+
+# ____________________________________________________________
+# Stubs
+
+
+def _make_stub(_name: str, _ret: object) -> None:
+ def stub(*args: object, **kwds: object) -> None:
+ import warnings
+
+ warnings.warn("readline.%s() not implemented" % _name, stacklevel=2)
+
+ stub.__name__ = _name
+ globals()[_name] = stub
+
+
+for _name, _ret in [
+ ("read_init_file", None),
+ ("redisplay", None),
+ ("set_pre_input_hook", None),
+]:
+ assert _name not in globals(), _name
+ _make_stub(_name, _ret)
+
+# ____________________________________________________________
+
+
+def _setup(namespace: Mapping[str, Any]) -> None:
+ global raw_input
+ if raw_input is not None:
+ return # don't run _setup twice
+
+ try:
+ f_in = sys.stdin.fileno()
+ f_out = sys.stdout.fileno()
+ except (AttributeError, ValueError):
+ return
+ if not os.isatty(f_in) or not os.isatty(f_out):
+ return
+
+ _wrapper.f_in = f_in
+ _wrapper.f_out = f_out
+
+ # set up namespace in rlcompleter, which requires it to be a bona fide dict
+ if not isinstance(namespace, dict):
+ namespace = dict(namespace)
+ _wrapper.config.readline_completer = RLCompleter(namespace).complete
+
+ # this is not really what readline.c does. Better than nothing I guess
+ import builtins
+ raw_input = builtins.input
+ builtins.input = _wrapper.input
+
+
+raw_input: Callable[[object], str] | None = None
diff --git a/contrib/tools/python3/Lib/_pyrepl/simple_interact.py b/contrib/tools/python3/Lib/_pyrepl/simple_interact.py
new file mode 100644
index 00000000000..8fb2359fb51
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/simple_interact.py
@@ -0,0 +1,180 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""This is an alternative to python_reader which tries to emulate
+the CPython prompt as closely as possible, with the exception of
+allowing multiline input and multiline history entries.
+"""
+
+from __future__ import annotations
+
+import _sitebuiltins
+import functools
+import os
+import sys
+import code
+
+from .readline import _get_reader, multiline_input
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import Any
+
+
+_error: tuple[type[Exception], ...] | type[Exception]
+try:
+ from .unix_console import _error
+except ModuleNotFoundError:
+ from .windows_console import _error
+
+def check() -> str:
+ """Returns the error message if there is a problem initializing the state."""
+ try:
+ _get_reader()
+ except _error as e:
+ if term := os.environ.get("TERM", ""):
+ term = f"; TERM={term}"
+ return str(str(e) or repr(e) or "unknown error") + term
+ return ""
+
+
+def _strip_final_indent(text: str) -> str:
+ # kill spaces and tabs at the end, but only if they follow '\n'.
+ # meant to remove the auto-indentation only (although it would of
+ # course also remove explicitly-added indentation).
+ short = text.rstrip(" \t")
+ n = len(short)
+ if n > 0 and text[n - 1] == "\n":
+ return short
+ return text
+
+
+def _clear_screen():
+ reader = _get_reader()
+ reader.scheduled_commands.append("clear_screen")
+
+
+REPL_COMMANDS = {
+ "exit": _sitebuiltins.Quitter('exit', ''),
+ "quit": _sitebuiltins.Quitter('quit' ,''),
+ "copyright": _sitebuiltins._Printer('copyright', sys.copyright),
+ "help": _sitebuiltins._Helper(),
+ "clear": _clear_screen,
+ "\x1a": _sitebuiltins.Quitter('\x1a', ''),
+}
+
+
+def _more_lines(console: code.InteractiveConsole, unicodetext: str) -> bool:
+ # ooh, look at the hack:
+ src = _strip_final_indent(unicodetext)
+ try:
+ code = console.compile(src, "<stdin>", "single")
+ except (OverflowError, SyntaxError, ValueError):
+ lines = src.splitlines(keepends=True)
+ if len(lines) == 1:
+ return False
+
+ last_line = lines[-1]
+ was_indented = last_line.startswith((" ", "\t"))
+ not_empty = last_line.strip() != ""
+ incomplete = not last_line.endswith("\n")
+ return (was_indented or not_empty) and incomplete
+ else:
+ return code is None
+
+
+def run_multiline_interactive_console(
+ console: code.InteractiveConsole,
+ *,
+ future_flags: int = 0,
+) -> None:
+ from .readline import _setup
+ _setup(console.locals)
+ if future_flags:
+ console.compile.compiler.flags |= future_flags
+
+ more_lines = functools.partial(_more_lines, console)
+ input_n = 0
+
+ _is_x_showrefcount_set = sys._xoptions.get("showrefcount")
+ _is_pydebug_build = hasattr(sys, "gettotalrefcount")
+ show_ref_count = _is_x_showrefcount_set and _is_pydebug_build
+
+ def maybe_run_command(statement: str) -> bool:
+ statement = statement.strip()
+ if statement in console.locals or statement not in REPL_COMMANDS:
+ return False
+
+ reader = _get_reader()
+ reader.history.pop() # skip internal commands in history
+ command = REPL_COMMANDS[statement]
+ if callable(command):
+ # Make sure that history does not change because of commands
+ with reader.suspend_history():
+ command()
+ return True
+ return False
+
+ while 1:
+ try:
+ try:
+ sys.stdout.flush()
+ except Exception:
+ pass
+
+ ps1 = getattr(sys, "ps1", ">>> ")
+ ps2 = getattr(sys, "ps2", "... ")
+ try:
+ statement = multiline_input(more_lines, ps1, ps2)
+ except EOFError:
+ break
+
+ if maybe_run_command(statement):
+ continue
+
+ input_name = f"<python-input-{input_n}>"
+ more = console.push(_strip_final_indent(statement), filename=input_name, _symbol="single") # type: ignore[call-arg]
+ assert not more
+ input_n += 1
+ except KeyboardInterrupt:
+ r = _get_reader()
+ r.cmpltn_reset()
+ if r.input_trans is r.isearch_trans:
+ r.do_cmd(("isearch-end", [""]))
+ r.pos = len(r.get_unicode())
+ r.dirty = True
+ r.refresh()
+ r.in_bracketed_paste = False
+ console.write("\nKeyboardInterrupt\n")
+ console.resetbuffer()
+ except MemoryError:
+ console.write("\nMemoryError\n")
+ console.resetbuffer()
+ except SystemExit:
+ raise
+ except:
+ console.showtraceback()
+ console.resetbuffer()
+ if show_ref_count:
+ console.write(
+ f"[{sys.gettotalrefcount()} refs,"
+ f" {sys.getallocatedblocks()} blocks]\n"
+ )
diff --git a/contrib/tools/python3/Lib/_pyrepl/terminfo.py b/contrib/tools/python3/Lib/_pyrepl/terminfo.py
new file mode 100644
index 00000000000..063a285bb99
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/terminfo.py
@@ -0,0 +1,530 @@
+"""Pure Python curses-like terminal capability queries."""
+
+from dataclasses import dataclass, field
+import errno
+import os
+from pathlib import Path
+import re
+import struct
+
+
+# Terminfo constants
+MAGIC16 = 0o432 # Magic number for 16-bit terminfo format
+MAGIC32 = 0o1036 # Magic number for 32-bit terminfo format
+
+# Special values for absent/cancelled capabilities
+ABSENT_BOOLEAN = -1
+ABSENT_NUMERIC = -1
+CANCELLED_NUMERIC = -2
+ABSENT_STRING = None
+CANCELLED_STRING = None
+
+
+# Standard string capability names from ncurses Caps file
+# This matches the order used by ncurses when compiling terminfo
+# fmt: off
+_STRING_NAMES: tuple[str, ...] = (
+ "cbt", "bel", "cr", "csr", "tbc", "clear", "el", "ed", "hpa", "cmdch",
+ "cup", "cud1", "home", "civis", "cub1", "mrcup", "cnorm", "cuf1", "ll",
+ "cuu1", "cvvis", "dch1", "dl1", "dsl", "hd", "smacs", "blink", "bold",
+ "smcup", "smdc", "dim", "smir", "invis", "prot", "rev", "smso", "smul",
+ "ech", "rmacs", "sgr0", "rmcup", "rmdc", "rmir", "rmso", "rmul", "flash",
+ "ff", "fsl", "is1", "is2", "is3", "if", "ich1", "il1", "ip", "kbs", "ktbc",
+ "kclr", "kctab", "kdch1", "kdl1", "kcud1", "krmir", "kel", "ked", "kf0",
+ "kf1", "kf10", "kf2", "kf3", "kf4", "kf5", "kf6", "kf7", "kf8", "kf9",
+ "khome", "kich1", "kil1", "kcub1", "kll", "knp", "kpp", "kcuf1", "kind",
+ "kri", "khts", "kcuu1", "rmkx", "smkx", "lf0", "lf1", "lf10", "lf2", "lf3",
+ "lf4", "lf5", "lf6", "lf7", "lf8", "lf9", "rmm", "smm", "nel", "pad", "dch",
+ "dl", "cud", "ich", "indn", "il", "cub", "cuf", "rin", "cuu", "pfkey",
+ "pfloc", "pfx", "mc0", "mc4", "mc5", "rep", "rs1", "rs2", "rs3", "rf", "rc",
+ "vpa", "sc", "ind", "ri", "sgr", "hts", "wind", "ht", "tsl", "uc", "hu",
+ "iprog", "ka1", "ka3", "kb2", "kc1", "kc3", "mc5p", "rmp", "acsc", "pln",
+ "kcbt", "smxon", "rmxon", "smam", "rmam", "xonc", "xoffc", "enacs", "smln",
+ "rmln", "kbeg", "kcan", "kclo", "kcmd", "kcpy", "kcrt", "kend", "kent",
+ "kext", "kfnd", "khlp", "kmrk", "kmsg", "kmov", "knxt", "kopn", "kopt",
+ "kprv", "kprt", "krdo", "kref", "krfr", "krpl", "krst", "kres", "ksav",
+ "kspd", "kund", "kBEG", "kCAN", "kCMD", "kCPY", "kCRT", "kDC", "kDL",
+ "kslt", "kEND", "kEOL", "kEXT", "kFND", "kHLP", "kHOM", "kIC", "kLFT",
+ "kMSG", "kMOV", "kNXT", "kOPT", "kPRV", "kPRT", "kRDO", "kRPL", "kRIT",
+ "kRES", "kSAV", "kSPD", "kUND", "rfi", "kf11", "kf12", "kf13", "kf14",
+ "kf15", "kf16", "kf17", "kf18", "kf19", "kf20", "kf21", "kf22", "kf23",
+ "kf24", "kf25", "kf26", "kf27", "kf28", "kf29", "kf30", "kf31", "kf32",
+ "kf33", "kf34", "kf35", "kf36", "kf37", "kf38", "kf39", "kf40", "kf41",
+ "kf42", "kf43", "kf44", "kf45", "kf46", "kf47", "kf48", "kf49", "kf50",
+ "kf51", "kf52", "kf53", "kf54", "kf55", "kf56", "kf57", "kf58", "kf59",
+ "kf60", "kf61", "kf62", "kf63", "el1", "mgc", "smgl", "smgr", "fln", "sclk",
+ "dclk", "rmclk", "cwin", "wingo", "hup","dial", "qdial", "tone", "pulse",
+ "hook", "pause", "wait", "u0", "u1", "u2", "u3", "u4", "u5", "u6", "u7",
+ "u8", "u9", "op", "oc", "initc", "initp", "scp", "setf", "setb", "cpi",
+ "lpi", "chr", "cvr", "defc", "swidm", "sdrfq", "sitm", "slm", "smicm",
+ "snlq", "snrmq", "sshm", "ssubm", "ssupm", "sum", "rwidm", "ritm", "rlm",
+ "rmicm", "rshm", "rsubm", "rsupm", "rum", "mhpa", "mcud1", "mcub1", "mcuf1",
+ "mvpa", "mcuu1", "porder", "mcud", "mcub", "mcuf", "mcuu", "scs", "smgb",
+ "smgbp", "smglp", "smgrp", "smgt", "smgtp", "sbim", "scsd", "rbim", "rcsd",
+ "subcs", "supcs", "docr", "zerom", "csnm", "kmous", "minfo", "reqmp",
+ "getm", "setaf", "setab", "pfxl", "devt", "csin", "s0ds", "s1ds", "s2ds",
+ "s3ds", "smglr", "smgtb", "birep", "binel", "bicr", "colornm", "defbi",
+ "endbi", "setcolor", "slines", "dispc", "smpch", "rmpch", "smsc", "rmsc",
+ "pctrm", "scesc", "scesa", "ehhlm", "elhlm", "elohlm", "erhlm", "ethlm",
+ "evhlm", "sgr1", "slength", "OTi2", "OTrs", "OTnl", "OTbc", "OTko", "OTma",
+ "OTG2", "OTG3", "OTG1", "OTG4", "OTGR", "OTGL", "OTGU", "OTGD", "OTGH",
+ "OTGV", "OTGC","meml", "memu", "box1"
+)
+# fmt: on
+_STRING_CAPABILITY_NAMES = {name: i for i, name in enumerate(_STRING_NAMES)}
+
+
+def _get_terminfo_dirs() -> list[Path]:
+ """Get list of directories to search for terminfo files.
+
+ Based on ncurses behavior in:
+ - ncurses/tinfo/db_iterator.c:_nc_next_db()
+ - ncurses/tinfo/read_entry.c:_nc_read_entry()
+ """
+ dirs = []
+
+ terminfo = os.environ.get("TERMINFO")
+ if terminfo:
+ dirs.append(terminfo)
+
+ try:
+ home = Path.home()
+ dirs.append(str(home / ".terminfo"))
+ except RuntimeError:
+ pass
+
+ # Check TERMINFO_DIRS
+ terminfo_dirs = os.environ.get("TERMINFO_DIRS", "")
+ if terminfo_dirs:
+ for d in terminfo_dirs.split(":"):
+ if d:
+ dirs.append(d)
+
+ dirs.extend(
+ [
+ "/etc/terminfo",
+ "/lib/terminfo",
+ "/usr/lib/terminfo",
+ "/usr/share/terminfo",
+ "/usr/share/lib/terminfo",
+ "/usr/share/misc/terminfo",
+ "/usr/local/lib/terminfo",
+ "/usr/local/share/terminfo",
+ ]
+ )
+
+ return [Path(d) for d in dirs if Path(d).is_dir()]
+
+
+def _validate_terminal_name_or_raise(terminal_name: str) -> None:
+ if not isinstance(terminal_name, str):
+ raise TypeError("`terminal_name` must be a string")
+
+ if not terminal_name:
+ raise ValueError("`terminal_name` cannot be empty")
+
+ if "\x00" in terminal_name:
+ raise ValueError("NUL character found in `terminal_name`")
+
+ t = Path(terminal_name)
+ if len(t.parts) > 1:
+ raise ValueError("`terminal_name` cannot contain path separators")
+
+
+def _read_terminfo_file(terminal_name: str) -> bytes:
+ """Find and read terminfo file for given terminal name.
+
+ Terminfo files are stored in directories using the first character
+ of the terminal name as a subdirectory.
+ """
+ _validate_terminal_name_or_raise(terminal_name)
+ first_char = terminal_name[0].lower()
+ filename = terminal_name
+
+ for directory in _get_terminfo_dirs():
+ path = directory / first_char / filename
+ if path.is_file():
+ return path.read_bytes()
+
+ # Try with hex encoding of first char (for special chars)
+ hex_dir = "%02x" % ord(first_char)
+ path = directory / hex_dir / filename
+ if path.is_file():
+ return path.read_bytes()
+
+ raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
+
+
+# Hard-coded terminal capabilities for common terminals
+# This is a minimal subset needed by PyREPL
+_TERMINAL_CAPABILITIES = {
+ # ANSI/xterm-compatible terminals
+ "ansi": {
+ # Bell
+ "bel": b"\x07",
+ # Cursor movement
+ "cub": b"\x1b[%p1%dD", # Move cursor left N columns
+ "cud": b"\x1b[%p1%dB", # Move cursor down N rows
+ "cuf": b"\x1b[%p1%dC", # Move cursor right N columns
+ "cuu": b"\x1b[%p1%dA", # Move cursor up N rows
+ "cub1": b"\x08", # Move cursor left 1 column
+ "cud1": b"\n", # Move cursor down 1 row
+ "cuf1": b"\x1b[C", # Move cursor right 1 column
+ "cuu1": b"\x1b[A", # Move cursor up 1 row
+ "cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column
+ "hpa": b"\x1b[%i%p1%dG", # Move cursor to column
+ # Clear operations
+ "clear": b"\x1b[H\x1b[2J", # Clear screen and home cursor
+ "el": b"\x1b[K", # Clear to end of line
+ # Insert/delete
+ "dch": b"\x1b[%p1%dP", # Delete N characters
+ "dch1": b"\x1b[P", # Delete 1 character
+ "ich": b"\x1b[%p1%d@", # Insert N characters
+ "ich1": b"", # Insert 1 character
+ # Cursor visibility
+ "civis": b"\x1b[?25l", # Make cursor invisible
+ "cnorm": b"\x1b[?12l\x1b[?25h", # Make cursor normal (visible)
+ # Scrolling
+ "ind": b"\n", # Scroll up one line
+ "ri": b"\x1bM", # Scroll down one line
+ # Keypad mode
+ "smkx": b"\x1b[?1h\x1b=", # Enable keypad mode
+ "rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode
+ # Padding (not used in modern terminals)
+ "pad": b"",
+ # Function keys and special keys
+ "kdch1": b"\x1b[3~", # Delete key
+ "kcud1": b"\x1bOB", # Down arrow
+ "kend": b"\x1bOF", # End key
+ "kent": b"\x1bOM", # Enter key
+ "khome": b"\x1bOH", # Home key
+ "kich1": b"\x1b[2~", # Insert key
+ "kcub1": b"\x1bOD", # Left arrow
+ "knp": b"\x1b[6~", # Page down
+ "kpp": b"\x1b[5~", # Page up
+ "kcuf1": b"\x1bOC", # Right arrow
+ "kcuu1": b"\x1bOA", # Up arrow
+ # Function keys F1-F20
+ "kf1": b"\x1bOP",
+ "kf2": b"\x1bOQ",
+ "kf3": b"\x1bOR",
+ "kf4": b"\x1bOS",
+ "kf5": b"\x1b[15~",
+ "kf6": b"\x1b[17~",
+ "kf7": b"\x1b[18~",
+ "kf8": b"\x1b[19~",
+ "kf9": b"\x1b[20~",
+ "kf10": b"\x1b[21~",
+ "kf11": b"\x1b[23~",
+ "kf12": b"\x1b[24~",
+ "kf13": b"\x1b[1;2P",
+ "kf14": b"\x1b[1;2Q",
+ "kf15": b"\x1b[1;2R",
+ "kf16": b"\x1b[1;2S",
+ "kf17": b"\x1b[15;2~",
+ "kf18": b"\x1b[17;2~",
+ "kf19": b"\x1b[18;2~",
+ "kf20": b"\x1b[19;2~",
+ },
+ # Dumb terminal - minimal capabilities
+ "dumb": {
+ "bel": b"\x07", # Bell
+ "cud1": b"\n", # Move down 1 row (newline)
+ "ind": b"\n", # Scroll up one line (newline)
+ },
+ # Linux console
+ "linux": {
+ # Bell
+ "bel": b"\x07",
+ # Cursor movement
+ "cub": b"\x1b[%p1%dD", # Move cursor left N columns
+ "cud": b"\x1b[%p1%dB", # Move cursor down N rows
+ "cuf": b"\x1b[%p1%dC", # Move cursor right N columns
+ "cuu": b"\x1b[%p1%dA", # Move cursor up N rows
+ "cub1": b"\x08", # Move cursor left 1 column (backspace)
+ "cud1": b"\n", # Move cursor down 1 row (newline)
+ "cuf1": b"\x1b[C", # Move cursor right 1 column
+ "cuu1": b"\x1b[A", # Move cursor up 1 row
+ "cup": b"\x1b[%i%p1%d;%p2%dH", # Move cursor to row, column
+ "hpa": b"\x1b[%i%p1%dG", # Move cursor to column
+ # Clear operations
+ "clear": b"\x1b[H\x1b[J", # Clear screen and home cursor (different from ansi!)
+ "el": b"\x1b[K", # Clear to end of line
+ # Insert/delete
+ "dch": b"\x1b[%p1%dP", # Delete N characters
+ "dch1": b"\x1b[P", # Delete 1 character
+ "ich": b"\x1b[%p1%d@", # Insert N characters
+ "ich1": b"\x1b[@", # Insert 1 character
+ # Cursor visibility
+ "civis": b"\x1b[?25l\x1b[?1c", # Make cursor invisible
+ "cnorm": b"\x1b[?25h\x1b[?0c", # Make cursor normal
+ # Scrolling
+ "ind": b"\n", # Scroll up one line
+ "ri": b"\x1bM", # Scroll down one line
+ # Keypad mode
+ "smkx": b"\x1b[?1h\x1b=", # Enable keypad mode
+ "rmkx": b"\x1b[?1l\x1b>", # Disable keypad mode
+ # Function keys and special keys
+ "kdch1": b"\x1b[3~", # Delete key
+ "kcud1": b"\x1b[B", # Down arrow
+ "kend": b"\x1b[4~", # End key (different from ansi!)
+ "khome": b"\x1b[1~", # Home key (different from ansi!)
+ "kich1": b"\x1b[2~", # Insert key
+ "kcub1": b"\x1b[D", # Left arrow
+ "knp": b"\x1b[6~", # Page down
+ "kpp": b"\x1b[5~", # Page up
+ "kcuf1": b"\x1b[C", # Right arrow
+ "kcuu1": b"\x1b[A", # Up arrow
+ # Function keys
+ "kf1": b"\x1b[[A",
+ "kf2": b"\x1b[[B",
+ "kf3": b"\x1b[[C",
+ "kf4": b"\x1b[[D",
+ "kf5": b"\x1b[[E",
+ "kf6": b"\x1b[17~",
+ "kf7": b"\x1b[18~",
+ "kf8": b"\x1b[19~",
+ "kf9": b"\x1b[20~",
+ "kf10": b"\x1b[21~",
+ "kf11": b"\x1b[23~",
+ "kf12": b"\x1b[24~",
+ "kf13": b"\x1b[25~",
+ "kf14": b"\x1b[26~",
+ "kf15": b"\x1b[28~",
+ "kf16": b"\x1b[29~",
+ "kf17": b"\x1b[31~",
+ "kf18": b"\x1b[32~",
+ "kf19": b"\x1b[33~",
+ "kf20": b"\x1b[34~",
+ },
+}
+
+# Map common TERM values to capability sets
+_TERM_ALIASES = {
+ "xterm": "ansi",
+ "xterm-color": "ansi",
+ "xterm-256color": "ansi",
+ "screen": "ansi",
+ "screen-256color": "ansi",
+ "tmux": "ansi",
+ "tmux-256color": "ansi",
+ "vt100": "ansi",
+ "vt220": "ansi",
+ "rxvt": "ansi",
+ "rxvt-unicode": "ansi",
+ "rxvt-unicode-256color": "ansi",
+ "unknown": "dumb",
+}
+
+
+@dataclass
+class TermInfo:
+ terminal_name: str | bytes | None
+ fallback: bool = True
+
+ _names: list[str] = field(default_factory=list)
+ _booleans: list[int] = field(default_factory=list)
+ _numbers: list[int] = field(default_factory=list)
+ _strings: list[bytes | None] = field(default_factory=list)
+ _capabilities: dict[str, bytes] = field(default_factory=dict)
+
+ def __post_init__(self) -> None:
+ """Initialize terminal capabilities for the given terminal type.
+
+ Based on ncurses implementation in:
+ - ncurses/tinfo/lib_setup.c:setupterm() and _nc_setupterm()
+ - ncurses/tinfo/lib_setup.c:TINFO_SETUP_TERM()
+
+ This version first attempts to read terminfo database files like ncurses,
+ then, if `fallback` is True, falls back to hardcoded capabilities for
+ common terminal types.
+ """
+ # If termstr is None or empty, try to get from environment
+ if not self.terminal_name:
+ self.terminal_name = os.environ.get("TERM") or "ANSI"
+
+ if isinstance(self.terminal_name, bytes):
+ self.terminal_name = self.terminal_name.decode("ascii")
+
+ try:
+ self._parse_terminfo_file(self.terminal_name)
+ except (OSError, ValueError):
+ if not self.fallback:
+ raise
+
+ term_type = _TERM_ALIASES.get(
+ self.terminal_name, self.terminal_name
+ )
+ if term_type not in _TERMINAL_CAPABILITIES:
+ term_type = "dumb"
+ self._capabilities = _TERMINAL_CAPABILITIES[term_type].copy()
+
+ def _parse_terminfo_file(self, terminal_name: str) -> None:
+ """Parse a terminfo file.
+
+ Based on ncurses implementation in:
+ - ncurses/tinfo/read_entry.c:_nc_read_termtype()
+ - ncurses/tinfo/read_entry.c:_nc_read_file_entry()
+ """
+ data = _read_terminfo_file(terminal_name)
+ too_short = f"TermInfo file for {terminal_name!r} too short"
+ offset = 12
+ if len(data) < offset:
+ raise ValueError(too_short)
+
+ magic, name_size, bool_count, num_count, str_count, str_size = (
+ struct.unpack("<Hhhhhh", data[:offset])
+ )
+
+ if magic == MAGIC16:
+ number_format = "<h" # 16-bit signed
+ number_size = 2
+ elif magic == MAGIC32:
+ number_format = "<i" # 32-bit signed
+ number_size = 4
+ else:
+ raise ValueError(
+ f"TermInfo file for {terminal_name!r} uses unknown magic"
+ )
+
+ # Read terminal names
+ if offset + name_size > len(data):
+ raise ValueError(too_short)
+ names = data[offset : offset + name_size - 1].decode(
+ "ascii", errors="ignore"
+ )
+ offset += name_size
+
+ # Read boolean capabilities
+ if offset + bool_count > len(data):
+ raise ValueError(too_short)
+ booleans = list(data[offset : offset + bool_count])
+ offset += bool_count
+
+ # Align to even byte boundary for numbers
+ if offset % 2:
+ offset += 1
+
+ # Read numeric capabilities
+ numbers = []
+ for i in range(num_count):
+ if offset + number_size > len(data):
+ raise ValueError(too_short)
+ num = struct.unpack(
+ number_format, data[offset : offset + number_size]
+ )[0]
+ numbers.append(num)
+ offset += number_size
+
+ # Read string offsets
+ string_offsets = []
+ for i in range(str_count):
+ if offset + 2 > len(data):
+ raise ValueError(too_short)
+ off = struct.unpack("<h", data[offset : offset + 2])[0]
+ string_offsets.append(off)
+ offset += 2
+
+ # Read string table
+ if offset + str_size > len(data):
+ raise ValueError(too_short)
+ string_table = data[offset : offset + str_size]
+
+ # Extract strings from string table
+ strings: list[bytes | None] = []
+ for off in string_offsets:
+ if off < 0:
+ strings.append(CANCELLED_STRING)
+ elif off < len(string_table):
+ # Find null terminator
+ end = off
+ while end < len(string_table) and string_table[end] != 0:
+ end += 1
+ if end <= len(string_table):
+ strings.append(string_table[off:end])
+ else:
+ strings.append(ABSENT_STRING)
+ else:
+ strings.append(ABSENT_STRING)
+
+ self._names = names.split("|")
+ self._booleans = booleans
+ self._numbers = numbers
+ self._strings = strings
+
+ def get(self, cap: str) -> bytes | None:
+ """Get terminal capability string by name.
+
+ Based on ncurses implementation in:
+ - ncurses/tinfo/lib_ti.c:tigetstr()
+
+ The ncurses version searches through compiled terminfo data structures.
+ This version first checks parsed terminfo data, then falls back to
+ hardcoded capabilities.
+ """
+ if not isinstance(cap, str):
+ raise TypeError(f"`cap` must be a string, not {type(cap)}")
+
+ if self._capabilities:
+ # Fallbacks populated, use them
+ return self._capabilities.get(cap)
+
+ # Look up in standard capabilities first
+ if cap in _STRING_CAPABILITY_NAMES:
+ index = _STRING_CAPABILITY_NAMES[cap]
+ if index < len(self._strings):
+ return self._strings[index]
+
+ # Note: we don't support extended capabilities since PyREPL doesn't
+ # need them.
+ return None
+
+
+def tparm(cap_bytes: bytes, *params: int) -> bytes:
+ """Parameterize a terminal capability string.
+
+ Based on ncurses implementation in:
+ - ncurses/tinfo/lib_tparm.c:tparm()
+ - ncurses/tinfo/lib_tparm.c:tparam_internal()
+
+ The ncurses version implements a full stack-based interpreter for
+ terminfo parameter strings. This pure Python version implements only
+ the subset of parameter substitution operations needed by PyREPL:
+ - %i (increment parameters for 1-based indexing)
+ - %p[1-9]%d (parameter substitution)
+ - %p[1-9]%{n}%+%d (parameter plus constant)
+ """
+ if not isinstance(cap_bytes, bytes):
+ raise TypeError(f"`cap` must be bytes, not {type(cap_bytes)}")
+
+ result = cap_bytes
+
+ # %i - increment parameters (1-based instead of 0-based)
+ increment = b"%i" in result
+ if increment:
+ result = result.replace(b"%i", b"")
+
+ # Replace %p1%d, %p2%d, etc. with actual parameter values
+ for i in range(len(params)):
+ pattern = b"%%p%d%%d" % (i + 1)
+ if pattern in result:
+ value = params[i]
+ if increment:
+ value += 1
+ result = result.replace(pattern, str(value).encode("ascii"))
+
+ # Handle %p1%{1}%+%d (parameter plus constant)
+ # Used in some cursor positioning sequences
+ pattern_re = re.compile(rb"%p(\d)%\{(\d+)\}%\+%d")
+ matches = list(pattern_re.finditer(result))
+ for match in reversed(matches): # reversed to maintain positions
+ param_idx = int(match.group(1))
+ constant = int(match.group(2))
+ value = params[param_idx] + constant
+ result = (
+ result[: match.start()]
+ + str(value).encode("ascii")
+ + result[match.end() :]
+ )
+
+ return result
diff --git a/contrib/tools/python3/Lib/_pyrepl/trace.py b/contrib/tools/python3/Lib/_pyrepl/trace.py
new file mode 100644
index 00000000000..a8eb2433cd3
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/trace.py
@@ -0,0 +1,21 @@
+from __future__ import annotations
+
+import os
+
+# types
+if False:
+ from typing import IO
+
+
+trace_file: IO[str] | None = None
+if trace_filename := os.environ.get("PYREPL_TRACE"):
+ trace_file = open(trace_filename, "a")
+
+
+def trace(line: str, *k: object, **kw: object) -> None:
+ if trace_file is None:
+ return
+ if k or kw:
+ line = line.format(*k, **kw)
+ trace_file.write(line + "\n")
+ trace_file.flush()
diff --git a/contrib/tools/python3/Lib/_pyrepl/types.py b/contrib/tools/python3/Lib/_pyrepl/types.py
new file mode 100644
index 00000000000..c5b7ebc1a40
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/types.py
@@ -0,0 +1,10 @@
+from collections.abc import Callable, Iterator
+
+type Callback = Callable[[], object]
+type SimpleContextManager = Iterator[None]
+type KeySpec = str # like r"\C-c"
+type CommandName = str # like "interrupt"
+type EventTuple = tuple[CommandName, str]
+type Completer = Callable[[str, int], str | None]
+type CharBuffer = list[str]
+type CharWidths = list[int]
diff --git a/contrib/tools/python3/Lib/_pyrepl/unix_console.py b/contrib/tools/python3/Lib/_pyrepl/unix_console.py
new file mode 100644
index 00000000000..db5b8ddadce
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/unix_console.py
@@ -0,0 +1,852 @@
+# Copyright 2000-2010 Michael Hudson-Doyle <[email protected]>
+# Antonio Cuni
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import errno
+import os
+import re
+import select
+import signal
+import struct
+import termios
+import time
+import platform
+from fcntl import ioctl
+
+from . import terminfo
+from .console import Console, Event
+from .fancy_termios import tcgetattr, tcsetattr, TermState
+from .trace import trace
+from .unix_eventqueue import EventQueue
+from .utils import wlen
+
+
+TYPE_CHECKING = False
+
+# types
+if TYPE_CHECKING:
+ from typing import AbstractSet, IO, Literal, overload, cast
+else:
+ overload = lambda func: None
+ cast = lambda typ, val: val
+
+
+class InvalidTerminal(RuntimeError):
+ def __init__(self, message: str) -> None:
+ super().__init__(errno.EIO, message)
+
+
+_error = (termios.error, InvalidTerminal)
+_error_codes_to_ignore = frozenset([errno.EIO, errno.ENXIO, errno.EPERM])
+
+SIGWINCH_EVENT = "repaint"
+
+FIONREAD = getattr(termios, "FIONREAD", None)
+TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None)
+
+# ------------ start of baudrate definitions ------------
+
+# Add (possibly) missing baudrates (check termios man page) to termios
+
+
+def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None:
+ baudrate_name = "B%d" % rate
+ if hasattr(termios, baudrate_name):
+ dictionary[getattr(termios, baudrate_name)] = rate
+
+
+# Check the termios man page (Line speed) to know where these
+# values come from.
+potential_baudrates = [
+ 0,
+ 110,
+ 115200,
+ 1200,
+ 134,
+ 150,
+ 1800,
+ 19200,
+ 200,
+ 230400,
+ 2400,
+ 300,
+ 38400,
+ 460800,
+ 4800,
+ 50,
+ 57600,
+ 600,
+ 75,
+ 9600,
+]
+
+ratedict: dict[int, int] = {}
+for rate in potential_baudrates:
+ add_baudrate_if_supported(ratedict, rate)
+
+# Clean up variables to avoid unintended usage
+del rate, add_baudrate_if_supported
+
+# ------------ end of baudrate definitions ------------
+
+delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>")
+
+try:
+ poll: type[select.poll] = select.poll
+except AttributeError:
+ # this is exactly the minumum necessary to support what we
+ # do with poll objects
+ class MinimalPoll:
+ def __init__(self):
+ pass
+
+ def register(self, fd, flag):
+ self.fd = fd
+
+ # note: The 'timeout' argument is received as *milliseconds*
+ def poll(self, timeout: float | None = None) -> list[int]:
+ if timeout is None:
+ r, w, e = select.select([self.fd], [], [])
+ else:
+ r, w, e = select.select([self.fd], [], [], timeout / 1000)
+ return r
+
+ poll = MinimalPoll # type: ignore[assignment]
+
+
+class UnixConsole(Console):
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ """
+ Initialize the UnixConsole.
+
+ Parameters:
+ - f_in (int or file-like object): Input file descriptor or object.
+ - f_out (int or file-like object): Output file descriptor or object.
+ - term (str): Terminal name.
+ - encoding (str): Encoding to use for I/O operations.
+ """
+ super().__init__(f_in, f_out, term, encoding)
+
+ self.pollob = poll()
+ self.pollob.register(self.input_fd, select.POLLIN)
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ self.terminfo = terminfo.TermInfo(term or None)
+ self.term = term
+ self.is_apple_terminal = (
+ platform.system() == "Darwin"
+ and os.getenv("TERM_PROGRAM") == "Apple_Terminal"
+ )
+
+ try:
+ self.__input_fd_set(tcgetattr(self.input_fd), ignore=frozenset())
+ except _error as e:
+ raise RuntimeError(f"termios failure ({e.args[1]})")
+
+ @overload
+ def _my_getstr(
+ cap: str, optional: Literal[False] = False
+ ) -> bytes: ...
+
+ @overload
+ def _my_getstr(cap: str, optional: bool) -> bytes | None: ...
+
+ def _my_getstr(cap: str, optional: bool = False) -> bytes | None:
+ r = self.terminfo.get(cap)
+ if not optional and r is None:
+ raise InvalidTerminal(
+ f"terminal doesn't have the required {cap} capability"
+ )
+ return r
+
+ self._bel = _my_getstr("bel")
+ self._civis = _my_getstr("civis", optional=True)
+ self._clear = _my_getstr("clear")
+ self._cnorm = _my_getstr("cnorm", optional=True)
+ self._cub = _my_getstr("cub", optional=True)
+ self._cub1 = _my_getstr("cub1", optional=True)
+ self._cud = _my_getstr("cud", optional=True)
+ self._cud1 = _my_getstr("cud1", optional=True)
+ self._cuf = _my_getstr("cuf", optional=True)
+ self._cuf1 = _my_getstr("cuf1", optional=True)
+ self._cup = _my_getstr("cup")
+ self._cuu = _my_getstr("cuu", optional=True)
+ self._cuu1 = _my_getstr("cuu1", optional=True)
+ self._dch1 = _my_getstr("dch1", optional=True)
+ self._dch = _my_getstr("dch", optional=True)
+ self._el = _my_getstr("el")
+ self._hpa = _my_getstr("hpa", optional=True)
+ self._ich = _my_getstr("ich", optional=True)
+ self._ich1 = _my_getstr("ich1", optional=True)
+ self._ind = _my_getstr("ind", optional=True)
+ self._pad = _my_getstr("pad", optional=True)
+ self._ri = _my_getstr("ri", optional=True)
+ self._rmkx = _my_getstr("rmkx", optional=True)
+ self._smkx = _my_getstr("smkx", optional=True)
+
+ self.__setup_movement()
+
+ self.event_queue = EventQueue(self.input_fd, self.encoding, self.terminfo)
+ self.cursor_visible = 1
+
+ signal.signal(signal.SIGCONT, self._sigcont_handler)
+
+ def _sigcont_handler(self, signum, frame):
+ self.restore()
+ self.prepare()
+
+ def more_in_buffer(self) -> bool:
+ return bool(
+ self.input_buffer
+ and self.input_buffer_pos < len(self.input_buffer)
+ )
+
+ def __read(self, n: int) -> bytes:
+ if not self.more_in_buffer():
+ self.input_buffer = os.read(self.input_fd, 10000)
+
+ ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
+ self.input_buffer_pos += len(ret)
+ if self.input_buffer_pos >= len(self.input_buffer):
+ self.input_buffer = b""
+ self.input_buffer_pos = 0
+ return ret
+
+ def change_encoding(self, encoding: str) -> None:
+ """
+ Change the encoding used for I/O operations.
+
+ Parameters:
+ - encoding (str): New encoding to use.
+ """
+ self.encoding = encoding
+
+ def refresh(self, screen, c_xy):
+ """
+ Refresh the console screen.
+
+ Parameters:
+ - screen (list): List of strings representing the screen contents.
+ - c_xy (tuple): Cursor position (x, y) on the screen.
+ """
+ cx, cy = c_xy
+ if not self.__gone_tall:
+ while len(self.screen) < min(len(screen), self.height):
+ self.__hide_cursor()
+ if self.screen:
+ self.__move(0, len(self.screen) - 1)
+ self.__write("\n")
+ self.posxy = 0, len(self.screen)
+ self.screen.append("")
+ else:
+ while len(self.screen) < len(screen):
+ self.screen.append("")
+
+ if len(screen) > self.height:
+ self.__gone_tall = 1
+ self.__move = self.__move_tall
+
+ px, py = self.posxy
+ old_offset = offset = self.__offset
+ height = self.height
+
+ # we make sure the cursor is on the screen, and that we're
+ # using all of the screen if we can
+ if cy < offset:
+ offset = cy
+ elif cy >= offset + height:
+ offset = cy - height + 1
+ elif offset > 0 and len(screen) < offset + height:
+ offset = max(len(screen) - height, 0)
+ screen.append("")
+
+ oldscr = self.screen[old_offset : old_offset + height]
+ newscr = screen[offset : offset + height]
+
+ # use hardware scrolling if we have it.
+ if old_offset > offset and self._ri:
+ self.__hide_cursor()
+ self.__write_code(self._cup, 0, 0)
+ self.posxy = 0, old_offset
+ for i in range(old_offset - offset):
+ self.__write_code(self._ri)
+ oldscr.pop(-1)
+ oldscr.insert(0, "")
+ elif old_offset < offset and self._ind:
+ self.__hide_cursor()
+ self.__write_code(self._cup, self.height - 1, 0)
+ self.posxy = 0, old_offset + self.height - 1
+ for i in range(offset - old_offset):
+ self.__write_code(self._ind)
+ oldscr.pop(0)
+ oldscr.append("")
+
+ self.__offset = offset
+
+ for (
+ y,
+ oldline,
+ newline,
+ ) in zip(range(offset, offset + height), oldscr, newscr):
+ if oldline != newline:
+ self.__write_changed_line(y, oldline, newline, px)
+
+ y = len(newscr)
+ while y < len(oldscr):
+ self.__hide_cursor()
+ self.__move(0, y)
+ self.posxy = 0, y
+ self.__write_code(self._el)
+ y += 1
+
+ self.__show_cursor()
+
+ self.screen = screen.copy()
+ self.move_cursor(cx, cy)
+ self.flushoutput()
+
+ def move_cursor(self, x, y):
+ """
+ Move the cursor to the specified position on the screen.
+
+ Parameters:
+ - x (int): X coordinate.
+ - y (int): Y coordinate.
+ """
+ if y < self.__offset or y >= self.__offset + self.height:
+ self.event_queue.insert(Event("scroll", None))
+ else:
+ self.__move(x, y)
+ self.posxy = x, y
+ self.flushoutput()
+
+ def prepare(self):
+ """
+ Prepare the console for input/output operations.
+ """
+ self.__buffer = []
+
+ self.__svtermstate = tcgetattr(self.input_fd)
+ raw = self.__svtermstate.copy()
+ raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON)
+ raw.oflag &= ~(termios.OPOST)
+ raw.cflag &= ~(termios.CSIZE | termios.PARENB)
+ raw.cflag |= termios.CS8
+ raw.iflag |= termios.BRKINT
+ raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN)
+ raw.lflag |= termios.ISIG
+ raw.cc[termios.VMIN] = 1
+ raw.cc[termios.VTIME] = 0
+ self.__input_fd_set(raw)
+
+ # In macOS terminal we need to deactivate line wrap via ANSI escape code
+ if self.is_apple_terminal:
+ os.write(self.output_fd, b"\033[?7l")
+
+ self.screen = []
+ self.height, self.width = self.getheightwidth()
+
+ self.posxy = 0, 0
+ self.__gone_tall = 0
+ self.__move = self.__move_short
+ self.__offset = 0
+
+ self.__maybe_write_code(self._smkx)
+
+ try:
+ self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch)
+ except ValueError:
+ pass
+
+ self.__enable_bracketed_paste()
+
+ def restore(self):
+ """
+ Restore the console to the default state
+ """
+ self.__disable_bracketed_paste()
+ self.__maybe_write_code(self._rmkx)
+ self.flushoutput()
+ self.__input_fd_set(self.__svtermstate)
+
+ if self.is_apple_terminal:
+ os.write(self.output_fd, b"\033[?7h")
+
+ if hasattr(self, "old_sigwinch"):
+ try:
+ signal.signal(signal.SIGWINCH, self.old_sigwinch)
+ except ValueError as e:
+ import threading
+ if threading.current_thread() is threading.main_thread():
+ raise e
+ del self.old_sigwinch
+
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ trace("push char {char!r}", char=char)
+ self.event_queue.push(char)
+
+ def get_event(self, block: bool = True) -> Event | None:
+ """
+ Get an event from the console event queue.
+
+ Parameters:
+ - block (bool): Whether to block until an event is available.
+
+ Returns:
+ - Event: Event object from the event queue.
+ """
+ if not block and not self.wait(timeout=0):
+ return None
+
+ while self.event_queue.empty():
+ while True:
+ try:
+ self.push_char(self.__read(1))
+ except OSError as err:
+ if err.errno == errno.EINTR:
+ if not self.event_queue.empty():
+ return self.event_queue.get()
+ else:
+ continue
+ elif err.errno == errno.EIO:
+ raise SystemExit(errno.EIO)
+ else:
+ raise
+ else:
+ break
+ return self.event_queue.get()
+
+ def wait(self, timeout: float | None = None) -> bool:
+ """
+ Wait for events on the console.
+ """
+ return (
+ not self.event_queue.empty()
+ or self.more_in_buffer()
+ or bool(self.pollob.poll(timeout))
+ )
+
+ def set_cursor_vis(self, visible):
+ """
+ Set the visibility of the cursor.
+
+ Parameters:
+ - visible (bool): Visibility flag.
+ """
+ if visible:
+ self.__show_cursor()
+ else:
+ self.__hide_cursor()
+
+ if TIOCGWINSZ:
+
+ def getheightwidth(self):
+ """
+ Get the height and width of the console.
+
+ Returns:
+ - tuple: Height and width of the console.
+ """
+ try:
+ return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
+ except (KeyError, TypeError, ValueError):
+ try:
+ size = ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8)
+ except OSError:
+ return 25, 80
+ height, width = struct.unpack("hhhh", size)[0:2]
+ if not height:
+ return 25, 80
+ return height, width
+
+ else:
+
+ def getheightwidth(self):
+ """
+ Get the height and width of the console.
+
+ Returns:
+ - tuple: Height and width of the console.
+ """
+ try:
+ return int(os.environ["LINES"]), int(os.environ["COLUMNS"])
+ except (KeyError, TypeError, ValueError):
+ return 25, 80
+
+ def forgetinput(self):
+ """
+ Discard any pending input on the console.
+ """
+ termios.tcflush(self.input_fd, termios.TCIFLUSH)
+
+ def flushoutput(self):
+ """
+ Flush the output buffer.
+ """
+ for text, iscode in self.__buffer:
+ if iscode:
+ self.__tputs(text)
+ else:
+ os.write(self.output_fd, text.encode(self.encoding, "replace"))
+ del self.__buffer[:]
+
+ def finish(self):
+ """
+ Finish console operations and flush the output buffer.
+ """
+ y = len(self.screen) - 1
+ while y >= 0 and not self.screen[y]:
+ y -= 1
+ self.__move(0, min(y, self.height + self.__offset - 1))
+ self.__write("\n\r")
+ self.flushoutput()
+
+ def beep(self):
+ """
+ Emit a beep sound.
+ """
+ self.__maybe_write_code(self._bel)
+ self.flushoutput()
+
+ if FIONREAD:
+
+ def getpending(self):
+ """
+ Get pending events from the console event queue.
+
+ Returns:
+ - Event: Pending event from the event queue.
+ """
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ e.data += e2.data
+ e.raw += e.raw
+
+ amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
+ raw = self.__read(amount)
+ data = str(raw, self.encoding, "replace")
+ e.data += data
+ e.raw += raw
+ return e
+
+ else:
+
+ def getpending(self):
+ """
+ Get pending events from the console event queue.
+
+ Returns:
+ - Event: Pending event from the event queue.
+ """
+ e = Event("key", "", b"")
+
+ while not self.event_queue.empty():
+ e2 = self.event_queue.get()
+ e.data += e2.data
+ e.raw += e.raw
+
+ amount = 10000
+ raw = self.__read(amount)
+ data = str(raw, self.encoding, "replace")
+ e.data += data
+ e.raw += raw
+ return e
+
+ def clear(self):
+ """
+ Clear the console screen.
+ """
+ self.__write_code(self._clear)
+ self.__gone_tall = 1
+ self.__move = self.__move_tall
+ self.posxy = 0, 0
+ self.screen = []
+
+ @property
+ def input_hook(self):
+ try:
+ import posix
+ except ImportError:
+ return None
+ if posix._is_inputhook_installed():
+ return posix._inputhook
+
+ def __enable_bracketed_paste(self) -> None:
+ os.write(self.output_fd, b"\x1b[?2004h")
+
+ def __disable_bracketed_paste(self) -> None:
+ os.write(self.output_fd, b"\x1b[?2004l")
+
+ def __setup_movement(self):
+ """
+ Set up the movement functions based on the terminal capabilities.
+ """
+ if 0 and self._hpa: # hpa don't work in windows telnet :-(
+ self.__move_x = self.__move_x_hpa
+ elif self._cub and self._cuf:
+ self.__move_x = self.__move_x_cub_cuf
+ elif self._cub1 and self._cuf1:
+ self.__move_x = self.__move_x_cub1_cuf1
+ else:
+ raise RuntimeError("insufficient terminal (horizontal)")
+
+ if self._cuu and self._cud:
+ self.__move_y = self.__move_y_cuu_cud
+ elif self._cuu1 and self._cud1:
+ self.__move_y = self.__move_y_cuu1_cud1
+ else:
+ raise RuntimeError("insufficient terminal (vertical)")
+
+ if self._dch1:
+ self.dch1 = self._dch1
+ elif self._dch:
+ self.dch1 = terminfo.tparm(self._dch, 1)
+ else:
+ self.dch1 = None
+
+ if self._ich1:
+ self.ich1 = self._ich1
+ elif self._ich:
+ self.ich1 = terminfo.tparm(self._ich, 1)
+ else:
+ self.ich1 = None
+
+ self.__move = self.__move_short
+
+ def __write_changed_line(self, y, oldline, newline, px_coord):
+ # this is frustrating; there's no reason to test (say)
+ # self.dch1 inside the loop -- but alternative ways of
+ # structuring this function are equally painful (I'm trying to
+ # avoid writing code generators these days...)
+ minlen = min(wlen(oldline), wlen(newline))
+ x_pos = 0
+ x_coord = 0
+
+ px_pos = 0
+ j = 0
+ for c in oldline:
+ if j >= px_coord:
+ break
+ j += wlen(c)
+ px_pos += 1
+
+ # reuse the oldline as much as possible, but stop as soon as we
+ # encounter an ESCAPE, because it might be the start of an escape
+ # sequene
+ while (
+ x_coord < minlen
+ and oldline[x_pos] == newline[x_pos]
+ and newline[x_pos] != "\x1b"
+ ):
+ x_coord += wlen(newline[x_pos])
+ x_pos += 1
+
+ # if we need to insert a single character right after the first detected change
+ if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1:
+ if (
+ y == self.posxy[1]
+ and x_coord > self.posxy[0]
+ and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1]
+ ):
+ x_pos = px_pos
+ x_coord = px_coord
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write_code(self.ich1)
+ self.__write(newline[x_pos])
+ self.posxy = x_coord + character_width, y
+
+ # if it's a single character change in the middle of the line
+ elif (
+ x_coord < minlen
+ and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
+ and wlen(oldline[x_pos]) == wlen(newline[x_pos])
+ ):
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write(newline[x_pos])
+ self.posxy = x_coord + character_width, y
+
+ # if this is the last character to fit in the line and we edit in the middle of the line
+ elif (
+ self.dch1
+ and self.ich1
+ and wlen(newline) == self.width
+ and x_coord < wlen(newline) - 2
+ and newline[x_pos + 1 : -1] == oldline[x_pos:-2]
+ ):
+ self.__hide_cursor()
+ self.__move(self.width - 2, y)
+ self.posxy = self.width - 2, y
+ self.__write_code(self.dch1)
+
+ character_width = wlen(newline[x_pos])
+ self.__move(x_coord, y)
+ self.__write_code(self.ich1)
+ self.__write(newline[x_pos])
+ self.posxy = character_width + 1, y
+
+ else:
+ self.__hide_cursor()
+ self.__move(x_coord, y)
+ if wlen(oldline) > wlen(newline):
+ self.__write_code(self._el)
+ self.__write(newline[x_pos:])
+ self.posxy = wlen(newline), y
+
+ if "\x1b" in newline:
+ # ANSI escape characters are present, so we can't assume
+ # anything about the position of the cursor. Moving the cursor
+ # to the left margin should work to get to a known position.
+ self.move_cursor(0, y)
+
+ def __write(self, text):
+ self.__buffer.append((text, 0))
+
+ def __write_code(self, fmt, *args):
+ self.__buffer.append((terminfo.tparm(fmt, *args), 1))
+
+ def __maybe_write_code(self, fmt, *args):
+ if fmt:
+ self.__write_code(fmt, *args)
+
+ def __move_y_cuu1_cud1(self, y):
+ assert self._cud1 is not None
+ assert self._cuu1 is not None
+ dy = y - self.posxy[1]
+ if dy > 0:
+ self.__write_code(dy * self._cud1)
+ elif dy < 0:
+ self.__write_code((-dy) * self._cuu1)
+
+ def __move_y_cuu_cud(self, y):
+ dy = y - self.posxy[1]
+ if dy > 0:
+ self.__write_code(self._cud, dy)
+ elif dy < 0:
+ self.__write_code(self._cuu, -dy)
+
+ def __move_x_hpa(self, x: int) -> None:
+ if x != self.posxy[0]:
+ self.__write_code(self._hpa, x)
+
+ def __move_x_cub1_cuf1(self, x: int) -> None:
+ assert self._cuf1 is not None
+ assert self._cub1 is not None
+ dx = x - self.posxy[0]
+ if dx > 0:
+ self.__write_code(self._cuf1 * dx)
+ elif dx < 0:
+ self.__write_code(self._cub1 * (-dx))
+
+ def __move_x_cub_cuf(self, x: int) -> None:
+ dx = x - self.posxy[0]
+ if dx > 0:
+ self.__write_code(self._cuf, dx)
+ elif dx < 0:
+ self.__write_code(self._cub, -dx)
+
+ def __move_short(self, x, y):
+ self.__move_x(x)
+ self.__move_y(y)
+
+ def __move_tall(self, x, y):
+ assert 0 <= y - self.__offset < self.height, y - self.__offset
+ self.__write_code(self._cup, y - self.__offset, x)
+
+ def __sigwinch(self, signum, frame):
+ self.height, self.width = self.getheightwidth()
+ self.event_queue.insert(Event("resize", None))
+
+ def __hide_cursor(self):
+ if self.cursor_visible:
+ self.__maybe_write_code(self._civis)
+ self.cursor_visible = 0
+
+ def __show_cursor(self):
+ if not self.cursor_visible:
+ self.__maybe_write_code(self._cnorm)
+ self.cursor_visible = 1
+
+ def repaint(self):
+ if not self.__gone_tall:
+ self.posxy = 0, self.posxy[1]
+ self.__write("\r")
+ ns = len(self.screen) * ["\000" * self.width]
+ self.screen = ns
+ else:
+ self.posxy = 0, self.__offset
+ self.__move(0, self.__offset)
+ ns = self.height * ["\000" * self.width]
+ self.screen = ns
+
+ def __tputs(self, fmt, prog=delayprog):
+ """A Python implementation of the curses tputs function; the
+ curses one can't really be wrapped in a sane manner.
+
+ I have the strong suspicion that this is complexity that
+ will never do anyone any good."""
+ # using .get() means that things will blow up
+ # only if the bps is actually needed (which I'm
+ # betting is pretty unlikely)
+ bps = ratedict.get(self.__svtermstate.ospeed)
+ while 1:
+ m = prog.search(fmt)
+ if not m:
+ os.write(self.output_fd, fmt)
+ break
+ x, y = m.span()
+ os.write(self.output_fd, fmt[:x])
+ fmt = fmt[y:]
+ delay = int(m.group(1))
+ if b"*" in m.group(2):
+ delay *= self.height
+ if self._pad and bps is not None:
+ nchars = (bps * delay) / 1000
+ os.write(self.output_fd, self._pad * nchars)
+ else:
+ time.sleep(float(delay) / 1000.0)
+
+ def __input_fd_set(
+ self,
+ state: TermState,
+ ignore: AbstractSet[int] = _error_codes_to_ignore,
+ ) -> bool:
+ try:
+ tcsetattr(self.input_fd, termios.TCSADRAIN, state)
+ except termios.error as te:
+ if te.args[0] not in ignore:
+ raise
+ return False
+ else:
+ return True
diff --git a/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py
new file mode 100644
index 00000000000..2a9cca59e74
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/unix_eventqueue.py
@@ -0,0 +1,77 @@
+# Copyright 2000-2008 Michael Hudson-Doyle <[email protected]>
+# Armin Rigo
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from .terminfo import TermInfo
+from .trace import trace
+from .base_eventqueue import BaseEventQueue
+from termios import tcgetattr, VERASE
+import os
+
+
+# Mapping of human-readable key names to their terminal-specific codes
+TERMINAL_KEYNAMES = {
+ "delete": "kdch1",
+ "down": "kcud1",
+ "end": "kend",
+ "enter": "kent",
+ "home": "khome",
+ "insert": "kich1",
+ "left": "kcub1",
+ "page down": "knp",
+ "page up": "kpp",
+ "right": "kcuf1",
+ "up": "kcuu1",
+}
+
+
+# Function keys F1-F20 mapping
+TERMINAL_KEYNAMES.update(("f%d" % i, "kf%d" % i) for i in range(1, 21))
+
+# Known CTRL-arrow keycodes
+CTRL_ARROW_KEYCODES= {
+ # for xterm, gnome-terminal, xfce terminal, etc.
+ b'\033[1;5D': 'ctrl left',
+ b'\033[1;5C': 'ctrl right',
+ # for rxvt
+ b'\033Od': 'ctrl left',
+ b'\033Oc': 'ctrl right',
+}
+
+def get_terminal_keycodes(ti: TermInfo) -> dict[bytes, str]:
+ """
+ Generates a dictionary mapping terminal keycodes to human-readable names.
+ """
+ keycodes = {}
+ for key, terminal_code in TERMINAL_KEYNAMES.items():
+ keycode = ti.get(terminal_code)
+ trace('key {key} tiname {terminal_code} keycode {keycode!r}', **locals())
+ if keycode:
+ keycodes[keycode] = key
+ keycodes.update(CTRL_ARROW_KEYCODES)
+ return keycodes
+
+
+class EventQueue(BaseEventQueue):
+ def __init__(self, fd: int, encoding: str, ti: TermInfo) -> None:
+ keycodes = get_terminal_keycodes(ti)
+ if os.isatty(fd):
+ backspace = tcgetattr(fd)[6][VERASE]
+ keycodes[backspace] = "backspace"
+ BaseEventQueue.__init__(self, encoding, keycodes)
diff --git a/contrib/tools/python3/Lib/_pyrepl/utils.py b/contrib/tools/python3/Lib/_pyrepl/utils.py
new file mode 100644
index 00000000000..a30fbdee3a4
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/utils.py
@@ -0,0 +1,83 @@
+import re
+import unicodedata
+import functools
+
+from .types import CharBuffer, CharWidths
+from .trace import trace
+
+ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
+ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
+ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
+
+
+def str_width(c: str) -> int:
+ if ord(c) < 128:
+ return 1
+ # gh-139246 for zero-width joiner and combining characters
+ if unicodedata.combining(c):
+ return 0
+ category = unicodedata.category(c)
+ if category == "Cf" and c != "\u00ad":
+ return 0
+ w = unicodedata.east_asian_width(c)
+ if w in ("N", "Na", "H", "A"):
+ return 1
+ return 2
+
+
+def wlen(s: str) -> int:
+ if len(s) == 1 and s != "\x1a":
+ return str_width(s)
+ length = sum(str_width(i) for i in s)
+ # remove lengths of any escape sequences
+ sequence = ANSI_ESCAPE_SEQUENCE.findall(s)
+ ctrl_z_cnt = s.count("\x1a")
+ return length - sum(len(i) for i in sequence) + ctrl_z_cnt
+
+
+def unbracket(s: str, including_content: bool = False) -> str:
+ r"""Return `s` with \001 and \002 characters removed.
+
+ If `including_content` is True, content between \001 and \002 is also
+ stripped.
+ """
+ if including_content:
+ return ZERO_WIDTH_BRACKET.sub("", s)
+ return s.translate(ZERO_WIDTH_TRANS)
+
+
+def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
+ r"""Decompose the input buffer into a printable variant.
+
+ Returns a tuple of two lists:
+ - the first list is the input buffer, character by character;
+ - the second list is the visible width of each character in the input
+ buffer.
+
+ Examples:
+ >>> utils.disp_str("a = 9")
+ (['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])
+ """
+ chars: CharBuffer = []
+ char_widths: CharWidths = []
+
+ if not buffer:
+ return chars, char_widths
+
+ for c in buffer:
+ if c == "\x1a": # CTRL-Z on Windows
+ chars.append(c)
+ char_widths.append(2)
+ elif ord(c) < 128:
+ chars.append(c)
+ char_widths.append(1)
+ elif unicodedata.category(c).startswith("C"):
+ c = r"\u%04x" % ord(c)
+ chars.append(c)
+ char_widths.append(len(c))
+ else:
+ chars.append(c)
+ char_widths.append(str_width(c))
+ trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths)
+ return chars, char_widths
diff --git a/contrib/tools/python3/Lib/_pyrepl/windows_console.py b/contrib/tools/python3/Lib/_pyrepl/windows_console.py
new file mode 100644
index 00000000000..bb2ef1aa51f
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/windows_console.py
@@ -0,0 +1,687 @@
+# Copyright 2000-2004 Michael Hudson-Doyle <[email protected]>
+#
+# All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import io
+import os
+import sys
+import time
+import msvcrt
+
+from collections import deque
+import ctypes
+from ctypes.wintypes import (
+ _COORD,
+ WORD,
+ SMALL_RECT,
+ BOOL,
+ HANDLE,
+ CHAR,
+ DWORD,
+ WCHAR,
+ SHORT,
+)
+from ctypes import Structure, POINTER, Union
+from .console import Event, Console
+from .trace import trace
+from .utils import wlen
+from .windows_eventqueue import EventQueue
+
+try:
+ from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
+except:
+ # Keep MyPy happy off Windows
+ from ctypes import CDLL as WinDLL, cdll as windll
+
+ def GetLastError() -> int:
+ return 42
+
+ class WinError(OSError): # type: ignore[no-redef]
+ def __init__(self, err: int | None, descr: str | None = None) -> None:
+ self.err = err
+ self.descr = descr
+
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+ from typing import IO
+
+# Virtual-Key Codes: https://learn.microsoft.com/en-us/windows/win32/inputdev/virtual-key-codes
+VK_MAP: dict[int, str] = {
+ 0x23: "end", # VK_END
+ 0x24: "home", # VK_HOME
+ 0x25: "left", # VK_LEFT
+ 0x26: "up", # VK_UP
+ 0x27: "right", # VK_RIGHT
+ 0x28: "down", # VK_DOWN
+ 0x2E: "delete", # VK_DELETE
+ 0x70: "f1", # VK_F1
+ 0x71: "f2", # VK_F2
+ 0x72: "f3", # VK_F3
+ 0x73: "f4", # VK_F4
+ 0x74: "f5", # VK_F5
+ 0x75: "f6", # VK_F6
+ 0x76: "f7", # VK_F7
+ 0x77: "f8", # VK_F8
+ 0x78: "f9", # VK_F9
+ 0x79: "f10", # VK_F10
+ 0x7A: "f11", # VK_F11
+ 0x7B: "f12", # VK_F12
+ 0x7C: "f13", # VK_F13
+ 0x7D: "f14", # VK_F14
+ 0x7E: "f15", # VK_F15
+ 0x7F: "f16", # VK_F16
+ 0x80: "f17", # VK_F17
+ 0x81: "f18", # VK_F18
+ 0x82: "f19", # VK_F19
+ 0x83: "f20", # VK_F20
+}
+
+# Virtual terminal output sequences
+# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences
+# Check `windows_eventqueue.py` for input sequences
+ERASE_IN_LINE = "\x1b[K"
+MOVE_LEFT = "\x1b[{}D"
+MOVE_RIGHT = "\x1b[{}C"
+MOVE_UP = "\x1b[{}A"
+MOVE_DOWN = "\x1b[{}B"
+CLEAR = "\x1b[H\x1b[J"
+
+# State of control keys: https://learn.microsoft.com/en-us/windows/console/key-event-record-str
+ALT_ACTIVE = 0x01 | 0x02
+CTRL_ACTIVE = 0x04 | 0x08
+
+
+class _error(Exception):
+ pass
+
+def _supports_vt():
+ try:
+ import nt
+ return nt._supports_virtual_terminal()
+ except (ImportError, AttributeError):
+ return False
+
+class WindowsConsole(Console):
+ def __init__(
+ self,
+ f_in: IO[bytes] | int = 0,
+ f_out: IO[bytes] | int = 1,
+ term: str = "",
+ encoding: str = "",
+ ):
+ super().__init__(f_in, f_out, term, encoding)
+
+ self.__vt_support = _supports_vt()
+
+ if self.__vt_support:
+ trace('console supports virtual terminal')
+
+ # Save original console modes so we can recover on cleanup.
+ original_input_mode = DWORD()
+ GetConsoleMode(InHandle, original_input_mode)
+ trace(f'saved original input mode 0x{original_input_mode.value:x}')
+ self.__original_input_mode = original_input_mode.value
+
+ SetConsoleMode(
+ OutHandle,
+ ENABLE_WRAP_AT_EOL_OUTPUT
+ | ENABLE_PROCESSED_OUTPUT
+ | ENABLE_VIRTUAL_TERMINAL_PROCESSING,
+ )
+
+ self.screen: list[str] = []
+ self.width = 80
+ self.height = 25
+ self.__offset = 0
+ self.event_queue = EventQueue(encoding)
+ try:
+ self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
+ except ValueError:
+ # Console I/O is redirected, fallback...
+ self.out = None
+
+ def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
+ """
+ Refresh the console screen.
+
+ Parameters:
+ - screen (list): List of strings representing the screen contents.
+ - c_xy (tuple): Cursor position (x, y) on the screen.
+ """
+ cx, cy = c_xy
+
+ while len(self.screen) < min(len(screen), self.height):
+ self._hide_cursor()
+ if self.screen:
+ self._move_relative(0, len(self.screen) - 1)
+ self.__write("\n")
+ self.posxy = 0, len(self.screen)
+ self.screen.append("")
+
+ px, py = self.posxy
+ old_offset = offset = self.__offset
+ height = self.height
+
+ # we make sure the cursor is on the screen, and that we're
+ # using all of the screen if we can
+ if cy < offset:
+ offset = cy
+ elif cy >= offset + height:
+ offset = cy - height + 1
+ scroll_lines = offset - old_offset
+
+ # Scrolling the buffer as the current input is greater than the visible
+ # portion of the window. We need to scroll the visible portion and the
+ # entire history
+ self._scroll(scroll_lines, self._getscrollbacksize())
+ self.posxy = self.posxy[0], self.posxy[1] + scroll_lines
+ self.__offset += scroll_lines
+
+ for i in range(scroll_lines):
+ self.screen.append("")
+ elif offset > 0 and len(screen) < offset + height:
+ offset = max(len(screen) - height, 0)
+ screen.append("")
+
+ oldscr = self.screen[old_offset : old_offset + height]
+ newscr = screen[offset : offset + height]
+
+ self.__offset = offset
+
+ self._hide_cursor()
+ for (
+ y,
+ oldline,
+ newline,
+ ) in zip(range(offset, offset + height), oldscr, newscr):
+ if oldline != newline:
+ self.__write_changed_line(y, oldline, newline, px)
+
+ y = len(newscr)
+ while y < len(oldscr):
+ self._move_relative(0, y)
+ self.posxy = 0, y
+ self._erase_to_end()
+ y += 1
+
+ self._show_cursor()
+
+ self.screen = screen
+ self.move_cursor(cx, cy)
+
+ @property
+ def input_hook(self):
+ try:
+ import nt
+ except ImportError:
+ return None
+ if nt._is_inputhook_installed():
+ return nt._inputhook
+
+ def __write_changed_line(
+ self, y: int, oldline: str, newline: str, px_coord: int
+ ) -> None:
+ # this is frustrating; there's no reason to test (say)
+ # self.dch1 inside the loop -- but alternative ways of
+ # structuring this function are equally painful (I'm trying to
+ # avoid writing code generators these days...)
+ minlen = min(wlen(oldline), wlen(newline))
+ x_pos = 0
+ x_coord = 0
+
+ px_pos = 0
+ j = 0
+ for c in oldline:
+ if j >= px_coord:
+ break
+ j += wlen(c)
+ px_pos += 1
+
+ # reuse the oldline as much as possible, but stop as soon as we
+ # encounter an ESCAPE, because it might be the start of an escape
+ # sequene
+ while (
+ x_coord < minlen
+ and oldline[x_pos] == newline[x_pos]
+ and newline[x_pos] != "\x1b"
+ ):
+ x_coord += wlen(newline[x_pos])
+ x_pos += 1
+
+ self._hide_cursor()
+ self._move_relative(x_coord, y)
+ if wlen(oldline) > wlen(newline):
+ self._erase_to_end()
+
+ self.__write(newline[x_pos:])
+ self.posxy = min(wlen(newline), self.width - 1), y
+
+ if "\x1b" in newline or y != self.posxy[1] or '\x1a' in newline:
+ # ANSI escape characters are present, so we can't assume
+ # anything about the position of the cursor. Moving the cursor
+ # to the left margin should work to get to a known position.
+ self.move_cursor(0, y)
+
+ def _scroll(
+ self, top: int, bottom: int, left: int | None = None, right: int | None = None
+ ) -> None:
+ scroll_rect = SMALL_RECT()
+ scroll_rect.Top = SHORT(top)
+ scroll_rect.Bottom = SHORT(bottom)
+ scroll_rect.Left = SHORT(0 if left is None else left)
+ scroll_rect.Right = SHORT(
+ self.getheightwidth()[1] - 1 if right is None else right
+ )
+ destination_origin = _COORD()
+ fill_info = CHAR_INFO()
+ fill_info.UnicodeChar = " "
+
+ if not ScrollConsoleScreenBuffer(
+ OutHandle, scroll_rect, None, destination_origin, fill_info
+ ):
+ raise WinError(GetLastError())
+
+ def _hide_cursor(self):
+ self.__write("\x1b[?25l")
+
+ def _show_cursor(self):
+ self.__write("\x1b[?25h")
+
+ def _enable_blinking(self):
+ self.__write("\x1b[?12h")
+
+ def _disable_blinking(self):
+ self.__write("\x1b[?12l")
+
+ def _enable_bracketed_paste(self) -> None:
+ self.__write("\x1b[?2004h")
+
+ def _disable_bracketed_paste(self) -> None:
+ self.__write("\x1b[?2004l")
+
+ def __write(self, text: str) -> None:
+ if "\x1a" in text:
+ text = ''.join(["^Z" if x == '\x1a' else x for x in text])
+
+ if self.out is not None:
+ self.out.write(text.encode(self.encoding, "replace"))
+ self.out.flush()
+ else:
+ os.write(self.output_fd, text.encode(self.encoding, "replace"))
+
+ @property
+ def screen_xy(self) -> tuple[int, int]:
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+ return info.dwCursorPosition.X, info.dwCursorPosition.Y
+
+ def _erase_to_end(self) -> None:
+ self.__write(ERASE_IN_LINE)
+
+ def prepare(self) -> None:
+ trace("prepare")
+ self.screen = []
+ self.height, self.width = self.getheightwidth()
+
+ self.posxy = 0, 0
+ self.__gone_tall = 0
+ self.__offset = 0
+
+ if self.__vt_support:
+ SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT)
+ self._enable_bracketed_paste()
+
+ def restore(self) -> None:
+ if self.__vt_support:
+ # Recover to original mode before running REPL
+ self._disable_bracketed_paste()
+ SetConsoleMode(InHandle, self.__original_input_mode)
+
+ def _move_relative(self, x: int, y: int) -> None:
+ """Moves relative to the current posxy"""
+ dx = x - self.posxy[0]
+ dy = y - self.posxy[1]
+ if dx < 0:
+ self.__write(MOVE_LEFT.format(-dx))
+ elif dx > 0:
+ self.__write(MOVE_RIGHT.format(dx))
+
+ if dy < 0:
+ self.__write(MOVE_UP.format(-dy))
+ elif dy > 0:
+ self.__write(MOVE_DOWN.format(dy))
+
+ def move_cursor(self, x: int, y: int) -> None:
+ if x < 0 or y < 0:
+ raise ValueError(f"Bad cursor position {x}, {y}")
+
+ if y < self.__offset or y >= self.__offset + self.height:
+ self.event_queue.insert(Event("scroll", ""))
+ else:
+ self._move_relative(x, y)
+ self.posxy = x, y
+
+ def set_cursor_vis(self, visible: bool) -> None:
+ if visible:
+ self._show_cursor()
+ else:
+ self._hide_cursor()
+
+ def getheightwidth(self) -> tuple[int, int]:
+ """Return (height, width) where height and width are the height
+ and width of the terminal window in characters."""
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+ return (
+ info.srWindow.Bottom - info.srWindow.Top + 1,
+ info.srWindow.Right - info.srWindow.Left + 1,
+ )
+
+ def _getscrollbacksize(self) -> int:
+ info = CONSOLE_SCREEN_BUFFER_INFO()
+ if not GetConsoleScreenBufferInfo(OutHandle, info):
+ raise WinError(GetLastError())
+
+ return info.srWindow.Bottom # type: ignore[no-any-return]
+
+ def _read_input(self, block: bool = True) -> INPUT_RECORD | None:
+ if not block:
+ events = DWORD()
+ if not GetNumberOfConsoleInputEvents(InHandle, events):
+ raise WinError(GetLastError())
+ if not events.value:
+ return None
+
+ rec = INPUT_RECORD()
+ read = DWORD()
+ if not ReadConsoleInput(InHandle, rec, 1, read):
+ raise WinError(GetLastError())
+
+ return rec
+
+ def get_event(self, block: bool = True) -> Event | None:
+ """Return an Event instance. Returns None if |block| is false
+ and there is no event pending, otherwise waits for the
+ completion of an event."""
+
+ while self.event_queue.empty():
+ rec = self._read_input(block)
+ if rec is None:
+ return None
+
+ if rec.EventType == WINDOW_BUFFER_SIZE_EVENT:
+ return Event("resize", "")
+
+ if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown:
+ # Only process keys and keydown events
+ if block:
+ continue
+ return None
+
+ key_event = rec.Event.KeyEvent
+ raw_key = key = key_event.uChar.UnicodeChar
+
+ if key == "\r":
+ # Make enter unix-like
+ return Event(evt="key", data="\n")
+ elif key_event.wVirtualKeyCode == 8:
+ # Turn backspace directly into the command
+ key = "backspace"
+ elif key == "\x00":
+ # Handle special keys like arrow keys and translate them into the appropriate command
+ key = VK_MAP.get(key_event.wVirtualKeyCode)
+ if key:
+ if key_event.dwControlKeyState & CTRL_ACTIVE:
+ key = f"ctrl {key}"
+ elif key_event.dwControlKeyState & ALT_ACTIVE:
+ # queue the key, return the meta command
+ self.event_queue.insert(Event(evt="key", data=key))
+ return Event(evt="key", data="\033") # keymap.py uses this for meta
+ return Event(evt="key", data=key)
+ if block:
+ continue
+
+ return None
+ elif self.__vt_support:
+ # If virtual terminal is enabled, scanning VT sequences
+ for char in raw_key.encode(self.event_queue.encoding, "replace"):
+ self.event_queue.push(char)
+ continue
+
+ if key_event.dwControlKeyState & ALT_ACTIVE:
+ # Do not swallow characters that have been entered via AltGr:
+ # Windows internally converts AltGr to CTRL+ALT, see
+ # https://learn.microsoft.com/en-us/windows/win32/api/winuser/nf-winuser-vkkeyscanw
+ if not key_event.dwControlKeyState & CTRL_ACTIVE:
+ # queue the key, return the meta command
+ self.event_queue.insert(Event(evt="key", data=key))
+ return Event(evt="key", data="\033") # keymap.py uses this for meta
+
+ return Event(evt="key", data=key)
+ return self.event_queue.get()
+
+ def push_char(self, char: int | bytes) -> None:
+ """
+ Push a character to the console event queue.
+ """
+ raise NotImplementedError("push_char not supported on Windows")
+
+ def beep(self) -> None:
+ self.__write("\x07")
+
+ def clear(self) -> None:
+ """Wipe the screen"""
+ self.__write(CLEAR)
+ self.posxy = 0, 0
+ self.screen = []
+
+ def finish(self) -> None:
+ """Move the cursor to the end of the display and otherwise get
+ ready for end. XXX could be merged with restore? Hmm."""
+ y = len(self.screen) - 1
+ while y >= 0 and not self.screen[y]:
+ y -= 1
+ self._move_relative(0, min(y, self.height + self.__offset - 1))
+ self.__write("\r\n")
+
+ def flushoutput(self) -> None:
+ """Flush all output to the screen (assuming there's some
+ buffering going on somewhere).
+
+ All output on Windows is unbuffered so this is a nop"""
+ pass
+
+ def forgetinput(self) -> None:
+ """Forget all pending, but not yet processed input."""
+ if not FlushConsoleInputBuffer(InHandle):
+ raise WinError(GetLastError())
+
+ def getpending(self) -> Event:
+ """Return the characters that have been typed but not yet
+ processed."""
+ return Event("key", "", b"")
+
+ def wait_for_event(self, timeout: float | None) -> bool:
+ """Wait for an event."""
+ # Poor man's Windows select loop
+ start_time = time.time()
+ while True:
+ if msvcrt.kbhit(): # type: ignore[attr-defined]
+ return True
+ if timeout and time.time() - start_time > timeout / 1000:
+ return False
+ time.sleep(0.01)
+
+ def wait(self, timeout: float | None) -> bool:
+ """
+ Wait for events on the console.
+ """
+ return (
+ not self.event_queue.empty()
+ or self.wait_for_event(timeout)
+ )
+
+ def repaint(self) -> None:
+ raise NotImplementedError("No repaint support")
+
+
+# Windows interop
+class CONSOLE_SCREEN_BUFFER_INFO(Structure):
+ _fields_ = [
+ ("dwSize", _COORD),
+ ("dwCursorPosition", _COORD),
+ ("wAttributes", WORD),
+ ("srWindow", SMALL_RECT),
+ ("dwMaximumWindowSize", _COORD),
+ ]
+
+
+class CONSOLE_CURSOR_INFO(Structure):
+ _fields_ = [
+ ("dwSize", DWORD),
+ ("bVisible", BOOL),
+ ]
+
+
+class CHAR_INFO(Structure):
+ _fields_ = [
+ ("UnicodeChar", WCHAR),
+ ("Attributes", WORD),
+ ]
+
+
+class Char(Union):
+ _fields_ = [
+ ("UnicodeChar", WCHAR),
+ ("Char", CHAR),
+ ]
+
+
+class KeyEvent(ctypes.Structure):
+ _fields_ = [
+ ("bKeyDown", BOOL),
+ ("wRepeatCount", WORD),
+ ("wVirtualKeyCode", WORD),
+ ("wVirtualScanCode", WORD),
+ ("uChar", Char),
+ ("dwControlKeyState", DWORD),
+ ]
+
+
+class WindowsBufferSizeEvent(ctypes.Structure):
+ _fields_ = [("dwSize", _COORD)]
+
+
+class ConsoleEvent(ctypes.Union):
+ _fields_ = [
+ ("KeyEvent", KeyEvent),
+ ("WindowsBufferSizeEvent", WindowsBufferSizeEvent),
+ ]
+
+
+class INPUT_RECORD(Structure):
+ _fields_ = [("EventType", WORD), ("Event", ConsoleEvent)]
+
+
+KEY_EVENT = 0x01
+FOCUS_EVENT = 0x10
+MENU_EVENT = 0x08
+MOUSE_EVENT = 0x02
+WINDOW_BUFFER_SIZE_EVENT = 0x04
+
+ENABLE_PROCESSED_INPUT = 0x0001
+ENABLE_LINE_INPUT = 0x0002
+ENABLE_ECHO_INPUT = 0x0004
+ENABLE_MOUSE_INPUT = 0x0010
+ENABLE_INSERT_MODE = 0x0020
+ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
+
+ENABLE_PROCESSED_OUTPUT = 0x01
+ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
+ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
+
+STD_INPUT_HANDLE = -10
+STD_OUTPUT_HANDLE = -11
+
+if sys.platform == "win32":
+ _KERNEL32 = WinDLL("kernel32", use_last_error=True)
+
+ GetStdHandle = windll.kernel32.GetStdHandle
+ GetStdHandle.argtypes = [DWORD]
+ GetStdHandle.restype = HANDLE
+
+ GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo
+ GetConsoleScreenBufferInfo.argtypes = [
+ HANDLE,
+ ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO),
+ ]
+ GetConsoleScreenBufferInfo.restype = BOOL
+
+ ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW
+ ScrollConsoleScreenBuffer.argtypes = [
+ HANDLE,
+ POINTER(SMALL_RECT),
+ POINTER(SMALL_RECT),
+ _COORD,
+ POINTER(CHAR_INFO),
+ ]
+ ScrollConsoleScreenBuffer.restype = BOOL
+
+ GetConsoleMode = _KERNEL32.GetConsoleMode
+ GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)]
+ GetConsoleMode.restype = BOOL
+
+ SetConsoleMode = _KERNEL32.SetConsoleMode
+ SetConsoleMode.argtypes = [HANDLE, DWORD]
+ SetConsoleMode.restype = BOOL
+
+ ReadConsoleInput = _KERNEL32.ReadConsoleInputW
+ ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)]
+ ReadConsoleInput.restype = BOOL
+
+ GetNumberOfConsoleInputEvents = _KERNEL32.GetNumberOfConsoleInputEvents
+ GetNumberOfConsoleInputEvents.argtypes = [HANDLE, POINTER(DWORD)]
+ GetNumberOfConsoleInputEvents.restype = BOOL
+
+ FlushConsoleInputBuffer = _KERNEL32.FlushConsoleInputBuffer
+ FlushConsoleInputBuffer.argtypes = [HANDLE]
+ FlushConsoleInputBuffer.restype = BOOL
+
+ OutHandle = GetStdHandle(STD_OUTPUT_HANDLE)
+ InHandle = GetStdHandle(STD_INPUT_HANDLE)
+else:
+
+ def _win_only(*args, **kwargs):
+ raise NotImplementedError("Windows only")
+
+ GetStdHandle = _win_only
+ GetConsoleScreenBufferInfo = _win_only
+ ScrollConsoleScreenBuffer = _win_only
+ GetConsoleMode = _win_only
+ SetConsoleMode = _win_only
+ ReadConsoleInput = _win_only
+ GetNumberOfConsoleInputEvents = _win_only
+ FlushConsoleInputBuffer = _win_only
+ OutHandle = 0
+ InHandle = 0
diff --git a/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py b/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py
new file mode 100644
index 00000000000..d99722f9a16
--- /dev/null
+++ b/contrib/tools/python3/Lib/_pyrepl/windows_eventqueue.py
@@ -0,0 +1,42 @@
+"""
+Windows event and VT sequence scanner
+"""
+
+from .base_eventqueue import BaseEventQueue
+
+
+# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences
+VT_MAP: dict[bytes, str] = {
+ b'\x1b[A': 'up',
+ b'\x1b[B': 'down',
+ b'\x1b[C': 'right',
+ b'\x1b[D': 'left',
+ b'\x1b[1;5D': 'ctrl left',
+ b'\x1b[1;5C': 'ctrl right',
+
+ b'\x1b[H': 'home',
+ b'\x1b[F': 'end',
+
+ b'\x7f': 'backspace',
+ b'\x1b[2~': 'insert',
+ b'\x1b[3~': 'delete',
+ b'\x1b[5~': 'page up',
+ b'\x1b[6~': 'page down',
+
+ b'\x1bOP': 'f1',
+ b'\x1bOQ': 'f2',
+ b'\x1bOR': 'f3',
+ b'\x1bOS': 'f4',
+ b'\x1b[15~': 'f5',
+ b'\x1b[17~': 'f6',
+ b'\x1b[18~': 'f7',
+ b'\x1b[19~': 'f8',
+ b'\x1b[20~': 'f9',
+ b'\x1b[21~': 'f10',
+ b'\x1b[23~': 'f11',
+ b'\x1b[24~': 'f12',
+}
+
+class EventQueue(BaseEventQueue):
+ def __init__(self, encoding: str) -> None:
+ BaseEventQueue.__init__(self, encoding, VT_MAP)