aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/prompt-toolkit/py3/prompt_toolkit/input
diff options
context:
space:
mode:
authorshadchin <shadchin@yandex-team.ru>2022-02-10 16:44:39 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:44:39 +0300
commite9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (patch)
tree64175d5cadab313b3e7039ebaa06c5bc3295e274 /contrib/python/prompt-toolkit/py3/prompt_toolkit/input
parent2598ef1d0aee359b4b6d5fdd1758916d5907d04f (diff)
downloadydb-e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0.tar.gz
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/input')
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py22
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py670
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py274
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py114
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py144
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py190
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py152
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py610
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py494
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py1506
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py270
11 files changed, 2223 insertions, 2223 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py
index b3a8219d814..421d4ccdf40 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py
@@ -1,11 +1,11 @@
-from .base import DummyInput, Input
-from .defaults import create_input, create_pipe_input
-
-__all__ = [
- # Base.
- "Input",
- "DummyInput",
- # Defaults.
- "create_input",
- "create_pipe_input",
-]
+from .base import DummyInput, Input
+from .defaults import create_input, create_pipe_input
+
+__all__ = [
+ # Base.
+ "Input",
+ "DummyInput",
+ # Defaults.
+ "create_input",
+ "create_pipe_input",
+]
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py
index 22006fdb5c1..2e6c5b9b286 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py
@@ -1,126 +1,126 @@
-"""
-Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit
-keys.
-
-We are not using the terminfo/termcap databases to detect the ANSI escape
-sequences for the input. Instead, we recognize 99% of the most common
-sequences. This works well, because in practice, every modern terminal is
-mostly Xterm compatible.
-
-Some useful docs:
-- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md
-"""
-from typing import Dict, Tuple, Union
-
-from ..keys import Keys
-
-__all__ = [
- "ANSI_SEQUENCES",
- "REVERSE_ANSI_SEQUENCES",
-]
-
-# Mapping of vt100 escape codes to Keys.
-ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = {
- # Control keys.
- "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space)
- "\x01": Keys.ControlA, # Control-A (home)
- "\x02": Keys.ControlB, # Control-B (emacs cursor left)
- "\x03": Keys.ControlC, # Control-C (interrupt)
- "\x04": Keys.ControlD, # Control-D (exit)
- "\x05": Keys.ControlE, # Control-E (end)
- "\x06": Keys.ControlF, # Control-F (cursor forward)
- "\x07": Keys.ControlG, # Control-G
- "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
- "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
- "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
- "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
- "\x0c": Keys.ControlL, # Control-L (clear; form feed)
- "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r')
- "\x0e": Keys.ControlN, # Control-N (14) (history forward)
- "\x0f": Keys.ControlO, # Control-O (15)
- "\x10": Keys.ControlP, # Control-P (16) (history back)
- "\x11": Keys.ControlQ, # Control-Q
- "\x12": Keys.ControlR, # Control-R (18) (reverse search)
- "\x13": Keys.ControlS, # Control-S (19) (forward search)
- "\x14": Keys.ControlT, # Control-T
- "\x15": Keys.ControlU, # Control-U
- "\x16": Keys.ControlV, # Control-V
- "\x17": Keys.ControlW, # Control-W
- "\x18": Keys.ControlX, # Control-X
- "\x19": Keys.ControlY, # Control-Y (25)
- "\x1a": Keys.ControlZ, # Control-Z
- "\x1b": Keys.Escape, # Also Control-[
- "\x9b": Keys.ShiftEscape,
- "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| )
- "\x1d": Keys.ControlSquareClose, # Control-]
- "\x1e": Keys.ControlCircumflex, # Control-^
- "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
- # ASCII Delete (0x7f)
- # Vt220 (and Linux terminal) send this when pressing backspace. We map this
- # to ControlH, because that will make it easier to create key bindings that
- # work everywhere, with the trade-off that it's no longer possible to
- # handle backspace and control-h individually for the few terminals that
- # support it. (Most terminals send ControlH when backspace is pressed.)
- # See: http://www.ibb.net/~anne/keyboard.html
- "\x7f": Keys.ControlH,
- # --
- # Various
- "\x1b[1~": Keys.Home, # tmux
- "\x1b[2~": Keys.Insert,
- "\x1b[3~": Keys.Delete,
- "\x1b[4~": Keys.End, # tmux
- "\x1b[5~": Keys.PageUp,
- "\x1b[6~": Keys.PageDown,
- "\x1b[7~": Keys.Home, # xrvt
- "\x1b[8~": Keys.End, # xrvt
- "\x1b[Z": Keys.BackTab, # shift + tab
- "\x1b\x09": Keys.BackTab, # Linux console
- "\x1b[~": Keys.BackTab, # Windows console
- # --
- # Function keys.
- "\x1bOP": Keys.F1,
- "\x1bOQ": Keys.F2,
- "\x1bOR": Keys.F3,
- "\x1bOS": Keys.F4,
- "\x1b[[A": Keys.F1, # Linux console.
- "\x1b[[B": Keys.F2, # Linux console.
- "\x1b[[C": Keys.F3, # Linux console.
- "\x1b[[D": Keys.F4, # Linux console.
- "\x1b[[E": Keys.F5, # Linux console.
- "\x1b[11~": Keys.F1, # rxvt-unicode
- "\x1b[12~": Keys.F2, # rxvt-unicode
- "\x1b[13~": Keys.F3, # rxvt-unicode
- "\x1b[14~": Keys.F4, # rxvt-unicode
- "\x1b[15~": Keys.F5,
- "\x1b[17~": Keys.F6,
- "\x1b[18~": Keys.F7,
- "\x1b[19~": Keys.F8,
- "\x1b[20~": Keys.F9,
- "\x1b[21~": Keys.F10,
- "\x1b[23~": Keys.F11,
- "\x1b[24~": Keys.F12,
- "\x1b[25~": Keys.F13,
- "\x1b[26~": Keys.F14,
- "\x1b[28~": Keys.F15,
- "\x1b[29~": Keys.F16,
- "\x1b[31~": Keys.F17,
- "\x1b[32~": Keys.F18,
- "\x1b[33~": Keys.F19,
- "\x1b[34~": Keys.F20,
- # Xterm
- "\x1b[1;2P": Keys.F13,
- "\x1b[1;2Q": Keys.F14,
- # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response.
- "\x1b[1;2S": Keys.F16,
- "\x1b[15;2~": Keys.F17,
- "\x1b[17;2~": Keys.F18,
- "\x1b[18;2~": Keys.F19,
- "\x1b[19;2~": Keys.F20,
- "\x1b[20;2~": Keys.F21,
- "\x1b[21;2~": Keys.F22,
- "\x1b[23;2~": Keys.F23,
- "\x1b[24;2~": Keys.F24,
- # --
+"""
+Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit
+keys.
+
+We are not using the terminfo/termcap databases to detect the ANSI escape
+sequences for the input. Instead, we recognize 99% of the most common
+sequences. This works well, because in practice, every modern terminal is
+mostly Xterm compatible.
+
+Some useful docs:
+- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md
+"""
+from typing import Dict, Tuple, Union
+
+from ..keys import Keys
+
+__all__ = [
+ "ANSI_SEQUENCES",
+ "REVERSE_ANSI_SEQUENCES",
+]
+
+# Mapping of vt100 escape codes to Keys.
+ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = {
+ # Control keys.
+ "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space)
+ "\x01": Keys.ControlA, # Control-A (home)
+ "\x02": Keys.ControlB, # Control-B (emacs cursor left)
+ "\x03": Keys.ControlC, # Control-C (interrupt)
+ "\x04": Keys.ControlD, # Control-D (exit)
+ "\x05": Keys.ControlE, # Control-E (end)
+ "\x06": Keys.ControlF, # Control-F (cursor forward)
+ "\x07": Keys.ControlG, # Control-G
+ "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
+ "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
+ "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
+ "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
+ "\x0c": Keys.ControlL, # Control-L (clear; form feed)
+ "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r')
+ "\x0e": Keys.ControlN, # Control-N (14) (history forward)
+ "\x0f": Keys.ControlO, # Control-O (15)
+ "\x10": Keys.ControlP, # Control-P (16) (history back)
+ "\x11": Keys.ControlQ, # Control-Q
+ "\x12": Keys.ControlR, # Control-R (18) (reverse search)
+ "\x13": Keys.ControlS, # Control-S (19) (forward search)
+ "\x14": Keys.ControlT, # Control-T
+ "\x15": Keys.ControlU, # Control-U
+ "\x16": Keys.ControlV, # Control-V
+ "\x17": Keys.ControlW, # Control-W
+ "\x18": Keys.ControlX, # Control-X
+ "\x19": Keys.ControlY, # Control-Y (25)
+ "\x1a": Keys.ControlZ, # Control-Z
+ "\x1b": Keys.Escape, # Also Control-[
+ "\x9b": Keys.ShiftEscape,
+ "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| )
+ "\x1d": Keys.ControlSquareClose, # Control-]
+ "\x1e": Keys.ControlCircumflex, # Control-^
+ "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
+ # ASCII Delete (0x7f)
+ # Vt220 (and Linux terminal) send this when pressing backspace. We map this
+ # to ControlH, because that will make it easier to create key bindings that
+ # work everywhere, with the trade-off that it's no longer possible to
+ # handle backspace and control-h individually for the few terminals that
+ # support it. (Most terminals send ControlH when backspace is pressed.)
+ # See: http://www.ibb.net/~anne/keyboard.html
+ "\x7f": Keys.ControlH,
+ # --
+ # Various
+ "\x1b[1~": Keys.Home, # tmux
+ "\x1b[2~": Keys.Insert,
+ "\x1b[3~": Keys.Delete,
+ "\x1b[4~": Keys.End, # tmux
+ "\x1b[5~": Keys.PageUp,
+ "\x1b[6~": Keys.PageDown,
+ "\x1b[7~": Keys.Home, # xrvt
+ "\x1b[8~": Keys.End, # xrvt
+ "\x1b[Z": Keys.BackTab, # shift + tab
+ "\x1b\x09": Keys.BackTab, # Linux console
+ "\x1b[~": Keys.BackTab, # Windows console
+ # --
+ # Function keys.
+ "\x1bOP": Keys.F1,
+ "\x1bOQ": Keys.F2,
+ "\x1bOR": Keys.F3,
+ "\x1bOS": Keys.F4,
+ "\x1b[[A": Keys.F1, # Linux console.
+ "\x1b[[B": Keys.F2, # Linux console.
+ "\x1b[[C": Keys.F3, # Linux console.
+ "\x1b[[D": Keys.F4, # Linux console.
+ "\x1b[[E": Keys.F5, # Linux console.
+ "\x1b[11~": Keys.F1, # rxvt-unicode
+ "\x1b[12~": Keys.F2, # rxvt-unicode
+ "\x1b[13~": Keys.F3, # rxvt-unicode
+ "\x1b[14~": Keys.F4, # rxvt-unicode
+ "\x1b[15~": Keys.F5,
+ "\x1b[17~": Keys.F6,
+ "\x1b[18~": Keys.F7,
+ "\x1b[19~": Keys.F8,
+ "\x1b[20~": Keys.F9,
+ "\x1b[21~": Keys.F10,
+ "\x1b[23~": Keys.F11,
+ "\x1b[24~": Keys.F12,
+ "\x1b[25~": Keys.F13,
+ "\x1b[26~": Keys.F14,
+ "\x1b[28~": Keys.F15,
+ "\x1b[29~": Keys.F16,
+ "\x1b[31~": Keys.F17,
+ "\x1b[32~": Keys.F18,
+ "\x1b[33~": Keys.F19,
+ "\x1b[34~": Keys.F20,
+ # Xterm
+ "\x1b[1;2P": Keys.F13,
+ "\x1b[1;2Q": Keys.F14,
+ # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response.
+ "\x1b[1;2S": Keys.F16,
+ "\x1b[15;2~": Keys.F17,
+ "\x1b[17;2~": Keys.F18,
+ "\x1b[18;2~": Keys.F19,
+ "\x1b[19;2~": Keys.F20,
+ "\x1b[20;2~": Keys.F21,
+ "\x1b[21;2~": Keys.F22,
+ "\x1b[23;2~": Keys.F23,
+ "\x1b[24;2~": Keys.F24,
+ # --
# CSI 27 disambiguated modified "other" keys (xterm)
# Ref: https://invisible-island.net/xterm/modified-keys.html
# These are currently unsupported, so just re-map some common ones to the
@@ -129,215 +129,215 @@ ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = {
"\x1b[27;5;13~": Keys.ControlM, # Ctrl + Enter
"\x1b[27;6;13~": Keys.ControlM, # Ctrl + Shift + Enter
# --
- # Control + function keys.
- "\x1b[1;5P": Keys.ControlF1,
- "\x1b[1;5Q": Keys.ControlF2,
- # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response.
- "\x1b[1;5S": Keys.ControlF4,
- "\x1b[15;5~": Keys.ControlF5,
- "\x1b[17;5~": Keys.ControlF6,
- "\x1b[18;5~": Keys.ControlF7,
- "\x1b[19;5~": Keys.ControlF8,
- "\x1b[20;5~": Keys.ControlF9,
- "\x1b[21;5~": Keys.ControlF10,
- "\x1b[23;5~": Keys.ControlF11,
- "\x1b[24;5~": Keys.ControlF12,
- "\x1b[1;6P": Keys.ControlF13,
- "\x1b[1;6Q": Keys.ControlF14,
- # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response.
- "\x1b[1;6S": Keys.ControlF16,
- "\x1b[15;6~": Keys.ControlF17,
- "\x1b[17;6~": Keys.ControlF18,
- "\x1b[18;6~": Keys.ControlF19,
- "\x1b[19;6~": Keys.ControlF20,
- "\x1b[20;6~": Keys.ControlF21,
- "\x1b[21;6~": Keys.ControlF22,
- "\x1b[23;6~": Keys.ControlF23,
- "\x1b[24;6~": Keys.ControlF24,
- # --
- # Tmux (Win32 subsystem) sends the following scroll events.
- "\x1b[62~": Keys.ScrollUp,
- "\x1b[63~": Keys.ScrollDown,
- "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste.
- # --
- # Sequences generated by numpad 5. Not sure what it means. (It doesn't
- # appear in 'infocmp'. Just ignore.
- "\x1b[E": Keys.Ignore, # Xterm.
- "\x1b[G": Keys.Ignore, # Linux console.
- # --
- # Meta/control/escape + pageup/pagedown/insert/delete.
- "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal.
- "\x1b[5;2~": Keys.ShiftPageUp,
- "\x1b[6;2~": Keys.ShiftPageDown,
- "\x1b[2;3~": (Keys.Escape, Keys.Insert),
- "\x1b[3;3~": (Keys.Escape, Keys.Delete),
- "\x1b[5;3~": (Keys.Escape, Keys.PageUp),
- "\x1b[6;3~": (Keys.Escape, Keys.PageDown),
- "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert),
- "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete),
- "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp),
- "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown),
- "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal.
- "\x1b[5;5~": Keys.ControlPageUp,
- "\x1b[6;5~": Keys.ControlPageDown,
- "\x1b[3;6~": Keys.ControlShiftDelete,
- "\x1b[5;6~": Keys.ControlShiftPageUp,
- "\x1b[6;6~": Keys.ControlShiftPageDown,
- "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert),
- "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown),
- "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown),
- "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert),
- "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown),
- "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown),
- # --
- # Arrows.
- # (Normal cursor mode).
- "\x1b[A": Keys.Up,
- "\x1b[B": Keys.Down,
- "\x1b[C": Keys.Right,
- "\x1b[D": Keys.Left,
- "\x1b[H": Keys.Home,
- "\x1b[F": Keys.End,
- # Tmux sends following keystrokes when control+arrow is pressed, but for
- # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
- # it a normal arrow press, because that's more important.
- # (Application cursor mode).
- "\x1bOA": Keys.Up,
- "\x1bOB": Keys.Down,
- "\x1bOC": Keys.Right,
- "\x1bOD": Keys.Left,
- "\x1bOF": Keys.End,
- "\x1bOH": Keys.Home,
- # Shift + arrows.
- "\x1b[1;2A": Keys.ShiftUp,
- "\x1b[1;2B": Keys.ShiftDown,
- "\x1b[1;2C": Keys.ShiftRight,
- "\x1b[1;2D": Keys.ShiftLeft,
- "\x1b[1;2F": Keys.ShiftEnd,
- "\x1b[1;2H": Keys.ShiftHome,
- # Meta + arrow keys. Several terminals handle this differently.
- # The following sequences are for xterm and gnome-terminal.
- # (Iterm sends ESC followed by the normal arrow_up/down/left/right
- # sequences, and the OSX Terminal sends ESCb and ESCf for "alt
- # arrow_left" and "alt arrow_right." We don't handle these
- # explicitly, in here, because would could not distinguish between
- # pressing ESC (to go to Vi navigation mode), followed by just the
- # 'b' or 'f' key. These combinations are handled in
- # the input processor.)
- "\x1b[1;3A": (Keys.Escape, Keys.Up),
- "\x1b[1;3B": (Keys.Escape, Keys.Down),
- "\x1b[1;3C": (Keys.Escape, Keys.Right),
- "\x1b[1;3D": (Keys.Escape, Keys.Left),
- "\x1b[1;3F": (Keys.Escape, Keys.End),
- "\x1b[1;3H": (Keys.Escape, Keys.Home),
- # Alt+shift+number.
- "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown),
- "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp),
- "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight),
- "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft),
- "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd),
- "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome),
- # Control + arrows.
- "\x1b[1;5A": Keys.ControlUp, # Cursor Mode
- "\x1b[1;5B": Keys.ControlDown, # Cursor Mode
- "\x1b[1;5C": Keys.ControlRight, # Cursor Mode
- "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode
- "\x1b[1;5F": Keys.ControlEnd,
- "\x1b[1;5H": Keys.ControlHome,
- # Tmux sends following keystrokes when control+arrow is pressed, but for
- # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
- # it a normal arrow press, because that's more important.
- "\x1b[5A": Keys.ControlUp,
- "\x1b[5B": Keys.ControlDown,
- "\x1b[5C": Keys.ControlRight,
- "\x1b[5D": Keys.ControlLeft,
- "\x1bOc": Keys.ControlRight, # rxvt
- "\x1bOd": Keys.ControlLeft, # rxvt
- # Control + shift + arrows.
- "\x1b[1;6A": Keys.ControlShiftDown,
- "\x1b[1;6B": Keys.ControlShiftUp,
- "\x1b[1;6C": Keys.ControlShiftRight,
- "\x1b[1;6D": Keys.ControlShiftLeft,
- "\x1b[1;6F": Keys.ControlShiftEnd,
- "\x1b[1;6H": Keys.ControlShiftHome,
- # Control + Meta + arrows.
- "\x1b[1;7A": (Keys.Escape, Keys.ControlDown),
- "\x1b[1;7B": (Keys.Escape, Keys.ControlUp),
- "\x1b[1;7C": (Keys.Escape, Keys.ControlRight),
- "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft),
- "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd),
- "\x1b[1;7H": (Keys.Escape, Keys.ControlHome),
- # Meta + Shift + arrows.
- "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown),
- "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp),
- "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight),
- "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft),
- "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd),
- "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome),
- # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483).
- "\x1b[1;9A": (Keys.Escape, Keys.Up),
- "\x1b[1;9B": (Keys.Escape, Keys.Down),
- "\x1b[1;9C": (Keys.Escape, Keys.Right),
- "\x1b[1;9D": (Keys.Escape, Keys.Left),
- # --
- # Control/shift/meta + number in mintty.
- # (c-2 will actually send c-@ and c-6 will send c-^.)
- "\x1b[1;5p": Keys.Control0,
- "\x1b[1;5q": Keys.Control1,
- "\x1b[1;5r": Keys.Control2,
- "\x1b[1;5s": Keys.Control3,
- "\x1b[1;5t": Keys.Control4,
- "\x1b[1;5u": Keys.Control5,
- "\x1b[1;5v": Keys.Control6,
- "\x1b[1;5w": Keys.Control7,
- "\x1b[1;5x": Keys.Control8,
- "\x1b[1;5y": Keys.Control9,
- "\x1b[1;6p": Keys.ControlShift0,
- "\x1b[1;6q": Keys.ControlShift1,
- "\x1b[1;6r": Keys.ControlShift2,
- "\x1b[1;6s": Keys.ControlShift3,
- "\x1b[1;6t": Keys.ControlShift4,
- "\x1b[1;6u": Keys.ControlShift5,
- "\x1b[1;6v": Keys.ControlShift6,
- "\x1b[1;6w": Keys.ControlShift7,
- "\x1b[1;6x": Keys.ControlShift8,
- "\x1b[1;6y": Keys.ControlShift9,
- "\x1b[1;7p": (Keys.Escape, Keys.Control0),
- "\x1b[1;7q": (Keys.Escape, Keys.Control1),
- "\x1b[1;7r": (Keys.Escape, Keys.Control2),
- "\x1b[1;7s": (Keys.Escape, Keys.Control3),
- "\x1b[1;7t": (Keys.Escape, Keys.Control4),
- "\x1b[1;7u": (Keys.Escape, Keys.Control5),
- "\x1b[1;7v": (Keys.Escape, Keys.Control6),
- "\x1b[1;7w": (Keys.Escape, Keys.Control7),
- "\x1b[1;7x": (Keys.Escape, Keys.Control8),
- "\x1b[1;7y": (Keys.Escape, Keys.Control9),
- "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0),
- "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1),
- "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2),
- "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3),
- "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4),
- "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5),
- "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6),
- "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7),
- "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8),
- "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9),
-}
-
-
-def _get_reverse_ansi_sequences() -> Dict[Keys, str]:
- """
- Create a dictionary that maps prompt_toolkit keys back to the VT100 escape
- sequences.
- """
- result: Dict[Keys, str] = {}
-
- for sequence, key in ANSI_SEQUENCES.items():
- if not isinstance(key, tuple):
- if key not in result:
- result[key] = sequence
-
- return result
-
-
-REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences()
+ # Control + function keys.
+ "\x1b[1;5P": Keys.ControlF1,
+ "\x1b[1;5Q": Keys.ControlF2,
+ # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response.
+ "\x1b[1;5S": Keys.ControlF4,
+ "\x1b[15;5~": Keys.ControlF5,
+ "\x1b[17;5~": Keys.ControlF6,
+ "\x1b[18;5~": Keys.ControlF7,
+ "\x1b[19;5~": Keys.ControlF8,
+ "\x1b[20;5~": Keys.ControlF9,
+ "\x1b[21;5~": Keys.ControlF10,
+ "\x1b[23;5~": Keys.ControlF11,
+ "\x1b[24;5~": Keys.ControlF12,
+ "\x1b[1;6P": Keys.ControlF13,
+ "\x1b[1;6Q": Keys.ControlF14,
+ # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response.
+ "\x1b[1;6S": Keys.ControlF16,
+ "\x1b[15;6~": Keys.ControlF17,
+ "\x1b[17;6~": Keys.ControlF18,
+ "\x1b[18;6~": Keys.ControlF19,
+ "\x1b[19;6~": Keys.ControlF20,
+ "\x1b[20;6~": Keys.ControlF21,
+ "\x1b[21;6~": Keys.ControlF22,
+ "\x1b[23;6~": Keys.ControlF23,
+ "\x1b[24;6~": Keys.ControlF24,
+ # --
+ # Tmux (Win32 subsystem) sends the following scroll events.
+ "\x1b[62~": Keys.ScrollUp,
+ "\x1b[63~": Keys.ScrollDown,
+ "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste.
+ # --
+ # Sequences generated by numpad 5. Not sure what it means. (It doesn't
+ # appear in 'infocmp'. Just ignore.
+ "\x1b[E": Keys.Ignore, # Xterm.
+ "\x1b[G": Keys.Ignore, # Linux console.
+ # --
+ # Meta/control/escape + pageup/pagedown/insert/delete.
+ "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal.
+ "\x1b[5;2~": Keys.ShiftPageUp,
+ "\x1b[6;2~": Keys.ShiftPageDown,
+ "\x1b[2;3~": (Keys.Escape, Keys.Insert),
+ "\x1b[3;3~": (Keys.Escape, Keys.Delete),
+ "\x1b[5;3~": (Keys.Escape, Keys.PageUp),
+ "\x1b[6;3~": (Keys.Escape, Keys.PageDown),
+ "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert),
+ "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete),
+ "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp),
+ "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown),
+ "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal.
+ "\x1b[5;5~": Keys.ControlPageUp,
+ "\x1b[6;5~": Keys.ControlPageDown,
+ "\x1b[3;6~": Keys.ControlShiftDelete,
+ "\x1b[5;6~": Keys.ControlShiftPageUp,
+ "\x1b[6;6~": Keys.ControlShiftPageDown,
+ "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert),
+ "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown),
+ "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown),
+ "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert),
+ "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown),
+ "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown),
+ # --
+ # Arrows.
+ # (Normal cursor mode).
+ "\x1b[A": Keys.Up,
+ "\x1b[B": Keys.Down,
+ "\x1b[C": Keys.Right,
+ "\x1b[D": Keys.Left,
+ "\x1b[H": Keys.Home,
+ "\x1b[F": Keys.End,
+ # Tmux sends following keystrokes when control+arrow is pressed, but for
+ # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
+ # it a normal arrow press, because that's more important.
+ # (Application cursor mode).
+ "\x1bOA": Keys.Up,
+ "\x1bOB": Keys.Down,
+ "\x1bOC": Keys.Right,
+ "\x1bOD": Keys.Left,
+ "\x1bOF": Keys.End,
+ "\x1bOH": Keys.Home,
+ # Shift + arrows.
+ "\x1b[1;2A": Keys.ShiftUp,
+ "\x1b[1;2B": Keys.ShiftDown,
+ "\x1b[1;2C": Keys.ShiftRight,
+ "\x1b[1;2D": Keys.ShiftLeft,
+ "\x1b[1;2F": Keys.ShiftEnd,
+ "\x1b[1;2H": Keys.ShiftHome,
+ # Meta + arrow keys. Several terminals handle this differently.
+ # The following sequences are for xterm and gnome-terminal.
+ # (Iterm sends ESC followed by the normal arrow_up/down/left/right
+ # sequences, and the OSX Terminal sends ESCb and ESCf for "alt
+ # arrow_left" and "alt arrow_right." We don't handle these
+ # explicitly, in here, because would could not distinguish between
+ # pressing ESC (to go to Vi navigation mode), followed by just the
+ # 'b' or 'f' key. These combinations are handled in
+ # the input processor.)
+ "\x1b[1;3A": (Keys.Escape, Keys.Up),
+ "\x1b[1;3B": (Keys.Escape, Keys.Down),
+ "\x1b[1;3C": (Keys.Escape, Keys.Right),
+ "\x1b[1;3D": (Keys.Escape, Keys.Left),
+ "\x1b[1;3F": (Keys.Escape, Keys.End),
+ "\x1b[1;3H": (Keys.Escape, Keys.Home),
+ # Alt+shift+number.
+ "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown),
+ "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp),
+ "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight),
+ "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft),
+ "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd),
+ "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome),
+ # Control + arrows.
+ "\x1b[1;5A": Keys.ControlUp, # Cursor Mode
+ "\x1b[1;5B": Keys.ControlDown, # Cursor Mode
+ "\x1b[1;5C": Keys.ControlRight, # Cursor Mode
+ "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode
+ "\x1b[1;5F": Keys.ControlEnd,
+ "\x1b[1;5H": Keys.ControlHome,
+ # Tmux sends following keystrokes when control+arrow is pressed, but for
+ # Emacs ansi-term sends the same sequences for normal arrow keys. Consider
+ # it a normal arrow press, because that's more important.
+ "\x1b[5A": Keys.ControlUp,
+ "\x1b[5B": Keys.ControlDown,
+ "\x1b[5C": Keys.ControlRight,
+ "\x1b[5D": Keys.ControlLeft,
+ "\x1bOc": Keys.ControlRight, # rxvt
+ "\x1bOd": Keys.ControlLeft, # rxvt
+ # Control + shift + arrows.
+ "\x1b[1;6A": Keys.ControlShiftDown,
+ "\x1b[1;6B": Keys.ControlShiftUp,
+ "\x1b[1;6C": Keys.ControlShiftRight,
+ "\x1b[1;6D": Keys.ControlShiftLeft,
+ "\x1b[1;6F": Keys.ControlShiftEnd,
+ "\x1b[1;6H": Keys.ControlShiftHome,
+ # Control + Meta + arrows.
+ "\x1b[1;7A": (Keys.Escape, Keys.ControlDown),
+ "\x1b[1;7B": (Keys.Escape, Keys.ControlUp),
+ "\x1b[1;7C": (Keys.Escape, Keys.ControlRight),
+ "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft),
+ "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd),
+ "\x1b[1;7H": (Keys.Escape, Keys.ControlHome),
+ # Meta + Shift + arrows.
+ "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown),
+ "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp),
+ "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight),
+ "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft),
+ "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd),
+ "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome),
+ # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483).
+ "\x1b[1;9A": (Keys.Escape, Keys.Up),
+ "\x1b[1;9B": (Keys.Escape, Keys.Down),
+ "\x1b[1;9C": (Keys.Escape, Keys.Right),
+ "\x1b[1;9D": (Keys.Escape, Keys.Left),
+ # --
+ # Control/shift/meta + number in mintty.
+ # (c-2 will actually send c-@ and c-6 will send c-^.)
+ "\x1b[1;5p": Keys.Control0,
+ "\x1b[1;5q": Keys.Control1,
+ "\x1b[1;5r": Keys.Control2,
+ "\x1b[1;5s": Keys.Control3,
+ "\x1b[1;5t": Keys.Control4,
+ "\x1b[1;5u": Keys.Control5,
+ "\x1b[1;5v": Keys.Control6,
+ "\x1b[1;5w": Keys.Control7,
+ "\x1b[1;5x": Keys.Control8,
+ "\x1b[1;5y": Keys.Control9,
+ "\x1b[1;6p": Keys.ControlShift0,
+ "\x1b[1;6q": Keys.ControlShift1,
+ "\x1b[1;6r": Keys.ControlShift2,
+ "\x1b[1;6s": Keys.ControlShift3,
+ "\x1b[1;6t": Keys.ControlShift4,
+ "\x1b[1;6u": Keys.ControlShift5,
+ "\x1b[1;6v": Keys.ControlShift6,
+ "\x1b[1;6w": Keys.ControlShift7,
+ "\x1b[1;6x": Keys.ControlShift8,
+ "\x1b[1;6y": Keys.ControlShift9,
+ "\x1b[1;7p": (Keys.Escape, Keys.Control0),
+ "\x1b[1;7q": (Keys.Escape, Keys.Control1),
+ "\x1b[1;7r": (Keys.Escape, Keys.Control2),
+ "\x1b[1;7s": (Keys.Escape, Keys.Control3),
+ "\x1b[1;7t": (Keys.Escape, Keys.Control4),
+ "\x1b[1;7u": (Keys.Escape, Keys.Control5),
+ "\x1b[1;7v": (Keys.Escape, Keys.Control6),
+ "\x1b[1;7w": (Keys.Escape, Keys.Control7),
+ "\x1b[1;7x": (Keys.Escape, Keys.Control8),
+ "\x1b[1;7y": (Keys.Escape, Keys.Control9),
+ "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0),
+ "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1),
+ "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2),
+ "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3),
+ "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4),
+ "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5),
+ "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6),
+ "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7),
+ "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8),
+ "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9),
+}
+
+
+def _get_reverse_ansi_sequences() -> Dict[Keys, str]:
+ """
+ Create a dictionary that maps prompt_toolkit keys back to the VT100 escape
+ sequences.
+ """
+ result: Dict[Keys, str] = {}
+
+ for sequence, key in ANSI_SEQUENCES.items():
+ if not isinstance(key, tuple):
+ if key not in result:
+ result[key] = sequence
+
+ return result
+
+
+REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences()
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py
index e72f03a1e46..9885a37bc2c 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py
@@ -1,131 +1,131 @@
-"""
-Abstraction of CLI Input.
-"""
-from abc import ABCMeta, abstractmethod, abstractproperty
-from contextlib import contextmanager
-from typing import Callable, ContextManager, Generator, List
-
-from prompt_toolkit.key_binding import KeyPress
-
-__all__ = [
- "Input",
- "DummyInput",
-]
-
-
-class Input(metaclass=ABCMeta):
- """
- Abstraction for any input.
-
- An instance of this class can be given to the constructor of a
- :class:`~prompt_toolkit.application.Application` and will also be
- passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`.
- """
-
- @abstractmethod
- def fileno(self) -> int:
- """
- Fileno for putting this in an event loop.
- """
-
- @abstractmethod
- def typeahead_hash(self) -> str:
- """
- Identifier for storing type ahead key presses.
- """
-
- @abstractmethod
- def read_keys(self) -> List[KeyPress]:
- """
- Return a list of Key objects which are read/parsed from the input.
- """
-
- def flush_keys(self) -> List[KeyPress]:
- """
- Flush the underlying parser. and return the pending keys.
- (Used for vt100 input.)
- """
- return []
-
- def flush(self) -> None:
- "The event loop can call this when the input has to be flushed."
- pass
-
- @abstractproperty
- def closed(self) -> bool:
- "Should be true when the input stream is closed."
- return False
-
- @abstractmethod
- def raw_mode(self) -> ContextManager[None]:
- """
- Context manager that turns the input into raw mode.
- """
-
- @abstractmethod
- def cooked_mode(self) -> ContextManager[None]:
- """
- Context manager that turns the input into cooked mode.
- """
-
- @abstractmethod
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
- """
- Return a context manager that makes this input active in the current
- event loop.
- """
-
- @abstractmethod
- def detach(self) -> ContextManager[None]:
- """
- Return a context manager that makes sure that this input is not active
- in the current event loop.
- """
-
- def close(self) -> None:
- "Close input."
- pass
-
-
-class PipeInput(Input):
- """
- Abstraction for pipe input.
- """
-
- @abstractmethod
- def send_bytes(self, data: bytes) -> None:
- """Feed byte string into the pipe"""
-
- @abstractmethod
- def send_text(self, data: str) -> None:
- """Feed a text string into the pipe"""
-
-
-class DummyInput(Input):
- """
- Input for use in a `DummyApplication`
- """
-
- def fileno(self) -> int:
- raise NotImplementedError
-
- def typeahead_hash(self) -> str:
- return "dummy-%s" % id(self)
-
- def read_keys(self) -> List[KeyPress]:
- return []
-
- @property
- def closed(self) -> bool:
- return True
-
- def raw_mode(self) -> ContextManager[None]:
- return _dummy_context_manager()
-
- def cooked_mode(self) -> ContextManager[None]:
- return _dummy_context_manager()
-
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+"""
+Abstraction of CLI Input.
+"""
+from abc import ABCMeta, abstractmethod, abstractproperty
+from contextlib import contextmanager
+from typing import Callable, ContextManager, Generator, List
+
+from prompt_toolkit.key_binding import KeyPress
+
+__all__ = [
+ "Input",
+ "DummyInput",
+]
+
+
+class Input(metaclass=ABCMeta):
+ """
+ Abstraction for any input.
+
+ An instance of this class can be given to the constructor of a
+ :class:`~prompt_toolkit.application.Application` and will also be
+ passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`.
+ """
+
+ @abstractmethod
+ def fileno(self) -> int:
+ """
+ Fileno for putting this in an event loop.
+ """
+
+ @abstractmethod
+ def typeahead_hash(self) -> str:
+ """
+ Identifier for storing type ahead key presses.
+ """
+
+ @abstractmethod
+ def read_keys(self) -> List[KeyPress]:
+ """
+ Return a list of Key objects which are read/parsed from the input.
+ """
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush the underlying parser. and return the pending keys.
+ (Used for vt100 input.)
+ """
+ return []
+
+ def flush(self) -> None:
+ "The event loop can call this when the input has to be flushed."
+ pass
+
+ @abstractproperty
+ def closed(self) -> bool:
+ "Should be true when the input stream is closed."
+ return False
+
+ @abstractmethod
+ def raw_mode(self) -> ContextManager[None]:
+ """
+ Context manager that turns the input into raw mode.
+ """
+
+ @abstractmethod
+ def cooked_mode(self) -> ContextManager[None]:
+ """
+ Context manager that turns the input into cooked mode.
+ """
+
+ @abstractmethod
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+
+ @abstractmethod
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+
+ def close(self) -> None:
+ "Close input."
+ pass
+
+
+class PipeInput(Input):
+ """
+ Abstraction for pipe input.
+ """
+
+ @abstractmethod
+ def send_bytes(self, data: bytes) -> None:
+ """Feed byte string into the pipe"""
+
+ @abstractmethod
+ def send_text(self, data: str) -> None:
+ """Feed a text string into the pipe"""
+
+
+class DummyInput(Input):
+ """
+ Input for use in a `DummyApplication`
+ """
+
+ def fileno(self) -> int:
+ raise NotImplementedError
+
+ def typeahead_hash(self) -> str:
+ return "dummy-%s" % id(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ return []
+
+ @property
+ def closed(self) -> bool:
+ return True
+
+ def raw_mode(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
# Call the callback immediately once after attaching.
# This tells the callback to call `read_keys` and check the
# `input.closed` flag, after which it won't receive any keys, but knows
@@ -133,12 +133,12 @@ class DummyInput(Input):
# `application.py`.
input_ready_callback()
- return _dummy_context_manager()
-
- def detach(self) -> ContextManager[None]:
- return _dummy_context_manager()
-
-
-@contextmanager
-def _dummy_context_manager() -> Generator[None, None, None]:
- yield
+ return _dummy_context_manager()
+
+ def detach(self) -> ContextManager[None]:
+ return _dummy_context_manager()
+
+
+@contextmanager
+def _dummy_context_manager() -> Generator[None, None, None]:
+ yield
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py
index f771d4721ca..347f8c6ad34 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py
@@ -1,63 +1,63 @@
-import sys
-from typing import Optional, TextIO
-
-from prompt_toolkit.utils import is_windows
-
+import sys
+from typing import Optional, TextIO
+
+from prompt_toolkit.utils import is_windows
+
from .base import DummyInput, Input, PipeInput
-
-__all__ = [
- "create_input",
- "create_pipe_input",
-]
-
-
-def create_input(
- stdin: Optional[TextIO] = None, always_prefer_tty: bool = False
-) -> Input:
- """
- Create the appropriate `Input` object for the current os/environment.
-
- :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix
- `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a
- pseudo terminal. If so, open the tty for reading instead of reading for
- `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how
- a `$PAGER` works.)
- """
- if is_windows():
- from .win32 import Win32Input
-
+
+__all__ = [
+ "create_input",
+ "create_pipe_input",
+]
+
+
+def create_input(
+ stdin: Optional[TextIO] = None, always_prefer_tty: bool = False
+) -> Input:
+ """
+ Create the appropriate `Input` object for the current os/environment.
+
+ :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix
+ `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a
+ pseudo terminal. If so, open the tty for reading instead of reading for
+ `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how
+ a `$PAGER` works.)
+ """
+ if is_windows():
+ from .win32 import Win32Input
+
# If `stdin` was assigned `None` (which happens with pythonw.exe), use
# a `DummyInput`. This triggers `EOFError` in the application code.
if stdin is None and sys.stdin is None:
return DummyInput()
- return Win32Input(stdin or sys.stdin)
- else:
- from .vt100 import Vt100Input
-
- # If no input TextIO is given, use stdin/stdout.
- if stdin is None:
- stdin = sys.stdin
-
- if always_prefer_tty:
- for io in [sys.stdin, sys.stdout, sys.stderr]:
- if io.isatty():
- stdin = io
- break
-
- return Vt100Input(stdin)
-
-
-def create_pipe_input() -> PipeInput:
- """
- Create an input pipe.
- This is mostly useful for unit testing.
- """
- if is_windows():
- from .win32_pipe import Win32PipeInput
-
- return Win32PipeInput()
- else:
- from .posix_pipe import PosixPipeInput
-
- return PosixPipeInput()
+ return Win32Input(stdin or sys.stdin)
+ else:
+ from .vt100 import Vt100Input
+
+ # If no input TextIO is given, use stdin/stdout.
+ if stdin is None:
+ stdin = sys.stdin
+
+ if always_prefer_tty:
+ for io in [sys.stdin, sys.stdout, sys.stderr]:
+ if io.isatty():
+ stdin = io
+ break
+
+ return Vt100Input(stdin)
+
+
+def create_pipe_input() -> PipeInput:
+ """
+ Create an input pipe.
+ This is mostly useful for unit testing.
+ """
+ if is_windows():
+ from .win32_pipe import Win32PipeInput
+
+ return Win32PipeInput()
+ else:
+ from .posix_pipe import PosixPipeInput
+
+ return PosixPipeInput()
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py
index 61a798e869b..de47c649330 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py
@@ -1,72 +1,72 @@
-import os
-from typing import ContextManager, TextIO, cast
-
-from ..utils import DummyContext
-from .base import PipeInput
-from .vt100 import Vt100Input
-
-__all__ = [
- "PosixPipeInput",
-]
-
-
-class PosixPipeInput(Vt100Input, PipeInput):
- """
- Input that is send through a pipe.
- This is useful if we want to send the input programmatically into the
- application. Mostly useful for unit testing.
-
- Usage::
-
- input = PosixPipeInput()
- input.send_text('inputdata')
- """
-
- _id = 0
-
- def __init__(self, text: str = "") -> None:
- self._r, self._w = os.pipe()
-
- class Stdin:
- encoding = "utf-8"
-
- def isatty(stdin) -> bool:
- return True
-
- def fileno(stdin) -> int:
- return self._r
-
- super().__init__(cast(TextIO, Stdin()))
- self.send_text(text)
-
- # Identifier for every PipeInput for the hash.
- self.__class__._id += 1
- self._id = self.__class__._id
-
- def send_bytes(self, data: bytes) -> None:
- os.write(self._w, data)
-
- def send_text(self, data: str) -> None:
- "Send text to the input."
- os.write(self._w, data.encode("utf-8"))
-
- def raw_mode(self) -> ContextManager[None]:
- return DummyContext()
-
- def cooked_mode(self) -> ContextManager[None]:
- return DummyContext()
-
- def close(self) -> None:
- "Close pipe fds."
- os.close(self._r)
- os.close(self._w)
-
- # We should assign `None` to 'self._r` and 'self._w',
- # The event loop still needs to know the the fileno for this input in order
- # to properly remove it from the selectors.
-
- def typeahead_hash(self) -> str:
- """
- This needs to be unique for every `PipeInput`.
- """
- return "pipe-input-%s" % (self._id,)
+import os
+from typing import ContextManager, TextIO, cast
+
+from ..utils import DummyContext
+from .base import PipeInput
+from .vt100 import Vt100Input
+
+__all__ = [
+ "PosixPipeInput",
+]
+
+
+class PosixPipeInput(Vt100Input, PipeInput):
+ """
+ Input that is send through a pipe.
+ This is useful if we want to send the input programmatically into the
+ application. Mostly useful for unit testing.
+
+ Usage::
+
+ input = PosixPipeInput()
+ input.send_text('inputdata')
+ """
+
+ _id = 0
+
+ def __init__(self, text: str = "") -> None:
+ self._r, self._w = os.pipe()
+
+ class Stdin:
+ encoding = "utf-8"
+
+ def isatty(stdin) -> bool:
+ return True
+
+ def fileno(stdin) -> int:
+ return self._r
+
+ super().__init__(cast(TextIO, Stdin()))
+ self.send_text(text)
+
+ # Identifier for every PipeInput for the hash.
+ self.__class__._id += 1
+ self._id = self.__class__._id
+
+ def send_bytes(self, data: bytes) -> None:
+ os.write(self._w, data)
+
+ def send_text(self, data: str) -> None:
+ "Send text to the input."
+ os.write(self._w, data.encode("utf-8"))
+
+ def raw_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def close(self) -> None:
+ "Close pipe fds."
+ os.close(self._r)
+ os.close(self._w)
+
+ # We should assign `None` to 'self._r` and 'self._w',
+ # The event loop still needs to know the the fileno for this input in order
+ # to properly remove it from the selectors.
+
+ def typeahead_hash(self) -> str:
+ """
+ This needs to be unique for every `PipeInput`.
+ """
+ return "pipe-input-%s" % (self._id,)
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py
index c0893239a3e..f32f683f735 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py
@@ -1,95 +1,95 @@
-import os
-import select
-from codecs import getincrementaldecoder
-
-__all__ = [
- "PosixStdinReader",
-]
-
-
-class PosixStdinReader:
- """
- Wrapper around stdin which reads (nonblocking) the next available 1024
- bytes and decodes it.
-
- Note that you can't be sure that the input file is closed if the ``read``
- function returns an empty string. When ``errors=ignore`` is passed,
- ``read`` can return an empty string if all malformed input was replaced by
- an empty string. (We can't block here and wait for more input.) So, because
- of that, check the ``closed`` attribute, to be sure that the file has been
- closed.
-
- :param stdin_fd: File descriptor from which we read.
- :param errors: Can be 'ignore', 'strict' or 'replace'.
- On Python3, this can be 'surrogateescape', which is the default.
-
- 'surrogateescape' is preferred, because this allows us to transfer
- unrecognised bytes to the key bindings. Some terminals, like lxterminal
- and Guake, use the 'Mxx' notation to send mouse events, where each 'x'
- can be any possible byte.
- """
-
- # By default, we want to 'ignore' errors here. The input stream can be full
- # of junk. One occurrence of this that I had was when using iTerm2 on OS X,
- # with "Option as Meta" checked (You should choose "Option as +Esc".)
-
- def __init__(
- self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8"
- ) -> None:
- self.stdin_fd = stdin_fd
- self.errors = errors
-
- # Create incremental decoder for decoding stdin.
- # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because
- # it could be that we are in the middle of a utf-8 byte sequence.
- self._stdin_decoder_cls = getincrementaldecoder(encoding)
- self._stdin_decoder = self._stdin_decoder_cls(errors=errors)
-
- #: True when there is nothing anymore to read.
- self.closed = False
-
- def read(self, count: int = 1024) -> str:
- # By default we choose a rather small chunk size, because reading
- # big amounts of input at once, causes the event loop to process
- # all these key bindings also at once without going back to the
- # loop. This will make the application feel unresponsive.
- """
- Read the input and return it as a string.
-
- Return the text. Note that this can return an empty string, even when
- the input stream was not yet closed. This means that something went
- wrong during the decoding.
- """
- if self.closed:
- return ""
-
- # Check whether there is some input to read. `os.read` would block
- # otherwise.
- # (Actually, the event loop is responsible to make sure that this
- # function is only called when there is something to read, but for some
- # reason this happens in certain situations.)
- try:
- if not select.select([self.stdin_fd], [], [], 0)[0]:
- return ""
- except IOError:
- # Happens for instance when the file descriptor was closed.
- # (We had this in ptterm, where the FD became ready, a callback was
- # scheduled, but in the meantime another callback closed it already.)
- self.closed = True
-
- # Note: the following works better than wrapping `self.stdin` like
- # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`.
- # Somehow that causes some latency when the escape
- # character is pressed. (Especially on combination with the `select`.)
- try:
- data = os.read(self.stdin_fd, count)
-
- # Nothing more to read, stream is closed.
- if data == b"":
- self.closed = True
- return ""
- except OSError:
- # In case of SIGWINCH
- data = b""
-
- return self._stdin_decoder.decode(data)
+import os
+import select
+from codecs import getincrementaldecoder
+
+__all__ = [
+ "PosixStdinReader",
+]
+
+
+class PosixStdinReader:
+ """
+ Wrapper around stdin which reads (nonblocking) the next available 1024
+ bytes and decodes it.
+
+ Note that you can't be sure that the input file is closed if the ``read``
+ function returns an empty string. When ``errors=ignore`` is passed,
+ ``read`` can return an empty string if all malformed input was replaced by
+ an empty string. (We can't block here and wait for more input.) So, because
+ of that, check the ``closed`` attribute, to be sure that the file has been
+ closed.
+
+ :param stdin_fd: File descriptor from which we read.
+ :param errors: Can be 'ignore', 'strict' or 'replace'.
+ On Python3, this can be 'surrogateescape', which is the default.
+
+ 'surrogateescape' is preferred, because this allows us to transfer
+ unrecognised bytes to the key bindings. Some terminals, like lxterminal
+ and Guake, use the 'Mxx' notation to send mouse events, where each 'x'
+ can be any possible byte.
+ """
+
+ # By default, we want to 'ignore' errors here. The input stream can be full
+ # of junk. One occurrence of this that I had was when using iTerm2 on OS X,
+ # with "Option as Meta" checked (You should choose "Option as +Esc".)
+
+ def __init__(
+ self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8"
+ ) -> None:
+ self.stdin_fd = stdin_fd
+ self.errors = errors
+
+ # Create incremental decoder for decoding stdin.
+ # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because
+ # it could be that we are in the middle of a utf-8 byte sequence.
+ self._stdin_decoder_cls = getincrementaldecoder(encoding)
+ self._stdin_decoder = self._stdin_decoder_cls(errors=errors)
+
+ #: True when there is nothing anymore to read.
+ self.closed = False
+
+ def read(self, count: int = 1024) -> str:
+ # By default we choose a rather small chunk size, because reading
+ # big amounts of input at once, causes the event loop to process
+ # all these key bindings also at once without going back to the
+ # loop. This will make the application feel unresponsive.
+ """
+ Read the input and return it as a string.
+
+ Return the text. Note that this can return an empty string, even when
+ the input stream was not yet closed. This means that something went
+ wrong during the decoding.
+ """
+ if self.closed:
+ return ""
+
+ # Check whether there is some input to read. `os.read` would block
+ # otherwise.
+ # (Actually, the event loop is responsible to make sure that this
+ # function is only called when there is something to read, but for some
+ # reason this happens in certain situations.)
+ try:
+ if not select.select([self.stdin_fd], [], [], 0)[0]:
+ return ""
+ except IOError:
+ # Happens for instance when the file descriptor was closed.
+ # (We had this in ptterm, where the FD became ready, a callback was
+ # scheduled, but in the meantime another callback closed it already.)
+ self.closed = True
+
+ # Note: the following works better than wrapping `self.stdin` like
+ # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`.
+ # Somehow that causes some latency when the escape
+ # character is pressed. (Especially on combination with the `select`.)
+ try:
+ data = os.read(self.stdin_fd, count)
+
+ # Nothing more to read, stream is closed.
+ if data == b"":
+ self.closed = True
+ return ""
+ except OSError:
+ # In case of SIGWINCH
+ data = b""
+
+ return self._stdin_decoder.decode(data)
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py
index df53afc3741..a3d78664493 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py
@@ -1,76 +1,76 @@
-r"""
-Store input key strokes if we did read more than was required.
-
-The input classes `Vt100Input` and `Win32Input` read the input text in chunks
-of a few kilobytes. This means that if we read input from stdin, it could be
-that we read a couple of lines (with newlines in between) at once.
-
-This creates a problem: potentially, we read too much from stdin. Sometimes
-people paste several lines at once because they paste input in a REPL and
-expect each input() call to process one line. Or they rely on type ahead
-because the application can't keep up with the processing.
-
-However, we need to read input in bigger chunks. We need this mostly to support
-pasting of larger chunks of text. We don't want everything to become
-unresponsive because we:
- - read one character;
- - parse one character;
- - call the key binding, which does a string operation with one character;
- - and render the user interface.
-Doing text operations on single characters is very inefficient in Python, so we
-prefer to work on bigger chunks of text. This is why we have to read the input
-in bigger chunks.
-
-Further, line buffering is also not an option, because it doesn't work well in
-the architecture. We use lower level Posix APIs, that work better with the
-event loop and so on. In fact, there is also nothing that defines that only \n
-can accept the input, you could create a key binding for any key to accept the
-input.
-
-To support type ahead, this module will store all the key strokes that were
-read too early, so that they can be feed into to the next `prompt()` call or to
-the next prompt_toolkit `Application`.
-"""
-from collections import defaultdict
-from typing import Dict, List
-
-from ..key_binding import KeyPress
-from .base import Input
-
-__all__ = [
- "store_typeahead",
- "get_typeahead",
- "clear_typeahead",
-]
-
-_buffer: Dict[str, List[KeyPress]] = defaultdict(list)
-
-
-def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None:
- """
- Insert typeahead key presses for the given input.
- """
- global _buffer
- key = input_obj.typeahead_hash()
- _buffer[key].extend(key_presses)
-
-
-def get_typeahead(input_obj: Input) -> List[KeyPress]:
- """
- Retrieve typeahead and reset the buffer for this input.
- """
- global _buffer
-
- key = input_obj.typeahead_hash()
- result = _buffer[key]
- _buffer[key] = []
- return result
-
-
-def clear_typeahead(input_obj: Input) -> None:
- """
- Clear typeahead buffer.
- """
- global _buffer
- key = input_obj.typeahead_hash()
- _buffer[key] = []
+r"""
+Store input key strokes if we did read more than was required.
+
+The input classes `Vt100Input` and `Win32Input` read the input text in chunks
+of a few kilobytes. This means that if we read input from stdin, it could be
+that we read a couple of lines (with newlines in between) at once.
+
+This creates a problem: potentially, we read too much from stdin. Sometimes
+people paste several lines at once because they paste input in a REPL and
+expect each input() call to process one line. Or they rely on type ahead
+because the application can't keep up with the processing.
+
+However, we need to read input in bigger chunks. We need this mostly to support
+pasting of larger chunks of text. We don't want everything to become
+unresponsive because we:
+ - read one character;
+ - parse one character;
+ - call the key binding, which does a string operation with one character;
+ - and render the user interface.
+Doing text operations on single characters is very inefficient in Python, so we
+prefer to work on bigger chunks of text. This is why we have to read the input
+in bigger chunks.
+
+Further, line buffering is also not an option, because it doesn't work well in
+the architecture. We use lower level Posix APIs, that work better with the
+event loop and so on. In fact, there is also nothing that defines that only \n
+can accept the input, you could create a key binding for any key to accept the
+input.
+
+To support type ahead, this module will store all the key strokes that were
+read too early, so that they can be feed into to the next `prompt()` call or to
+the next prompt_toolkit `Application`.
+"""
+from collections import defaultdict
+from typing import Dict, List
+
+from ..key_binding import KeyPress
+from .base import Input
+
+__all__ = [
+ "store_typeahead",
+ "get_typeahead",
+ "clear_typeahead",
+]
+
+_buffer: Dict[str, List[KeyPress]] = defaultdict(list)
+
+
+def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None:
+ """
+ Insert typeahead key presses for the given input.
+ """
+ global _buffer
+ key = input_obj.typeahead_hash()
+ _buffer[key].extend(key_presses)
+
+
+def get_typeahead(input_obj: Input) -> List[KeyPress]:
+ """
+ Retrieve typeahead and reset the buffer for this input.
+ """
+ global _buffer
+
+ key = input_obj.typeahead_hash()
+ result = _buffer[key]
+ _buffer[key] = []
+ return result
+
+
+def clear_typeahead(input_obj: Input) -> None:
+ """
+ Clear typeahead buffer.
+ """
+ global _buffer
+ key = input_obj.typeahead_hash()
+ _buffer[key] = []
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py
index 1f32a7d58e4..455cf8efd1f 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py
@@ -1,174 +1,174 @@
-import contextlib
-import io
-import sys
-import termios
-import tty
-from asyncio import AbstractEventLoop
-from typing import (
- Callable,
- ContextManager,
- Dict,
- Generator,
- List,
- Optional,
- Set,
- TextIO,
- Tuple,
- Union,
-)
-
-from prompt_toolkit.eventloop import get_event_loop
-
-from ..key_binding import KeyPress
-from .base import Input
-from .posix_utils import PosixStdinReader
-from .vt100_parser import Vt100Parser
-
-__all__ = [
- "Vt100Input",
- "raw_mode",
- "cooked_mode",
-]
-
-
-class Vt100Input(Input):
- """
- Vt100 input for Posix systems.
- (This uses a posix file descriptor that can be registered in the event loop.)
- """
-
- # For the error messages. Only display "Input is not a terminal" once per
- # file descriptor.
- _fds_not_a_terminal: Set[int] = set()
-
- def __init__(self, stdin: TextIO) -> None:
- # Test whether the given input object has a file descriptor.
- # (Idle reports stdin to be a TTY, but fileno() is not implemented.)
- try:
- # This should not raise, but can return 0.
- stdin.fileno()
- except io.UnsupportedOperation as e:
- if "idlelib.run" in sys.modules:
- raise io.UnsupportedOperation(
- "Stdin is not a terminal. Running from Idle is not supported."
- ) from e
- else:
- raise io.UnsupportedOperation("Stdin is not a terminal.") from e
-
- # Even when we have a file descriptor, it doesn't mean it's a TTY.
- # Normally, this requires a real TTY device, but people instantiate
- # this class often during unit tests as well. They use for instance
- # pexpect to pipe data into an application. For convenience, we print
- # an error message and go on.
- isatty = stdin.isatty()
- fd = stdin.fileno()
-
- if not isatty and fd not in Vt100Input._fds_not_a_terminal:
- msg = "Warning: Input is not a terminal (fd=%r).\n"
- sys.stderr.write(msg % fd)
- sys.stderr.flush()
- Vt100Input._fds_not_a_terminal.add(fd)
-
- #
- self.stdin = stdin
-
- # Create a backup of the fileno(). We want this to work even if the
- # underlying file is closed, so that `typeahead_hash()` keeps working.
- self._fileno = stdin.fileno()
-
- self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
- self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding)
- self.vt100_parser = Vt100Parser(
- lambda key_press: self._buffer.append(key_press)
- )
-
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
- """
- Return a context manager that makes this input active in the current
- event loop.
- """
- return _attached_input(self, input_ready_callback)
-
- def detach(self) -> ContextManager[None]:
- """
- Return a context manager that makes sure that this input is not active
- in the current event loop.
- """
- return _detached_input(self)
-
- def read_keys(self) -> List[KeyPress]:
- "Read list of KeyPress."
- # Read text from stdin.
- data = self.stdin_reader.read()
-
- # Pass it through our vt100 parser.
- self.vt100_parser.feed(data)
-
- # Return result.
- result = self._buffer
- self._buffer = []
- return result
-
- def flush_keys(self) -> List[KeyPress]:
- """
- Flush pending keys and return them.
- (Used for flushing the 'escape' key.)
- """
- # Flush all pending keys. (This is most important to flush the vt100
- # 'Escape' key early when nothing else follows.)
- self.vt100_parser.flush()
-
- # Return result.
- result = self._buffer
- self._buffer = []
- return result
-
- @property
- def closed(self) -> bool:
- return self.stdin_reader.closed
-
- def raw_mode(self) -> ContextManager[None]:
- return raw_mode(self.stdin.fileno())
-
- def cooked_mode(self) -> ContextManager[None]:
- return cooked_mode(self.stdin.fileno())
-
- def fileno(self) -> int:
- return self.stdin.fileno()
-
- def typeahead_hash(self) -> str:
- return "fd-%s" % (self._fileno,)
-
-
-_current_callbacks: Dict[
- Tuple[AbstractEventLoop, int], Optional[Callable[[], None]]
-] = {} # (loop, fd) -> current callback
-
-
-@contextlib.contextmanager
-def _attached_input(
- input: Vt100Input, callback: Callable[[], None]
-) -> Generator[None, None, None]:
- """
- Context manager that makes this input active in the current event loop.
-
- :param input: :class:`~prompt_toolkit.input.Input` object.
- :param callback: Called when the input is ready to read.
- """
- loop = get_event_loop()
- fd = input.fileno()
- previous = _current_callbacks.get((loop, fd))
-
- def callback_wrapper() -> None:
- """Wrapper around the callback that already removes the reader when
- the input is closed. Otherwise, we keep continuously calling this
- callback, until we leave the context manager (which can happen a bit
- later). This fixes issues when piping /dev/null into a prompt_toolkit
- application."""
- if input.closed:
- loop.remove_reader(fd)
- callback()
-
+import contextlib
+import io
+import sys
+import termios
+import tty
+from asyncio import AbstractEventLoop
+from typing import (
+ Callable,
+ ContextManager,
+ Dict,
+ Generator,
+ List,
+ Optional,
+ Set,
+ TextIO,
+ Tuple,
+ Union,
+)
+
+from prompt_toolkit.eventloop import get_event_loop
+
+from ..key_binding import KeyPress
+from .base import Input
+from .posix_utils import PosixStdinReader
+from .vt100_parser import Vt100Parser
+
+__all__ = [
+ "Vt100Input",
+ "raw_mode",
+ "cooked_mode",
+]
+
+
+class Vt100Input(Input):
+ """
+ Vt100 input for Posix systems.
+ (This uses a posix file descriptor that can be registered in the event loop.)
+ """
+
+ # For the error messages. Only display "Input is not a terminal" once per
+ # file descriptor.
+ _fds_not_a_terminal: Set[int] = set()
+
+ def __init__(self, stdin: TextIO) -> None:
+ # Test whether the given input object has a file descriptor.
+ # (Idle reports stdin to be a TTY, but fileno() is not implemented.)
+ try:
+ # This should not raise, but can return 0.
+ stdin.fileno()
+ except io.UnsupportedOperation as e:
+ if "idlelib.run" in sys.modules:
+ raise io.UnsupportedOperation(
+ "Stdin is not a terminal. Running from Idle is not supported."
+ ) from e
+ else:
+ raise io.UnsupportedOperation("Stdin is not a terminal.") from e
+
+ # Even when we have a file descriptor, it doesn't mean it's a TTY.
+ # Normally, this requires a real TTY device, but people instantiate
+ # this class often during unit tests as well. They use for instance
+ # pexpect to pipe data into an application. For convenience, we print
+ # an error message and go on.
+ isatty = stdin.isatty()
+ fd = stdin.fileno()
+
+ if not isatty and fd not in Vt100Input._fds_not_a_terminal:
+ msg = "Warning: Input is not a terminal (fd=%r).\n"
+ sys.stderr.write(msg % fd)
+ sys.stderr.flush()
+ Vt100Input._fds_not_a_terminal.add(fd)
+
+ #
+ self.stdin = stdin
+
+ # Create a backup of the fileno(). We want this to work even if the
+ # underlying file is closed, so that `typeahead_hash()` keeps working.
+ self._fileno = stdin.fileno()
+
+ self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
+ self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding)
+ self.vt100_parser = Vt100Parser(
+ lambda key_press: self._buffer.append(key_press)
+ )
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return _attached_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return _detached_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ "Read list of KeyPress."
+ # Read text from stdin.
+ data = self.stdin_reader.read()
+
+ # Pass it through our vt100 parser.
+ self.vt100_parser.feed(data)
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush pending keys and return them.
+ (Used for flushing the 'escape' key.)
+ """
+ # Flush all pending keys. (This is most important to flush the vt100
+ # 'Escape' key early when nothing else follows.)
+ self.vt100_parser.flush()
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ @property
+ def closed(self) -> bool:
+ return self.stdin_reader.closed
+
+ def raw_mode(self) -> ContextManager[None]:
+ return raw_mode(self.stdin.fileno())
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return cooked_mode(self.stdin.fileno())
+
+ def fileno(self) -> int:
+ return self.stdin.fileno()
+
+ def typeahead_hash(self) -> str:
+ return "fd-%s" % (self._fileno,)
+
+
+_current_callbacks: Dict[
+ Tuple[AbstractEventLoop, int], Optional[Callable[[], None]]
+] = {} # (loop, fd) -> current callback
+
+
+@contextlib.contextmanager
+def _attached_input(
+ input: Vt100Input, callback: Callable[[], None]
+) -> Generator[None, None, None]:
+ """
+ Context manager that makes this input active in the current event loop.
+
+ :param input: :class:`~prompt_toolkit.input.Input` object.
+ :param callback: Called when the input is ready to read.
+ """
+ loop = get_event_loop()
+ fd = input.fileno()
+ previous = _current_callbacks.get((loop, fd))
+
+ def callback_wrapper() -> None:
+ """Wrapper around the callback that already removes the reader when
+ the input is closed. Otherwise, we keep continuously calling this
+ callback, until we leave the context manager (which can happen a bit
+ later). This fixes issues when piping /dev/null into a prompt_toolkit
+ application."""
+ if input.closed:
+ loop.remove_reader(fd)
+ callback()
+
try:
loop.add_reader(fd, callback_wrapper)
except PermissionError:
@@ -181,137 +181,137 @@ def _attached_input(
# To reproduce, do: `ptpython 0< /dev/null 1< /dev/null`
raise EOFError
- _current_callbacks[loop, fd] = callback
-
- try:
- yield
- finally:
- loop.remove_reader(fd)
-
- if previous:
- loop.add_reader(fd, previous)
- _current_callbacks[loop, fd] = previous
- else:
- del _current_callbacks[loop, fd]
-
-
-@contextlib.contextmanager
-def _detached_input(input: Vt100Input) -> Generator[None, None, None]:
- loop = get_event_loop()
- fd = input.fileno()
- previous = _current_callbacks.get((loop, fd))
-
- if previous:
- loop.remove_reader(fd)
- _current_callbacks[loop, fd] = None
-
- try:
- yield
- finally:
- if previous:
- loop.add_reader(fd, previous)
- _current_callbacks[loop, fd] = previous
-
-
-class raw_mode:
- """
- ::
-
- with raw_mode(stdin):
- ''' the pseudo-terminal stdin is now used in raw mode '''
-
- We ignore errors when executing `tcgetattr` fails.
- """
-
- # There are several reasons for ignoring errors:
- # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would
- # execute this code (In a Python REPL, for instance):
- #
- # import os; f = open(os.devnull); os.dup2(f.fileno(), 0)
- #
- # The result is that the eventloop will stop correctly, because it has
- # to logic to quit when stdin is closed. However, we should not fail at
- # this point. See:
- # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393
- # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392
-
- # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated.
- # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165
- def __init__(self, fileno: int) -> None:
- self.fileno = fileno
- self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]]
- try:
- self.attrs_before = termios.tcgetattr(fileno)
- except termios.error:
- # Ignore attribute errors.
- self.attrs_before = None
-
- def __enter__(self) -> None:
- # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this:
- try:
- newattr = termios.tcgetattr(self.fileno)
- except termios.error:
- pass
- else:
- newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
- newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
-
- # VMIN defines the number of characters read at a time in
- # non-canonical mode. It seems to default to 1 on Linux, but on
- # Solaris and derived operating systems it defaults to 4. (This is
- # because the VMIN slot is the same as the VEOF slot, which
- # defaults to ASCII EOT = Ctrl-D = 4.)
- newattr[tty.CC][termios.VMIN] = 1
-
- termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
-
- @classmethod
- def _patch_lflag(cls, attrs: int) -> int:
- return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
-
- @classmethod
- def _patch_iflag(cls, attrs: int) -> int:
- return attrs & ~(
- # Disable XON/XOFF flow control on output and input.
- # (Don't capture Ctrl-S and Ctrl-Q.)
- # Like executing: "stty -ixon."
- termios.IXON
- | termios.IXOFF
- |
- # Don't translate carriage return into newline on input.
- termios.ICRNL
- | termios.INLCR
- | termios.IGNCR
- )
-
- def __exit__(self, *a: object) -> None:
- if self.attrs_before is not None:
- try:
- termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
- except termios.error:
- pass
-
- # # Put the terminal in application mode.
- # self._stdout.write('\x1b[?1h')
-
-
-class cooked_mode(raw_mode):
- """
- The opposite of ``raw_mode``, used when we need cooked mode inside a
- `raw_mode` block. Used in `Application.run_in_terminal`.::
-
- with cooked_mode(stdin):
- ''' the pseudo-terminal stdin is now used in cooked mode. '''
- """
-
- @classmethod
- def _patch_lflag(cls, attrs: int) -> int:
- return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
-
- @classmethod
- def _patch_iflag(cls, attrs: int) -> int:
- # Turn the ICRNL flag back on. (Without this, calling `input()` in
- # run_in_terminal doesn't work and displays ^M instead. Ptpython
- # evaluates commands using `run_in_terminal`, so it's important that
- # they translate ^M back into ^J.)
- return attrs | termios.ICRNL
+ _current_callbacks[loop, fd] = callback
+
+ try:
+ yield
+ finally:
+ loop.remove_reader(fd)
+
+ if previous:
+ loop.add_reader(fd, previous)
+ _current_callbacks[loop, fd] = previous
+ else:
+ del _current_callbacks[loop, fd]
+
+
+@contextlib.contextmanager
+def _detached_input(input: Vt100Input) -> Generator[None, None, None]:
+ loop = get_event_loop()
+ fd = input.fileno()
+ previous = _current_callbacks.get((loop, fd))
+
+ if previous:
+ loop.remove_reader(fd)
+ _current_callbacks[loop, fd] = None
+
+ try:
+ yield
+ finally:
+ if previous:
+ loop.add_reader(fd, previous)
+ _current_callbacks[loop, fd] = previous
+
+
+class raw_mode:
+ """
+ ::
+
+ with raw_mode(stdin):
+ ''' the pseudo-terminal stdin is now used in raw mode '''
+
+ We ignore errors when executing `tcgetattr` fails.
+ """
+
+ # There are several reasons for ignoring errors:
+ # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would
+ # execute this code (In a Python REPL, for instance):
+ #
+ # import os; f = open(os.devnull); os.dup2(f.fileno(), 0)
+ #
+ # The result is that the eventloop will stop correctly, because it has
+ # to logic to quit when stdin is closed. However, we should not fail at
+ # this point. See:
+ # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393
+ # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392
+
+ # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated.
+ # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165
+ def __init__(self, fileno: int) -> None:
+ self.fileno = fileno
+ self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]]
+ try:
+ self.attrs_before = termios.tcgetattr(fileno)
+ except termios.error:
+ # Ignore attribute errors.
+ self.attrs_before = None
+
+ def __enter__(self) -> None:
+ # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this:
+ try:
+ newattr = termios.tcgetattr(self.fileno)
+ except termios.error:
+ pass
+ else:
+ newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
+ newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])
+
+ # VMIN defines the number of characters read at a time in
+ # non-canonical mode. It seems to default to 1 on Linux, but on
+ # Solaris and derived operating systems it defaults to 4. (This is
+ # because the VMIN slot is the same as the VEOF slot, which
+ # defaults to ASCII EOT = Ctrl-D = 4.)
+ newattr[tty.CC][termios.VMIN] = 1
+
+ termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)
+
+ @classmethod
+ def _patch_lflag(cls, attrs: int) -> int:
+ return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+
+ @classmethod
+ def _patch_iflag(cls, attrs: int) -> int:
+ return attrs & ~(
+ # Disable XON/XOFF flow control on output and input.
+ # (Don't capture Ctrl-S and Ctrl-Q.)
+ # Like executing: "stty -ixon."
+ termios.IXON
+ | termios.IXOFF
+ |
+ # Don't translate carriage return into newline on input.
+ termios.ICRNL
+ | termios.INLCR
+ | termios.IGNCR
+ )
+
+ def __exit__(self, *a: object) -> None:
+ if self.attrs_before is not None:
+ try:
+ termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
+ except termios.error:
+ pass
+
+ # # Put the terminal in application mode.
+ # self._stdout.write('\x1b[?1h')
+
+
+class cooked_mode(raw_mode):
+ """
+ The opposite of ``raw_mode``, used when we need cooked mode inside a
+ `raw_mode` block. Used in `Application.run_in_terminal`.::
+
+ with cooked_mode(stdin):
+ ''' the pseudo-terminal stdin is now used in cooked mode. '''
+ """
+
+ @classmethod
+ def _patch_lflag(cls, attrs: int) -> int:
+ return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
+
+ @classmethod
+ def _patch_iflag(cls, attrs: int) -> int:
+ # Turn the ICRNL flag back on. (Without this, calling `input()` in
+ # run_in_terminal doesn't work and displays ^M instead. Ptpython
+ # evaluates commands using `run_in_terminal`, so it's important that
+ # they translate ^M back into ^J.)
+ return attrs | termios.ICRNL
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py
index 09e4cc91bf9..3ee1e14fdda 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py
@@ -1,247 +1,247 @@
-"""
-Parser for VT100 input stream.
-"""
-import re
-from typing import Callable, Dict, Generator, Tuple, Union
-
-from ..key_binding.key_processor import KeyPress
-from ..keys import Keys
-from .ansi_escape_sequences import ANSI_SEQUENCES
-
-__all__ = [
- "Vt100Parser",
-]
-
-
-# Regex matching any CPR response
-# (Note that we use '\Z' instead of '$', because '$' could include a trailing
-# newline.)
-_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
-
-# Mouse events:
-# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
-_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
-
-# Regex matching any valid prefix of a CPR response.
-# (Note that it doesn't contain the last character, the 'R'. The prefix has to
-# be shorter.)
-_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
-
-_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
-
-
-class _Flush:
- """Helper object to indicate flush operation to the parser."""
-
- pass
-
-
-class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
- """
- Dictionary that maps input sequences to a boolean indicating whether there is
- any key that start with this characters.
- """
-
- def __missing__(self, prefix: str) -> bool:
- # (hard coded) If this could be a prefix of a CPR response, return
- # True.
- if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
- prefix
- ):
- result = True
- else:
- # If this could be a prefix of anything else, also return True.
- result = any(
- v
- for k, v in ANSI_SEQUENCES.items()
- if k.startswith(prefix) and k != prefix
- )
-
- self[prefix] = result
- return result
-
-
-_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
-
-
-class Vt100Parser:
- """
- Parser for VT100 input stream.
- Data can be fed through the `feed` method and the given callback will be
- called with KeyPress objects.
-
- ::
-
- def callback(key):
- pass
- i = Vt100Parser(callback)
- i.feed('data\x01...')
-
- :attr feed_key_callback: Function that will be called when a key is parsed.
- """
-
- # Lookup table of ANSI escape sequences for a VT100 terminal
- # Hint: in order to know what sequences your terminal writes to stdin, run
- # "od -c" and start typing.
- def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
- self.feed_key_callback = feed_key_callback
- self.reset()
-
- def reset(self, request: bool = False) -> None:
- self._in_bracketed_paste = False
- self._start_parser()
-
- def _start_parser(self) -> None:
- """
- Start the parser coroutine.
- """
- self._input_parser = self._input_parser_generator()
- self._input_parser.send(None) # type: ignore
-
- def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]:
- """
- Return the key (or keys) that maps to this prefix.
- """
- # (hard coded) If we match a CPR response, return Keys.CPRResponse.
- # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
- # integer variables.)
- if _cpr_response_re.match(prefix):
- return Keys.CPRResponse
-
- elif _mouse_event_re.match(prefix):
- return Keys.Vt100MouseEvent
-
- # Otherwise, use the mappings.
- try:
- return ANSI_SEQUENCES[prefix]
- except KeyError:
- return None
-
- def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]:
- """
- Coroutine (state machine) for the input parser.
- """
- prefix = ""
- retry = False
- flush = False
-
- while True:
- flush = False
-
- if retry:
- retry = False
- else:
- # Get next character.
- c = yield
-
- if isinstance(c, _Flush):
- flush = True
- else:
- prefix += c
-
- # If we have some data, check for matches.
- if prefix:
- is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
- match = self._get_match(prefix)
-
- # Exact matches found, call handlers..
- if (flush or not is_prefix_of_longer_match) and match:
- self._call_handler(match, prefix)
- prefix = ""
-
- # No exact match found.
- elif (flush or not is_prefix_of_longer_match) and not match:
- found = False
- retry = True
-
- # Loop over the input, try the longest match first and
- # shift.
- for i in range(len(prefix), 0, -1):
- match = self._get_match(prefix[:i])
- if match:
- self._call_handler(match, prefix[:i])
- prefix = prefix[i:]
- found = True
-
- if not found:
- self._call_handler(prefix[0], prefix[0])
- prefix = prefix[1:]
-
- def _call_handler(
- self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str
- ) -> None:
- """
- Callback to handler.
- """
- if isinstance(key, tuple):
- # Received ANSI sequence that corresponds with multiple keys
- # (probably alt+something). Handle keys individually, but only pass
- # data payload to first KeyPress (so that we won't insert it
- # multiple times).
- for i, k in enumerate(key):
- self._call_handler(k, insert_text if i == 0 else "")
- else:
- if key == Keys.BracketedPaste:
- self._in_bracketed_paste = True
- self._paste_buffer = ""
- else:
- self.feed_key_callback(KeyPress(key, insert_text))
-
- def feed(self, data: str) -> None:
- """
- Feed the input stream.
-
- :param data: Input string (unicode).
- """
- # Handle bracketed paste. (We bypass the parser that matches all other
- # key presses and keep reading input until we see the end mark.)
- # This is much faster then parsing character by character.
- if self._in_bracketed_paste:
- self._paste_buffer += data
- end_mark = "\x1b[201~"
-
- if end_mark in self._paste_buffer:
- end_index = self._paste_buffer.index(end_mark)
-
- # Feed content to key bindings.
- paste_content = self._paste_buffer[:end_index]
- self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
-
- # Quit bracketed paste mode and handle remaining input.
- self._in_bracketed_paste = False
- remaining = self._paste_buffer[end_index + len(end_mark) :]
- self._paste_buffer = ""
-
- self.feed(remaining)
-
- # Handle normal input character by character.
- else:
- for i, c in enumerate(data):
- if self._in_bracketed_paste:
- # Quit loop and process from this position when the parser
- # entered bracketed paste.
- self.feed(data[i:])
- break
- else:
- self._input_parser.send(c)
-
- def flush(self) -> None:
- """
- Flush the buffer of the input stream.
-
- This will allow us to handle the escape key (or maybe meta) sooner.
- The input received by the escape key is actually the same as the first
- characters of e.g. Arrow-Up, so without knowing what follows the escape
- sequence, we don't know whether escape has been pressed, or whether
- it's something else. This flush function should be called after a
- timeout, and processes everything that's still in the buffer as-is, so
- without assuming any characters will follow.
- """
- self._input_parser.send(_Flush())
-
- def feed_and_flush(self, data: str) -> None:
- """
- Wrapper around ``feed`` and ``flush``.
- """
- self.feed(data)
- self.flush()
+"""
+Parser for VT100 input stream.
+"""
+import re
+from typing import Callable, Dict, Generator, Tuple, Union
+
+from ..key_binding.key_processor import KeyPress
+from ..keys import Keys
+from .ansi_escape_sequences import ANSI_SEQUENCES
+
+__all__ = [
+ "Vt100Parser",
+]
+
+
+# Regex matching any CPR response
+# (Note that we use '\Z' instead of '$', because '$' could include a trailing
+# newline.)
+_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z")
+
+# Mouse events:
+# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
+_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z")
+
+# Regex matching any valid prefix of a CPR response.
+# (Note that it doesn't contain the last character, the 'R'. The prefix has to
+# be shorter.)
+_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z")
+
+_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z")
+
+
+class _Flush:
+ """Helper object to indicate flush operation to the parser."""
+
+ pass
+
+
+class _IsPrefixOfLongerMatchCache(Dict[str, bool]):
+ """
+ Dictionary that maps input sequences to a boolean indicating whether there is
+ any key that start with this characters.
+ """
+
+ def __missing__(self, prefix: str) -> bool:
+ # (hard coded) If this could be a prefix of a CPR response, return
+ # True.
+ if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(
+ prefix
+ ):
+ result = True
+ else:
+ # If this could be a prefix of anything else, also return True.
+ result = any(
+ v
+ for k, v in ANSI_SEQUENCES.items()
+ if k.startswith(prefix) and k != prefix
+ )
+
+ self[prefix] = result
+ return result
+
+
+_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
+
+
+class Vt100Parser:
+ """
+ Parser for VT100 input stream.
+ Data can be fed through the `feed` method and the given callback will be
+ called with KeyPress objects.
+
+ ::
+
+ def callback(key):
+ pass
+ i = Vt100Parser(callback)
+ i.feed('data\x01...')
+
+ :attr feed_key_callback: Function that will be called when a key is parsed.
+ """
+
+ # Lookup table of ANSI escape sequences for a VT100 terminal
+ # Hint: in order to know what sequences your terminal writes to stdin, run
+ # "od -c" and start typing.
+ def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None:
+ self.feed_key_callback = feed_key_callback
+ self.reset()
+
+ def reset(self, request: bool = False) -> None:
+ self._in_bracketed_paste = False
+ self._start_parser()
+
+ def _start_parser(self) -> None:
+ """
+ Start the parser coroutine.
+ """
+ self._input_parser = self._input_parser_generator()
+ self._input_parser.send(None) # type: ignore
+
+ def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]:
+ """
+ Return the key (or keys) that maps to this prefix.
+ """
+ # (hard coded) If we match a CPR response, return Keys.CPRResponse.
+ # (This one doesn't fit in the ANSI_SEQUENCES, because it contains
+ # integer variables.)
+ if _cpr_response_re.match(prefix):
+ return Keys.CPRResponse
+
+ elif _mouse_event_re.match(prefix):
+ return Keys.Vt100MouseEvent
+
+ # Otherwise, use the mappings.
+ try:
+ return ANSI_SEQUENCES[prefix]
+ except KeyError:
+ return None
+
+ def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]:
+ """
+ Coroutine (state machine) for the input parser.
+ """
+ prefix = ""
+ retry = False
+ flush = False
+
+ while True:
+ flush = False
+
+ if retry:
+ retry = False
+ else:
+ # Get next character.
+ c = yield
+
+ if isinstance(c, _Flush):
+ flush = True
+ else:
+ prefix += c
+
+ # If we have some data, check for matches.
+ if prefix:
+ is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
+ match = self._get_match(prefix)
+
+ # Exact matches found, call handlers..
+ if (flush or not is_prefix_of_longer_match) and match:
+ self._call_handler(match, prefix)
+ prefix = ""
+
+ # No exact match found.
+ elif (flush or not is_prefix_of_longer_match) and not match:
+ found = False
+ retry = True
+
+ # Loop over the input, try the longest match first and
+ # shift.
+ for i in range(len(prefix), 0, -1):
+ match = self._get_match(prefix[:i])
+ if match:
+ self._call_handler(match, prefix[:i])
+ prefix = prefix[i:]
+ found = True
+
+ if not found:
+ self._call_handler(prefix[0], prefix[0])
+ prefix = prefix[1:]
+
+ def _call_handler(
+ self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str
+ ) -> None:
+ """
+ Callback to handler.
+ """
+ if isinstance(key, tuple):
+ # Received ANSI sequence that corresponds with multiple keys
+ # (probably alt+something). Handle keys individually, but only pass
+ # data payload to first KeyPress (so that we won't insert it
+ # multiple times).
+ for i, k in enumerate(key):
+ self._call_handler(k, insert_text if i == 0 else "")
+ else:
+ if key == Keys.BracketedPaste:
+ self._in_bracketed_paste = True
+ self._paste_buffer = ""
+ else:
+ self.feed_key_callback(KeyPress(key, insert_text))
+
+ def feed(self, data: str) -> None:
+ """
+ Feed the input stream.
+
+ :param data: Input string (unicode).
+ """
+ # Handle bracketed paste. (We bypass the parser that matches all other
+ # key presses and keep reading input until we see the end mark.)
+ # This is much faster then parsing character by character.
+ if self._in_bracketed_paste:
+ self._paste_buffer += data
+ end_mark = "\x1b[201~"
+
+ if end_mark in self._paste_buffer:
+ end_index = self._paste_buffer.index(end_mark)
+
+ # Feed content to key bindings.
+ paste_content = self._paste_buffer[:end_index]
+ self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
+
+ # Quit bracketed paste mode and handle remaining input.
+ self._in_bracketed_paste = False
+ remaining = self._paste_buffer[end_index + len(end_mark) :]
+ self._paste_buffer = ""
+
+ self.feed(remaining)
+
+ # Handle normal input character by character.
+ else:
+ for i, c in enumerate(data):
+ if self._in_bracketed_paste:
+ # Quit loop and process from this position when the parser
+ # entered bracketed paste.
+ self.feed(data[i:])
+ break
+ else:
+ self._input_parser.send(c)
+
+ def flush(self) -> None:
+ """
+ Flush the buffer of the input stream.
+
+ This will allow us to handle the escape key (or maybe meta) sooner.
+ The input received by the escape key is actually the same as the first
+ characters of e.g. Arrow-Up, so without knowing what follows the escape
+ sequence, we don't know whether escape has been pressed, or whether
+ it's something else. This flush function should be called after a
+ timeout, and processes everything that's still in the buffer as-is, so
+ without assuming any characters will follow.
+ """
+ self._input_parser.send(_Flush())
+
+ def feed_and_flush(self, data: str) -> None:
+ """
+ Wrapper around ``feed`` and ``flush``.
+ """
+ self.feed(data)
+ self.flush()
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
index 437affaa5dd..97699e19b24 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
@@ -1,758 +1,758 @@
-import os
-import sys
-from abc import abstractmethod
-from contextlib import contextmanager
-
-from prompt_toolkit.eventloop import get_event_loop
-
-from ..utils import SPHINX_AUTODOC_RUNNING
-
-# Do not import win32-specific stuff when generating documentation.
-# Otherwise RTD would be unable to generate docs for this module.
-if not SPHINX_AUTODOC_RUNNING:
- import msvcrt
- from ctypes import windll
-
-from ctypes import Array, pointer
-from ctypes.wintypes import DWORD, HANDLE
-from typing import (
- Callable,
- ContextManager,
- Dict,
- Iterable,
- Iterator,
- List,
- Optional,
- TextIO,
-)
-
-from prompt_toolkit.eventloop import run_in_executor_with_context
-from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles
-from prompt_toolkit.key_binding.key_processor import KeyPress
-from prompt_toolkit.keys import Keys
-from prompt_toolkit.mouse_events import MouseButton, MouseEventType
-from prompt_toolkit.win32_types import (
- INPUT_RECORD,
- KEY_EVENT_RECORD,
- MOUSE_EVENT_RECORD,
- STD_INPUT_HANDLE,
- EventTypes,
-)
-
-from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
-from .base import Input
-
-__all__ = [
- "Win32Input",
- "ConsoleInputReader",
- "raw_mode",
- "cooked_mode",
- "attach_win32_input",
- "detach_win32_input",
-]
-
-# Win32 Constants for MOUSE_EVENT_RECORD.
-# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str
-FROM_LEFT_1ST_BUTTON_PRESSED = 0x1
-RIGHTMOST_BUTTON_PRESSED = 0x2
-MOUSE_MOVED = 0x0001
-MOUSE_WHEELED = 0x0004
-
-
-class _Win32InputBase(Input):
- """
- Base class for `Win32Input` and `Win32PipeInput`.
- """
-
- def __init__(self) -> None:
- self.win32_handles = _Win32Handles()
-
- @property
- @abstractmethod
- def handle(self) -> HANDLE:
- pass
-
-
-class Win32Input(_Win32InputBase):
- """
- `Input` class that reads from the Windows console.
- """
-
- def __init__(self, stdin: Optional[TextIO] = None) -> None:
- super().__init__()
- self.console_input_reader = ConsoleInputReader()
-
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
- """
- Return a context manager that makes this input active in the current
- event loop.
- """
- return attach_win32_input(self, input_ready_callback)
-
- def detach(self) -> ContextManager[None]:
- """
- Return a context manager that makes sure that this input is not active
- in the current event loop.
- """
- return detach_win32_input(self)
-
- def read_keys(self) -> List[KeyPress]:
- return list(self.console_input_reader.read())
-
- def flush(self) -> None:
- pass
-
- @property
- def closed(self) -> bool:
- return False
-
- def raw_mode(self) -> ContextManager[None]:
- return raw_mode()
-
- def cooked_mode(self) -> ContextManager[None]:
- return cooked_mode()
-
- def fileno(self) -> int:
- # The windows console doesn't depend on the file handle, so
- # this is not used for the event loop (which uses the
- # handle instead). But it's used in `Application.run_system_command`
- # which opens a subprocess with a given stdin/stdout.
- return sys.stdin.fileno()
-
- def typeahead_hash(self) -> str:
- return "win32-input"
-
- def close(self) -> None:
- self.console_input_reader.close()
-
- @property
- def handle(self) -> HANDLE:
- return self.console_input_reader.handle
-
-
-class ConsoleInputReader:
- """
- :param recognize_paste: When True, try to discover paste actions and turn
- the event into a BracketedPaste.
- """
-
- # Keys with character data.
- mappings = {
- b"\x1b": Keys.Escape,
- b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
- b"\x01": Keys.ControlA, # Control-A (home)
- b"\x02": Keys.ControlB, # Control-B (emacs cursor left)
- b"\x03": Keys.ControlC, # Control-C (interrupt)
- b"\x04": Keys.ControlD, # Control-D (exit)
- b"\x05": Keys.ControlE, # Control-E (end)
- b"\x06": Keys.ControlF, # Control-F (cursor forward)
- b"\x07": Keys.ControlG, # Control-G
- b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
- b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
- b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
- b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
- b"\x0c": Keys.ControlL, # Control-L (clear; form feed)
- b"\x0d": Keys.ControlM, # Control-M (enter)
- b"\x0e": Keys.ControlN, # Control-N (14) (history forward)
- b"\x0f": Keys.ControlO, # Control-O (15)
- b"\x10": Keys.ControlP, # Control-P (16) (history back)
- b"\x11": Keys.ControlQ, # Control-Q
- b"\x12": Keys.ControlR, # Control-R (18) (reverse search)
- b"\x13": Keys.ControlS, # Control-S (19) (forward search)
- b"\x14": Keys.ControlT, # Control-T
- b"\x15": Keys.ControlU, # Control-U
- b"\x16": Keys.ControlV, # Control-V
- b"\x17": Keys.ControlW, # Control-W
- b"\x18": Keys.ControlX, # Control-X
- b"\x19": Keys.ControlY, # Control-Y (25)
- b"\x1a": Keys.ControlZ, # Control-Z
- b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-|
- b"\x1d": Keys.ControlSquareClose, # Control-]
- b"\x1e": Keys.ControlCircumflex, # Control-^
- b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
- b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.)
- }
-
- # Keys that don't carry character data.
- keycodes = {
- # Home/End
- 33: Keys.PageUp,
- 34: Keys.PageDown,
- 35: Keys.End,
- 36: Keys.Home,
- # Arrows
- 37: Keys.Left,
- 38: Keys.Up,
- 39: Keys.Right,
- 40: Keys.Down,
- 45: Keys.Insert,
- 46: Keys.Delete,
- # F-keys.
- 112: Keys.F1,
- 113: Keys.F2,
- 114: Keys.F3,
- 115: Keys.F4,
- 116: Keys.F5,
- 117: Keys.F6,
- 118: Keys.F7,
- 119: Keys.F8,
- 120: Keys.F9,
- 121: Keys.F10,
- 122: Keys.F11,
- 123: Keys.F12,
- }
-
- LEFT_ALT_PRESSED = 0x0002
- RIGHT_ALT_PRESSED = 0x0001
- SHIFT_PRESSED = 0x0010
- LEFT_CTRL_PRESSED = 0x0008
- RIGHT_CTRL_PRESSED = 0x0004
-
- def __init__(self, recognize_paste: bool = True) -> None:
- self._fdcon = None
- self.recognize_paste = recognize_paste
-
- # When stdin is a tty, use that handle, otherwise, create a handle from
- # CONIN$.
- self.handle: HANDLE
- if sys.stdin.isatty():
- self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
- else:
- self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
- self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
-
- def close(self) -> None:
- "Close fdcon."
- if self._fdcon is not None:
- os.close(self._fdcon)
-
- def read(self) -> Iterable[KeyPress]:
- """
- Return a list of `KeyPress` instances. It won't return anything when
- there was nothing to read. (This function doesn't block.)
-
- http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
- """
- max_count = 2048 # Max events to read at the same time.
-
- read = DWORD(0)
- arrtype = INPUT_RECORD * max_count
- input_records = arrtype()
-
- # Check whether there is some input to read. `ReadConsoleInputW` would
- # block otherwise.
- # (Actually, the event loop is responsible to make sure that this
- # function is only called when there is something to read, but for some
- # reason this happened in the asyncio_win32 loop, and it's better to be
- # safe anyway.)
- if not wait_for_handles([self.handle], timeout=0):
- return
-
- # Get next batch of input event.
- windll.kernel32.ReadConsoleInputW(
- self.handle, pointer(input_records), max_count, pointer(read)
- )
-
- # First, get all the keys from the input buffer, in order to determine
- # whether we should consider this a paste event or not.
- all_keys = list(self._get_keys(read, input_records))
-
- # Fill in 'data' for key presses.
- all_keys = [self._insert_key_data(key) for key in all_keys]
-
- # Correct non-bmp characters that are passed as separate surrogate codes
- all_keys = list(self._merge_paired_surrogates(all_keys))
-
- if self.recognize_paste and self._is_paste(all_keys):
- gen = iter(all_keys)
- k: Optional[KeyPress]
-
- for k in gen:
- # Pasting: if the current key consists of text or \n, turn it
- # into a BracketedPaste.
- data = []
+import os
+import sys
+from abc import abstractmethod
+from contextlib import contextmanager
+
+from prompt_toolkit.eventloop import get_event_loop
+
+from ..utils import SPHINX_AUTODOC_RUNNING
+
+# Do not import win32-specific stuff when generating documentation.
+# Otherwise RTD would be unable to generate docs for this module.
+if not SPHINX_AUTODOC_RUNNING:
+ import msvcrt
+ from ctypes import windll
+
+from ctypes import Array, pointer
+from ctypes.wintypes import DWORD, HANDLE
+from typing import (
+ Callable,
+ ContextManager,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ TextIO,
+)
+
+from prompt_toolkit.eventloop import run_in_executor_with_context
+from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles
+from prompt_toolkit.key_binding.key_processor import KeyPress
+from prompt_toolkit.keys import Keys
+from prompt_toolkit.mouse_events import MouseButton, MouseEventType
+from prompt_toolkit.win32_types import (
+ INPUT_RECORD,
+ KEY_EVENT_RECORD,
+ MOUSE_EVENT_RECORD,
+ STD_INPUT_HANDLE,
+ EventTypes,
+)
+
+from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
+from .base import Input
+
+__all__ = [
+ "Win32Input",
+ "ConsoleInputReader",
+ "raw_mode",
+ "cooked_mode",
+ "attach_win32_input",
+ "detach_win32_input",
+]
+
+# Win32 Constants for MOUSE_EVENT_RECORD.
+# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str
+FROM_LEFT_1ST_BUTTON_PRESSED = 0x1
+RIGHTMOST_BUTTON_PRESSED = 0x2
+MOUSE_MOVED = 0x0001
+MOUSE_WHEELED = 0x0004
+
+
+class _Win32InputBase(Input):
+ """
+ Base class for `Win32Input` and `Win32PipeInput`.
+ """
+
+ def __init__(self) -> None:
+ self.win32_handles = _Win32Handles()
+
+ @property
+ @abstractmethod
+ def handle(self) -> HANDLE:
+ pass
+
+
+class Win32Input(_Win32InputBase):
+ """
+ `Input` class that reads from the Windows console.
+ """
+
+ def __init__(self, stdin: Optional[TextIO] = None) -> None:
+ super().__init__()
+ self.console_input_reader = ConsoleInputReader()
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return attach_win32_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return detach_win32_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ return list(self.console_input_reader.read())
+
+ def flush(self) -> None:
+ pass
+
+ @property
+ def closed(self) -> bool:
+ return False
+
+ def raw_mode(self) -> ContextManager[None]:
+ return raw_mode()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return cooked_mode()
+
+ def fileno(self) -> int:
+ # The windows console doesn't depend on the file handle, so
+ # this is not used for the event loop (which uses the
+ # handle instead). But it's used in `Application.run_system_command`
+ # which opens a subprocess with a given stdin/stdout.
+ return sys.stdin.fileno()
+
+ def typeahead_hash(self) -> str:
+ return "win32-input"
+
+ def close(self) -> None:
+ self.console_input_reader.close()
+
+ @property
+ def handle(self) -> HANDLE:
+ return self.console_input_reader.handle
+
+
+class ConsoleInputReader:
+ """
+ :param recognize_paste: When True, try to discover paste actions and turn
+ the event into a BracketedPaste.
+ """
+
+ # Keys with character data.
+ mappings = {
+ b"\x1b": Keys.Escape,
+ b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
+ b"\x01": Keys.ControlA, # Control-A (home)
+ b"\x02": Keys.ControlB, # Control-B (emacs cursor left)
+ b"\x03": Keys.ControlC, # Control-C (interrupt)
+ b"\x04": Keys.ControlD, # Control-D (exit)
+ b"\x05": Keys.ControlE, # Control-E (end)
+ b"\x06": Keys.ControlF, # Control-F (cursor forward)
+ b"\x07": Keys.ControlG, # Control-G
+ b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b')
+ b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t')
+ b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n')
+ b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab)
+ b"\x0c": Keys.ControlL, # Control-L (clear; form feed)
+ b"\x0d": Keys.ControlM, # Control-M (enter)
+ b"\x0e": Keys.ControlN, # Control-N (14) (history forward)
+ b"\x0f": Keys.ControlO, # Control-O (15)
+ b"\x10": Keys.ControlP, # Control-P (16) (history back)
+ b"\x11": Keys.ControlQ, # Control-Q
+ b"\x12": Keys.ControlR, # Control-R (18) (reverse search)
+ b"\x13": Keys.ControlS, # Control-S (19) (forward search)
+ b"\x14": Keys.ControlT, # Control-T
+ b"\x15": Keys.ControlU, # Control-U
+ b"\x16": Keys.ControlV, # Control-V
+ b"\x17": Keys.ControlW, # Control-W
+ b"\x18": Keys.ControlX, # Control-X
+ b"\x19": Keys.ControlY, # Control-Y (25)
+ b"\x1a": Keys.ControlZ, # Control-Z
+ b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-|
+ b"\x1d": Keys.ControlSquareClose, # Control-]
+ b"\x1e": Keys.ControlCircumflex, # Control-^
+ b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.)
+ b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.)
+ }
+
+ # Keys that don't carry character data.
+ keycodes = {
+ # Home/End
+ 33: Keys.PageUp,
+ 34: Keys.PageDown,
+ 35: Keys.End,
+ 36: Keys.Home,
+ # Arrows
+ 37: Keys.Left,
+ 38: Keys.Up,
+ 39: Keys.Right,
+ 40: Keys.Down,
+ 45: Keys.Insert,
+ 46: Keys.Delete,
+ # F-keys.
+ 112: Keys.F1,
+ 113: Keys.F2,
+ 114: Keys.F3,
+ 115: Keys.F4,
+ 116: Keys.F5,
+ 117: Keys.F6,
+ 118: Keys.F7,
+ 119: Keys.F8,
+ 120: Keys.F9,
+ 121: Keys.F10,
+ 122: Keys.F11,
+ 123: Keys.F12,
+ }
+
+ LEFT_ALT_PRESSED = 0x0002
+ RIGHT_ALT_PRESSED = 0x0001
+ SHIFT_PRESSED = 0x0010
+ LEFT_CTRL_PRESSED = 0x0008
+ RIGHT_CTRL_PRESSED = 0x0004
+
+ def __init__(self, recognize_paste: bool = True) -> None:
+ self._fdcon = None
+ self.recognize_paste = recognize_paste
+
+ # When stdin is a tty, use that handle, otherwise, create a handle from
+ # CONIN$.
+ self.handle: HANDLE
+ if sys.stdin.isatty():
+ self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+ else:
+ self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
+ self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
+
+ def close(self) -> None:
+ "Close fdcon."
+ if self._fdcon is not None:
+ os.close(self._fdcon)
+
+ def read(self) -> Iterable[KeyPress]:
+ """
+ Return a list of `KeyPress` instances. It won't return anything when
+ there was nothing to read. (This function doesn't block.)
+
+ http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
+ """
+ max_count = 2048 # Max events to read at the same time.
+
+ read = DWORD(0)
+ arrtype = INPUT_RECORD * max_count
+ input_records = arrtype()
+
+ # Check whether there is some input to read. `ReadConsoleInputW` would
+ # block otherwise.
+ # (Actually, the event loop is responsible to make sure that this
+ # function is only called when there is something to read, but for some
+ # reason this happened in the asyncio_win32 loop, and it's better to be
+ # safe anyway.)
+ if not wait_for_handles([self.handle], timeout=0):
+ return
+
+ # Get next batch of input event.
+ windll.kernel32.ReadConsoleInputW(
+ self.handle, pointer(input_records), max_count, pointer(read)
+ )
+
+ # First, get all the keys from the input buffer, in order to determine
+ # whether we should consider this a paste event or not.
+ all_keys = list(self._get_keys(read, input_records))
+
+ # Fill in 'data' for key presses.
+ all_keys = [self._insert_key_data(key) for key in all_keys]
+
+ # Correct non-bmp characters that are passed as separate surrogate codes
+ all_keys = list(self._merge_paired_surrogates(all_keys))
+
+ if self.recognize_paste and self._is_paste(all_keys):
+ gen = iter(all_keys)
+ k: Optional[KeyPress]
+
+ for k in gen:
+ # Pasting: if the current key consists of text or \n, turn it
+ # into a BracketedPaste.
+ data = []
while k and (
not isinstance(k.key, Keys)
or k.key in {Keys.ControlJ, Keys.ControlM}
):
- data.append(k.data)
- try:
- k = next(gen)
- except StopIteration:
- k = None
-
- if data:
- yield KeyPress(Keys.BracketedPaste, "".join(data))
- if k is not None:
- yield k
- else:
- for k2 in all_keys:
- yield k2
-
- def _insert_key_data(self, key_press: KeyPress) -> KeyPress:
- """
- Insert KeyPress data, for vt100 compatibility.
- """
- if key_press.data:
- return key_press
-
- if isinstance(key_press.key, Keys):
- data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "")
- else:
- data = ""
-
- return KeyPress(key_press.key, data)
-
- def _get_keys(
- self, read: DWORD, input_records: "Array[INPUT_RECORD]"
- ) -> Iterator[KeyPress]:
- """
- Generator that yields `KeyPress` objects from the input records.
- """
- for i in range(read.value):
- ir = input_records[i]
-
- # Get the right EventType from the EVENT_RECORD.
- # (For some reason the Windows console application 'cmder'
- # [http://gooseberrycreative.com/cmder/] can return '0' for
- # ir.EventType. -- Just ignore that.)
- if ir.EventType in EventTypes:
- ev = getattr(ir.Event, EventTypes[ir.EventType])
-
- # Process if this is a key event. (We also have mouse, menu and
- # focus events.)
- if type(ev) == KEY_EVENT_RECORD and ev.KeyDown:
- for key_press in self._event_to_key_presses(ev):
- yield key_press
-
- elif type(ev) == MOUSE_EVENT_RECORD:
- for key_press in self._handle_mouse(ev):
- yield key_press
-
- @staticmethod
- def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]:
- """
- Combines consecutive KeyPresses with high and low surrogates into
- single characters
- """
- buffered_high_surrogate = None
- for key in key_presses:
- is_text = not isinstance(key.key, Keys)
- is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF"
- is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF"
-
- if buffered_high_surrogate:
- if is_low_surrogate:
- # convert high surrogate + low surrogate to single character
- fullchar = (
- (buffered_high_surrogate.key + key.key)
- .encode("utf-16-le", "surrogatepass")
- .decode("utf-16-le")
- )
- key = KeyPress(fullchar, fullchar)
- else:
- yield buffered_high_surrogate
- buffered_high_surrogate = None
-
- if is_high_surrogate:
- buffered_high_surrogate = key
- else:
- yield key
-
- if buffered_high_surrogate:
- yield buffered_high_surrogate
-
- @staticmethod
- def _is_paste(keys: List[KeyPress]) -> bool:
- """
- Return `True` when we should consider this list of keys as a paste
- event. Pasted text on windows will be turned into a
- `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably
- the best possible way to detect pasting of text and handle that
- correctly.)
- """
- # Consider paste when it contains at least one newline and at least one
- # other character.
- text_count = 0
- newline_count = 0
-
- for k in keys:
- if not isinstance(k.key, Keys):
- text_count += 1
- if k.key == Keys.ControlM:
- newline_count += 1
-
+ data.append(k.data)
+ try:
+ k = next(gen)
+ except StopIteration:
+ k = None
+
+ if data:
+ yield KeyPress(Keys.BracketedPaste, "".join(data))
+ if k is not None:
+ yield k
+ else:
+ for k2 in all_keys:
+ yield k2
+
+ def _insert_key_data(self, key_press: KeyPress) -> KeyPress:
+ """
+ Insert KeyPress data, for vt100 compatibility.
+ """
+ if key_press.data:
+ return key_press
+
+ if isinstance(key_press.key, Keys):
+ data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "")
+ else:
+ data = ""
+
+ return KeyPress(key_press.key, data)
+
+ def _get_keys(
+ self, read: DWORD, input_records: "Array[INPUT_RECORD]"
+ ) -> Iterator[KeyPress]:
+ """
+ Generator that yields `KeyPress` objects from the input records.
+ """
+ for i in range(read.value):
+ ir = input_records[i]
+
+ # Get the right EventType from the EVENT_RECORD.
+ # (For some reason the Windows console application 'cmder'
+ # [http://gooseberrycreative.com/cmder/] can return '0' for
+ # ir.EventType. -- Just ignore that.)
+ if ir.EventType in EventTypes:
+ ev = getattr(ir.Event, EventTypes[ir.EventType])
+
+ # Process if this is a key event. (We also have mouse, menu and
+ # focus events.)
+ if type(ev) == KEY_EVENT_RECORD and ev.KeyDown:
+ for key_press in self._event_to_key_presses(ev):
+ yield key_press
+
+ elif type(ev) == MOUSE_EVENT_RECORD:
+ for key_press in self._handle_mouse(ev):
+ yield key_press
+
+ @staticmethod
+ def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]:
+ """
+ Combines consecutive KeyPresses with high and low surrogates into
+ single characters
+ """
+ buffered_high_surrogate = None
+ for key in key_presses:
+ is_text = not isinstance(key.key, Keys)
+ is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF"
+ is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF"
+
+ if buffered_high_surrogate:
+ if is_low_surrogate:
+ # convert high surrogate + low surrogate to single character
+ fullchar = (
+ (buffered_high_surrogate.key + key.key)
+ .encode("utf-16-le", "surrogatepass")
+ .decode("utf-16-le")
+ )
+ key = KeyPress(fullchar, fullchar)
+ else:
+ yield buffered_high_surrogate
+ buffered_high_surrogate = None
+
+ if is_high_surrogate:
+ buffered_high_surrogate = key
+ else:
+ yield key
+
+ if buffered_high_surrogate:
+ yield buffered_high_surrogate
+
+ @staticmethod
+ def _is_paste(keys: List[KeyPress]) -> bool:
+ """
+ Return `True` when we should consider this list of keys as a paste
+ event. Pasted text on windows will be turned into a
+ `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably
+ the best possible way to detect pasting of text and handle that
+ correctly.)
+ """
+ # Consider paste when it contains at least one newline and at least one
+ # other character.
+ text_count = 0
+ newline_count = 0
+
+ for k in keys:
+ if not isinstance(k.key, Keys):
+ text_count += 1
+ if k.key == Keys.ControlM:
+ newline_count += 1
+
return newline_count >= 1 and text_count >= 1
-
- def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]:
- """
- For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
- """
- assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown
-
- result: Optional[KeyPress] = None
-
- control_key_state = ev.ControlKeyState
- u_char = ev.uChar.UnicodeChar
- # Use surrogatepass because u_char may be an unmatched surrogate
- ascii_char = u_char.encode("utf-8", "surrogatepass")
-
- # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the
- # unicode code point truncated to 1 byte. See also:
- # https://github.com/ipython/ipython/issues/10004
- # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389
-
- if u_char == "\x00":
- if ev.VirtualKeyCode in self.keycodes:
- result = KeyPress(self.keycodes[ev.VirtualKeyCode], "")
- else:
- if ascii_char in self.mappings:
- if self.mappings[ascii_char] == Keys.ControlJ:
- u_char = (
- "\n" # Windows sends \n, turn into \r for unix compatibility.
- )
- result = KeyPress(self.mappings[ascii_char], u_char)
- else:
- result = KeyPress(u_char, u_char)
-
- # First we handle Shift-Control-Arrow/Home/End (need to do this first)
- if (
- (
- control_key_state & self.LEFT_CTRL_PRESSED
- or control_key_state & self.RIGHT_CTRL_PRESSED
- )
- and control_key_state & self.SHIFT_PRESSED
- and result
- ):
- mapping: Dict[str, str] = {
- Keys.Left: Keys.ControlShiftLeft,
- Keys.Right: Keys.ControlShiftRight,
- Keys.Up: Keys.ControlShiftUp,
- Keys.Down: Keys.ControlShiftDown,
- Keys.Home: Keys.ControlShiftHome,
- Keys.End: Keys.ControlShiftEnd,
- Keys.Insert: Keys.ControlShiftInsert,
- Keys.PageUp: Keys.ControlShiftPageUp,
- Keys.PageDown: Keys.ControlShiftPageDown,
- }
- result.key = mapping.get(result.key, result.key)
-
- # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys.
- if (
- control_key_state & self.LEFT_CTRL_PRESSED
- or control_key_state & self.RIGHT_CTRL_PRESSED
- ) and result:
- mapping = {
- Keys.Left: Keys.ControlLeft,
- Keys.Right: Keys.ControlRight,
- Keys.Up: Keys.ControlUp,
- Keys.Down: Keys.ControlDown,
- Keys.Home: Keys.ControlHome,
- Keys.End: Keys.ControlEnd,
- Keys.Insert: Keys.ControlInsert,
- Keys.Delete: Keys.ControlDelete,
- Keys.PageUp: Keys.ControlPageUp,
- Keys.PageDown: Keys.ControlPageDown,
- }
- result.key = mapping.get(result.key, result.key)
-
- # Turn 'Tab' into 'BackTab' when shift was pressed.
- # Also handle other shift-key combination
- if control_key_state & self.SHIFT_PRESSED and result:
- mapping = {
- Keys.Tab: Keys.BackTab,
- Keys.Left: Keys.ShiftLeft,
- Keys.Right: Keys.ShiftRight,
- Keys.Up: Keys.ShiftUp,
- Keys.Down: Keys.ShiftDown,
- Keys.Home: Keys.ShiftHome,
- Keys.End: Keys.ShiftEnd,
- Keys.Insert: Keys.ShiftInsert,
- Keys.Delete: Keys.ShiftDelete,
- Keys.PageUp: Keys.ShiftPageUp,
- Keys.PageDown: Keys.ShiftPageDown,
- }
- result.key = mapping.get(result.key, result.key)
-
- # Turn 'Space' into 'ControlSpace' when control was pressed.
- if (
- (
- control_key_state & self.LEFT_CTRL_PRESSED
- or control_key_state & self.RIGHT_CTRL_PRESSED
- )
- and result
- and result.data == " "
- ):
- result = KeyPress(Keys.ControlSpace, " ")
-
- # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot
- # detect this combination. But it's really practical on Windows.)
- if (
- (
- control_key_state & self.LEFT_CTRL_PRESSED
- or control_key_state & self.RIGHT_CTRL_PRESSED
- )
- and result
- and result.key == Keys.ControlJ
- ):
- return [KeyPress(Keys.Escape, ""), result]
-
- # Return result. If alt was pressed, prefix the result with an
- # 'Escape' key, just like unix VT100 terminals do.
-
- # NOTE: Only replace the left alt with escape. The right alt key often
- # acts as altgr and is used in many non US keyboard layouts for
- # typing some special characters, like a backslash. We don't want
- # all backslashes to be prefixed with escape. (Esc-\ has a
- # meaning in E-macs, for instance.)
- if result:
- meta_pressed = control_key_state & self.LEFT_ALT_PRESSED
-
- if meta_pressed:
- return [KeyPress(Keys.Escape, ""), result]
- else:
- return [result]
-
- else:
- return []
-
- def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]:
- """
- Handle mouse events. Return a list of KeyPress instances.
- """
- event_flags = ev.EventFlags
- button_state = ev.ButtonState
-
- event_type: Optional[MouseEventType] = None
- button: MouseButton = MouseButton.NONE
-
- # Scroll events.
- if event_flags & MOUSE_WHEELED:
- if button_state > 0:
- event_type = MouseEventType.SCROLL_UP
- else:
- event_type = MouseEventType.SCROLL_DOWN
- else:
- # Handle button state for non-scroll events.
- if button_state == FROM_LEFT_1ST_BUTTON_PRESSED:
- button = MouseButton.LEFT
-
- elif button_state == RIGHTMOST_BUTTON_PRESSED:
- button = MouseButton.RIGHT
-
- # Move events.
- if event_flags & MOUSE_MOVED:
- event_type = MouseEventType.MOUSE_MOVE
-
- # No key pressed anymore: mouse up.
- if event_type is None:
- if button_state > 0:
- # Some button pressed.
- event_type = MouseEventType.MOUSE_DOWN
- else:
- # No button pressed.
- event_type = MouseEventType.MOUSE_UP
-
- data = ";".join(
- [
- button.value,
- event_type.value,
- str(ev.MousePosition.X),
- str(ev.MousePosition.Y),
- ]
- )
- return [KeyPress(Keys.WindowsMouseEvent, data)]
-
-
-class _Win32Handles:
- """
- Utility to keep track of which handles are connectod to which callbacks.
-
- `add_win32_handle` starts a tiny event loop in another thread which waits
- for the Win32 handle to become ready. When this happens, the callback will
- be called in the current asyncio event loop using `call_soon_threadsafe`.
-
- `remove_win32_handle` will stop this tiny event loop.
-
- NOTE: We use this technique, so that we don't have to use the
- `ProactorEventLoop` on Windows and we can wait for things like stdin
- in a `SelectorEventLoop`. This is important, because our inputhook
- mechanism (used by IPython), only works with the `SelectorEventLoop`.
- """
-
- def __init__(self) -> None:
- self._handle_callbacks: Dict[int, Callable[[], None]] = {}
-
- # Windows Events that are triggered when we have to stop watching this
- # handle.
- self._remove_events: Dict[int, HANDLE] = {}
-
- def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None:
- """
- Add a Win32 handle to the event loop.
- """
- handle_value = handle.value
-
- if handle_value is None:
- raise ValueError("Invalid handle.")
-
- # Make sure to remove a previous registered handler first.
- self.remove_win32_handle(handle)
-
- loop = get_event_loop()
- self._handle_callbacks[handle_value] = callback
-
- # Create remove event.
- remove_event = create_win32_event()
- self._remove_events[handle_value] = remove_event
-
- # Add reader.
- def ready() -> None:
- # Tell the callback that input's ready.
- try:
- callback()
- finally:
- run_in_executor_with_context(wait, loop=loop)
-
- # Wait for the input to become ready.
- # (Use an executor for this, the Windows asyncio event loop doesn't
- # allow us to wait for handles like stdin.)
- def wait() -> None:
- # Wait until either the handle becomes ready, or the remove event
- # has been set.
- result = wait_for_handles([remove_event, handle])
-
- if result is remove_event:
- windll.kernel32.CloseHandle(remove_event)
- return
- else:
- loop.call_soon_threadsafe(ready)
-
- run_in_executor_with_context(wait, loop=loop)
-
- def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]:
- """
- Remove a Win32 handle from the event loop.
- Return either the registered handler or `None`.
- """
- if handle.value is None:
- return None # Ignore.
-
- # Trigger remove events, so that the reader knows to stop.
- try:
- event = self._remove_events.pop(handle.value)
- except KeyError:
- pass
- else:
- windll.kernel32.SetEvent(event)
-
- try:
- return self._handle_callbacks.pop(handle.value)
- except KeyError:
- return None
-
-
-@contextmanager
-def attach_win32_input(
- input: _Win32InputBase, callback: Callable[[], None]
-) -> Iterator[None]:
- """
- Context manager that makes this input active in the current event loop.
-
- :param input: :class:`~prompt_toolkit.input.Input` object.
- :param input_ready_callback: Called when the input is ready to read.
- """
- win32_handles = input.win32_handles
- handle = input.handle
-
- if handle.value is None:
- raise ValueError("Invalid handle.")
-
- # Add reader.
- previous_callback = win32_handles.remove_win32_handle(handle)
- win32_handles.add_win32_handle(handle, callback)
-
- try:
- yield
- finally:
- win32_handles.remove_win32_handle(handle)
-
- if previous_callback:
- win32_handles.add_win32_handle(handle, previous_callback)
-
-
-@contextmanager
-def detach_win32_input(input: _Win32InputBase) -> Iterator[None]:
- win32_handles = input.win32_handles
- handle = input.handle
-
- if handle.value is None:
- raise ValueError("Invalid handle.")
-
- previous_callback = win32_handles.remove_win32_handle(handle)
-
- try:
- yield
- finally:
- if previous_callback:
- win32_handles.add_win32_handle(handle, previous_callback)
-
-
-class raw_mode:
- """
- ::
-
- with raw_mode(stdin):
- ''' the windows terminal is now in 'raw' mode. '''
-
- The ``fileno`` attribute is ignored. This is to be compatible with the
- `raw_input` method of `.vt100_input`.
- """
-
- def __init__(self, fileno: Optional[int] = None) -> None:
- self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
-
- def __enter__(self) -> None:
- # Remember original mode.
- original_mode = DWORD()
- windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode))
- self.original_mode = original_mode
-
- self._patch()
-
- def _patch(self) -> None:
- # Set raw
- ENABLE_ECHO_INPUT = 0x0004
- ENABLE_LINE_INPUT = 0x0002
- ENABLE_PROCESSED_INPUT = 0x0001
-
- windll.kernel32.SetConsoleMode(
- self.handle,
- self.original_mode.value
- & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
- )
-
- def __exit__(self, *a: object) -> None:
- # Restore original mode
- windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
-
-
-class cooked_mode(raw_mode):
- """
- ::
-
- with cooked_mode(stdin):
- ''' The pseudo-terminal stdin is now used in cooked mode. '''
- """
-
- def _patch(self) -> None:
- # Set cooked.
- ENABLE_ECHO_INPUT = 0x0004
- ENABLE_LINE_INPUT = 0x0002
- ENABLE_PROCESSED_INPUT = 0x0001
-
- windll.kernel32.SetConsoleMode(
- self.handle,
- self.original_mode.value
- | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
- )
+
+ def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]:
+ """
+ For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances.
+ """
+ assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown
+
+ result: Optional[KeyPress] = None
+
+ control_key_state = ev.ControlKeyState
+ u_char = ev.uChar.UnicodeChar
+ # Use surrogatepass because u_char may be an unmatched surrogate
+ ascii_char = u_char.encode("utf-8", "surrogatepass")
+
+ # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the
+ # unicode code point truncated to 1 byte. See also:
+ # https://github.com/ipython/ipython/issues/10004
+ # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389
+
+ if u_char == "\x00":
+ if ev.VirtualKeyCode in self.keycodes:
+ result = KeyPress(self.keycodes[ev.VirtualKeyCode], "")
+ else:
+ if ascii_char in self.mappings:
+ if self.mappings[ascii_char] == Keys.ControlJ:
+ u_char = (
+ "\n" # Windows sends \n, turn into \r for unix compatibility.
+ )
+ result = KeyPress(self.mappings[ascii_char], u_char)
+ else:
+ result = KeyPress(u_char, u_char)
+
+ # First we handle Shift-Control-Arrow/Home/End (need to do this first)
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and control_key_state & self.SHIFT_PRESSED
+ and result
+ ):
+ mapping: Dict[str, str] = {
+ Keys.Left: Keys.ControlShiftLeft,
+ Keys.Right: Keys.ControlShiftRight,
+ Keys.Up: Keys.ControlShiftUp,
+ Keys.Down: Keys.ControlShiftDown,
+ Keys.Home: Keys.ControlShiftHome,
+ Keys.End: Keys.ControlShiftEnd,
+ Keys.Insert: Keys.ControlShiftInsert,
+ Keys.PageUp: Keys.ControlShiftPageUp,
+ Keys.PageDown: Keys.ControlShiftPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys.
+ if (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ ) and result:
+ mapping = {
+ Keys.Left: Keys.ControlLeft,
+ Keys.Right: Keys.ControlRight,
+ Keys.Up: Keys.ControlUp,
+ Keys.Down: Keys.ControlDown,
+ Keys.Home: Keys.ControlHome,
+ Keys.End: Keys.ControlEnd,
+ Keys.Insert: Keys.ControlInsert,
+ Keys.Delete: Keys.ControlDelete,
+ Keys.PageUp: Keys.ControlPageUp,
+ Keys.PageDown: Keys.ControlPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Turn 'Tab' into 'BackTab' when shift was pressed.
+ # Also handle other shift-key combination
+ if control_key_state & self.SHIFT_PRESSED and result:
+ mapping = {
+ Keys.Tab: Keys.BackTab,
+ Keys.Left: Keys.ShiftLeft,
+ Keys.Right: Keys.ShiftRight,
+ Keys.Up: Keys.ShiftUp,
+ Keys.Down: Keys.ShiftDown,
+ Keys.Home: Keys.ShiftHome,
+ Keys.End: Keys.ShiftEnd,
+ Keys.Insert: Keys.ShiftInsert,
+ Keys.Delete: Keys.ShiftDelete,
+ Keys.PageUp: Keys.ShiftPageUp,
+ Keys.PageDown: Keys.ShiftPageDown,
+ }
+ result.key = mapping.get(result.key, result.key)
+
+ # Turn 'Space' into 'ControlSpace' when control was pressed.
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and result
+ and result.data == " "
+ ):
+ result = KeyPress(Keys.ControlSpace, " ")
+
+ # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot
+ # detect this combination. But it's really practical on Windows.)
+ if (
+ (
+ control_key_state & self.LEFT_CTRL_PRESSED
+ or control_key_state & self.RIGHT_CTRL_PRESSED
+ )
+ and result
+ and result.key == Keys.ControlJ
+ ):
+ return [KeyPress(Keys.Escape, ""), result]
+
+ # Return result. If alt was pressed, prefix the result with an
+ # 'Escape' key, just like unix VT100 terminals do.
+
+ # NOTE: Only replace the left alt with escape. The right alt key often
+ # acts as altgr and is used in many non US keyboard layouts for
+ # typing some special characters, like a backslash. We don't want
+ # all backslashes to be prefixed with escape. (Esc-\ has a
+ # meaning in E-macs, for instance.)
+ if result:
+ meta_pressed = control_key_state & self.LEFT_ALT_PRESSED
+
+ if meta_pressed:
+ return [KeyPress(Keys.Escape, ""), result]
+ else:
+ return [result]
+
+ else:
+ return []
+
+ def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]:
+ """
+ Handle mouse events. Return a list of KeyPress instances.
+ """
+ event_flags = ev.EventFlags
+ button_state = ev.ButtonState
+
+ event_type: Optional[MouseEventType] = None
+ button: MouseButton = MouseButton.NONE
+
+ # Scroll events.
+ if event_flags & MOUSE_WHEELED:
+ if button_state > 0:
+ event_type = MouseEventType.SCROLL_UP
+ else:
+ event_type = MouseEventType.SCROLL_DOWN
+ else:
+ # Handle button state for non-scroll events.
+ if button_state == FROM_LEFT_1ST_BUTTON_PRESSED:
+ button = MouseButton.LEFT
+
+ elif button_state == RIGHTMOST_BUTTON_PRESSED:
+ button = MouseButton.RIGHT
+
+ # Move events.
+ if event_flags & MOUSE_MOVED:
+ event_type = MouseEventType.MOUSE_MOVE
+
+ # No key pressed anymore: mouse up.
+ if event_type is None:
+ if button_state > 0:
+ # Some button pressed.
+ event_type = MouseEventType.MOUSE_DOWN
+ else:
+ # No button pressed.
+ event_type = MouseEventType.MOUSE_UP
+
+ data = ";".join(
+ [
+ button.value,
+ event_type.value,
+ str(ev.MousePosition.X),
+ str(ev.MousePosition.Y),
+ ]
+ )
+ return [KeyPress(Keys.WindowsMouseEvent, data)]
+
+
+class _Win32Handles:
+ """
+ Utility to keep track of which handles are connectod to which callbacks.
+
+ `add_win32_handle` starts a tiny event loop in another thread which waits
+ for the Win32 handle to become ready. When this happens, the callback will
+ be called in the current asyncio event loop using `call_soon_threadsafe`.
+
+ `remove_win32_handle` will stop this tiny event loop.
+
+ NOTE: We use this technique, so that we don't have to use the
+ `ProactorEventLoop` on Windows and we can wait for things like stdin
+ in a `SelectorEventLoop`. This is important, because our inputhook
+ mechanism (used by IPython), only works with the `SelectorEventLoop`.
+ """
+
+ def __init__(self) -> None:
+ self._handle_callbacks: Dict[int, Callable[[], None]] = {}
+
+ # Windows Events that are triggered when we have to stop watching this
+ # handle.
+ self._remove_events: Dict[int, HANDLE] = {}
+
+ def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None:
+ """
+ Add a Win32 handle to the event loop.
+ """
+ handle_value = handle.value
+
+ if handle_value is None:
+ raise ValueError("Invalid handle.")
+
+ # Make sure to remove a previous registered handler first.
+ self.remove_win32_handle(handle)
+
+ loop = get_event_loop()
+ self._handle_callbacks[handle_value] = callback
+
+ # Create remove event.
+ remove_event = create_win32_event()
+ self._remove_events[handle_value] = remove_event
+
+ # Add reader.
+ def ready() -> None:
+ # Tell the callback that input's ready.
+ try:
+ callback()
+ finally:
+ run_in_executor_with_context(wait, loop=loop)
+
+ # Wait for the input to become ready.
+ # (Use an executor for this, the Windows asyncio event loop doesn't
+ # allow us to wait for handles like stdin.)
+ def wait() -> None:
+ # Wait until either the handle becomes ready, or the remove event
+ # has been set.
+ result = wait_for_handles([remove_event, handle])
+
+ if result is remove_event:
+ windll.kernel32.CloseHandle(remove_event)
+ return
+ else:
+ loop.call_soon_threadsafe(ready)
+
+ run_in_executor_with_context(wait, loop=loop)
+
+ def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]:
+ """
+ Remove a Win32 handle from the event loop.
+ Return either the registered handler or `None`.
+ """
+ if handle.value is None:
+ return None # Ignore.
+
+ # Trigger remove events, so that the reader knows to stop.
+ try:
+ event = self._remove_events.pop(handle.value)
+ except KeyError:
+ pass
+ else:
+ windll.kernel32.SetEvent(event)
+
+ try:
+ return self._handle_callbacks.pop(handle.value)
+ except KeyError:
+ return None
+
+
+@contextmanager
+def attach_win32_input(
+ input: _Win32InputBase, callback: Callable[[], None]
+) -> Iterator[None]:
+ """
+ Context manager that makes this input active in the current event loop.
+
+ :param input: :class:`~prompt_toolkit.input.Input` object.
+ :param input_ready_callback: Called when the input is ready to read.
+ """
+ win32_handles = input.win32_handles
+ handle = input.handle
+
+ if handle.value is None:
+ raise ValueError("Invalid handle.")
+
+ # Add reader.
+ previous_callback = win32_handles.remove_win32_handle(handle)
+ win32_handles.add_win32_handle(handle, callback)
+
+ try:
+ yield
+ finally:
+ win32_handles.remove_win32_handle(handle)
+
+ if previous_callback:
+ win32_handles.add_win32_handle(handle, previous_callback)
+
+
+@contextmanager
+def detach_win32_input(input: _Win32InputBase) -> Iterator[None]:
+ win32_handles = input.win32_handles
+ handle = input.handle
+
+ if handle.value is None:
+ raise ValueError("Invalid handle.")
+
+ previous_callback = win32_handles.remove_win32_handle(handle)
+
+ try:
+ yield
+ finally:
+ if previous_callback:
+ win32_handles.add_win32_handle(handle, previous_callback)
+
+
+class raw_mode:
+ """
+ ::
+
+ with raw_mode(stdin):
+ ''' the windows terminal is now in 'raw' mode. '''
+
+ The ``fileno`` attribute is ignored. This is to be compatible with the
+ `raw_input` method of `.vt100_input`.
+ """
+
+ def __init__(self, fileno: Optional[int] = None) -> None:
+ self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+
+ def __enter__(self) -> None:
+ # Remember original mode.
+ original_mode = DWORD()
+ windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode))
+ self.original_mode = original_mode
+
+ self._patch()
+
+ def _patch(self) -> None:
+ # Set raw
+ ENABLE_ECHO_INPUT = 0x0004
+ ENABLE_LINE_INPUT = 0x0002
+ ENABLE_PROCESSED_INPUT = 0x0001
+
+ windll.kernel32.SetConsoleMode(
+ self.handle,
+ self.original_mode.value
+ & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
+ )
+
+ def __exit__(self, *a: object) -> None:
+ # Restore original mode
+ windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
+
+
+class cooked_mode(raw_mode):
+ """
+ ::
+
+ with cooked_mode(stdin):
+ ''' The pseudo-terminal stdin is now used in cooked mode. '''
+ """
+
+ def _patch(self) -> None:
+ # Set cooked.
+ ENABLE_ECHO_INPUT = 0x0004
+ ENABLE_LINE_INPUT = 0x0002
+ ENABLE_PROCESSED_INPUT = 0x0001
+
+ windll.kernel32.SetConsoleMode(
+ self.handle,
+ self.original_mode.value
+ | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
+ )
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py
index 67cf6f0ee3e..cdcf084de11 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py
@@ -1,135 +1,135 @@
-from ctypes import windll
-from ctypes.wintypes import HANDLE
-from typing import Callable, ContextManager, List
-
-from prompt_toolkit.eventloop.win32 import create_win32_event
-
-from ..key_binding import KeyPress
-from ..utils import DummyContext
-from .base import PipeInput
-from .vt100_parser import Vt100Parser
-from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input
-
-__all__ = ["Win32PipeInput"]
-
-
-class Win32PipeInput(_Win32InputBase, PipeInput):
- """
- This is an input pipe that works on Windows.
- Text or bytes can be feed into the pipe, and key strokes can be read from
- the pipe. This is useful if we want to send the input programmatically into
- the application. Mostly useful for unit testing.
-
- Notice that even though it's Windows, we use vt100 escape sequences over
- the pipe.
-
- Usage::
-
- input = Win32PipeInput()
- input.send_text('inputdata')
- """
-
- _id = 0
-
- def __init__(self) -> None:
- super().__init__()
- # Event (handle) for registering this input in the event loop.
- # This event is set when there is data available to read from the pipe.
- # Note: We use this approach instead of using a regular pipe, like
- # returned from `os.pipe()`, because making such a regular pipe
- # non-blocking is tricky and this works really well.
- self._event = create_win32_event()
-
- self._closed = False
-
- # Parser for incoming keys.
- self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
- self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key))
-
- # Identifier for every PipeInput for the hash.
- self.__class__._id += 1
- self._id = self.__class__._id
-
- @property
- def closed(self) -> bool:
- return self._closed
-
- def fileno(self) -> int:
- """
- The windows pipe doesn't depend on the file handle.
- """
- raise NotImplementedError
-
- @property
- def handle(self) -> HANDLE:
- "The handle used for registering this pipe in the event loop."
- return self._event
-
- def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
- """
- Return a context manager that makes this input active in the current
- event loop.
- """
- return attach_win32_input(self, input_ready_callback)
-
- def detach(self) -> ContextManager[None]:
- """
- Return a context manager that makes sure that this input is not active
- in the current event loop.
- """
- return detach_win32_input(self)
-
- def read_keys(self) -> List[KeyPress]:
- "Read list of KeyPress."
-
- # Return result.
- result = self._buffer
- self._buffer = []
-
- # Reset event.
- windll.kernel32.ResetEvent(self._event)
-
- return result
-
- def flush_keys(self) -> List[KeyPress]:
- """
- Flush pending keys and return them.
- (Used for flushing the 'escape' key.)
- """
- # Flush all pending keys. (This is most important to flush the vt100
- # 'Escape' key early when nothing else follows.)
- self.vt100_parser.flush()
-
- # Return result.
- result = self._buffer
- self._buffer = []
- return result
-
- def send_bytes(self, data: bytes) -> None:
- "Send bytes to the input."
- self.send_text(data.decode("utf-8", "ignore"))
-
- def send_text(self, text: str) -> None:
- "Send text to the input."
- # Pass it through our vt100 parser.
- self.vt100_parser.feed(text)
-
- # Set event.
- windll.kernel32.SetEvent(self._event)
-
- def raw_mode(self) -> ContextManager[None]:
- return DummyContext()
-
- def cooked_mode(self) -> ContextManager[None]:
- return DummyContext()
-
- def close(self) -> None:
- "Close pipe handles."
- windll.kernel32.CloseHandle(self._event)
- self._closed = True
-
- def typeahead_hash(self) -> str:
- """
- This needs to be unique for every `PipeInput`.
- """
- return "pipe-input-%s" % (self._id,)
+from ctypes import windll
+from ctypes.wintypes import HANDLE
+from typing import Callable, ContextManager, List
+
+from prompt_toolkit.eventloop.win32 import create_win32_event
+
+from ..key_binding import KeyPress
+from ..utils import DummyContext
+from .base import PipeInput
+from .vt100_parser import Vt100Parser
+from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input
+
+__all__ = ["Win32PipeInput"]
+
+
+class Win32PipeInput(_Win32InputBase, PipeInput):
+ """
+ This is an input pipe that works on Windows.
+ Text or bytes can be feed into the pipe, and key strokes can be read from
+ the pipe. This is useful if we want to send the input programmatically into
+ the application. Mostly useful for unit testing.
+
+ Notice that even though it's Windows, we use vt100 escape sequences over
+ the pipe.
+
+ Usage::
+
+ input = Win32PipeInput()
+ input.send_text('inputdata')
+ """
+
+ _id = 0
+
+ def __init__(self) -> None:
+ super().__init__()
+ # Event (handle) for registering this input in the event loop.
+ # This event is set when there is data available to read from the pipe.
+ # Note: We use this approach instead of using a regular pipe, like
+ # returned from `os.pipe()`, because making such a regular pipe
+ # non-blocking is tricky and this works really well.
+ self._event = create_win32_event()
+
+ self._closed = False
+
+ # Parser for incoming keys.
+ self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects.
+ self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key))
+
+ # Identifier for every PipeInput for the hash.
+ self.__class__._id += 1
+ self._id = self.__class__._id
+
+ @property
+ def closed(self) -> bool:
+ return self._closed
+
+ def fileno(self) -> int:
+ """
+ The windows pipe doesn't depend on the file handle.
+ """
+ raise NotImplementedError
+
+ @property
+ def handle(self) -> HANDLE:
+ "The handle used for registering this pipe in the event loop."
+ return self._event
+
+ def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
+ """
+ Return a context manager that makes this input active in the current
+ event loop.
+ """
+ return attach_win32_input(self, input_ready_callback)
+
+ def detach(self) -> ContextManager[None]:
+ """
+ Return a context manager that makes sure that this input is not active
+ in the current event loop.
+ """
+ return detach_win32_input(self)
+
+ def read_keys(self) -> List[KeyPress]:
+ "Read list of KeyPress."
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+
+ # Reset event.
+ windll.kernel32.ResetEvent(self._event)
+
+ return result
+
+ def flush_keys(self) -> List[KeyPress]:
+ """
+ Flush pending keys and return them.
+ (Used for flushing the 'escape' key.)
+ """
+ # Flush all pending keys. (This is most important to flush the vt100
+ # 'Escape' key early when nothing else follows.)
+ self.vt100_parser.flush()
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ def send_bytes(self, data: bytes) -> None:
+ "Send bytes to the input."
+ self.send_text(data.decode("utf-8", "ignore"))
+
+ def send_text(self, text: str) -> None:
+ "Send text to the input."
+ # Pass it through our vt100 parser.
+ self.vt100_parser.feed(text)
+
+ # Set event.
+ windll.kernel32.SetEvent(self._event)
+
+ def raw_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def cooked_mode(self) -> ContextManager[None]:
+ return DummyContext()
+
+ def close(self) -> None:
+ "Close pipe handles."
+ windll.kernel32.CloseHandle(self._event)
+ self._closed = True
+
+ def typeahead_hash(self) -> str:
+ """
+ This needs to be unique for every `PipeInput`.
+ """
+ return "pipe-input-%s" % (self._id,)