diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/prompt-toolkit/py3/prompt_toolkit/input | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/input')
11 files changed, 2223 insertions, 2223 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py index 421d4ccdf40..b3a8219d814 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/__init__.py @@ -1,11 +1,11 @@ -from .base import DummyInput, Input -from .defaults import create_input, create_pipe_input - -__all__ = [ - # Base. - "Input", - "DummyInput", - # Defaults. - "create_input", - "create_pipe_input", -] +from .base import DummyInput, Input +from .defaults import create_input, create_pipe_input + +__all__ = [ + # Base. + "Input", + "DummyInput", + # Defaults. + "create_input", + "create_pipe_input", +] diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py index 2e6c5b9b286..22006fdb5c1 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/ansi_escape_sequences.py @@ -1,126 +1,126 @@ -""" -Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit -keys. - -We are not using the terminfo/termcap databases to detect the ANSI escape -sequences for the input. Instead, we recognize 99% of the most common -sequences. This works well, because in practice, every modern terminal is -mostly Xterm compatible. - -Some useful docs: -- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md -""" -from typing import Dict, Tuple, Union - -from ..keys import Keys - -__all__ = [ - "ANSI_SEQUENCES", - "REVERSE_ANSI_SEQUENCES", -] - -# Mapping of vt100 escape codes to Keys. -ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = { - # Control keys. - "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space) - "\x01": Keys.ControlA, # Control-A (home) - "\x02": Keys.ControlB, # Control-B (emacs cursor left) - "\x03": Keys.ControlC, # Control-C (interrupt) - "\x04": Keys.ControlD, # Control-D (exit) - "\x05": Keys.ControlE, # Control-E (end) - "\x06": Keys.ControlF, # Control-F (cursor forward) - "\x07": Keys.ControlG, # Control-G - "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') - "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') - "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') - "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) - "\x0c": Keys.ControlL, # Control-L (clear; form feed) - "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r') - "\x0e": Keys.ControlN, # Control-N (14) (history forward) - "\x0f": Keys.ControlO, # Control-O (15) - "\x10": Keys.ControlP, # Control-P (16) (history back) - "\x11": Keys.ControlQ, # Control-Q - "\x12": Keys.ControlR, # Control-R (18) (reverse search) - "\x13": Keys.ControlS, # Control-S (19) (forward search) - "\x14": Keys.ControlT, # Control-T - "\x15": Keys.ControlU, # Control-U - "\x16": Keys.ControlV, # Control-V - "\x17": Keys.ControlW, # Control-W - "\x18": Keys.ControlX, # Control-X - "\x19": Keys.ControlY, # Control-Y (25) - "\x1a": Keys.ControlZ, # Control-Z - "\x1b": Keys.Escape, # Also Control-[ - "\x9b": Keys.ShiftEscape, - "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| ) - "\x1d": Keys.ControlSquareClose, # Control-] - "\x1e": Keys.ControlCircumflex, # Control-^ - "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) - # ASCII Delete (0x7f) - # Vt220 (and Linux terminal) send this when pressing backspace. We map this - # to ControlH, because that will make it easier to create key bindings that - # work everywhere, with the trade-off that it's no longer possible to - # handle backspace and control-h individually for the few terminals that - # support it. (Most terminals send ControlH when backspace is pressed.) - # See: http://www.ibb.net/~anne/keyboard.html - "\x7f": Keys.ControlH, - # -- - # Various - "\x1b[1~": Keys.Home, # tmux - "\x1b[2~": Keys.Insert, - "\x1b[3~": Keys.Delete, - "\x1b[4~": Keys.End, # tmux - "\x1b[5~": Keys.PageUp, - "\x1b[6~": Keys.PageDown, - "\x1b[7~": Keys.Home, # xrvt - "\x1b[8~": Keys.End, # xrvt - "\x1b[Z": Keys.BackTab, # shift + tab - "\x1b\x09": Keys.BackTab, # Linux console - "\x1b[~": Keys.BackTab, # Windows console - # -- - # Function keys. - "\x1bOP": Keys.F1, - "\x1bOQ": Keys.F2, - "\x1bOR": Keys.F3, - "\x1bOS": Keys.F4, - "\x1b[[A": Keys.F1, # Linux console. - "\x1b[[B": Keys.F2, # Linux console. - "\x1b[[C": Keys.F3, # Linux console. - "\x1b[[D": Keys.F4, # Linux console. - "\x1b[[E": Keys.F5, # Linux console. - "\x1b[11~": Keys.F1, # rxvt-unicode - "\x1b[12~": Keys.F2, # rxvt-unicode - "\x1b[13~": Keys.F3, # rxvt-unicode - "\x1b[14~": Keys.F4, # rxvt-unicode - "\x1b[15~": Keys.F5, - "\x1b[17~": Keys.F6, - "\x1b[18~": Keys.F7, - "\x1b[19~": Keys.F8, - "\x1b[20~": Keys.F9, - "\x1b[21~": Keys.F10, - "\x1b[23~": Keys.F11, - "\x1b[24~": Keys.F12, - "\x1b[25~": Keys.F13, - "\x1b[26~": Keys.F14, - "\x1b[28~": Keys.F15, - "\x1b[29~": Keys.F16, - "\x1b[31~": Keys.F17, - "\x1b[32~": Keys.F18, - "\x1b[33~": Keys.F19, - "\x1b[34~": Keys.F20, - # Xterm - "\x1b[1;2P": Keys.F13, - "\x1b[1;2Q": Keys.F14, - # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response. - "\x1b[1;2S": Keys.F16, - "\x1b[15;2~": Keys.F17, - "\x1b[17;2~": Keys.F18, - "\x1b[18;2~": Keys.F19, - "\x1b[19;2~": Keys.F20, - "\x1b[20;2~": Keys.F21, - "\x1b[21;2~": Keys.F22, - "\x1b[23;2~": Keys.F23, - "\x1b[24;2~": Keys.F24, - # -- +""" +Mappings from VT100 (ANSI) escape sequences to the corresponding prompt_toolkit +keys. + +We are not using the terminfo/termcap databases to detect the ANSI escape +sequences for the input. Instead, we recognize 99% of the most common +sequences. This works well, because in practice, every modern terminal is +mostly Xterm compatible. + +Some useful docs: +- Mintty: https://github.com/mintty/mintty/blob/master/wiki/Keycodes.md +""" +from typing import Dict, Tuple, Union + +from ..keys import Keys + +__all__ = [ + "ANSI_SEQUENCES", + "REVERSE_ANSI_SEQUENCES", +] + +# Mapping of vt100 escape codes to Keys. +ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = { + # Control keys. + "\x00": Keys.ControlAt, # Control-At (Also for Ctrl-Space) + "\x01": Keys.ControlA, # Control-A (home) + "\x02": Keys.ControlB, # Control-B (emacs cursor left) + "\x03": Keys.ControlC, # Control-C (interrupt) + "\x04": Keys.ControlD, # Control-D (exit) + "\x05": Keys.ControlE, # Control-E (end) + "\x06": Keys.ControlF, # Control-F (cursor forward) + "\x07": Keys.ControlG, # Control-G + "\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') + "\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') + "\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') + "\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) + "\x0c": Keys.ControlL, # Control-L (clear; form feed) + "\x0d": Keys.ControlM, # Control-M (13) (Identical to '\r') + "\x0e": Keys.ControlN, # Control-N (14) (history forward) + "\x0f": Keys.ControlO, # Control-O (15) + "\x10": Keys.ControlP, # Control-P (16) (history back) + "\x11": Keys.ControlQ, # Control-Q + "\x12": Keys.ControlR, # Control-R (18) (reverse search) + "\x13": Keys.ControlS, # Control-S (19) (forward search) + "\x14": Keys.ControlT, # Control-T + "\x15": Keys.ControlU, # Control-U + "\x16": Keys.ControlV, # Control-V + "\x17": Keys.ControlW, # Control-W + "\x18": Keys.ControlX, # Control-X + "\x19": Keys.ControlY, # Control-Y (25) + "\x1a": Keys.ControlZ, # Control-Z + "\x1b": Keys.Escape, # Also Control-[ + "\x9b": Keys.ShiftEscape, + "\x1c": Keys.ControlBackslash, # Both Control-\ (also Ctrl-| ) + "\x1d": Keys.ControlSquareClose, # Control-] + "\x1e": Keys.ControlCircumflex, # Control-^ + "\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) + # ASCII Delete (0x7f) + # Vt220 (and Linux terminal) send this when pressing backspace. We map this + # to ControlH, because that will make it easier to create key bindings that + # work everywhere, with the trade-off that it's no longer possible to + # handle backspace and control-h individually for the few terminals that + # support it. (Most terminals send ControlH when backspace is pressed.) + # See: http://www.ibb.net/~anne/keyboard.html + "\x7f": Keys.ControlH, + # -- + # Various + "\x1b[1~": Keys.Home, # tmux + "\x1b[2~": Keys.Insert, + "\x1b[3~": Keys.Delete, + "\x1b[4~": Keys.End, # tmux + "\x1b[5~": Keys.PageUp, + "\x1b[6~": Keys.PageDown, + "\x1b[7~": Keys.Home, # xrvt + "\x1b[8~": Keys.End, # xrvt + "\x1b[Z": Keys.BackTab, # shift + tab + "\x1b\x09": Keys.BackTab, # Linux console + "\x1b[~": Keys.BackTab, # Windows console + # -- + # Function keys. + "\x1bOP": Keys.F1, + "\x1bOQ": Keys.F2, + "\x1bOR": Keys.F3, + "\x1bOS": Keys.F4, + "\x1b[[A": Keys.F1, # Linux console. + "\x1b[[B": Keys.F2, # Linux console. + "\x1b[[C": Keys.F3, # Linux console. + "\x1b[[D": Keys.F4, # Linux console. + "\x1b[[E": Keys.F5, # Linux console. + "\x1b[11~": Keys.F1, # rxvt-unicode + "\x1b[12~": Keys.F2, # rxvt-unicode + "\x1b[13~": Keys.F3, # rxvt-unicode + "\x1b[14~": Keys.F4, # rxvt-unicode + "\x1b[15~": Keys.F5, + "\x1b[17~": Keys.F6, + "\x1b[18~": Keys.F7, + "\x1b[19~": Keys.F8, + "\x1b[20~": Keys.F9, + "\x1b[21~": Keys.F10, + "\x1b[23~": Keys.F11, + "\x1b[24~": Keys.F12, + "\x1b[25~": Keys.F13, + "\x1b[26~": Keys.F14, + "\x1b[28~": Keys.F15, + "\x1b[29~": Keys.F16, + "\x1b[31~": Keys.F17, + "\x1b[32~": Keys.F18, + "\x1b[33~": Keys.F19, + "\x1b[34~": Keys.F20, + # Xterm + "\x1b[1;2P": Keys.F13, + "\x1b[1;2Q": Keys.F14, + # '\x1b[1;2R': Keys.F15, # Conflicts with CPR response. + "\x1b[1;2S": Keys.F16, + "\x1b[15;2~": Keys.F17, + "\x1b[17;2~": Keys.F18, + "\x1b[18;2~": Keys.F19, + "\x1b[19;2~": Keys.F20, + "\x1b[20;2~": Keys.F21, + "\x1b[21;2~": Keys.F22, + "\x1b[23;2~": Keys.F23, + "\x1b[24;2~": Keys.F24, + # -- # CSI 27 disambiguated modified "other" keys (xterm) # Ref: https://invisible-island.net/xterm/modified-keys.html # These are currently unsupported, so just re-map some common ones to the @@ -129,215 +129,215 @@ ANSI_SEQUENCES: Dict[str, Union[Keys, Tuple[Keys, ...]]] = { "\x1b[27;5;13~": Keys.ControlM, # Ctrl + Enter "\x1b[27;6;13~": Keys.ControlM, # Ctrl + Shift + Enter # -- - # Control + function keys. - "\x1b[1;5P": Keys.ControlF1, - "\x1b[1;5Q": Keys.ControlF2, - # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response. - "\x1b[1;5S": Keys.ControlF4, - "\x1b[15;5~": Keys.ControlF5, - "\x1b[17;5~": Keys.ControlF6, - "\x1b[18;5~": Keys.ControlF7, - "\x1b[19;5~": Keys.ControlF8, - "\x1b[20;5~": Keys.ControlF9, - "\x1b[21;5~": Keys.ControlF10, - "\x1b[23;5~": Keys.ControlF11, - "\x1b[24;5~": Keys.ControlF12, - "\x1b[1;6P": Keys.ControlF13, - "\x1b[1;6Q": Keys.ControlF14, - # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response. - "\x1b[1;6S": Keys.ControlF16, - "\x1b[15;6~": Keys.ControlF17, - "\x1b[17;6~": Keys.ControlF18, - "\x1b[18;6~": Keys.ControlF19, - "\x1b[19;6~": Keys.ControlF20, - "\x1b[20;6~": Keys.ControlF21, - "\x1b[21;6~": Keys.ControlF22, - "\x1b[23;6~": Keys.ControlF23, - "\x1b[24;6~": Keys.ControlF24, - # -- - # Tmux (Win32 subsystem) sends the following scroll events. - "\x1b[62~": Keys.ScrollUp, - "\x1b[63~": Keys.ScrollDown, - "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste. - # -- - # Sequences generated by numpad 5. Not sure what it means. (It doesn't - # appear in 'infocmp'. Just ignore. - "\x1b[E": Keys.Ignore, # Xterm. - "\x1b[G": Keys.Ignore, # Linux console. - # -- - # Meta/control/escape + pageup/pagedown/insert/delete. - "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal. - "\x1b[5;2~": Keys.ShiftPageUp, - "\x1b[6;2~": Keys.ShiftPageDown, - "\x1b[2;3~": (Keys.Escape, Keys.Insert), - "\x1b[3;3~": (Keys.Escape, Keys.Delete), - "\x1b[5;3~": (Keys.Escape, Keys.PageUp), - "\x1b[6;3~": (Keys.Escape, Keys.PageDown), - "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert), - "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete), - "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp), - "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown), - "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal. - "\x1b[5;5~": Keys.ControlPageUp, - "\x1b[6;5~": Keys.ControlPageDown, - "\x1b[3;6~": Keys.ControlShiftDelete, - "\x1b[5;6~": Keys.ControlShiftPageUp, - "\x1b[6;6~": Keys.ControlShiftPageDown, - "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert), - "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown), - "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown), - "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert), - "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown), - "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown), - # -- - # Arrows. - # (Normal cursor mode). - "\x1b[A": Keys.Up, - "\x1b[B": Keys.Down, - "\x1b[C": Keys.Right, - "\x1b[D": Keys.Left, - "\x1b[H": Keys.Home, - "\x1b[F": Keys.End, - # Tmux sends following keystrokes when control+arrow is pressed, but for - # Emacs ansi-term sends the same sequences for normal arrow keys. Consider - # it a normal arrow press, because that's more important. - # (Application cursor mode). - "\x1bOA": Keys.Up, - "\x1bOB": Keys.Down, - "\x1bOC": Keys.Right, - "\x1bOD": Keys.Left, - "\x1bOF": Keys.End, - "\x1bOH": Keys.Home, - # Shift + arrows. - "\x1b[1;2A": Keys.ShiftUp, - "\x1b[1;2B": Keys.ShiftDown, - "\x1b[1;2C": Keys.ShiftRight, - "\x1b[1;2D": Keys.ShiftLeft, - "\x1b[1;2F": Keys.ShiftEnd, - "\x1b[1;2H": Keys.ShiftHome, - # Meta + arrow keys. Several terminals handle this differently. - # The following sequences are for xterm and gnome-terminal. - # (Iterm sends ESC followed by the normal arrow_up/down/left/right - # sequences, and the OSX Terminal sends ESCb and ESCf for "alt - # arrow_left" and "alt arrow_right." We don't handle these - # explicitly, in here, because would could not distinguish between - # pressing ESC (to go to Vi navigation mode), followed by just the - # 'b' or 'f' key. These combinations are handled in - # the input processor.) - "\x1b[1;3A": (Keys.Escape, Keys.Up), - "\x1b[1;3B": (Keys.Escape, Keys.Down), - "\x1b[1;3C": (Keys.Escape, Keys.Right), - "\x1b[1;3D": (Keys.Escape, Keys.Left), - "\x1b[1;3F": (Keys.Escape, Keys.End), - "\x1b[1;3H": (Keys.Escape, Keys.Home), - # Alt+shift+number. - "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown), - "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp), - "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight), - "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft), - "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd), - "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome), - # Control + arrows. - "\x1b[1;5A": Keys.ControlUp, # Cursor Mode - "\x1b[1;5B": Keys.ControlDown, # Cursor Mode - "\x1b[1;5C": Keys.ControlRight, # Cursor Mode - "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode - "\x1b[1;5F": Keys.ControlEnd, - "\x1b[1;5H": Keys.ControlHome, - # Tmux sends following keystrokes when control+arrow is pressed, but for - # Emacs ansi-term sends the same sequences for normal arrow keys. Consider - # it a normal arrow press, because that's more important. - "\x1b[5A": Keys.ControlUp, - "\x1b[5B": Keys.ControlDown, - "\x1b[5C": Keys.ControlRight, - "\x1b[5D": Keys.ControlLeft, - "\x1bOc": Keys.ControlRight, # rxvt - "\x1bOd": Keys.ControlLeft, # rxvt - # Control + shift + arrows. - "\x1b[1;6A": Keys.ControlShiftDown, - "\x1b[1;6B": Keys.ControlShiftUp, - "\x1b[1;6C": Keys.ControlShiftRight, - "\x1b[1;6D": Keys.ControlShiftLeft, - "\x1b[1;6F": Keys.ControlShiftEnd, - "\x1b[1;6H": Keys.ControlShiftHome, - # Control + Meta + arrows. - "\x1b[1;7A": (Keys.Escape, Keys.ControlDown), - "\x1b[1;7B": (Keys.Escape, Keys.ControlUp), - "\x1b[1;7C": (Keys.Escape, Keys.ControlRight), - "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft), - "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd), - "\x1b[1;7H": (Keys.Escape, Keys.ControlHome), - # Meta + Shift + arrows. - "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown), - "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp), - "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight), - "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft), - "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd), - "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome), - # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483). - "\x1b[1;9A": (Keys.Escape, Keys.Up), - "\x1b[1;9B": (Keys.Escape, Keys.Down), - "\x1b[1;9C": (Keys.Escape, Keys.Right), - "\x1b[1;9D": (Keys.Escape, Keys.Left), - # -- - # Control/shift/meta + number in mintty. - # (c-2 will actually send c-@ and c-6 will send c-^.) - "\x1b[1;5p": Keys.Control0, - "\x1b[1;5q": Keys.Control1, - "\x1b[1;5r": Keys.Control2, - "\x1b[1;5s": Keys.Control3, - "\x1b[1;5t": Keys.Control4, - "\x1b[1;5u": Keys.Control5, - "\x1b[1;5v": Keys.Control6, - "\x1b[1;5w": Keys.Control7, - "\x1b[1;5x": Keys.Control8, - "\x1b[1;5y": Keys.Control9, - "\x1b[1;6p": Keys.ControlShift0, - "\x1b[1;6q": Keys.ControlShift1, - "\x1b[1;6r": Keys.ControlShift2, - "\x1b[1;6s": Keys.ControlShift3, - "\x1b[1;6t": Keys.ControlShift4, - "\x1b[1;6u": Keys.ControlShift5, - "\x1b[1;6v": Keys.ControlShift6, - "\x1b[1;6w": Keys.ControlShift7, - "\x1b[1;6x": Keys.ControlShift8, - "\x1b[1;6y": Keys.ControlShift9, - "\x1b[1;7p": (Keys.Escape, Keys.Control0), - "\x1b[1;7q": (Keys.Escape, Keys.Control1), - "\x1b[1;7r": (Keys.Escape, Keys.Control2), - "\x1b[1;7s": (Keys.Escape, Keys.Control3), - "\x1b[1;7t": (Keys.Escape, Keys.Control4), - "\x1b[1;7u": (Keys.Escape, Keys.Control5), - "\x1b[1;7v": (Keys.Escape, Keys.Control6), - "\x1b[1;7w": (Keys.Escape, Keys.Control7), - "\x1b[1;7x": (Keys.Escape, Keys.Control8), - "\x1b[1;7y": (Keys.Escape, Keys.Control9), - "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0), - "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1), - "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2), - "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3), - "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4), - "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5), - "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6), - "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7), - "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8), - "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9), -} - - -def _get_reverse_ansi_sequences() -> Dict[Keys, str]: - """ - Create a dictionary that maps prompt_toolkit keys back to the VT100 escape - sequences. - """ - result: Dict[Keys, str] = {} - - for sequence, key in ANSI_SEQUENCES.items(): - if not isinstance(key, tuple): - if key not in result: - result[key] = sequence - - return result - - -REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences() + # Control + function keys. + "\x1b[1;5P": Keys.ControlF1, + "\x1b[1;5Q": Keys.ControlF2, + # "\x1b[1;5R": Keys.ControlF3, # Conflicts with CPR response. + "\x1b[1;5S": Keys.ControlF4, + "\x1b[15;5~": Keys.ControlF5, + "\x1b[17;5~": Keys.ControlF6, + "\x1b[18;5~": Keys.ControlF7, + "\x1b[19;5~": Keys.ControlF8, + "\x1b[20;5~": Keys.ControlF9, + "\x1b[21;5~": Keys.ControlF10, + "\x1b[23;5~": Keys.ControlF11, + "\x1b[24;5~": Keys.ControlF12, + "\x1b[1;6P": Keys.ControlF13, + "\x1b[1;6Q": Keys.ControlF14, + # "\x1b[1;6R": Keys.ControlF15, # Conflicts with CPR response. + "\x1b[1;6S": Keys.ControlF16, + "\x1b[15;6~": Keys.ControlF17, + "\x1b[17;6~": Keys.ControlF18, + "\x1b[18;6~": Keys.ControlF19, + "\x1b[19;6~": Keys.ControlF20, + "\x1b[20;6~": Keys.ControlF21, + "\x1b[21;6~": Keys.ControlF22, + "\x1b[23;6~": Keys.ControlF23, + "\x1b[24;6~": Keys.ControlF24, + # -- + # Tmux (Win32 subsystem) sends the following scroll events. + "\x1b[62~": Keys.ScrollUp, + "\x1b[63~": Keys.ScrollDown, + "\x1b[200~": Keys.BracketedPaste, # Start of bracketed paste. + # -- + # Sequences generated by numpad 5. Not sure what it means. (It doesn't + # appear in 'infocmp'. Just ignore. + "\x1b[E": Keys.Ignore, # Xterm. + "\x1b[G": Keys.Ignore, # Linux console. + # -- + # Meta/control/escape + pageup/pagedown/insert/delete. + "\x1b[3;2~": Keys.ShiftDelete, # xterm, gnome-terminal. + "\x1b[5;2~": Keys.ShiftPageUp, + "\x1b[6;2~": Keys.ShiftPageDown, + "\x1b[2;3~": (Keys.Escape, Keys.Insert), + "\x1b[3;3~": (Keys.Escape, Keys.Delete), + "\x1b[5;3~": (Keys.Escape, Keys.PageUp), + "\x1b[6;3~": (Keys.Escape, Keys.PageDown), + "\x1b[2;4~": (Keys.Escape, Keys.ShiftInsert), + "\x1b[3;4~": (Keys.Escape, Keys.ShiftDelete), + "\x1b[5;4~": (Keys.Escape, Keys.ShiftPageUp), + "\x1b[6;4~": (Keys.Escape, Keys.ShiftPageDown), + "\x1b[3;5~": Keys.ControlDelete, # xterm, gnome-terminal. + "\x1b[5;5~": Keys.ControlPageUp, + "\x1b[6;5~": Keys.ControlPageDown, + "\x1b[3;6~": Keys.ControlShiftDelete, + "\x1b[5;6~": Keys.ControlShiftPageUp, + "\x1b[6;6~": Keys.ControlShiftPageDown, + "\x1b[2;7~": (Keys.Escape, Keys.ControlInsert), + "\x1b[5;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[6;7~": (Keys.Escape, Keys.ControlPageDown), + "\x1b[2;8~": (Keys.Escape, Keys.ControlShiftInsert), + "\x1b[5;8~": (Keys.Escape, Keys.ControlShiftPageDown), + "\x1b[6;8~": (Keys.Escape, Keys.ControlShiftPageDown), + # -- + # Arrows. + # (Normal cursor mode). + "\x1b[A": Keys.Up, + "\x1b[B": Keys.Down, + "\x1b[C": Keys.Right, + "\x1b[D": Keys.Left, + "\x1b[H": Keys.Home, + "\x1b[F": Keys.End, + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + # (Application cursor mode). + "\x1bOA": Keys.Up, + "\x1bOB": Keys.Down, + "\x1bOC": Keys.Right, + "\x1bOD": Keys.Left, + "\x1bOF": Keys.End, + "\x1bOH": Keys.Home, + # Shift + arrows. + "\x1b[1;2A": Keys.ShiftUp, + "\x1b[1;2B": Keys.ShiftDown, + "\x1b[1;2C": Keys.ShiftRight, + "\x1b[1;2D": Keys.ShiftLeft, + "\x1b[1;2F": Keys.ShiftEnd, + "\x1b[1;2H": Keys.ShiftHome, + # Meta + arrow keys. Several terminals handle this differently. + # The following sequences are for xterm and gnome-terminal. + # (Iterm sends ESC followed by the normal arrow_up/down/left/right + # sequences, and the OSX Terminal sends ESCb and ESCf for "alt + # arrow_left" and "alt arrow_right." We don't handle these + # explicitly, in here, because would could not distinguish between + # pressing ESC (to go to Vi navigation mode), followed by just the + # 'b' or 'f' key. These combinations are handled in + # the input processor.) + "\x1b[1;3A": (Keys.Escape, Keys.Up), + "\x1b[1;3B": (Keys.Escape, Keys.Down), + "\x1b[1;3C": (Keys.Escape, Keys.Right), + "\x1b[1;3D": (Keys.Escape, Keys.Left), + "\x1b[1;3F": (Keys.Escape, Keys.End), + "\x1b[1;3H": (Keys.Escape, Keys.Home), + # Alt+shift+number. + "\x1b[1;4A": (Keys.Escape, Keys.ShiftDown), + "\x1b[1;4B": (Keys.Escape, Keys.ShiftUp), + "\x1b[1;4C": (Keys.Escape, Keys.ShiftRight), + "\x1b[1;4D": (Keys.Escape, Keys.ShiftLeft), + "\x1b[1;4F": (Keys.Escape, Keys.ShiftEnd), + "\x1b[1;4H": (Keys.Escape, Keys.ShiftHome), + # Control + arrows. + "\x1b[1;5A": Keys.ControlUp, # Cursor Mode + "\x1b[1;5B": Keys.ControlDown, # Cursor Mode + "\x1b[1;5C": Keys.ControlRight, # Cursor Mode + "\x1b[1;5D": Keys.ControlLeft, # Cursor Mode + "\x1b[1;5F": Keys.ControlEnd, + "\x1b[1;5H": Keys.ControlHome, + # Tmux sends following keystrokes when control+arrow is pressed, but for + # Emacs ansi-term sends the same sequences for normal arrow keys. Consider + # it a normal arrow press, because that's more important. + "\x1b[5A": Keys.ControlUp, + "\x1b[5B": Keys.ControlDown, + "\x1b[5C": Keys.ControlRight, + "\x1b[5D": Keys.ControlLeft, + "\x1bOc": Keys.ControlRight, # rxvt + "\x1bOd": Keys.ControlLeft, # rxvt + # Control + shift + arrows. + "\x1b[1;6A": Keys.ControlShiftDown, + "\x1b[1;6B": Keys.ControlShiftUp, + "\x1b[1;6C": Keys.ControlShiftRight, + "\x1b[1;6D": Keys.ControlShiftLeft, + "\x1b[1;6F": Keys.ControlShiftEnd, + "\x1b[1;6H": Keys.ControlShiftHome, + # Control + Meta + arrows. + "\x1b[1;7A": (Keys.Escape, Keys.ControlDown), + "\x1b[1;7B": (Keys.Escape, Keys.ControlUp), + "\x1b[1;7C": (Keys.Escape, Keys.ControlRight), + "\x1b[1;7D": (Keys.Escape, Keys.ControlLeft), + "\x1b[1;7F": (Keys.Escape, Keys.ControlEnd), + "\x1b[1;7H": (Keys.Escape, Keys.ControlHome), + # Meta + Shift + arrows. + "\x1b[1;8A": (Keys.Escape, Keys.ControlShiftDown), + "\x1b[1;8B": (Keys.Escape, Keys.ControlShiftUp), + "\x1b[1;8C": (Keys.Escape, Keys.ControlShiftRight), + "\x1b[1;8D": (Keys.Escape, Keys.ControlShiftLeft), + "\x1b[1;8F": (Keys.Escape, Keys.ControlShiftEnd), + "\x1b[1;8H": (Keys.Escape, Keys.ControlShiftHome), + # Meta + arrow on (some?) Macs when using iTerm defaults (see issue #483). + "\x1b[1;9A": (Keys.Escape, Keys.Up), + "\x1b[1;9B": (Keys.Escape, Keys.Down), + "\x1b[1;9C": (Keys.Escape, Keys.Right), + "\x1b[1;9D": (Keys.Escape, Keys.Left), + # -- + # Control/shift/meta + number in mintty. + # (c-2 will actually send c-@ and c-6 will send c-^.) + "\x1b[1;5p": Keys.Control0, + "\x1b[1;5q": Keys.Control1, + "\x1b[1;5r": Keys.Control2, + "\x1b[1;5s": Keys.Control3, + "\x1b[1;5t": Keys.Control4, + "\x1b[1;5u": Keys.Control5, + "\x1b[1;5v": Keys.Control6, + "\x1b[1;5w": Keys.Control7, + "\x1b[1;5x": Keys.Control8, + "\x1b[1;5y": Keys.Control9, + "\x1b[1;6p": Keys.ControlShift0, + "\x1b[1;6q": Keys.ControlShift1, + "\x1b[1;6r": Keys.ControlShift2, + "\x1b[1;6s": Keys.ControlShift3, + "\x1b[1;6t": Keys.ControlShift4, + "\x1b[1;6u": Keys.ControlShift5, + "\x1b[1;6v": Keys.ControlShift6, + "\x1b[1;6w": Keys.ControlShift7, + "\x1b[1;6x": Keys.ControlShift8, + "\x1b[1;6y": Keys.ControlShift9, + "\x1b[1;7p": (Keys.Escape, Keys.Control0), + "\x1b[1;7q": (Keys.Escape, Keys.Control1), + "\x1b[1;7r": (Keys.Escape, Keys.Control2), + "\x1b[1;7s": (Keys.Escape, Keys.Control3), + "\x1b[1;7t": (Keys.Escape, Keys.Control4), + "\x1b[1;7u": (Keys.Escape, Keys.Control5), + "\x1b[1;7v": (Keys.Escape, Keys.Control6), + "\x1b[1;7w": (Keys.Escape, Keys.Control7), + "\x1b[1;7x": (Keys.Escape, Keys.Control8), + "\x1b[1;7y": (Keys.Escape, Keys.Control9), + "\x1b[1;8p": (Keys.Escape, Keys.ControlShift0), + "\x1b[1;8q": (Keys.Escape, Keys.ControlShift1), + "\x1b[1;8r": (Keys.Escape, Keys.ControlShift2), + "\x1b[1;8s": (Keys.Escape, Keys.ControlShift3), + "\x1b[1;8t": (Keys.Escape, Keys.ControlShift4), + "\x1b[1;8u": (Keys.Escape, Keys.ControlShift5), + "\x1b[1;8v": (Keys.Escape, Keys.ControlShift6), + "\x1b[1;8w": (Keys.Escape, Keys.ControlShift7), + "\x1b[1;8x": (Keys.Escape, Keys.ControlShift8), + "\x1b[1;8y": (Keys.Escape, Keys.ControlShift9), +} + + +def _get_reverse_ansi_sequences() -> Dict[Keys, str]: + """ + Create a dictionary that maps prompt_toolkit keys back to the VT100 escape + sequences. + """ + result: Dict[Keys, str] = {} + + for sequence, key in ANSI_SEQUENCES.items(): + if not isinstance(key, tuple): + if key not in result: + result[key] = sequence + + return result + + +REVERSE_ANSI_SEQUENCES = _get_reverse_ansi_sequences() diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py index 9885a37bc2c..e72f03a1e46 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/base.py @@ -1,131 +1,131 @@ -""" -Abstraction of CLI Input. -""" -from abc import ABCMeta, abstractmethod, abstractproperty -from contextlib import contextmanager -from typing import Callable, ContextManager, Generator, List - -from prompt_toolkit.key_binding import KeyPress - -__all__ = [ - "Input", - "DummyInput", -] - - -class Input(metaclass=ABCMeta): - """ - Abstraction for any input. - - An instance of this class can be given to the constructor of a - :class:`~prompt_toolkit.application.Application` and will also be - passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`. - """ - - @abstractmethod - def fileno(self) -> int: - """ - Fileno for putting this in an event loop. - """ - - @abstractmethod - def typeahead_hash(self) -> str: - """ - Identifier for storing type ahead key presses. - """ - - @abstractmethod - def read_keys(self) -> List[KeyPress]: - """ - Return a list of Key objects which are read/parsed from the input. - """ - - def flush_keys(self) -> List[KeyPress]: - """ - Flush the underlying parser. and return the pending keys. - (Used for vt100 input.) - """ - return [] - - def flush(self) -> None: - "The event loop can call this when the input has to be flushed." - pass - - @abstractproperty - def closed(self) -> bool: - "Should be true when the input stream is closed." - return False - - @abstractmethod - def raw_mode(self) -> ContextManager[None]: - """ - Context manager that turns the input into raw mode. - """ - - @abstractmethod - def cooked_mode(self) -> ContextManager[None]: - """ - Context manager that turns the input into cooked mode. - """ - - @abstractmethod - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: - """ - Return a context manager that makes this input active in the current - event loop. - """ - - @abstractmethod - def detach(self) -> ContextManager[None]: - """ - Return a context manager that makes sure that this input is not active - in the current event loop. - """ - - def close(self) -> None: - "Close input." - pass - - -class PipeInput(Input): - """ - Abstraction for pipe input. - """ - - @abstractmethod - def send_bytes(self, data: bytes) -> None: - """Feed byte string into the pipe""" - - @abstractmethod - def send_text(self, data: str) -> None: - """Feed a text string into the pipe""" - - -class DummyInput(Input): - """ - Input for use in a `DummyApplication` - """ - - def fileno(self) -> int: - raise NotImplementedError - - def typeahead_hash(self) -> str: - return "dummy-%s" % id(self) - - def read_keys(self) -> List[KeyPress]: - return [] - - @property - def closed(self) -> bool: - return True - - def raw_mode(self) -> ContextManager[None]: - return _dummy_context_manager() - - def cooked_mode(self) -> ContextManager[None]: - return _dummy_context_manager() - - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: +""" +Abstraction of CLI Input. +""" +from abc import ABCMeta, abstractmethod, abstractproperty +from contextlib import contextmanager +from typing import Callable, ContextManager, Generator, List + +from prompt_toolkit.key_binding import KeyPress + +__all__ = [ + "Input", + "DummyInput", +] + + +class Input(metaclass=ABCMeta): + """ + Abstraction for any input. + + An instance of this class can be given to the constructor of a + :class:`~prompt_toolkit.application.Application` and will also be + passed to the :class:`~prompt_toolkit.eventloop.base.EventLoop`. + """ + + @abstractmethod + def fileno(self) -> int: + """ + Fileno for putting this in an event loop. + """ + + @abstractmethod + def typeahead_hash(self) -> str: + """ + Identifier for storing type ahead key presses. + """ + + @abstractmethod + def read_keys(self) -> List[KeyPress]: + """ + Return a list of Key objects which are read/parsed from the input. + """ + + def flush_keys(self) -> List[KeyPress]: + """ + Flush the underlying parser. and return the pending keys. + (Used for vt100 input.) + """ + return [] + + def flush(self) -> None: + "The event loop can call this when the input has to be flushed." + pass + + @abstractproperty + def closed(self) -> bool: + "Should be true when the input stream is closed." + return False + + @abstractmethod + def raw_mode(self) -> ContextManager[None]: + """ + Context manager that turns the input into raw mode. + """ + + @abstractmethod + def cooked_mode(self) -> ContextManager[None]: + """ + Context manager that turns the input into cooked mode. + """ + + @abstractmethod + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + + @abstractmethod + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + + def close(self) -> None: + "Close input." + pass + + +class PipeInput(Input): + """ + Abstraction for pipe input. + """ + + @abstractmethod + def send_bytes(self, data: bytes) -> None: + """Feed byte string into the pipe""" + + @abstractmethod + def send_text(self, data: str) -> None: + """Feed a text string into the pipe""" + + +class DummyInput(Input): + """ + Input for use in a `DummyApplication` + """ + + def fileno(self) -> int: + raise NotImplementedError + + def typeahead_hash(self) -> str: + return "dummy-%s" % id(self) + + def read_keys(self) -> List[KeyPress]: + return [] + + @property + def closed(self) -> bool: + return True + + def raw_mode(self) -> ContextManager[None]: + return _dummy_context_manager() + + def cooked_mode(self) -> ContextManager[None]: + return _dummy_context_manager() + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: # Call the callback immediately once after attaching. # This tells the callback to call `read_keys` and check the # `input.closed` flag, after which it won't receive any keys, but knows @@ -133,12 +133,12 @@ class DummyInput(Input): # `application.py`. input_ready_callback() - return _dummy_context_manager() - - def detach(self) -> ContextManager[None]: - return _dummy_context_manager() - - -@contextmanager -def _dummy_context_manager() -> Generator[None, None, None]: - yield + return _dummy_context_manager() + + def detach(self) -> ContextManager[None]: + return _dummy_context_manager() + + +@contextmanager +def _dummy_context_manager() -> Generator[None, None, None]: + yield diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py index 347f8c6ad34..f771d4721ca 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/defaults.py @@ -1,63 +1,63 @@ -import sys -from typing import Optional, TextIO - -from prompt_toolkit.utils import is_windows - +import sys +from typing import Optional, TextIO + +from prompt_toolkit.utils import is_windows + from .base import DummyInput, Input, PipeInput - -__all__ = [ - "create_input", - "create_pipe_input", -] - - -def create_input( - stdin: Optional[TextIO] = None, always_prefer_tty: bool = False -) -> Input: - """ - Create the appropriate `Input` object for the current os/environment. - - :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix - `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a - pseudo terminal. If so, open the tty for reading instead of reading for - `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how - a `$PAGER` works.) - """ - if is_windows(): - from .win32 import Win32Input - + +__all__ = [ + "create_input", + "create_pipe_input", +] + + +def create_input( + stdin: Optional[TextIO] = None, always_prefer_tty: bool = False +) -> Input: + """ + Create the appropriate `Input` object for the current os/environment. + + :param always_prefer_tty: When set, if `sys.stdin` is connected to a Unix + `pipe`, check whether `sys.stdout` or `sys.stderr` are connected to a + pseudo terminal. If so, open the tty for reading instead of reading for + `sys.stdin`. (We can open `stdout` or `stderr` for reading, this is how + a `$PAGER` works.) + """ + if is_windows(): + from .win32 import Win32Input + # If `stdin` was assigned `None` (which happens with pythonw.exe), use # a `DummyInput`. This triggers `EOFError` in the application code. if stdin is None and sys.stdin is None: return DummyInput() - return Win32Input(stdin or sys.stdin) - else: - from .vt100 import Vt100Input - - # If no input TextIO is given, use stdin/stdout. - if stdin is None: - stdin = sys.stdin - - if always_prefer_tty: - for io in [sys.stdin, sys.stdout, sys.stderr]: - if io.isatty(): - stdin = io - break - - return Vt100Input(stdin) - - -def create_pipe_input() -> PipeInput: - """ - Create an input pipe. - This is mostly useful for unit testing. - """ - if is_windows(): - from .win32_pipe import Win32PipeInput - - return Win32PipeInput() - else: - from .posix_pipe import PosixPipeInput - - return PosixPipeInput() + return Win32Input(stdin or sys.stdin) + else: + from .vt100 import Vt100Input + + # If no input TextIO is given, use stdin/stdout. + if stdin is None: + stdin = sys.stdin + + if always_prefer_tty: + for io in [sys.stdin, sys.stdout, sys.stderr]: + if io.isatty(): + stdin = io + break + + return Vt100Input(stdin) + + +def create_pipe_input() -> PipeInput: + """ + Create an input pipe. + This is mostly useful for unit testing. + """ + if is_windows(): + from .win32_pipe import Win32PipeInput + + return Win32PipeInput() + else: + from .posix_pipe import PosixPipeInput + + return PosixPipeInput() diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py index de47c649330..61a798e869b 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_pipe.py @@ -1,72 +1,72 @@ -import os -from typing import ContextManager, TextIO, cast - -from ..utils import DummyContext -from .base import PipeInput -from .vt100 import Vt100Input - -__all__ = [ - "PosixPipeInput", -] - - -class PosixPipeInput(Vt100Input, PipeInput): - """ - Input that is send through a pipe. - This is useful if we want to send the input programmatically into the - application. Mostly useful for unit testing. - - Usage:: - - input = PosixPipeInput() - input.send_text('inputdata') - """ - - _id = 0 - - def __init__(self, text: str = "") -> None: - self._r, self._w = os.pipe() - - class Stdin: - encoding = "utf-8" - - def isatty(stdin) -> bool: - return True - - def fileno(stdin) -> int: - return self._r - - super().__init__(cast(TextIO, Stdin())) - self.send_text(text) - - # Identifier for every PipeInput for the hash. - self.__class__._id += 1 - self._id = self.__class__._id - - def send_bytes(self, data: bytes) -> None: - os.write(self._w, data) - - def send_text(self, data: str) -> None: - "Send text to the input." - os.write(self._w, data.encode("utf-8")) - - def raw_mode(self) -> ContextManager[None]: - return DummyContext() - - def cooked_mode(self) -> ContextManager[None]: - return DummyContext() - - def close(self) -> None: - "Close pipe fds." - os.close(self._r) - os.close(self._w) - - # We should assign `None` to 'self._r` and 'self._w', - # The event loop still needs to know the the fileno for this input in order - # to properly remove it from the selectors. - - def typeahead_hash(self) -> str: - """ - This needs to be unique for every `PipeInput`. - """ - return "pipe-input-%s" % (self._id,) +import os +from typing import ContextManager, TextIO, cast + +from ..utils import DummyContext +from .base import PipeInput +from .vt100 import Vt100Input + +__all__ = [ + "PosixPipeInput", +] + + +class PosixPipeInput(Vt100Input, PipeInput): + """ + Input that is send through a pipe. + This is useful if we want to send the input programmatically into the + application. Mostly useful for unit testing. + + Usage:: + + input = PosixPipeInput() + input.send_text('inputdata') + """ + + _id = 0 + + def __init__(self, text: str = "") -> None: + self._r, self._w = os.pipe() + + class Stdin: + encoding = "utf-8" + + def isatty(stdin) -> bool: + return True + + def fileno(stdin) -> int: + return self._r + + super().__init__(cast(TextIO, Stdin())) + self.send_text(text) + + # Identifier for every PipeInput for the hash. + self.__class__._id += 1 + self._id = self.__class__._id + + def send_bytes(self, data: bytes) -> None: + os.write(self._w, data) + + def send_text(self, data: str) -> None: + "Send text to the input." + os.write(self._w, data.encode("utf-8")) + + def raw_mode(self) -> ContextManager[None]: + return DummyContext() + + def cooked_mode(self) -> ContextManager[None]: + return DummyContext() + + def close(self) -> None: + "Close pipe fds." + os.close(self._r) + os.close(self._w) + + # We should assign `None` to 'self._r` and 'self._w', + # The event loop still needs to know the the fileno for this input in order + # to properly remove it from the selectors. + + def typeahead_hash(self) -> str: + """ + This needs to be unique for every `PipeInput`. + """ + return "pipe-input-%s" % (self._id,) diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py index f32f683f735..c0893239a3e 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/posix_utils.py @@ -1,95 +1,95 @@ -import os -import select -from codecs import getincrementaldecoder - -__all__ = [ - "PosixStdinReader", -] - - -class PosixStdinReader: - """ - Wrapper around stdin which reads (nonblocking) the next available 1024 - bytes and decodes it. - - Note that you can't be sure that the input file is closed if the ``read`` - function returns an empty string. When ``errors=ignore`` is passed, - ``read`` can return an empty string if all malformed input was replaced by - an empty string. (We can't block here and wait for more input.) So, because - of that, check the ``closed`` attribute, to be sure that the file has been - closed. - - :param stdin_fd: File descriptor from which we read. - :param errors: Can be 'ignore', 'strict' or 'replace'. - On Python3, this can be 'surrogateescape', which is the default. - - 'surrogateescape' is preferred, because this allows us to transfer - unrecognised bytes to the key bindings. Some terminals, like lxterminal - and Guake, use the 'Mxx' notation to send mouse events, where each 'x' - can be any possible byte. - """ - - # By default, we want to 'ignore' errors here. The input stream can be full - # of junk. One occurrence of this that I had was when using iTerm2 on OS X, - # with "Option as Meta" checked (You should choose "Option as +Esc".) - - def __init__( - self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8" - ) -> None: - self.stdin_fd = stdin_fd - self.errors = errors - - # Create incremental decoder for decoding stdin. - # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because - # it could be that we are in the middle of a utf-8 byte sequence. - self._stdin_decoder_cls = getincrementaldecoder(encoding) - self._stdin_decoder = self._stdin_decoder_cls(errors=errors) - - #: True when there is nothing anymore to read. - self.closed = False - - def read(self, count: int = 1024) -> str: - # By default we choose a rather small chunk size, because reading - # big amounts of input at once, causes the event loop to process - # all these key bindings also at once without going back to the - # loop. This will make the application feel unresponsive. - """ - Read the input and return it as a string. - - Return the text. Note that this can return an empty string, even when - the input stream was not yet closed. This means that something went - wrong during the decoding. - """ - if self.closed: - return "" - - # Check whether there is some input to read. `os.read` would block - # otherwise. - # (Actually, the event loop is responsible to make sure that this - # function is only called when there is something to read, but for some - # reason this happens in certain situations.) - try: - if not select.select([self.stdin_fd], [], [], 0)[0]: - return "" - except IOError: - # Happens for instance when the file descriptor was closed. - # (We had this in ptterm, where the FD became ready, a callback was - # scheduled, but in the meantime another callback closed it already.) - self.closed = True - - # Note: the following works better than wrapping `self.stdin` like - # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`. - # Somehow that causes some latency when the escape - # character is pressed. (Especially on combination with the `select`.) - try: - data = os.read(self.stdin_fd, count) - - # Nothing more to read, stream is closed. - if data == b"": - self.closed = True - return "" - except OSError: - # In case of SIGWINCH - data = b"" - - return self._stdin_decoder.decode(data) +import os +import select +from codecs import getincrementaldecoder + +__all__ = [ + "PosixStdinReader", +] + + +class PosixStdinReader: + """ + Wrapper around stdin which reads (nonblocking) the next available 1024 + bytes and decodes it. + + Note that you can't be sure that the input file is closed if the ``read`` + function returns an empty string. When ``errors=ignore`` is passed, + ``read`` can return an empty string if all malformed input was replaced by + an empty string. (We can't block here and wait for more input.) So, because + of that, check the ``closed`` attribute, to be sure that the file has been + closed. + + :param stdin_fd: File descriptor from which we read. + :param errors: Can be 'ignore', 'strict' or 'replace'. + On Python3, this can be 'surrogateescape', which is the default. + + 'surrogateescape' is preferred, because this allows us to transfer + unrecognised bytes to the key bindings. Some terminals, like lxterminal + and Guake, use the 'Mxx' notation to send mouse events, where each 'x' + can be any possible byte. + """ + + # By default, we want to 'ignore' errors here. The input stream can be full + # of junk. One occurrence of this that I had was when using iTerm2 on OS X, + # with "Option as Meta" checked (You should choose "Option as +Esc".) + + def __init__( + self, stdin_fd: int, errors: str = "surrogateescape", encoding: str = "utf-8" + ) -> None: + self.stdin_fd = stdin_fd + self.errors = errors + + # Create incremental decoder for decoding stdin. + # We can not just do `os.read(stdin.fileno(), 1024).decode('utf-8')`, because + # it could be that we are in the middle of a utf-8 byte sequence. + self._stdin_decoder_cls = getincrementaldecoder(encoding) + self._stdin_decoder = self._stdin_decoder_cls(errors=errors) + + #: True when there is nothing anymore to read. + self.closed = False + + def read(self, count: int = 1024) -> str: + # By default we choose a rather small chunk size, because reading + # big amounts of input at once, causes the event loop to process + # all these key bindings also at once without going back to the + # loop. This will make the application feel unresponsive. + """ + Read the input and return it as a string. + + Return the text. Note that this can return an empty string, even when + the input stream was not yet closed. This means that something went + wrong during the decoding. + """ + if self.closed: + return "" + + # Check whether there is some input to read. `os.read` would block + # otherwise. + # (Actually, the event loop is responsible to make sure that this + # function is only called when there is something to read, but for some + # reason this happens in certain situations.) + try: + if not select.select([self.stdin_fd], [], [], 0)[0]: + return "" + except IOError: + # Happens for instance when the file descriptor was closed. + # (We had this in ptterm, where the FD became ready, a callback was + # scheduled, but in the meantime another callback closed it already.) + self.closed = True + + # Note: the following works better than wrapping `self.stdin` like + # `codecs.getreader('utf-8')(stdin)` and doing `read(1)`. + # Somehow that causes some latency when the escape + # character is pressed. (Especially on combination with the `select`.) + try: + data = os.read(self.stdin_fd, count) + + # Nothing more to read, stream is closed. + if data == b"": + self.closed = True + return "" + except OSError: + # In case of SIGWINCH + data = b"" + + return self._stdin_decoder.decode(data) diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py index a3d78664493..df53afc3741 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/typeahead.py @@ -1,76 +1,76 @@ -r""" -Store input key strokes if we did read more than was required. - -The input classes `Vt100Input` and `Win32Input` read the input text in chunks -of a few kilobytes. This means that if we read input from stdin, it could be -that we read a couple of lines (with newlines in between) at once. - -This creates a problem: potentially, we read too much from stdin. Sometimes -people paste several lines at once because they paste input in a REPL and -expect each input() call to process one line. Or they rely on type ahead -because the application can't keep up with the processing. - -However, we need to read input in bigger chunks. We need this mostly to support -pasting of larger chunks of text. We don't want everything to become -unresponsive because we: - - read one character; - - parse one character; - - call the key binding, which does a string operation with one character; - - and render the user interface. -Doing text operations on single characters is very inefficient in Python, so we -prefer to work on bigger chunks of text. This is why we have to read the input -in bigger chunks. - -Further, line buffering is also not an option, because it doesn't work well in -the architecture. We use lower level Posix APIs, that work better with the -event loop and so on. In fact, there is also nothing that defines that only \n -can accept the input, you could create a key binding for any key to accept the -input. - -To support type ahead, this module will store all the key strokes that were -read too early, so that they can be feed into to the next `prompt()` call or to -the next prompt_toolkit `Application`. -""" -from collections import defaultdict -from typing import Dict, List - -from ..key_binding import KeyPress -from .base import Input - -__all__ = [ - "store_typeahead", - "get_typeahead", - "clear_typeahead", -] - -_buffer: Dict[str, List[KeyPress]] = defaultdict(list) - - -def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None: - """ - Insert typeahead key presses for the given input. - """ - global _buffer - key = input_obj.typeahead_hash() - _buffer[key].extend(key_presses) - - -def get_typeahead(input_obj: Input) -> List[KeyPress]: - """ - Retrieve typeahead and reset the buffer for this input. - """ - global _buffer - - key = input_obj.typeahead_hash() - result = _buffer[key] - _buffer[key] = [] - return result - - -def clear_typeahead(input_obj: Input) -> None: - """ - Clear typeahead buffer. - """ - global _buffer - key = input_obj.typeahead_hash() - _buffer[key] = [] +r""" +Store input key strokes if we did read more than was required. + +The input classes `Vt100Input` and `Win32Input` read the input text in chunks +of a few kilobytes. This means that if we read input from stdin, it could be +that we read a couple of lines (with newlines in between) at once. + +This creates a problem: potentially, we read too much from stdin. Sometimes +people paste several lines at once because they paste input in a REPL and +expect each input() call to process one line. Or they rely on type ahead +because the application can't keep up with the processing. + +However, we need to read input in bigger chunks. We need this mostly to support +pasting of larger chunks of text. We don't want everything to become +unresponsive because we: + - read one character; + - parse one character; + - call the key binding, which does a string operation with one character; + - and render the user interface. +Doing text operations on single characters is very inefficient in Python, so we +prefer to work on bigger chunks of text. This is why we have to read the input +in bigger chunks. + +Further, line buffering is also not an option, because it doesn't work well in +the architecture. We use lower level Posix APIs, that work better with the +event loop and so on. In fact, there is also nothing that defines that only \n +can accept the input, you could create a key binding for any key to accept the +input. + +To support type ahead, this module will store all the key strokes that were +read too early, so that they can be feed into to the next `prompt()` call or to +the next prompt_toolkit `Application`. +""" +from collections import defaultdict +from typing import Dict, List + +from ..key_binding import KeyPress +from .base import Input + +__all__ = [ + "store_typeahead", + "get_typeahead", + "clear_typeahead", +] + +_buffer: Dict[str, List[KeyPress]] = defaultdict(list) + + +def store_typeahead(input_obj: Input, key_presses: List[KeyPress]) -> None: + """ + Insert typeahead key presses for the given input. + """ + global _buffer + key = input_obj.typeahead_hash() + _buffer[key].extend(key_presses) + + +def get_typeahead(input_obj: Input) -> List[KeyPress]: + """ + Retrieve typeahead and reset the buffer for this input. + """ + global _buffer + + key = input_obj.typeahead_hash() + result = _buffer[key] + _buffer[key] = [] + return result + + +def clear_typeahead(input_obj: Input) -> None: + """ + Clear typeahead buffer. + """ + global _buffer + key = input_obj.typeahead_hash() + _buffer[key] = [] diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py index 455cf8efd1f..1f32a7d58e4 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100.py @@ -1,174 +1,174 @@ -import contextlib -import io -import sys -import termios -import tty -from asyncio import AbstractEventLoop -from typing import ( - Callable, - ContextManager, - Dict, - Generator, - List, - Optional, - Set, - TextIO, - Tuple, - Union, -) - -from prompt_toolkit.eventloop import get_event_loop - -from ..key_binding import KeyPress -from .base import Input -from .posix_utils import PosixStdinReader -from .vt100_parser import Vt100Parser - -__all__ = [ - "Vt100Input", - "raw_mode", - "cooked_mode", -] - - -class Vt100Input(Input): - """ - Vt100 input for Posix systems. - (This uses a posix file descriptor that can be registered in the event loop.) - """ - - # For the error messages. Only display "Input is not a terminal" once per - # file descriptor. - _fds_not_a_terminal: Set[int] = set() - - def __init__(self, stdin: TextIO) -> None: - # Test whether the given input object has a file descriptor. - # (Idle reports stdin to be a TTY, but fileno() is not implemented.) - try: - # This should not raise, but can return 0. - stdin.fileno() - except io.UnsupportedOperation as e: - if "idlelib.run" in sys.modules: - raise io.UnsupportedOperation( - "Stdin is not a terminal. Running from Idle is not supported." - ) from e - else: - raise io.UnsupportedOperation("Stdin is not a terminal.") from e - - # Even when we have a file descriptor, it doesn't mean it's a TTY. - # Normally, this requires a real TTY device, but people instantiate - # this class often during unit tests as well. They use for instance - # pexpect to pipe data into an application. For convenience, we print - # an error message and go on. - isatty = stdin.isatty() - fd = stdin.fileno() - - if not isatty and fd not in Vt100Input._fds_not_a_terminal: - msg = "Warning: Input is not a terminal (fd=%r).\n" - sys.stderr.write(msg % fd) - sys.stderr.flush() - Vt100Input._fds_not_a_terminal.add(fd) - - # - self.stdin = stdin - - # Create a backup of the fileno(). We want this to work even if the - # underlying file is closed, so that `typeahead_hash()` keeps working. - self._fileno = stdin.fileno() - - self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. - self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding) - self.vt100_parser = Vt100Parser( - lambda key_press: self._buffer.append(key_press) - ) - - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: - """ - Return a context manager that makes this input active in the current - event loop. - """ - return _attached_input(self, input_ready_callback) - - def detach(self) -> ContextManager[None]: - """ - Return a context manager that makes sure that this input is not active - in the current event loop. - """ - return _detached_input(self) - - def read_keys(self) -> List[KeyPress]: - "Read list of KeyPress." - # Read text from stdin. - data = self.stdin_reader.read() - - # Pass it through our vt100 parser. - self.vt100_parser.feed(data) - - # Return result. - result = self._buffer - self._buffer = [] - return result - - def flush_keys(self) -> List[KeyPress]: - """ - Flush pending keys and return them. - (Used for flushing the 'escape' key.) - """ - # Flush all pending keys. (This is most important to flush the vt100 - # 'Escape' key early when nothing else follows.) - self.vt100_parser.flush() - - # Return result. - result = self._buffer - self._buffer = [] - return result - - @property - def closed(self) -> bool: - return self.stdin_reader.closed - - def raw_mode(self) -> ContextManager[None]: - return raw_mode(self.stdin.fileno()) - - def cooked_mode(self) -> ContextManager[None]: - return cooked_mode(self.stdin.fileno()) - - def fileno(self) -> int: - return self.stdin.fileno() - - def typeahead_hash(self) -> str: - return "fd-%s" % (self._fileno,) - - -_current_callbacks: Dict[ - Tuple[AbstractEventLoop, int], Optional[Callable[[], None]] -] = {} # (loop, fd) -> current callback - - -@contextlib.contextmanager -def _attached_input( - input: Vt100Input, callback: Callable[[], None] -) -> Generator[None, None, None]: - """ - Context manager that makes this input active in the current event loop. - - :param input: :class:`~prompt_toolkit.input.Input` object. - :param callback: Called when the input is ready to read. - """ - loop = get_event_loop() - fd = input.fileno() - previous = _current_callbacks.get((loop, fd)) - - def callback_wrapper() -> None: - """Wrapper around the callback that already removes the reader when - the input is closed. Otherwise, we keep continuously calling this - callback, until we leave the context manager (which can happen a bit - later). This fixes issues when piping /dev/null into a prompt_toolkit - application.""" - if input.closed: - loop.remove_reader(fd) - callback() - +import contextlib +import io +import sys +import termios +import tty +from asyncio import AbstractEventLoop +from typing import ( + Callable, + ContextManager, + Dict, + Generator, + List, + Optional, + Set, + TextIO, + Tuple, + Union, +) + +from prompt_toolkit.eventloop import get_event_loop + +from ..key_binding import KeyPress +from .base import Input +from .posix_utils import PosixStdinReader +from .vt100_parser import Vt100Parser + +__all__ = [ + "Vt100Input", + "raw_mode", + "cooked_mode", +] + + +class Vt100Input(Input): + """ + Vt100 input for Posix systems. + (This uses a posix file descriptor that can be registered in the event loop.) + """ + + # For the error messages. Only display "Input is not a terminal" once per + # file descriptor. + _fds_not_a_terminal: Set[int] = set() + + def __init__(self, stdin: TextIO) -> None: + # Test whether the given input object has a file descriptor. + # (Idle reports stdin to be a TTY, but fileno() is not implemented.) + try: + # This should not raise, but can return 0. + stdin.fileno() + except io.UnsupportedOperation as e: + if "idlelib.run" in sys.modules: + raise io.UnsupportedOperation( + "Stdin is not a terminal. Running from Idle is not supported." + ) from e + else: + raise io.UnsupportedOperation("Stdin is not a terminal.") from e + + # Even when we have a file descriptor, it doesn't mean it's a TTY. + # Normally, this requires a real TTY device, but people instantiate + # this class often during unit tests as well. They use for instance + # pexpect to pipe data into an application. For convenience, we print + # an error message and go on. + isatty = stdin.isatty() + fd = stdin.fileno() + + if not isatty and fd not in Vt100Input._fds_not_a_terminal: + msg = "Warning: Input is not a terminal (fd=%r).\n" + sys.stderr.write(msg % fd) + sys.stderr.flush() + Vt100Input._fds_not_a_terminal.add(fd) + + # + self.stdin = stdin + + # Create a backup of the fileno(). We want this to work even if the + # underlying file is closed, so that `typeahead_hash()` keeps working. + self._fileno = stdin.fileno() + + self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. + self.stdin_reader = PosixStdinReader(self._fileno, encoding=stdin.encoding) + self.vt100_parser = Vt100Parser( + lambda key_press: self._buffer.append(key_press) + ) + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return _attached_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return _detached_input(self) + + def read_keys(self) -> List[KeyPress]: + "Read list of KeyPress." + # Read text from stdin. + data = self.stdin_reader.read() + + # Pass it through our vt100 parser. + self.vt100_parser.feed(data) + + # Return result. + result = self._buffer + self._buffer = [] + return result + + def flush_keys(self) -> List[KeyPress]: + """ + Flush pending keys and return them. + (Used for flushing the 'escape' key.) + """ + # Flush all pending keys. (This is most important to flush the vt100 + # 'Escape' key early when nothing else follows.) + self.vt100_parser.flush() + + # Return result. + result = self._buffer + self._buffer = [] + return result + + @property + def closed(self) -> bool: + return self.stdin_reader.closed + + def raw_mode(self) -> ContextManager[None]: + return raw_mode(self.stdin.fileno()) + + def cooked_mode(self) -> ContextManager[None]: + return cooked_mode(self.stdin.fileno()) + + def fileno(self) -> int: + return self.stdin.fileno() + + def typeahead_hash(self) -> str: + return "fd-%s" % (self._fileno,) + + +_current_callbacks: Dict[ + Tuple[AbstractEventLoop, int], Optional[Callable[[], None]] +] = {} # (loop, fd) -> current callback + + +@contextlib.contextmanager +def _attached_input( + input: Vt100Input, callback: Callable[[], None] +) -> Generator[None, None, None]: + """ + Context manager that makes this input active in the current event loop. + + :param input: :class:`~prompt_toolkit.input.Input` object. + :param callback: Called when the input is ready to read. + """ + loop = get_event_loop() + fd = input.fileno() + previous = _current_callbacks.get((loop, fd)) + + def callback_wrapper() -> None: + """Wrapper around the callback that already removes the reader when + the input is closed. Otherwise, we keep continuously calling this + callback, until we leave the context manager (which can happen a bit + later). This fixes issues when piping /dev/null into a prompt_toolkit + application.""" + if input.closed: + loop.remove_reader(fd) + callback() + try: loop.add_reader(fd, callback_wrapper) except PermissionError: @@ -181,137 +181,137 @@ def _attached_input( # To reproduce, do: `ptpython 0< /dev/null 1< /dev/null` raise EOFError - _current_callbacks[loop, fd] = callback - - try: - yield - finally: - loop.remove_reader(fd) - - if previous: - loop.add_reader(fd, previous) - _current_callbacks[loop, fd] = previous - else: - del _current_callbacks[loop, fd] - - -@contextlib.contextmanager -def _detached_input(input: Vt100Input) -> Generator[None, None, None]: - loop = get_event_loop() - fd = input.fileno() - previous = _current_callbacks.get((loop, fd)) - - if previous: - loop.remove_reader(fd) - _current_callbacks[loop, fd] = None - - try: - yield - finally: - if previous: - loop.add_reader(fd, previous) - _current_callbacks[loop, fd] = previous - - -class raw_mode: - """ - :: - - with raw_mode(stdin): - ''' the pseudo-terminal stdin is now used in raw mode ''' - - We ignore errors when executing `tcgetattr` fails. - """ - - # There are several reasons for ignoring errors: - # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would - # execute this code (In a Python REPL, for instance): - # - # import os; f = open(os.devnull); os.dup2(f.fileno(), 0) - # - # The result is that the eventloop will stop correctly, because it has - # to logic to quit when stdin is closed. However, we should not fail at - # this point. See: - # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393 - # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392 - - # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated. - # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165 - def __init__(self, fileno: int) -> None: - self.fileno = fileno - self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]] - try: - self.attrs_before = termios.tcgetattr(fileno) - except termios.error: - # Ignore attribute errors. - self.attrs_before = None - - def __enter__(self) -> None: - # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this: - try: - newattr = termios.tcgetattr(self.fileno) - except termios.error: - pass - else: - newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) - newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) - - # VMIN defines the number of characters read at a time in - # non-canonical mode. It seems to default to 1 on Linux, but on - # Solaris and derived operating systems it defaults to 4. (This is - # because the VMIN slot is the same as the VEOF slot, which - # defaults to ASCII EOT = Ctrl-D = 4.) - newattr[tty.CC][termios.VMIN] = 1 - - termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) - - @classmethod - def _patch_lflag(cls, attrs: int) -> int: - return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) - - @classmethod - def _patch_iflag(cls, attrs: int) -> int: - return attrs & ~( - # Disable XON/XOFF flow control on output and input. - # (Don't capture Ctrl-S and Ctrl-Q.) - # Like executing: "stty -ixon." - termios.IXON - | termios.IXOFF - | - # Don't translate carriage return into newline on input. - termios.ICRNL - | termios.INLCR - | termios.IGNCR - ) - - def __exit__(self, *a: object) -> None: - if self.attrs_before is not None: - try: - termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) - except termios.error: - pass - - # # Put the terminal in application mode. - # self._stdout.write('\x1b[?1h') - - -class cooked_mode(raw_mode): - """ - The opposite of ``raw_mode``, used when we need cooked mode inside a - `raw_mode` block. Used in `Application.run_in_terminal`.:: - - with cooked_mode(stdin): - ''' the pseudo-terminal stdin is now used in cooked mode. ''' - """ - - @classmethod - def _patch_lflag(cls, attrs: int) -> int: - return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) - - @classmethod - def _patch_iflag(cls, attrs: int) -> int: - # Turn the ICRNL flag back on. (Without this, calling `input()` in - # run_in_terminal doesn't work and displays ^M instead. Ptpython - # evaluates commands using `run_in_terminal`, so it's important that - # they translate ^M back into ^J.) - return attrs | termios.ICRNL + _current_callbacks[loop, fd] = callback + + try: + yield + finally: + loop.remove_reader(fd) + + if previous: + loop.add_reader(fd, previous) + _current_callbacks[loop, fd] = previous + else: + del _current_callbacks[loop, fd] + + +@contextlib.contextmanager +def _detached_input(input: Vt100Input) -> Generator[None, None, None]: + loop = get_event_loop() + fd = input.fileno() + previous = _current_callbacks.get((loop, fd)) + + if previous: + loop.remove_reader(fd) + _current_callbacks[loop, fd] = None + + try: + yield + finally: + if previous: + loop.add_reader(fd, previous) + _current_callbacks[loop, fd] = previous + + +class raw_mode: + """ + :: + + with raw_mode(stdin): + ''' the pseudo-terminal stdin is now used in raw mode ''' + + We ignore errors when executing `tcgetattr` fails. + """ + + # There are several reasons for ignoring errors: + # 1. To avoid the "Inappropriate ioctl for device" crash if somebody would + # execute this code (In a Python REPL, for instance): + # + # import os; f = open(os.devnull); os.dup2(f.fileno(), 0) + # + # The result is that the eventloop will stop correctly, because it has + # to logic to quit when stdin is closed. However, we should not fail at + # this point. See: + # https://github.com/jonathanslenders/python-prompt-toolkit/pull/393 + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/392 + + # 2. Related, when stdin is an SSH pipe, and no full terminal was allocated. + # See: https://github.com/jonathanslenders/python-prompt-toolkit/pull/165 + def __init__(self, fileno: int) -> None: + self.fileno = fileno + self.attrs_before: Optional[List[Union[int, List[Union[bytes, int]]]]] + try: + self.attrs_before = termios.tcgetattr(fileno) + except termios.error: + # Ignore attribute errors. + self.attrs_before = None + + def __enter__(self) -> None: + # NOTE: On os X systems, using pty.setraw() fails. Therefor we are using this: + try: + newattr = termios.tcgetattr(self.fileno) + except termios.error: + pass + else: + newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG]) + newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG]) + + # VMIN defines the number of characters read at a time in + # non-canonical mode. It seems to default to 1 on Linux, but on + # Solaris and derived operating systems it defaults to 4. (This is + # because the VMIN slot is the same as the VEOF slot, which + # defaults to ASCII EOT = Ctrl-D = 4.) + newattr[tty.CC][termios.VMIN] = 1 + + termios.tcsetattr(self.fileno, termios.TCSANOW, newattr) + + @classmethod + def _patch_lflag(cls, attrs: int) -> int: + return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + + @classmethod + def _patch_iflag(cls, attrs: int) -> int: + return attrs & ~( + # Disable XON/XOFF flow control on output and input. + # (Don't capture Ctrl-S and Ctrl-Q.) + # Like executing: "stty -ixon." + termios.IXON + | termios.IXOFF + | + # Don't translate carriage return into newline on input. + termios.ICRNL + | termios.INLCR + | termios.IGNCR + ) + + def __exit__(self, *a: object) -> None: + if self.attrs_before is not None: + try: + termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before) + except termios.error: + pass + + # # Put the terminal in application mode. + # self._stdout.write('\x1b[?1h') + + +class cooked_mode(raw_mode): + """ + The opposite of ``raw_mode``, used when we need cooked mode inside a + `raw_mode` block. Used in `Application.run_in_terminal`.:: + + with cooked_mode(stdin): + ''' the pseudo-terminal stdin is now used in cooked mode. ''' + """ + + @classmethod + def _patch_lflag(cls, attrs: int) -> int: + return attrs | (termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) + + @classmethod + def _patch_iflag(cls, attrs: int) -> int: + # Turn the ICRNL flag back on. (Without this, calling `input()` in + # run_in_terminal doesn't work and displays ^M instead. Ptpython + # evaluates commands using `run_in_terminal`, so it's important that + # they translate ^M back into ^J.) + return attrs | termios.ICRNL diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py index 3ee1e14fdda..09e4cc91bf9 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py @@ -1,247 +1,247 @@ -""" -Parser for VT100 input stream. -""" -import re -from typing import Callable, Dict, Generator, Tuple, Union - -from ..key_binding.key_processor import KeyPress -from ..keys import Keys -from .ansi_escape_sequences import ANSI_SEQUENCES - -__all__ = [ - "Vt100Parser", -] - - -# Regex matching any CPR response -# (Note that we use '\Z' instead of '$', because '$' could include a trailing -# newline.) -_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") - -# Mouse events: -# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" -_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") - -# Regex matching any valid prefix of a CPR response. -# (Note that it doesn't contain the last character, the 'R'. The prefix has to -# be shorter.) -_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") - -_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") - - -class _Flush: - """Helper object to indicate flush operation to the parser.""" - - pass - - -class _IsPrefixOfLongerMatchCache(Dict[str, bool]): - """ - Dictionary that maps input sequences to a boolean indicating whether there is - any key that start with this characters. - """ - - def __missing__(self, prefix: str) -> bool: - # (hard coded) If this could be a prefix of a CPR response, return - # True. - if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( - prefix - ): - result = True - else: - # If this could be a prefix of anything else, also return True. - result = any( - v - for k, v in ANSI_SEQUENCES.items() - if k.startswith(prefix) and k != prefix - ) - - self[prefix] = result - return result - - -_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() - - -class Vt100Parser: - """ - Parser for VT100 input stream. - Data can be fed through the `feed` method and the given callback will be - called with KeyPress objects. - - :: - - def callback(key): - pass - i = Vt100Parser(callback) - i.feed('data\x01...') - - :attr feed_key_callback: Function that will be called when a key is parsed. - """ - - # Lookup table of ANSI escape sequences for a VT100 terminal - # Hint: in order to know what sequences your terminal writes to stdin, run - # "od -c" and start typing. - def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: - self.feed_key_callback = feed_key_callback - self.reset() - - def reset(self, request: bool = False) -> None: - self._in_bracketed_paste = False - self._start_parser() - - def _start_parser(self) -> None: - """ - Start the parser coroutine. - """ - self._input_parser = self._input_parser_generator() - self._input_parser.send(None) # type: ignore - - def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]: - """ - Return the key (or keys) that maps to this prefix. - """ - # (hard coded) If we match a CPR response, return Keys.CPRResponse. - # (This one doesn't fit in the ANSI_SEQUENCES, because it contains - # integer variables.) - if _cpr_response_re.match(prefix): - return Keys.CPRResponse - - elif _mouse_event_re.match(prefix): - return Keys.Vt100MouseEvent - - # Otherwise, use the mappings. - try: - return ANSI_SEQUENCES[prefix] - except KeyError: - return None - - def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]: - """ - Coroutine (state machine) for the input parser. - """ - prefix = "" - retry = False - flush = False - - while True: - flush = False - - if retry: - retry = False - else: - # Get next character. - c = yield - - if isinstance(c, _Flush): - flush = True - else: - prefix += c - - # If we have some data, check for matches. - if prefix: - is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] - match = self._get_match(prefix) - - # Exact matches found, call handlers.. - if (flush or not is_prefix_of_longer_match) and match: - self._call_handler(match, prefix) - prefix = "" - - # No exact match found. - elif (flush or not is_prefix_of_longer_match) and not match: - found = False - retry = True - - # Loop over the input, try the longest match first and - # shift. - for i in range(len(prefix), 0, -1): - match = self._get_match(prefix[:i]) - if match: - self._call_handler(match, prefix[:i]) - prefix = prefix[i:] - found = True - - if not found: - self._call_handler(prefix[0], prefix[0]) - prefix = prefix[1:] - - def _call_handler( - self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str - ) -> None: - """ - Callback to handler. - """ - if isinstance(key, tuple): - # Received ANSI sequence that corresponds with multiple keys - # (probably alt+something). Handle keys individually, but only pass - # data payload to first KeyPress (so that we won't insert it - # multiple times). - for i, k in enumerate(key): - self._call_handler(k, insert_text if i == 0 else "") - else: - if key == Keys.BracketedPaste: - self._in_bracketed_paste = True - self._paste_buffer = "" - else: - self.feed_key_callback(KeyPress(key, insert_text)) - - def feed(self, data: str) -> None: - """ - Feed the input stream. - - :param data: Input string (unicode). - """ - # Handle bracketed paste. (We bypass the parser that matches all other - # key presses and keep reading input until we see the end mark.) - # This is much faster then parsing character by character. - if self._in_bracketed_paste: - self._paste_buffer += data - end_mark = "\x1b[201~" - - if end_mark in self._paste_buffer: - end_index = self._paste_buffer.index(end_mark) - - # Feed content to key bindings. - paste_content = self._paste_buffer[:end_index] - self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) - - # Quit bracketed paste mode and handle remaining input. - self._in_bracketed_paste = False - remaining = self._paste_buffer[end_index + len(end_mark) :] - self._paste_buffer = "" - - self.feed(remaining) - - # Handle normal input character by character. - else: - for i, c in enumerate(data): - if self._in_bracketed_paste: - # Quit loop and process from this position when the parser - # entered bracketed paste. - self.feed(data[i:]) - break - else: - self._input_parser.send(c) - - def flush(self) -> None: - """ - Flush the buffer of the input stream. - - This will allow us to handle the escape key (or maybe meta) sooner. - The input received by the escape key is actually the same as the first - characters of e.g. Arrow-Up, so without knowing what follows the escape - sequence, we don't know whether escape has been pressed, or whether - it's something else. This flush function should be called after a - timeout, and processes everything that's still in the buffer as-is, so - without assuming any characters will follow. - """ - self._input_parser.send(_Flush()) - - def feed_and_flush(self, data: str) -> None: - """ - Wrapper around ``feed`` and ``flush``. - """ - self.feed(data) - self.flush() +""" +Parser for VT100 input stream. +""" +import re +from typing import Callable, Dict, Generator, Tuple, Union + +from ..key_binding.key_processor import KeyPress +from ..keys import Keys +from .ansi_escape_sequences import ANSI_SEQUENCES + +__all__ = [ + "Vt100Parser", +] + + +# Regex matching any CPR response +# (Note that we use '\Z' instead of '$', because '$' could include a trailing +# newline.) +_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") + +# Mouse events: +# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" +_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") + +# Regex matching any valid prefix of a CPR response. +# (Note that it doesn't contain the last character, the 'R'. The prefix has to +# be shorter.) +_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") + +_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") + + +class _Flush: + """Helper object to indicate flush operation to the parser.""" + + pass + + +class _IsPrefixOfLongerMatchCache(Dict[str, bool]): + """ + Dictionary that maps input sequences to a boolean indicating whether there is + any key that start with this characters. + """ + + def __missing__(self, prefix: str) -> bool: + # (hard coded) If this could be a prefix of a CPR response, return + # True. + if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( + prefix + ): + result = True + else: + # If this could be a prefix of anything else, also return True. + result = any( + v + for k, v in ANSI_SEQUENCES.items() + if k.startswith(prefix) and k != prefix + ) + + self[prefix] = result + return result + + +_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() + + +class Vt100Parser: + """ + Parser for VT100 input stream. + Data can be fed through the `feed` method and the given callback will be + called with KeyPress objects. + + :: + + def callback(key): + pass + i = Vt100Parser(callback) + i.feed('data\x01...') + + :attr feed_key_callback: Function that will be called when a key is parsed. + """ + + # Lookup table of ANSI escape sequences for a VT100 terminal + # Hint: in order to know what sequences your terminal writes to stdin, run + # "od -c" and start typing. + def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: + self.feed_key_callback = feed_key_callback + self.reset() + + def reset(self, request: bool = False) -> None: + self._in_bracketed_paste = False + self._start_parser() + + def _start_parser(self) -> None: + """ + Start the parser coroutine. + """ + self._input_parser = self._input_parser_generator() + self._input_parser.send(None) # type: ignore + + def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]: + """ + Return the key (or keys) that maps to this prefix. + """ + # (hard coded) If we match a CPR response, return Keys.CPRResponse. + # (This one doesn't fit in the ANSI_SEQUENCES, because it contains + # integer variables.) + if _cpr_response_re.match(prefix): + return Keys.CPRResponse + + elif _mouse_event_re.match(prefix): + return Keys.Vt100MouseEvent + + # Otherwise, use the mappings. + try: + return ANSI_SEQUENCES[prefix] + except KeyError: + return None + + def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]: + """ + Coroutine (state machine) for the input parser. + """ + prefix = "" + retry = False + flush = False + + while True: + flush = False + + if retry: + retry = False + else: + # Get next character. + c = yield + + if isinstance(c, _Flush): + flush = True + else: + prefix += c + + # If we have some data, check for matches. + if prefix: + is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] + match = self._get_match(prefix) + + # Exact matches found, call handlers.. + if (flush or not is_prefix_of_longer_match) and match: + self._call_handler(match, prefix) + prefix = "" + + # No exact match found. + elif (flush or not is_prefix_of_longer_match) and not match: + found = False + retry = True + + # Loop over the input, try the longest match first and + # shift. + for i in range(len(prefix), 0, -1): + match = self._get_match(prefix[:i]) + if match: + self._call_handler(match, prefix[:i]) + prefix = prefix[i:] + found = True + + if not found: + self._call_handler(prefix[0], prefix[0]) + prefix = prefix[1:] + + def _call_handler( + self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str + ) -> None: + """ + Callback to handler. + """ + if isinstance(key, tuple): + # Received ANSI sequence that corresponds with multiple keys + # (probably alt+something). Handle keys individually, but only pass + # data payload to first KeyPress (so that we won't insert it + # multiple times). + for i, k in enumerate(key): + self._call_handler(k, insert_text if i == 0 else "") + else: + if key == Keys.BracketedPaste: + self._in_bracketed_paste = True + self._paste_buffer = "" + else: + self.feed_key_callback(KeyPress(key, insert_text)) + + def feed(self, data: str) -> None: + """ + Feed the input stream. + + :param data: Input string (unicode). + """ + # Handle bracketed paste. (We bypass the parser that matches all other + # key presses and keep reading input until we see the end mark.) + # This is much faster then parsing character by character. + if self._in_bracketed_paste: + self._paste_buffer += data + end_mark = "\x1b[201~" + + if end_mark in self._paste_buffer: + end_index = self._paste_buffer.index(end_mark) + + # Feed content to key bindings. + paste_content = self._paste_buffer[:end_index] + self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) + + # Quit bracketed paste mode and handle remaining input. + self._in_bracketed_paste = False + remaining = self._paste_buffer[end_index + len(end_mark) :] + self._paste_buffer = "" + + self.feed(remaining) + + # Handle normal input character by character. + else: + for i, c in enumerate(data): + if self._in_bracketed_paste: + # Quit loop and process from this position when the parser + # entered bracketed paste. + self.feed(data[i:]) + break + else: + self._input_parser.send(c) + + def flush(self) -> None: + """ + Flush the buffer of the input stream. + + This will allow us to handle the escape key (or maybe meta) sooner. + The input received by the escape key is actually the same as the first + characters of e.g. Arrow-Up, so without knowing what follows the escape + sequence, we don't know whether escape has been pressed, or whether + it's something else. This flush function should be called after a + timeout, and processes everything that's still in the buffer as-is, so + without assuming any characters will follow. + """ + self._input_parser.send(_Flush()) + + def feed_and_flush(self, data: str) -> None: + """ + Wrapper around ``feed`` and ``flush``. + """ + self.feed(data) + self.flush() diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py index 97699e19b24..437affaa5dd 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py @@ -1,758 +1,758 @@ -import os -import sys -from abc import abstractmethod -from contextlib import contextmanager - -from prompt_toolkit.eventloop import get_event_loop - -from ..utils import SPHINX_AUTODOC_RUNNING - -# Do not import win32-specific stuff when generating documentation. -# Otherwise RTD would be unable to generate docs for this module. -if not SPHINX_AUTODOC_RUNNING: - import msvcrt - from ctypes import windll - -from ctypes import Array, pointer -from ctypes.wintypes import DWORD, HANDLE -from typing import ( - Callable, - ContextManager, - Dict, - Iterable, - Iterator, - List, - Optional, - TextIO, -) - -from prompt_toolkit.eventloop import run_in_executor_with_context -from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles -from prompt_toolkit.key_binding.key_processor import KeyPress -from prompt_toolkit.keys import Keys -from prompt_toolkit.mouse_events import MouseButton, MouseEventType -from prompt_toolkit.win32_types import ( - INPUT_RECORD, - KEY_EVENT_RECORD, - MOUSE_EVENT_RECORD, - STD_INPUT_HANDLE, - EventTypes, -) - -from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES -from .base import Input - -__all__ = [ - "Win32Input", - "ConsoleInputReader", - "raw_mode", - "cooked_mode", - "attach_win32_input", - "detach_win32_input", -] - -# Win32 Constants for MOUSE_EVENT_RECORD. -# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str -FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 -RIGHTMOST_BUTTON_PRESSED = 0x2 -MOUSE_MOVED = 0x0001 -MOUSE_WHEELED = 0x0004 - - -class _Win32InputBase(Input): - """ - Base class for `Win32Input` and `Win32PipeInput`. - """ - - def __init__(self) -> None: - self.win32_handles = _Win32Handles() - - @property - @abstractmethod - def handle(self) -> HANDLE: - pass - - -class Win32Input(_Win32InputBase): - """ - `Input` class that reads from the Windows console. - """ - - def __init__(self, stdin: Optional[TextIO] = None) -> None: - super().__init__() - self.console_input_reader = ConsoleInputReader() - - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: - """ - Return a context manager that makes this input active in the current - event loop. - """ - return attach_win32_input(self, input_ready_callback) - - def detach(self) -> ContextManager[None]: - """ - Return a context manager that makes sure that this input is not active - in the current event loop. - """ - return detach_win32_input(self) - - def read_keys(self) -> List[KeyPress]: - return list(self.console_input_reader.read()) - - def flush(self) -> None: - pass - - @property - def closed(self) -> bool: - return False - - def raw_mode(self) -> ContextManager[None]: - return raw_mode() - - def cooked_mode(self) -> ContextManager[None]: - return cooked_mode() - - def fileno(self) -> int: - # The windows console doesn't depend on the file handle, so - # this is not used for the event loop (which uses the - # handle instead). But it's used in `Application.run_system_command` - # which opens a subprocess with a given stdin/stdout. - return sys.stdin.fileno() - - def typeahead_hash(self) -> str: - return "win32-input" - - def close(self) -> None: - self.console_input_reader.close() - - @property - def handle(self) -> HANDLE: - return self.console_input_reader.handle - - -class ConsoleInputReader: - """ - :param recognize_paste: When True, try to discover paste actions and turn - the event into a BracketedPaste. - """ - - # Keys with character data. - mappings = { - b"\x1b": Keys.Escape, - b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) - b"\x01": Keys.ControlA, # Control-A (home) - b"\x02": Keys.ControlB, # Control-B (emacs cursor left) - b"\x03": Keys.ControlC, # Control-C (interrupt) - b"\x04": Keys.ControlD, # Control-D (exit) - b"\x05": Keys.ControlE, # Control-E (end) - b"\x06": Keys.ControlF, # Control-F (cursor forward) - b"\x07": Keys.ControlG, # Control-G - b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') - b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') - b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') - b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) - b"\x0c": Keys.ControlL, # Control-L (clear; form feed) - b"\x0d": Keys.ControlM, # Control-M (enter) - b"\x0e": Keys.ControlN, # Control-N (14) (history forward) - b"\x0f": Keys.ControlO, # Control-O (15) - b"\x10": Keys.ControlP, # Control-P (16) (history back) - b"\x11": Keys.ControlQ, # Control-Q - b"\x12": Keys.ControlR, # Control-R (18) (reverse search) - b"\x13": Keys.ControlS, # Control-S (19) (forward search) - b"\x14": Keys.ControlT, # Control-T - b"\x15": Keys.ControlU, # Control-U - b"\x16": Keys.ControlV, # Control-V - b"\x17": Keys.ControlW, # Control-W - b"\x18": Keys.ControlX, # Control-X - b"\x19": Keys.ControlY, # Control-Y (25) - b"\x1a": Keys.ControlZ, # Control-Z - b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| - b"\x1d": Keys.ControlSquareClose, # Control-] - b"\x1e": Keys.ControlCircumflex, # Control-^ - b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) - b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) - } - - # Keys that don't carry character data. - keycodes = { - # Home/End - 33: Keys.PageUp, - 34: Keys.PageDown, - 35: Keys.End, - 36: Keys.Home, - # Arrows - 37: Keys.Left, - 38: Keys.Up, - 39: Keys.Right, - 40: Keys.Down, - 45: Keys.Insert, - 46: Keys.Delete, - # F-keys. - 112: Keys.F1, - 113: Keys.F2, - 114: Keys.F3, - 115: Keys.F4, - 116: Keys.F5, - 117: Keys.F6, - 118: Keys.F7, - 119: Keys.F8, - 120: Keys.F9, - 121: Keys.F10, - 122: Keys.F11, - 123: Keys.F12, - } - - LEFT_ALT_PRESSED = 0x0002 - RIGHT_ALT_PRESSED = 0x0001 - SHIFT_PRESSED = 0x0010 - LEFT_CTRL_PRESSED = 0x0008 - RIGHT_CTRL_PRESSED = 0x0004 - - def __init__(self, recognize_paste: bool = True) -> None: - self._fdcon = None - self.recognize_paste = recognize_paste - - # When stdin is a tty, use that handle, otherwise, create a handle from - # CONIN$. - self.handle: HANDLE - if sys.stdin.isatty(): - self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) - else: - self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) - self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) - - def close(self) -> None: - "Close fdcon." - if self._fdcon is not None: - os.close(self._fdcon) - - def read(self) -> Iterable[KeyPress]: - """ - Return a list of `KeyPress` instances. It won't return anything when - there was nothing to read. (This function doesn't block.) - - http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx - """ - max_count = 2048 # Max events to read at the same time. - - read = DWORD(0) - arrtype = INPUT_RECORD * max_count - input_records = arrtype() - - # Check whether there is some input to read. `ReadConsoleInputW` would - # block otherwise. - # (Actually, the event loop is responsible to make sure that this - # function is only called when there is something to read, but for some - # reason this happened in the asyncio_win32 loop, and it's better to be - # safe anyway.) - if not wait_for_handles([self.handle], timeout=0): - return - - # Get next batch of input event. - windll.kernel32.ReadConsoleInputW( - self.handle, pointer(input_records), max_count, pointer(read) - ) - - # First, get all the keys from the input buffer, in order to determine - # whether we should consider this a paste event or not. - all_keys = list(self._get_keys(read, input_records)) - - # Fill in 'data' for key presses. - all_keys = [self._insert_key_data(key) for key in all_keys] - - # Correct non-bmp characters that are passed as separate surrogate codes - all_keys = list(self._merge_paired_surrogates(all_keys)) - - if self.recognize_paste and self._is_paste(all_keys): - gen = iter(all_keys) - k: Optional[KeyPress] - - for k in gen: - # Pasting: if the current key consists of text or \n, turn it - # into a BracketedPaste. - data = [] +import os +import sys +from abc import abstractmethod +from contextlib import contextmanager + +from prompt_toolkit.eventloop import get_event_loop + +from ..utils import SPHINX_AUTODOC_RUNNING + +# Do not import win32-specific stuff when generating documentation. +# Otherwise RTD would be unable to generate docs for this module. +if not SPHINX_AUTODOC_RUNNING: + import msvcrt + from ctypes import windll + +from ctypes import Array, pointer +from ctypes.wintypes import DWORD, HANDLE +from typing import ( + Callable, + ContextManager, + Dict, + Iterable, + Iterator, + List, + Optional, + TextIO, +) + +from prompt_toolkit.eventloop import run_in_executor_with_context +from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles +from prompt_toolkit.key_binding.key_processor import KeyPress +from prompt_toolkit.keys import Keys +from prompt_toolkit.mouse_events import MouseButton, MouseEventType +from prompt_toolkit.win32_types import ( + INPUT_RECORD, + KEY_EVENT_RECORD, + MOUSE_EVENT_RECORD, + STD_INPUT_HANDLE, + EventTypes, +) + +from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES +from .base import Input + +__all__ = [ + "Win32Input", + "ConsoleInputReader", + "raw_mode", + "cooked_mode", + "attach_win32_input", + "detach_win32_input", +] + +# Win32 Constants for MOUSE_EVENT_RECORD. +# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str +FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 +RIGHTMOST_BUTTON_PRESSED = 0x2 +MOUSE_MOVED = 0x0001 +MOUSE_WHEELED = 0x0004 + + +class _Win32InputBase(Input): + """ + Base class for `Win32Input` and `Win32PipeInput`. + """ + + def __init__(self) -> None: + self.win32_handles = _Win32Handles() + + @property + @abstractmethod + def handle(self) -> HANDLE: + pass + + +class Win32Input(_Win32InputBase): + """ + `Input` class that reads from the Windows console. + """ + + def __init__(self, stdin: Optional[TextIO] = None) -> None: + super().__init__() + self.console_input_reader = ConsoleInputReader() + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return attach_win32_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return detach_win32_input(self) + + def read_keys(self) -> List[KeyPress]: + return list(self.console_input_reader.read()) + + def flush(self) -> None: + pass + + @property + def closed(self) -> bool: + return False + + def raw_mode(self) -> ContextManager[None]: + return raw_mode() + + def cooked_mode(self) -> ContextManager[None]: + return cooked_mode() + + def fileno(self) -> int: + # The windows console doesn't depend on the file handle, so + # this is not used for the event loop (which uses the + # handle instead). But it's used in `Application.run_system_command` + # which opens a subprocess with a given stdin/stdout. + return sys.stdin.fileno() + + def typeahead_hash(self) -> str: + return "win32-input" + + def close(self) -> None: + self.console_input_reader.close() + + @property + def handle(self) -> HANDLE: + return self.console_input_reader.handle + + +class ConsoleInputReader: + """ + :param recognize_paste: When True, try to discover paste actions and turn + the event into a BracketedPaste. + """ + + # Keys with character data. + mappings = { + b"\x1b": Keys.Escape, + b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) + b"\x01": Keys.ControlA, # Control-A (home) + b"\x02": Keys.ControlB, # Control-B (emacs cursor left) + b"\x03": Keys.ControlC, # Control-C (interrupt) + b"\x04": Keys.ControlD, # Control-D (exit) + b"\x05": Keys.ControlE, # Control-E (end) + b"\x06": Keys.ControlF, # Control-F (cursor forward) + b"\x07": Keys.ControlG, # Control-G + b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') + b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') + b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') + b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) + b"\x0c": Keys.ControlL, # Control-L (clear; form feed) + b"\x0d": Keys.ControlM, # Control-M (enter) + b"\x0e": Keys.ControlN, # Control-N (14) (history forward) + b"\x0f": Keys.ControlO, # Control-O (15) + b"\x10": Keys.ControlP, # Control-P (16) (history back) + b"\x11": Keys.ControlQ, # Control-Q + b"\x12": Keys.ControlR, # Control-R (18) (reverse search) + b"\x13": Keys.ControlS, # Control-S (19) (forward search) + b"\x14": Keys.ControlT, # Control-T + b"\x15": Keys.ControlU, # Control-U + b"\x16": Keys.ControlV, # Control-V + b"\x17": Keys.ControlW, # Control-W + b"\x18": Keys.ControlX, # Control-X + b"\x19": Keys.ControlY, # Control-Y (25) + b"\x1a": Keys.ControlZ, # Control-Z + b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| + b"\x1d": Keys.ControlSquareClose, # Control-] + b"\x1e": Keys.ControlCircumflex, # Control-^ + b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) + b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) + } + + # Keys that don't carry character data. + keycodes = { + # Home/End + 33: Keys.PageUp, + 34: Keys.PageDown, + 35: Keys.End, + 36: Keys.Home, + # Arrows + 37: Keys.Left, + 38: Keys.Up, + 39: Keys.Right, + 40: Keys.Down, + 45: Keys.Insert, + 46: Keys.Delete, + # F-keys. + 112: Keys.F1, + 113: Keys.F2, + 114: Keys.F3, + 115: Keys.F4, + 116: Keys.F5, + 117: Keys.F6, + 118: Keys.F7, + 119: Keys.F8, + 120: Keys.F9, + 121: Keys.F10, + 122: Keys.F11, + 123: Keys.F12, + } + + LEFT_ALT_PRESSED = 0x0002 + RIGHT_ALT_PRESSED = 0x0001 + SHIFT_PRESSED = 0x0010 + LEFT_CTRL_PRESSED = 0x0008 + RIGHT_CTRL_PRESSED = 0x0004 + + def __init__(self, recognize_paste: bool = True) -> None: + self._fdcon = None + self.recognize_paste = recognize_paste + + # When stdin is a tty, use that handle, otherwise, create a handle from + # CONIN$. + self.handle: HANDLE + if sys.stdin.isatty(): + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + else: + self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) + self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) + + def close(self) -> None: + "Close fdcon." + if self._fdcon is not None: + os.close(self._fdcon) + + def read(self) -> Iterable[KeyPress]: + """ + Return a list of `KeyPress` instances. It won't return anything when + there was nothing to read. (This function doesn't block.) + + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx + """ + max_count = 2048 # Max events to read at the same time. + + read = DWORD(0) + arrtype = INPUT_RECORD * max_count + input_records = arrtype() + + # Check whether there is some input to read. `ReadConsoleInputW` would + # block otherwise. + # (Actually, the event loop is responsible to make sure that this + # function is only called when there is something to read, but for some + # reason this happened in the asyncio_win32 loop, and it's better to be + # safe anyway.) + if not wait_for_handles([self.handle], timeout=0): + return + + # Get next batch of input event. + windll.kernel32.ReadConsoleInputW( + self.handle, pointer(input_records), max_count, pointer(read) + ) + + # First, get all the keys from the input buffer, in order to determine + # whether we should consider this a paste event or not. + all_keys = list(self._get_keys(read, input_records)) + + # Fill in 'data' for key presses. + all_keys = [self._insert_key_data(key) for key in all_keys] + + # Correct non-bmp characters that are passed as separate surrogate codes + all_keys = list(self._merge_paired_surrogates(all_keys)) + + if self.recognize_paste and self._is_paste(all_keys): + gen = iter(all_keys) + k: Optional[KeyPress] + + for k in gen: + # Pasting: if the current key consists of text or \n, turn it + # into a BracketedPaste. + data = [] while k and ( not isinstance(k.key, Keys) or k.key in {Keys.ControlJ, Keys.ControlM} ): - data.append(k.data) - try: - k = next(gen) - except StopIteration: - k = None - - if data: - yield KeyPress(Keys.BracketedPaste, "".join(data)) - if k is not None: - yield k - else: - for k2 in all_keys: - yield k2 - - def _insert_key_data(self, key_press: KeyPress) -> KeyPress: - """ - Insert KeyPress data, for vt100 compatibility. - """ - if key_press.data: - return key_press - - if isinstance(key_press.key, Keys): - data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") - else: - data = "" - - return KeyPress(key_press.key, data) - - def _get_keys( - self, read: DWORD, input_records: "Array[INPUT_RECORD]" - ) -> Iterator[KeyPress]: - """ - Generator that yields `KeyPress` objects from the input records. - """ - for i in range(read.value): - ir = input_records[i] - - # Get the right EventType from the EVENT_RECORD. - # (For some reason the Windows console application 'cmder' - # [http://gooseberrycreative.com/cmder/] can return '0' for - # ir.EventType. -- Just ignore that.) - if ir.EventType in EventTypes: - ev = getattr(ir.Event, EventTypes[ir.EventType]) - - # Process if this is a key event. (We also have mouse, menu and - # focus events.) - if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: - for key_press in self._event_to_key_presses(ev): - yield key_press - - elif type(ev) == MOUSE_EVENT_RECORD: - for key_press in self._handle_mouse(ev): - yield key_press - - @staticmethod - def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]: - """ - Combines consecutive KeyPresses with high and low surrogates into - single characters - """ - buffered_high_surrogate = None - for key in key_presses: - is_text = not isinstance(key.key, Keys) - is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF" - is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF" - - if buffered_high_surrogate: - if is_low_surrogate: - # convert high surrogate + low surrogate to single character - fullchar = ( - (buffered_high_surrogate.key + key.key) - .encode("utf-16-le", "surrogatepass") - .decode("utf-16-le") - ) - key = KeyPress(fullchar, fullchar) - else: - yield buffered_high_surrogate - buffered_high_surrogate = None - - if is_high_surrogate: - buffered_high_surrogate = key - else: - yield key - - if buffered_high_surrogate: - yield buffered_high_surrogate - - @staticmethod - def _is_paste(keys: List[KeyPress]) -> bool: - """ - Return `True` when we should consider this list of keys as a paste - event. Pasted text on windows will be turned into a - `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably - the best possible way to detect pasting of text and handle that - correctly.) - """ - # Consider paste when it contains at least one newline and at least one - # other character. - text_count = 0 - newline_count = 0 - - for k in keys: - if not isinstance(k.key, Keys): - text_count += 1 - if k.key == Keys.ControlM: - newline_count += 1 - + data.append(k.data) + try: + k = next(gen) + except StopIteration: + k = None + + if data: + yield KeyPress(Keys.BracketedPaste, "".join(data)) + if k is not None: + yield k + else: + for k2 in all_keys: + yield k2 + + def _insert_key_data(self, key_press: KeyPress) -> KeyPress: + """ + Insert KeyPress data, for vt100 compatibility. + """ + if key_press.data: + return key_press + + if isinstance(key_press.key, Keys): + data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") + else: + data = "" + + return KeyPress(key_press.key, data) + + def _get_keys( + self, read: DWORD, input_records: "Array[INPUT_RECORD]" + ) -> Iterator[KeyPress]: + """ + Generator that yields `KeyPress` objects from the input records. + """ + for i in range(read.value): + ir = input_records[i] + + # Get the right EventType from the EVENT_RECORD. + # (For some reason the Windows console application 'cmder' + # [http://gooseberrycreative.com/cmder/] can return '0' for + # ir.EventType. -- Just ignore that.) + if ir.EventType in EventTypes: + ev = getattr(ir.Event, EventTypes[ir.EventType]) + + # Process if this is a key event. (We also have mouse, menu and + # focus events.) + if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: + for key_press in self._event_to_key_presses(ev): + yield key_press + + elif type(ev) == MOUSE_EVENT_RECORD: + for key_press in self._handle_mouse(ev): + yield key_press + + @staticmethod + def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]: + """ + Combines consecutive KeyPresses with high and low surrogates into + single characters + """ + buffered_high_surrogate = None + for key in key_presses: + is_text = not isinstance(key.key, Keys) + is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF" + is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF" + + if buffered_high_surrogate: + if is_low_surrogate: + # convert high surrogate + low surrogate to single character + fullchar = ( + (buffered_high_surrogate.key + key.key) + .encode("utf-16-le", "surrogatepass") + .decode("utf-16-le") + ) + key = KeyPress(fullchar, fullchar) + else: + yield buffered_high_surrogate + buffered_high_surrogate = None + + if is_high_surrogate: + buffered_high_surrogate = key + else: + yield key + + if buffered_high_surrogate: + yield buffered_high_surrogate + + @staticmethod + def _is_paste(keys: List[KeyPress]) -> bool: + """ + Return `True` when we should consider this list of keys as a paste + event. Pasted text on windows will be turned into a + `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably + the best possible way to detect pasting of text and handle that + correctly.) + """ + # Consider paste when it contains at least one newline and at least one + # other character. + text_count = 0 + newline_count = 0 + + for k in keys: + if not isinstance(k.key, Keys): + text_count += 1 + if k.key == Keys.ControlM: + newline_count += 1 + return newline_count >= 1 and text_count >= 1 - - def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]: - """ - For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. - """ - assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown - - result: Optional[KeyPress] = None - - control_key_state = ev.ControlKeyState - u_char = ev.uChar.UnicodeChar - # Use surrogatepass because u_char may be an unmatched surrogate - ascii_char = u_char.encode("utf-8", "surrogatepass") - - # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the - # unicode code point truncated to 1 byte. See also: - # https://github.com/ipython/ipython/issues/10004 - # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 - - if u_char == "\x00": - if ev.VirtualKeyCode in self.keycodes: - result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") - else: - if ascii_char in self.mappings: - if self.mappings[ascii_char] == Keys.ControlJ: - u_char = ( - "\n" # Windows sends \n, turn into \r for unix compatibility. - ) - result = KeyPress(self.mappings[ascii_char], u_char) - else: - result = KeyPress(u_char, u_char) - - # First we handle Shift-Control-Arrow/Home/End (need to do this first) - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and control_key_state & self.SHIFT_PRESSED - and result - ): - mapping: Dict[str, str] = { - Keys.Left: Keys.ControlShiftLeft, - Keys.Right: Keys.ControlShiftRight, - Keys.Up: Keys.ControlShiftUp, - Keys.Down: Keys.ControlShiftDown, - Keys.Home: Keys.ControlShiftHome, - Keys.End: Keys.ControlShiftEnd, - Keys.Insert: Keys.ControlShiftInsert, - Keys.PageUp: Keys.ControlShiftPageUp, - Keys.PageDown: Keys.ControlShiftPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. - if ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) and result: - mapping = { - Keys.Left: Keys.ControlLeft, - Keys.Right: Keys.ControlRight, - Keys.Up: Keys.ControlUp, - Keys.Down: Keys.ControlDown, - Keys.Home: Keys.ControlHome, - Keys.End: Keys.ControlEnd, - Keys.Insert: Keys.ControlInsert, - Keys.Delete: Keys.ControlDelete, - Keys.PageUp: Keys.ControlPageUp, - Keys.PageDown: Keys.ControlPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Turn 'Tab' into 'BackTab' when shift was pressed. - # Also handle other shift-key combination - if control_key_state & self.SHIFT_PRESSED and result: - mapping = { - Keys.Tab: Keys.BackTab, - Keys.Left: Keys.ShiftLeft, - Keys.Right: Keys.ShiftRight, - Keys.Up: Keys.ShiftUp, - Keys.Down: Keys.ShiftDown, - Keys.Home: Keys.ShiftHome, - Keys.End: Keys.ShiftEnd, - Keys.Insert: Keys.ShiftInsert, - Keys.Delete: Keys.ShiftDelete, - Keys.PageUp: Keys.ShiftPageUp, - Keys.PageDown: Keys.ShiftPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Turn 'Space' into 'ControlSpace' when control was pressed. - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and result - and result.data == " " - ): - result = KeyPress(Keys.ControlSpace, " ") - - # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot - # detect this combination. But it's really practical on Windows.) - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and result - and result.key == Keys.ControlJ - ): - return [KeyPress(Keys.Escape, ""), result] - - # Return result. If alt was pressed, prefix the result with an - # 'Escape' key, just like unix VT100 terminals do. - - # NOTE: Only replace the left alt with escape. The right alt key often - # acts as altgr and is used in many non US keyboard layouts for - # typing some special characters, like a backslash. We don't want - # all backslashes to be prefixed with escape. (Esc-\ has a - # meaning in E-macs, for instance.) - if result: - meta_pressed = control_key_state & self.LEFT_ALT_PRESSED - - if meta_pressed: - return [KeyPress(Keys.Escape, ""), result] - else: - return [result] - - else: - return [] - - def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]: - """ - Handle mouse events. Return a list of KeyPress instances. - """ - event_flags = ev.EventFlags - button_state = ev.ButtonState - - event_type: Optional[MouseEventType] = None - button: MouseButton = MouseButton.NONE - - # Scroll events. - if event_flags & MOUSE_WHEELED: - if button_state > 0: - event_type = MouseEventType.SCROLL_UP - else: - event_type = MouseEventType.SCROLL_DOWN - else: - # Handle button state for non-scroll events. - if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: - button = MouseButton.LEFT - - elif button_state == RIGHTMOST_BUTTON_PRESSED: - button = MouseButton.RIGHT - - # Move events. - if event_flags & MOUSE_MOVED: - event_type = MouseEventType.MOUSE_MOVE - - # No key pressed anymore: mouse up. - if event_type is None: - if button_state > 0: - # Some button pressed. - event_type = MouseEventType.MOUSE_DOWN - else: - # No button pressed. - event_type = MouseEventType.MOUSE_UP - - data = ";".join( - [ - button.value, - event_type.value, - str(ev.MousePosition.X), - str(ev.MousePosition.Y), - ] - ) - return [KeyPress(Keys.WindowsMouseEvent, data)] - - -class _Win32Handles: - """ - Utility to keep track of which handles are connectod to which callbacks. - - `add_win32_handle` starts a tiny event loop in another thread which waits - for the Win32 handle to become ready. When this happens, the callback will - be called in the current asyncio event loop using `call_soon_threadsafe`. - - `remove_win32_handle` will stop this tiny event loop. - - NOTE: We use this technique, so that we don't have to use the - `ProactorEventLoop` on Windows and we can wait for things like stdin - in a `SelectorEventLoop`. This is important, because our inputhook - mechanism (used by IPython), only works with the `SelectorEventLoop`. - """ - - def __init__(self) -> None: - self._handle_callbacks: Dict[int, Callable[[], None]] = {} - - # Windows Events that are triggered when we have to stop watching this - # handle. - self._remove_events: Dict[int, HANDLE] = {} - - def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: - """ - Add a Win32 handle to the event loop. - """ - handle_value = handle.value - - if handle_value is None: - raise ValueError("Invalid handle.") - - # Make sure to remove a previous registered handler first. - self.remove_win32_handle(handle) - - loop = get_event_loop() - self._handle_callbacks[handle_value] = callback - - # Create remove event. - remove_event = create_win32_event() - self._remove_events[handle_value] = remove_event - - # Add reader. - def ready() -> None: - # Tell the callback that input's ready. - try: - callback() - finally: - run_in_executor_with_context(wait, loop=loop) - - # Wait for the input to become ready. - # (Use an executor for this, the Windows asyncio event loop doesn't - # allow us to wait for handles like stdin.) - def wait() -> None: - # Wait until either the handle becomes ready, or the remove event - # has been set. - result = wait_for_handles([remove_event, handle]) - - if result is remove_event: - windll.kernel32.CloseHandle(remove_event) - return - else: - loop.call_soon_threadsafe(ready) - - run_in_executor_with_context(wait, loop=loop) - - def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]: - """ - Remove a Win32 handle from the event loop. - Return either the registered handler or `None`. - """ - if handle.value is None: - return None # Ignore. - - # Trigger remove events, so that the reader knows to stop. - try: - event = self._remove_events.pop(handle.value) - except KeyError: - pass - else: - windll.kernel32.SetEvent(event) - - try: - return self._handle_callbacks.pop(handle.value) - except KeyError: - return None - - -@contextmanager -def attach_win32_input( - input: _Win32InputBase, callback: Callable[[], None] -) -> Iterator[None]: - """ - Context manager that makes this input active in the current event loop. - - :param input: :class:`~prompt_toolkit.input.Input` object. - :param input_ready_callback: Called when the input is ready to read. - """ - win32_handles = input.win32_handles - handle = input.handle - - if handle.value is None: - raise ValueError("Invalid handle.") - - # Add reader. - previous_callback = win32_handles.remove_win32_handle(handle) - win32_handles.add_win32_handle(handle, callback) - - try: - yield - finally: - win32_handles.remove_win32_handle(handle) - - if previous_callback: - win32_handles.add_win32_handle(handle, previous_callback) - - -@contextmanager -def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: - win32_handles = input.win32_handles - handle = input.handle - - if handle.value is None: - raise ValueError("Invalid handle.") - - previous_callback = win32_handles.remove_win32_handle(handle) - - try: - yield - finally: - if previous_callback: - win32_handles.add_win32_handle(handle, previous_callback) - - -class raw_mode: - """ - :: - - with raw_mode(stdin): - ''' the windows terminal is now in 'raw' mode. ''' - - The ``fileno`` attribute is ignored. This is to be compatible with the - `raw_input` method of `.vt100_input`. - """ - - def __init__(self, fileno: Optional[int] = None) -> None: - self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) - - def __enter__(self) -> None: - # Remember original mode. - original_mode = DWORD() - windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) - self.original_mode = original_mode - - self._patch() - - def _patch(self) -> None: - # Set raw - ENABLE_ECHO_INPUT = 0x0004 - ENABLE_LINE_INPUT = 0x0002 - ENABLE_PROCESSED_INPUT = 0x0001 - - windll.kernel32.SetConsoleMode( - self.handle, - self.original_mode.value - & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), - ) - - def __exit__(self, *a: object) -> None: - # Restore original mode - windll.kernel32.SetConsoleMode(self.handle, self.original_mode) - - -class cooked_mode(raw_mode): - """ - :: - - with cooked_mode(stdin): - ''' The pseudo-terminal stdin is now used in cooked mode. ''' - """ - - def _patch(self) -> None: - # Set cooked. - ENABLE_ECHO_INPUT = 0x0004 - ENABLE_LINE_INPUT = 0x0002 - ENABLE_PROCESSED_INPUT = 0x0001 - - windll.kernel32.SetConsoleMode( - self.handle, - self.original_mode.value - | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), - ) + + def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]: + """ + For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. + """ + assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown + + result: Optional[KeyPress] = None + + control_key_state = ev.ControlKeyState + u_char = ev.uChar.UnicodeChar + # Use surrogatepass because u_char may be an unmatched surrogate + ascii_char = u_char.encode("utf-8", "surrogatepass") + + # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the + # unicode code point truncated to 1 byte. See also: + # https://github.com/ipython/ipython/issues/10004 + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 + + if u_char == "\x00": + if ev.VirtualKeyCode in self.keycodes: + result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") + else: + if ascii_char in self.mappings: + if self.mappings[ascii_char] == Keys.ControlJ: + u_char = ( + "\n" # Windows sends \n, turn into \r for unix compatibility. + ) + result = KeyPress(self.mappings[ascii_char], u_char) + else: + result = KeyPress(u_char, u_char) + + # First we handle Shift-Control-Arrow/Home/End (need to do this first) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and control_key_state & self.SHIFT_PRESSED + and result + ): + mapping: Dict[str, str] = { + Keys.Left: Keys.ControlShiftLeft, + Keys.Right: Keys.ControlShiftRight, + Keys.Up: Keys.ControlShiftUp, + Keys.Down: Keys.ControlShiftDown, + Keys.Home: Keys.ControlShiftHome, + Keys.End: Keys.ControlShiftEnd, + Keys.Insert: Keys.ControlShiftInsert, + Keys.PageUp: Keys.ControlShiftPageUp, + Keys.PageDown: Keys.ControlShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. + if ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) and result: + mapping = { + Keys.Left: Keys.ControlLeft, + Keys.Right: Keys.ControlRight, + Keys.Up: Keys.ControlUp, + Keys.Down: Keys.ControlDown, + Keys.Home: Keys.ControlHome, + Keys.End: Keys.ControlEnd, + Keys.Insert: Keys.ControlInsert, + Keys.Delete: Keys.ControlDelete, + Keys.PageUp: Keys.ControlPageUp, + Keys.PageDown: Keys.ControlPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Tab' into 'BackTab' when shift was pressed. + # Also handle other shift-key combination + if control_key_state & self.SHIFT_PRESSED and result: + mapping = { + Keys.Tab: Keys.BackTab, + Keys.Left: Keys.ShiftLeft, + Keys.Right: Keys.ShiftRight, + Keys.Up: Keys.ShiftUp, + Keys.Down: Keys.ShiftDown, + Keys.Home: Keys.ShiftHome, + Keys.End: Keys.ShiftEnd, + Keys.Insert: Keys.ShiftInsert, + Keys.Delete: Keys.ShiftDelete, + Keys.PageUp: Keys.ShiftPageUp, + Keys.PageDown: Keys.ShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Space' into 'ControlSpace' when control was pressed. + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.data == " " + ): + result = KeyPress(Keys.ControlSpace, " ") + + # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot + # detect this combination. But it's really practical on Windows.) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.key == Keys.ControlJ + ): + return [KeyPress(Keys.Escape, ""), result] + + # Return result. If alt was pressed, prefix the result with an + # 'Escape' key, just like unix VT100 terminals do. + + # NOTE: Only replace the left alt with escape. The right alt key often + # acts as altgr and is used in many non US keyboard layouts for + # typing some special characters, like a backslash. We don't want + # all backslashes to be prefixed with escape. (Esc-\ has a + # meaning in E-macs, for instance.) + if result: + meta_pressed = control_key_state & self.LEFT_ALT_PRESSED + + if meta_pressed: + return [KeyPress(Keys.Escape, ""), result] + else: + return [result] + + else: + return [] + + def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]: + """ + Handle mouse events. Return a list of KeyPress instances. + """ + event_flags = ev.EventFlags + button_state = ev.ButtonState + + event_type: Optional[MouseEventType] = None + button: MouseButton = MouseButton.NONE + + # Scroll events. + if event_flags & MOUSE_WHEELED: + if button_state > 0: + event_type = MouseEventType.SCROLL_UP + else: + event_type = MouseEventType.SCROLL_DOWN + else: + # Handle button state for non-scroll events. + if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: + button = MouseButton.LEFT + + elif button_state == RIGHTMOST_BUTTON_PRESSED: + button = MouseButton.RIGHT + + # Move events. + if event_flags & MOUSE_MOVED: + event_type = MouseEventType.MOUSE_MOVE + + # No key pressed anymore: mouse up. + if event_type is None: + if button_state > 0: + # Some button pressed. + event_type = MouseEventType.MOUSE_DOWN + else: + # No button pressed. + event_type = MouseEventType.MOUSE_UP + + data = ";".join( + [ + button.value, + event_type.value, + str(ev.MousePosition.X), + str(ev.MousePosition.Y), + ] + ) + return [KeyPress(Keys.WindowsMouseEvent, data)] + + +class _Win32Handles: + """ + Utility to keep track of which handles are connectod to which callbacks. + + `add_win32_handle` starts a tiny event loop in another thread which waits + for the Win32 handle to become ready. When this happens, the callback will + be called in the current asyncio event loop using `call_soon_threadsafe`. + + `remove_win32_handle` will stop this tiny event loop. + + NOTE: We use this technique, so that we don't have to use the + `ProactorEventLoop` on Windows and we can wait for things like stdin + in a `SelectorEventLoop`. This is important, because our inputhook + mechanism (used by IPython), only works with the `SelectorEventLoop`. + """ + + def __init__(self) -> None: + self._handle_callbacks: Dict[int, Callable[[], None]] = {} + + # Windows Events that are triggered when we have to stop watching this + # handle. + self._remove_events: Dict[int, HANDLE] = {} + + def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: + """ + Add a Win32 handle to the event loop. + """ + handle_value = handle.value + + if handle_value is None: + raise ValueError("Invalid handle.") + + # Make sure to remove a previous registered handler first. + self.remove_win32_handle(handle) + + loop = get_event_loop() + self._handle_callbacks[handle_value] = callback + + # Create remove event. + remove_event = create_win32_event() + self._remove_events[handle_value] = remove_event + + # Add reader. + def ready() -> None: + # Tell the callback that input's ready. + try: + callback() + finally: + run_in_executor_with_context(wait, loop=loop) + + # Wait for the input to become ready. + # (Use an executor for this, the Windows asyncio event loop doesn't + # allow us to wait for handles like stdin.) + def wait() -> None: + # Wait until either the handle becomes ready, or the remove event + # has been set. + result = wait_for_handles([remove_event, handle]) + + if result is remove_event: + windll.kernel32.CloseHandle(remove_event) + return + else: + loop.call_soon_threadsafe(ready) + + run_in_executor_with_context(wait, loop=loop) + + def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]: + """ + Remove a Win32 handle from the event loop. + Return either the registered handler or `None`. + """ + if handle.value is None: + return None # Ignore. + + # Trigger remove events, so that the reader knows to stop. + try: + event = self._remove_events.pop(handle.value) + except KeyError: + pass + else: + windll.kernel32.SetEvent(event) + + try: + return self._handle_callbacks.pop(handle.value) + except KeyError: + return None + + +@contextmanager +def attach_win32_input( + input: _Win32InputBase, callback: Callable[[], None] +) -> Iterator[None]: + """ + Context manager that makes this input active in the current event loop. + + :param input: :class:`~prompt_toolkit.input.Input` object. + :param input_ready_callback: Called when the input is ready to read. + """ + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + # Add reader. + previous_callback = win32_handles.remove_win32_handle(handle) + win32_handles.add_win32_handle(handle, callback) + + try: + yield + finally: + win32_handles.remove_win32_handle(handle) + + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +@contextmanager +def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + previous_callback = win32_handles.remove_win32_handle(handle) + + try: + yield + finally: + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +class raw_mode: + """ + :: + + with raw_mode(stdin): + ''' the windows terminal is now in 'raw' mode. ''' + + The ``fileno`` attribute is ignored. This is to be compatible with the + `raw_input` method of `.vt100_input`. + """ + + def __init__(self, fileno: Optional[int] = None) -> None: + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + + def __enter__(self) -> None: + # Remember original mode. + original_mode = DWORD() + windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) + self.original_mode = original_mode + + self._patch() + + def _patch(self) -> None: + # Set raw + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) + + def __exit__(self, *a: object) -> None: + # Restore original mode + windll.kernel32.SetConsoleMode(self.handle, self.original_mode) + + +class cooked_mode(raw_mode): + """ + :: + + with cooked_mode(stdin): + ''' The pseudo-terminal stdin is now used in cooked mode. ''' + """ + + def _patch(self) -> None: + # Set cooked. + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py index cdcf084de11..67cf6f0ee3e 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32_pipe.py @@ -1,135 +1,135 @@ -from ctypes import windll -from ctypes.wintypes import HANDLE -from typing import Callable, ContextManager, List - -from prompt_toolkit.eventloop.win32 import create_win32_event - -from ..key_binding import KeyPress -from ..utils import DummyContext -from .base import PipeInput -from .vt100_parser import Vt100Parser -from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input - -__all__ = ["Win32PipeInput"] - - -class Win32PipeInput(_Win32InputBase, PipeInput): - """ - This is an input pipe that works on Windows. - Text or bytes can be feed into the pipe, and key strokes can be read from - the pipe. This is useful if we want to send the input programmatically into - the application. Mostly useful for unit testing. - - Notice that even though it's Windows, we use vt100 escape sequences over - the pipe. - - Usage:: - - input = Win32PipeInput() - input.send_text('inputdata') - """ - - _id = 0 - - def __init__(self) -> None: - super().__init__() - # Event (handle) for registering this input in the event loop. - # This event is set when there is data available to read from the pipe. - # Note: We use this approach instead of using a regular pipe, like - # returned from `os.pipe()`, because making such a regular pipe - # non-blocking is tricky and this works really well. - self._event = create_win32_event() - - self._closed = False - - # Parser for incoming keys. - self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. - self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key)) - - # Identifier for every PipeInput for the hash. - self.__class__._id += 1 - self._id = self.__class__._id - - @property - def closed(self) -> bool: - return self._closed - - def fileno(self) -> int: - """ - The windows pipe doesn't depend on the file handle. - """ - raise NotImplementedError - - @property - def handle(self) -> HANDLE: - "The handle used for registering this pipe in the event loop." - return self._event - - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: - """ - Return a context manager that makes this input active in the current - event loop. - """ - return attach_win32_input(self, input_ready_callback) - - def detach(self) -> ContextManager[None]: - """ - Return a context manager that makes sure that this input is not active - in the current event loop. - """ - return detach_win32_input(self) - - def read_keys(self) -> List[KeyPress]: - "Read list of KeyPress." - - # Return result. - result = self._buffer - self._buffer = [] - - # Reset event. - windll.kernel32.ResetEvent(self._event) - - return result - - def flush_keys(self) -> List[KeyPress]: - """ - Flush pending keys and return them. - (Used for flushing the 'escape' key.) - """ - # Flush all pending keys. (This is most important to flush the vt100 - # 'Escape' key early when nothing else follows.) - self.vt100_parser.flush() - - # Return result. - result = self._buffer - self._buffer = [] - return result - - def send_bytes(self, data: bytes) -> None: - "Send bytes to the input." - self.send_text(data.decode("utf-8", "ignore")) - - def send_text(self, text: str) -> None: - "Send text to the input." - # Pass it through our vt100 parser. - self.vt100_parser.feed(text) - - # Set event. - windll.kernel32.SetEvent(self._event) - - def raw_mode(self) -> ContextManager[None]: - return DummyContext() - - def cooked_mode(self) -> ContextManager[None]: - return DummyContext() - - def close(self) -> None: - "Close pipe handles." - windll.kernel32.CloseHandle(self._event) - self._closed = True - - def typeahead_hash(self) -> str: - """ - This needs to be unique for every `PipeInput`. - """ - return "pipe-input-%s" % (self._id,) +from ctypes import windll +from ctypes.wintypes import HANDLE +from typing import Callable, ContextManager, List + +from prompt_toolkit.eventloop.win32 import create_win32_event + +from ..key_binding import KeyPress +from ..utils import DummyContext +from .base import PipeInput +from .vt100_parser import Vt100Parser +from .win32 import _Win32InputBase, attach_win32_input, detach_win32_input + +__all__ = ["Win32PipeInput"] + + +class Win32PipeInput(_Win32InputBase, PipeInput): + """ + This is an input pipe that works on Windows. + Text or bytes can be feed into the pipe, and key strokes can be read from + the pipe. This is useful if we want to send the input programmatically into + the application. Mostly useful for unit testing. + + Notice that even though it's Windows, we use vt100 escape sequences over + the pipe. + + Usage:: + + input = Win32PipeInput() + input.send_text('inputdata') + """ + + _id = 0 + + def __init__(self) -> None: + super().__init__() + # Event (handle) for registering this input in the event loop. + # This event is set when there is data available to read from the pipe. + # Note: We use this approach instead of using a regular pipe, like + # returned from `os.pipe()`, because making such a regular pipe + # non-blocking is tricky and this works really well. + self._event = create_win32_event() + + self._closed = False + + # Parser for incoming keys. + self._buffer: List[KeyPress] = [] # Buffer to collect the Key objects. + self.vt100_parser = Vt100Parser(lambda key: self._buffer.append(key)) + + # Identifier for every PipeInput for the hash. + self.__class__._id += 1 + self._id = self.__class__._id + + @property + def closed(self) -> bool: + return self._closed + + def fileno(self) -> int: + """ + The windows pipe doesn't depend on the file handle. + """ + raise NotImplementedError + + @property + def handle(self) -> HANDLE: + "The handle used for registering this pipe in the event loop." + return self._event + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return attach_win32_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return detach_win32_input(self) + + def read_keys(self) -> List[KeyPress]: + "Read list of KeyPress." + + # Return result. + result = self._buffer + self._buffer = [] + + # Reset event. + windll.kernel32.ResetEvent(self._event) + + return result + + def flush_keys(self) -> List[KeyPress]: + """ + Flush pending keys and return them. + (Used for flushing the 'escape' key.) + """ + # Flush all pending keys. (This is most important to flush the vt100 + # 'Escape' key early when nothing else follows.) + self.vt100_parser.flush() + + # Return result. + result = self._buffer + self._buffer = [] + return result + + def send_bytes(self, data: bytes) -> None: + "Send bytes to the input." + self.send_text(data.decode("utf-8", "ignore")) + + def send_text(self, text: str) -> None: + "Send text to the input." + # Pass it through our vt100 parser. + self.vt100_parser.feed(text) + + # Set event. + windll.kernel32.SetEvent(self._event) + + def raw_mode(self) -> ContextManager[None]: + return DummyContext() + + def cooked_mode(self) -> ContextManager[None]: + return DummyContext() + + def close(self) -> None: + "Close pipe handles." + windll.kernel32.CloseHandle(self._event) + self._closed = True + + def typeahead_hash(self) -> str: + """ + This needs to be unique for every `PipeInput`. + """ + return "pipe-input-%s" % (self._id,) |