diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py | 1506 |
1 files changed, 753 insertions, 753 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py index 97699e19b2..437affaa5d 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py @@ -1,758 +1,758 @@ -import os -import sys -from abc import abstractmethod -from contextlib import contextmanager - -from prompt_toolkit.eventloop import get_event_loop - -from ..utils import SPHINX_AUTODOC_RUNNING - -# Do not import win32-specific stuff when generating documentation. -# Otherwise RTD would be unable to generate docs for this module. -if not SPHINX_AUTODOC_RUNNING: - import msvcrt - from ctypes import windll - -from ctypes import Array, pointer -from ctypes.wintypes import DWORD, HANDLE -from typing import ( - Callable, - ContextManager, - Dict, - Iterable, - Iterator, - List, - Optional, - TextIO, -) - -from prompt_toolkit.eventloop import run_in_executor_with_context -from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles -from prompt_toolkit.key_binding.key_processor import KeyPress -from prompt_toolkit.keys import Keys -from prompt_toolkit.mouse_events import MouseButton, MouseEventType -from prompt_toolkit.win32_types import ( - INPUT_RECORD, - KEY_EVENT_RECORD, - MOUSE_EVENT_RECORD, - STD_INPUT_HANDLE, - EventTypes, -) - -from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES -from .base import Input - -__all__ = [ - "Win32Input", - "ConsoleInputReader", - "raw_mode", - "cooked_mode", - "attach_win32_input", - "detach_win32_input", -] - -# Win32 Constants for MOUSE_EVENT_RECORD. -# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str -FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 -RIGHTMOST_BUTTON_PRESSED = 0x2 -MOUSE_MOVED = 0x0001 -MOUSE_WHEELED = 0x0004 - - -class _Win32InputBase(Input): - """ - Base class for `Win32Input` and `Win32PipeInput`. - """ - - def __init__(self) -> None: - self.win32_handles = _Win32Handles() - - @property - @abstractmethod - def handle(self) -> HANDLE: - pass - - -class Win32Input(_Win32InputBase): - """ - `Input` class that reads from the Windows console. - """ - - def __init__(self, stdin: Optional[TextIO] = None) -> None: - super().__init__() - self.console_input_reader = ConsoleInputReader() - - def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: - """ - Return a context manager that makes this input active in the current - event loop. - """ - return attach_win32_input(self, input_ready_callback) - - def detach(self) -> ContextManager[None]: - """ - Return a context manager that makes sure that this input is not active - in the current event loop. - """ - return detach_win32_input(self) - - def read_keys(self) -> List[KeyPress]: - return list(self.console_input_reader.read()) - - def flush(self) -> None: - pass - - @property - def closed(self) -> bool: - return False - - def raw_mode(self) -> ContextManager[None]: - return raw_mode() - - def cooked_mode(self) -> ContextManager[None]: - return cooked_mode() - - def fileno(self) -> int: - # The windows console doesn't depend on the file handle, so - # this is not used for the event loop (which uses the - # handle instead). But it's used in `Application.run_system_command` - # which opens a subprocess with a given stdin/stdout. - return sys.stdin.fileno() - - def typeahead_hash(self) -> str: - return "win32-input" - - def close(self) -> None: - self.console_input_reader.close() - - @property - def handle(self) -> HANDLE: - return self.console_input_reader.handle - - -class ConsoleInputReader: - """ - :param recognize_paste: When True, try to discover paste actions and turn - the event into a BracketedPaste. - """ - - # Keys with character data. - mappings = { - b"\x1b": Keys.Escape, - b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) - b"\x01": Keys.ControlA, # Control-A (home) - b"\x02": Keys.ControlB, # Control-B (emacs cursor left) - b"\x03": Keys.ControlC, # Control-C (interrupt) - b"\x04": Keys.ControlD, # Control-D (exit) - b"\x05": Keys.ControlE, # Control-E (end) - b"\x06": Keys.ControlF, # Control-F (cursor forward) - b"\x07": Keys.ControlG, # Control-G - b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') - b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') - b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') - b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) - b"\x0c": Keys.ControlL, # Control-L (clear; form feed) - b"\x0d": Keys.ControlM, # Control-M (enter) - b"\x0e": Keys.ControlN, # Control-N (14) (history forward) - b"\x0f": Keys.ControlO, # Control-O (15) - b"\x10": Keys.ControlP, # Control-P (16) (history back) - b"\x11": Keys.ControlQ, # Control-Q - b"\x12": Keys.ControlR, # Control-R (18) (reverse search) - b"\x13": Keys.ControlS, # Control-S (19) (forward search) - b"\x14": Keys.ControlT, # Control-T - b"\x15": Keys.ControlU, # Control-U - b"\x16": Keys.ControlV, # Control-V - b"\x17": Keys.ControlW, # Control-W - b"\x18": Keys.ControlX, # Control-X - b"\x19": Keys.ControlY, # Control-Y (25) - b"\x1a": Keys.ControlZ, # Control-Z - b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| - b"\x1d": Keys.ControlSquareClose, # Control-] - b"\x1e": Keys.ControlCircumflex, # Control-^ - b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) - b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) - } - - # Keys that don't carry character data. - keycodes = { - # Home/End - 33: Keys.PageUp, - 34: Keys.PageDown, - 35: Keys.End, - 36: Keys.Home, - # Arrows - 37: Keys.Left, - 38: Keys.Up, - 39: Keys.Right, - 40: Keys.Down, - 45: Keys.Insert, - 46: Keys.Delete, - # F-keys. - 112: Keys.F1, - 113: Keys.F2, - 114: Keys.F3, - 115: Keys.F4, - 116: Keys.F5, - 117: Keys.F6, - 118: Keys.F7, - 119: Keys.F8, - 120: Keys.F9, - 121: Keys.F10, - 122: Keys.F11, - 123: Keys.F12, - } - - LEFT_ALT_PRESSED = 0x0002 - RIGHT_ALT_PRESSED = 0x0001 - SHIFT_PRESSED = 0x0010 - LEFT_CTRL_PRESSED = 0x0008 - RIGHT_CTRL_PRESSED = 0x0004 - - def __init__(self, recognize_paste: bool = True) -> None: - self._fdcon = None - self.recognize_paste = recognize_paste - - # When stdin is a tty, use that handle, otherwise, create a handle from - # CONIN$. - self.handle: HANDLE - if sys.stdin.isatty(): - self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) - else: - self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) - self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) - - def close(self) -> None: - "Close fdcon." - if self._fdcon is not None: - os.close(self._fdcon) - - def read(self) -> Iterable[KeyPress]: - """ - Return a list of `KeyPress` instances. It won't return anything when - there was nothing to read. (This function doesn't block.) - - http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx - """ - max_count = 2048 # Max events to read at the same time. - - read = DWORD(0) - arrtype = INPUT_RECORD * max_count - input_records = arrtype() - - # Check whether there is some input to read. `ReadConsoleInputW` would - # block otherwise. - # (Actually, the event loop is responsible to make sure that this - # function is only called when there is something to read, but for some - # reason this happened in the asyncio_win32 loop, and it's better to be - # safe anyway.) - if not wait_for_handles([self.handle], timeout=0): - return - - # Get next batch of input event. - windll.kernel32.ReadConsoleInputW( - self.handle, pointer(input_records), max_count, pointer(read) - ) - - # First, get all the keys from the input buffer, in order to determine - # whether we should consider this a paste event or not. - all_keys = list(self._get_keys(read, input_records)) - - # Fill in 'data' for key presses. - all_keys = [self._insert_key_data(key) for key in all_keys] - - # Correct non-bmp characters that are passed as separate surrogate codes - all_keys = list(self._merge_paired_surrogates(all_keys)) - - if self.recognize_paste and self._is_paste(all_keys): - gen = iter(all_keys) - k: Optional[KeyPress] - - for k in gen: - # Pasting: if the current key consists of text or \n, turn it - # into a BracketedPaste. - data = [] +import os +import sys +from abc import abstractmethod +from contextlib import contextmanager + +from prompt_toolkit.eventloop import get_event_loop + +from ..utils import SPHINX_AUTODOC_RUNNING + +# Do not import win32-specific stuff when generating documentation. +# Otherwise RTD would be unable to generate docs for this module. +if not SPHINX_AUTODOC_RUNNING: + import msvcrt + from ctypes import windll + +from ctypes import Array, pointer +from ctypes.wintypes import DWORD, HANDLE +from typing import ( + Callable, + ContextManager, + Dict, + Iterable, + Iterator, + List, + Optional, + TextIO, +) + +from prompt_toolkit.eventloop import run_in_executor_with_context +from prompt_toolkit.eventloop.win32 import create_win32_event, wait_for_handles +from prompt_toolkit.key_binding.key_processor import KeyPress +from prompt_toolkit.keys import Keys +from prompt_toolkit.mouse_events import MouseButton, MouseEventType +from prompt_toolkit.win32_types import ( + INPUT_RECORD, + KEY_EVENT_RECORD, + MOUSE_EVENT_RECORD, + STD_INPUT_HANDLE, + EventTypes, +) + +from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES +from .base import Input + +__all__ = [ + "Win32Input", + "ConsoleInputReader", + "raw_mode", + "cooked_mode", + "attach_win32_input", + "detach_win32_input", +] + +# Win32 Constants for MOUSE_EVENT_RECORD. +# See: https://docs.microsoft.com/en-us/windows/console/mouse-event-record-str +FROM_LEFT_1ST_BUTTON_PRESSED = 0x1 +RIGHTMOST_BUTTON_PRESSED = 0x2 +MOUSE_MOVED = 0x0001 +MOUSE_WHEELED = 0x0004 + + +class _Win32InputBase(Input): + """ + Base class for `Win32Input` and `Win32PipeInput`. + """ + + def __init__(self) -> None: + self.win32_handles = _Win32Handles() + + @property + @abstractmethod + def handle(self) -> HANDLE: + pass + + +class Win32Input(_Win32InputBase): + """ + `Input` class that reads from the Windows console. + """ + + def __init__(self, stdin: Optional[TextIO] = None) -> None: + super().__init__() + self.console_input_reader = ConsoleInputReader() + + def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]: + """ + Return a context manager that makes this input active in the current + event loop. + """ + return attach_win32_input(self, input_ready_callback) + + def detach(self) -> ContextManager[None]: + """ + Return a context manager that makes sure that this input is not active + in the current event loop. + """ + return detach_win32_input(self) + + def read_keys(self) -> List[KeyPress]: + return list(self.console_input_reader.read()) + + def flush(self) -> None: + pass + + @property + def closed(self) -> bool: + return False + + def raw_mode(self) -> ContextManager[None]: + return raw_mode() + + def cooked_mode(self) -> ContextManager[None]: + return cooked_mode() + + def fileno(self) -> int: + # The windows console doesn't depend on the file handle, so + # this is not used for the event loop (which uses the + # handle instead). But it's used in `Application.run_system_command` + # which opens a subprocess with a given stdin/stdout. + return sys.stdin.fileno() + + def typeahead_hash(self) -> str: + return "win32-input" + + def close(self) -> None: + self.console_input_reader.close() + + @property + def handle(self) -> HANDLE: + return self.console_input_reader.handle + + +class ConsoleInputReader: + """ + :param recognize_paste: When True, try to discover paste actions and turn + the event into a BracketedPaste. + """ + + # Keys with character data. + mappings = { + b"\x1b": Keys.Escape, + b"\x00": Keys.ControlSpace, # Control-Space (Also for Ctrl-@) + b"\x01": Keys.ControlA, # Control-A (home) + b"\x02": Keys.ControlB, # Control-B (emacs cursor left) + b"\x03": Keys.ControlC, # Control-C (interrupt) + b"\x04": Keys.ControlD, # Control-D (exit) + b"\x05": Keys.ControlE, # Control-E (end) + b"\x06": Keys.ControlF, # Control-F (cursor forward) + b"\x07": Keys.ControlG, # Control-G + b"\x08": Keys.ControlH, # Control-H (8) (Identical to '\b') + b"\x09": Keys.ControlI, # Control-I (9) (Identical to '\t') + b"\x0a": Keys.ControlJ, # Control-J (10) (Identical to '\n') + b"\x0b": Keys.ControlK, # Control-K (delete until end of line; vertical tab) + b"\x0c": Keys.ControlL, # Control-L (clear; form feed) + b"\x0d": Keys.ControlM, # Control-M (enter) + b"\x0e": Keys.ControlN, # Control-N (14) (history forward) + b"\x0f": Keys.ControlO, # Control-O (15) + b"\x10": Keys.ControlP, # Control-P (16) (history back) + b"\x11": Keys.ControlQ, # Control-Q + b"\x12": Keys.ControlR, # Control-R (18) (reverse search) + b"\x13": Keys.ControlS, # Control-S (19) (forward search) + b"\x14": Keys.ControlT, # Control-T + b"\x15": Keys.ControlU, # Control-U + b"\x16": Keys.ControlV, # Control-V + b"\x17": Keys.ControlW, # Control-W + b"\x18": Keys.ControlX, # Control-X + b"\x19": Keys.ControlY, # Control-Y (25) + b"\x1a": Keys.ControlZ, # Control-Z + b"\x1c": Keys.ControlBackslash, # Both Control-\ and Ctrl-| + b"\x1d": Keys.ControlSquareClose, # Control-] + b"\x1e": Keys.ControlCircumflex, # Control-^ + b"\x1f": Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hyphen.) + b"\x7f": Keys.Backspace, # (127) Backspace (ASCII Delete.) + } + + # Keys that don't carry character data. + keycodes = { + # Home/End + 33: Keys.PageUp, + 34: Keys.PageDown, + 35: Keys.End, + 36: Keys.Home, + # Arrows + 37: Keys.Left, + 38: Keys.Up, + 39: Keys.Right, + 40: Keys.Down, + 45: Keys.Insert, + 46: Keys.Delete, + # F-keys. + 112: Keys.F1, + 113: Keys.F2, + 114: Keys.F3, + 115: Keys.F4, + 116: Keys.F5, + 117: Keys.F6, + 118: Keys.F7, + 119: Keys.F8, + 120: Keys.F9, + 121: Keys.F10, + 122: Keys.F11, + 123: Keys.F12, + } + + LEFT_ALT_PRESSED = 0x0002 + RIGHT_ALT_PRESSED = 0x0001 + SHIFT_PRESSED = 0x0010 + LEFT_CTRL_PRESSED = 0x0008 + RIGHT_CTRL_PRESSED = 0x0004 + + def __init__(self, recognize_paste: bool = True) -> None: + self._fdcon = None + self.recognize_paste = recognize_paste + + # When stdin is a tty, use that handle, otherwise, create a handle from + # CONIN$. + self.handle: HANDLE + if sys.stdin.isatty(): + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + else: + self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY) + self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon)) + + def close(self) -> None: + "Close fdcon." + if self._fdcon is not None: + os.close(self._fdcon) + + def read(self) -> Iterable[KeyPress]: + """ + Return a list of `KeyPress` instances. It won't return anything when + there was nothing to read. (This function doesn't block.) + + http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx + """ + max_count = 2048 # Max events to read at the same time. + + read = DWORD(0) + arrtype = INPUT_RECORD * max_count + input_records = arrtype() + + # Check whether there is some input to read. `ReadConsoleInputW` would + # block otherwise. + # (Actually, the event loop is responsible to make sure that this + # function is only called when there is something to read, but for some + # reason this happened in the asyncio_win32 loop, and it's better to be + # safe anyway.) + if not wait_for_handles([self.handle], timeout=0): + return + + # Get next batch of input event. + windll.kernel32.ReadConsoleInputW( + self.handle, pointer(input_records), max_count, pointer(read) + ) + + # First, get all the keys from the input buffer, in order to determine + # whether we should consider this a paste event or not. + all_keys = list(self._get_keys(read, input_records)) + + # Fill in 'data' for key presses. + all_keys = [self._insert_key_data(key) for key in all_keys] + + # Correct non-bmp characters that are passed as separate surrogate codes + all_keys = list(self._merge_paired_surrogates(all_keys)) + + if self.recognize_paste and self._is_paste(all_keys): + gen = iter(all_keys) + k: Optional[KeyPress] + + for k in gen: + # Pasting: if the current key consists of text or \n, turn it + # into a BracketedPaste. + data = [] while k and ( not isinstance(k.key, Keys) or k.key in {Keys.ControlJ, Keys.ControlM} ): - data.append(k.data) - try: - k = next(gen) - except StopIteration: - k = None - - if data: - yield KeyPress(Keys.BracketedPaste, "".join(data)) - if k is not None: - yield k - else: - for k2 in all_keys: - yield k2 - - def _insert_key_data(self, key_press: KeyPress) -> KeyPress: - """ - Insert KeyPress data, for vt100 compatibility. - """ - if key_press.data: - return key_press - - if isinstance(key_press.key, Keys): - data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") - else: - data = "" - - return KeyPress(key_press.key, data) - - def _get_keys( - self, read: DWORD, input_records: "Array[INPUT_RECORD]" - ) -> Iterator[KeyPress]: - """ - Generator that yields `KeyPress` objects from the input records. - """ - for i in range(read.value): - ir = input_records[i] - - # Get the right EventType from the EVENT_RECORD. - # (For some reason the Windows console application 'cmder' - # [http://gooseberrycreative.com/cmder/] can return '0' for - # ir.EventType. -- Just ignore that.) - if ir.EventType in EventTypes: - ev = getattr(ir.Event, EventTypes[ir.EventType]) - - # Process if this is a key event. (We also have mouse, menu and - # focus events.) - if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: - for key_press in self._event_to_key_presses(ev): - yield key_press - - elif type(ev) == MOUSE_EVENT_RECORD: - for key_press in self._handle_mouse(ev): - yield key_press - - @staticmethod - def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]: - """ - Combines consecutive KeyPresses with high and low surrogates into - single characters - """ - buffered_high_surrogate = None - for key in key_presses: - is_text = not isinstance(key.key, Keys) - is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF" - is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF" - - if buffered_high_surrogate: - if is_low_surrogate: - # convert high surrogate + low surrogate to single character - fullchar = ( - (buffered_high_surrogate.key + key.key) - .encode("utf-16-le", "surrogatepass") - .decode("utf-16-le") - ) - key = KeyPress(fullchar, fullchar) - else: - yield buffered_high_surrogate - buffered_high_surrogate = None - - if is_high_surrogate: - buffered_high_surrogate = key - else: - yield key - - if buffered_high_surrogate: - yield buffered_high_surrogate - - @staticmethod - def _is_paste(keys: List[KeyPress]) -> bool: - """ - Return `True` when we should consider this list of keys as a paste - event. Pasted text on windows will be turned into a - `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably - the best possible way to detect pasting of text and handle that - correctly.) - """ - # Consider paste when it contains at least one newline and at least one - # other character. - text_count = 0 - newline_count = 0 - - for k in keys: - if not isinstance(k.key, Keys): - text_count += 1 - if k.key == Keys.ControlM: - newline_count += 1 - + data.append(k.data) + try: + k = next(gen) + except StopIteration: + k = None + + if data: + yield KeyPress(Keys.BracketedPaste, "".join(data)) + if k is not None: + yield k + else: + for k2 in all_keys: + yield k2 + + def _insert_key_data(self, key_press: KeyPress) -> KeyPress: + """ + Insert KeyPress data, for vt100 compatibility. + """ + if key_press.data: + return key_press + + if isinstance(key_press.key, Keys): + data = REVERSE_ANSI_SEQUENCES.get(key_press.key, "") + else: + data = "" + + return KeyPress(key_press.key, data) + + def _get_keys( + self, read: DWORD, input_records: "Array[INPUT_RECORD]" + ) -> Iterator[KeyPress]: + """ + Generator that yields `KeyPress` objects from the input records. + """ + for i in range(read.value): + ir = input_records[i] + + # Get the right EventType from the EVENT_RECORD. + # (For some reason the Windows console application 'cmder' + # [http://gooseberrycreative.com/cmder/] can return '0' for + # ir.EventType. -- Just ignore that.) + if ir.EventType in EventTypes: + ev = getattr(ir.Event, EventTypes[ir.EventType]) + + # Process if this is a key event. (We also have mouse, menu and + # focus events.) + if type(ev) == KEY_EVENT_RECORD and ev.KeyDown: + for key_press in self._event_to_key_presses(ev): + yield key_press + + elif type(ev) == MOUSE_EVENT_RECORD: + for key_press in self._handle_mouse(ev): + yield key_press + + @staticmethod + def _merge_paired_surrogates(key_presses: List[KeyPress]) -> Iterator[KeyPress]: + """ + Combines consecutive KeyPresses with high and low surrogates into + single characters + """ + buffered_high_surrogate = None + for key in key_presses: + is_text = not isinstance(key.key, Keys) + is_high_surrogate = is_text and "\uD800" <= key.key <= "\uDBFF" + is_low_surrogate = is_text and "\uDC00" <= key.key <= "\uDFFF" + + if buffered_high_surrogate: + if is_low_surrogate: + # convert high surrogate + low surrogate to single character + fullchar = ( + (buffered_high_surrogate.key + key.key) + .encode("utf-16-le", "surrogatepass") + .decode("utf-16-le") + ) + key = KeyPress(fullchar, fullchar) + else: + yield buffered_high_surrogate + buffered_high_surrogate = None + + if is_high_surrogate: + buffered_high_surrogate = key + else: + yield key + + if buffered_high_surrogate: + yield buffered_high_surrogate + + @staticmethod + def _is_paste(keys: List[KeyPress]) -> bool: + """ + Return `True` when we should consider this list of keys as a paste + event. Pasted text on windows will be turned into a + `Keys.BracketedPaste` event. (It's not 100% correct, but it is probably + the best possible way to detect pasting of text and handle that + correctly.) + """ + # Consider paste when it contains at least one newline and at least one + # other character. + text_count = 0 + newline_count = 0 + + for k in keys: + if not isinstance(k.key, Keys): + text_count += 1 + if k.key == Keys.ControlM: + newline_count += 1 + return newline_count >= 1 and text_count >= 1 - - def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]: - """ - For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. - """ - assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown - - result: Optional[KeyPress] = None - - control_key_state = ev.ControlKeyState - u_char = ev.uChar.UnicodeChar - # Use surrogatepass because u_char may be an unmatched surrogate - ascii_char = u_char.encode("utf-8", "surrogatepass") - - # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the - # unicode code point truncated to 1 byte. See also: - # https://github.com/ipython/ipython/issues/10004 - # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 - - if u_char == "\x00": - if ev.VirtualKeyCode in self.keycodes: - result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") - else: - if ascii_char in self.mappings: - if self.mappings[ascii_char] == Keys.ControlJ: - u_char = ( - "\n" # Windows sends \n, turn into \r for unix compatibility. - ) - result = KeyPress(self.mappings[ascii_char], u_char) - else: - result = KeyPress(u_char, u_char) - - # First we handle Shift-Control-Arrow/Home/End (need to do this first) - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and control_key_state & self.SHIFT_PRESSED - and result - ): - mapping: Dict[str, str] = { - Keys.Left: Keys.ControlShiftLeft, - Keys.Right: Keys.ControlShiftRight, - Keys.Up: Keys.ControlShiftUp, - Keys.Down: Keys.ControlShiftDown, - Keys.Home: Keys.ControlShiftHome, - Keys.End: Keys.ControlShiftEnd, - Keys.Insert: Keys.ControlShiftInsert, - Keys.PageUp: Keys.ControlShiftPageUp, - Keys.PageDown: Keys.ControlShiftPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. - if ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) and result: - mapping = { - Keys.Left: Keys.ControlLeft, - Keys.Right: Keys.ControlRight, - Keys.Up: Keys.ControlUp, - Keys.Down: Keys.ControlDown, - Keys.Home: Keys.ControlHome, - Keys.End: Keys.ControlEnd, - Keys.Insert: Keys.ControlInsert, - Keys.Delete: Keys.ControlDelete, - Keys.PageUp: Keys.ControlPageUp, - Keys.PageDown: Keys.ControlPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Turn 'Tab' into 'BackTab' when shift was pressed. - # Also handle other shift-key combination - if control_key_state & self.SHIFT_PRESSED and result: - mapping = { - Keys.Tab: Keys.BackTab, - Keys.Left: Keys.ShiftLeft, - Keys.Right: Keys.ShiftRight, - Keys.Up: Keys.ShiftUp, - Keys.Down: Keys.ShiftDown, - Keys.Home: Keys.ShiftHome, - Keys.End: Keys.ShiftEnd, - Keys.Insert: Keys.ShiftInsert, - Keys.Delete: Keys.ShiftDelete, - Keys.PageUp: Keys.ShiftPageUp, - Keys.PageDown: Keys.ShiftPageDown, - } - result.key = mapping.get(result.key, result.key) - - # Turn 'Space' into 'ControlSpace' when control was pressed. - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and result - and result.data == " " - ): - result = KeyPress(Keys.ControlSpace, " ") - - # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot - # detect this combination. But it's really practical on Windows.) - if ( - ( - control_key_state & self.LEFT_CTRL_PRESSED - or control_key_state & self.RIGHT_CTRL_PRESSED - ) - and result - and result.key == Keys.ControlJ - ): - return [KeyPress(Keys.Escape, ""), result] - - # Return result. If alt was pressed, prefix the result with an - # 'Escape' key, just like unix VT100 terminals do. - - # NOTE: Only replace the left alt with escape. The right alt key often - # acts as altgr and is used in many non US keyboard layouts for - # typing some special characters, like a backslash. We don't want - # all backslashes to be prefixed with escape. (Esc-\ has a - # meaning in E-macs, for instance.) - if result: - meta_pressed = control_key_state & self.LEFT_ALT_PRESSED - - if meta_pressed: - return [KeyPress(Keys.Escape, ""), result] - else: - return [result] - - else: - return [] - - def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]: - """ - Handle mouse events. Return a list of KeyPress instances. - """ - event_flags = ev.EventFlags - button_state = ev.ButtonState - - event_type: Optional[MouseEventType] = None - button: MouseButton = MouseButton.NONE - - # Scroll events. - if event_flags & MOUSE_WHEELED: - if button_state > 0: - event_type = MouseEventType.SCROLL_UP - else: - event_type = MouseEventType.SCROLL_DOWN - else: - # Handle button state for non-scroll events. - if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: - button = MouseButton.LEFT - - elif button_state == RIGHTMOST_BUTTON_PRESSED: - button = MouseButton.RIGHT - - # Move events. - if event_flags & MOUSE_MOVED: - event_type = MouseEventType.MOUSE_MOVE - - # No key pressed anymore: mouse up. - if event_type is None: - if button_state > 0: - # Some button pressed. - event_type = MouseEventType.MOUSE_DOWN - else: - # No button pressed. - event_type = MouseEventType.MOUSE_UP - - data = ";".join( - [ - button.value, - event_type.value, - str(ev.MousePosition.X), - str(ev.MousePosition.Y), - ] - ) - return [KeyPress(Keys.WindowsMouseEvent, data)] - - -class _Win32Handles: - """ - Utility to keep track of which handles are connectod to which callbacks. - - `add_win32_handle` starts a tiny event loop in another thread which waits - for the Win32 handle to become ready. When this happens, the callback will - be called in the current asyncio event loop using `call_soon_threadsafe`. - - `remove_win32_handle` will stop this tiny event loop. - - NOTE: We use this technique, so that we don't have to use the - `ProactorEventLoop` on Windows and we can wait for things like stdin - in a `SelectorEventLoop`. This is important, because our inputhook - mechanism (used by IPython), only works with the `SelectorEventLoop`. - """ - - def __init__(self) -> None: - self._handle_callbacks: Dict[int, Callable[[], None]] = {} - - # Windows Events that are triggered when we have to stop watching this - # handle. - self._remove_events: Dict[int, HANDLE] = {} - - def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: - """ - Add a Win32 handle to the event loop. - """ - handle_value = handle.value - - if handle_value is None: - raise ValueError("Invalid handle.") - - # Make sure to remove a previous registered handler first. - self.remove_win32_handle(handle) - - loop = get_event_loop() - self._handle_callbacks[handle_value] = callback - - # Create remove event. - remove_event = create_win32_event() - self._remove_events[handle_value] = remove_event - - # Add reader. - def ready() -> None: - # Tell the callback that input's ready. - try: - callback() - finally: - run_in_executor_with_context(wait, loop=loop) - - # Wait for the input to become ready. - # (Use an executor for this, the Windows asyncio event loop doesn't - # allow us to wait for handles like stdin.) - def wait() -> None: - # Wait until either the handle becomes ready, or the remove event - # has been set. - result = wait_for_handles([remove_event, handle]) - - if result is remove_event: - windll.kernel32.CloseHandle(remove_event) - return - else: - loop.call_soon_threadsafe(ready) - - run_in_executor_with_context(wait, loop=loop) - - def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]: - """ - Remove a Win32 handle from the event loop. - Return either the registered handler or `None`. - """ - if handle.value is None: - return None # Ignore. - - # Trigger remove events, so that the reader knows to stop. - try: - event = self._remove_events.pop(handle.value) - except KeyError: - pass - else: - windll.kernel32.SetEvent(event) - - try: - return self._handle_callbacks.pop(handle.value) - except KeyError: - return None - - -@contextmanager -def attach_win32_input( - input: _Win32InputBase, callback: Callable[[], None] -) -> Iterator[None]: - """ - Context manager that makes this input active in the current event loop. - - :param input: :class:`~prompt_toolkit.input.Input` object. - :param input_ready_callback: Called when the input is ready to read. - """ - win32_handles = input.win32_handles - handle = input.handle - - if handle.value is None: - raise ValueError("Invalid handle.") - - # Add reader. - previous_callback = win32_handles.remove_win32_handle(handle) - win32_handles.add_win32_handle(handle, callback) - - try: - yield - finally: - win32_handles.remove_win32_handle(handle) - - if previous_callback: - win32_handles.add_win32_handle(handle, previous_callback) - - -@contextmanager -def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: - win32_handles = input.win32_handles - handle = input.handle - - if handle.value is None: - raise ValueError("Invalid handle.") - - previous_callback = win32_handles.remove_win32_handle(handle) - - try: - yield - finally: - if previous_callback: - win32_handles.add_win32_handle(handle, previous_callback) - - -class raw_mode: - """ - :: - - with raw_mode(stdin): - ''' the windows terminal is now in 'raw' mode. ''' - - The ``fileno`` attribute is ignored. This is to be compatible with the - `raw_input` method of `.vt100_input`. - """ - - def __init__(self, fileno: Optional[int] = None) -> None: - self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) - - def __enter__(self) -> None: - # Remember original mode. - original_mode = DWORD() - windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) - self.original_mode = original_mode - - self._patch() - - def _patch(self) -> None: - # Set raw - ENABLE_ECHO_INPUT = 0x0004 - ENABLE_LINE_INPUT = 0x0002 - ENABLE_PROCESSED_INPUT = 0x0001 - - windll.kernel32.SetConsoleMode( - self.handle, - self.original_mode.value - & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), - ) - - def __exit__(self, *a: object) -> None: - # Restore original mode - windll.kernel32.SetConsoleMode(self.handle, self.original_mode) - - -class cooked_mode(raw_mode): - """ - :: - - with cooked_mode(stdin): - ''' The pseudo-terminal stdin is now used in cooked mode. ''' - """ - - def _patch(self) -> None: - # Set cooked. - ENABLE_ECHO_INPUT = 0x0004 - ENABLE_LINE_INPUT = 0x0002 - ENABLE_PROCESSED_INPUT = 0x0001 - - windll.kernel32.SetConsoleMode( - self.handle, - self.original_mode.value - | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), - ) + + def _event_to_key_presses(self, ev: KEY_EVENT_RECORD) -> List[KeyPress]: + """ + For this `KEY_EVENT_RECORD`, return a list of `KeyPress` instances. + """ + assert type(ev) == KEY_EVENT_RECORD and ev.KeyDown + + result: Optional[KeyPress] = None + + control_key_state = ev.ControlKeyState + u_char = ev.uChar.UnicodeChar + # Use surrogatepass because u_char may be an unmatched surrogate + ascii_char = u_char.encode("utf-8", "surrogatepass") + + # NOTE: We don't use `ev.uChar.AsciiChar`. That appears to be the + # unicode code point truncated to 1 byte. See also: + # https://github.com/ipython/ipython/issues/10004 + # https://github.com/jonathanslenders/python-prompt-toolkit/issues/389 + + if u_char == "\x00": + if ev.VirtualKeyCode in self.keycodes: + result = KeyPress(self.keycodes[ev.VirtualKeyCode], "") + else: + if ascii_char in self.mappings: + if self.mappings[ascii_char] == Keys.ControlJ: + u_char = ( + "\n" # Windows sends \n, turn into \r for unix compatibility. + ) + result = KeyPress(self.mappings[ascii_char], u_char) + else: + result = KeyPress(u_char, u_char) + + # First we handle Shift-Control-Arrow/Home/End (need to do this first) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and control_key_state & self.SHIFT_PRESSED + and result + ): + mapping: Dict[str, str] = { + Keys.Left: Keys.ControlShiftLeft, + Keys.Right: Keys.ControlShiftRight, + Keys.Up: Keys.ControlShiftUp, + Keys.Down: Keys.ControlShiftDown, + Keys.Home: Keys.ControlShiftHome, + Keys.End: Keys.ControlShiftEnd, + Keys.Insert: Keys.ControlShiftInsert, + Keys.PageUp: Keys.ControlShiftPageUp, + Keys.PageDown: Keys.ControlShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Correctly handle Control-Arrow/Home/End and Control-Insert/Delete keys. + if ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) and result: + mapping = { + Keys.Left: Keys.ControlLeft, + Keys.Right: Keys.ControlRight, + Keys.Up: Keys.ControlUp, + Keys.Down: Keys.ControlDown, + Keys.Home: Keys.ControlHome, + Keys.End: Keys.ControlEnd, + Keys.Insert: Keys.ControlInsert, + Keys.Delete: Keys.ControlDelete, + Keys.PageUp: Keys.ControlPageUp, + Keys.PageDown: Keys.ControlPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Tab' into 'BackTab' when shift was pressed. + # Also handle other shift-key combination + if control_key_state & self.SHIFT_PRESSED and result: + mapping = { + Keys.Tab: Keys.BackTab, + Keys.Left: Keys.ShiftLeft, + Keys.Right: Keys.ShiftRight, + Keys.Up: Keys.ShiftUp, + Keys.Down: Keys.ShiftDown, + Keys.Home: Keys.ShiftHome, + Keys.End: Keys.ShiftEnd, + Keys.Insert: Keys.ShiftInsert, + Keys.Delete: Keys.ShiftDelete, + Keys.PageUp: Keys.ShiftPageUp, + Keys.PageDown: Keys.ShiftPageDown, + } + result.key = mapping.get(result.key, result.key) + + # Turn 'Space' into 'ControlSpace' when control was pressed. + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.data == " " + ): + result = KeyPress(Keys.ControlSpace, " ") + + # Turn Control-Enter into META-Enter. (On a vt100 terminal, we cannot + # detect this combination. But it's really practical on Windows.) + if ( + ( + control_key_state & self.LEFT_CTRL_PRESSED + or control_key_state & self.RIGHT_CTRL_PRESSED + ) + and result + and result.key == Keys.ControlJ + ): + return [KeyPress(Keys.Escape, ""), result] + + # Return result. If alt was pressed, prefix the result with an + # 'Escape' key, just like unix VT100 terminals do. + + # NOTE: Only replace the left alt with escape. The right alt key often + # acts as altgr and is used in many non US keyboard layouts for + # typing some special characters, like a backslash. We don't want + # all backslashes to be prefixed with escape. (Esc-\ has a + # meaning in E-macs, for instance.) + if result: + meta_pressed = control_key_state & self.LEFT_ALT_PRESSED + + if meta_pressed: + return [KeyPress(Keys.Escape, ""), result] + else: + return [result] + + else: + return [] + + def _handle_mouse(self, ev: MOUSE_EVENT_RECORD) -> List[KeyPress]: + """ + Handle mouse events. Return a list of KeyPress instances. + """ + event_flags = ev.EventFlags + button_state = ev.ButtonState + + event_type: Optional[MouseEventType] = None + button: MouseButton = MouseButton.NONE + + # Scroll events. + if event_flags & MOUSE_WHEELED: + if button_state > 0: + event_type = MouseEventType.SCROLL_UP + else: + event_type = MouseEventType.SCROLL_DOWN + else: + # Handle button state for non-scroll events. + if button_state == FROM_LEFT_1ST_BUTTON_PRESSED: + button = MouseButton.LEFT + + elif button_state == RIGHTMOST_BUTTON_PRESSED: + button = MouseButton.RIGHT + + # Move events. + if event_flags & MOUSE_MOVED: + event_type = MouseEventType.MOUSE_MOVE + + # No key pressed anymore: mouse up. + if event_type is None: + if button_state > 0: + # Some button pressed. + event_type = MouseEventType.MOUSE_DOWN + else: + # No button pressed. + event_type = MouseEventType.MOUSE_UP + + data = ";".join( + [ + button.value, + event_type.value, + str(ev.MousePosition.X), + str(ev.MousePosition.Y), + ] + ) + return [KeyPress(Keys.WindowsMouseEvent, data)] + + +class _Win32Handles: + """ + Utility to keep track of which handles are connectod to which callbacks. + + `add_win32_handle` starts a tiny event loop in another thread which waits + for the Win32 handle to become ready. When this happens, the callback will + be called in the current asyncio event loop using `call_soon_threadsafe`. + + `remove_win32_handle` will stop this tiny event loop. + + NOTE: We use this technique, so that we don't have to use the + `ProactorEventLoop` on Windows and we can wait for things like stdin + in a `SelectorEventLoop`. This is important, because our inputhook + mechanism (used by IPython), only works with the `SelectorEventLoop`. + """ + + def __init__(self) -> None: + self._handle_callbacks: Dict[int, Callable[[], None]] = {} + + # Windows Events that are triggered when we have to stop watching this + # handle. + self._remove_events: Dict[int, HANDLE] = {} + + def add_win32_handle(self, handle: HANDLE, callback: Callable[[], None]) -> None: + """ + Add a Win32 handle to the event loop. + """ + handle_value = handle.value + + if handle_value is None: + raise ValueError("Invalid handle.") + + # Make sure to remove a previous registered handler first. + self.remove_win32_handle(handle) + + loop = get_event_loop() + self._handle_callbacks[handle_value] = callback + + # Create remove event. + remove_event = create_win32_event() + self._remove_events[handle_value] = remove_event + + # Add reader. + def ready() -> None: + # Tell the callback that input's ready. + try: + callback() + finally: + run_in_executor_with_context(wait, loop=loop) + + # Wait for the input to become ready. + # (Use an executor for this, the Windows asyncio event loop doesn't + # allow us to wait for handles like stdin.) + def wait() -> None: + # Wait until either the handle becomes ready, or the remove event + # has been set. + result = wait_for_handles([remove_event, handle]) + + if result is remove_event: + windll.kernel32.CloseHandle(remove_event) + return + else: + loop.call_soon_threadsafe(ready) + + run_in_executor_with_context(wait, loop=loop) + + def remove_win32_handle(self, handle: HANDLE) -> Optional[Callable[[], None]]: + """ + Remove a Win32 handle from the event loop. + Return either the registered handler or `None`. + """ + if handle.value is None: + return None # Ignore. + + # Trigger remove events, so that the reader knows to stop. + try: + event = self._remove_events.pop(handle.value) + except KeyError: + pass + else: + windll.kernel32.SetEvent(event) + + try: + return self._handle_callbacks.pop(handle.value) + except KeyError: + return None + + +@contextmanager +def attach_win32_input( + input: _Win32InputBase, callback: Callable[[], None] +) -> Iterator[None]: + """ + Context manager that makes this input active in the current event loop. + + :param input: :class:`~prompt_toolkit.input.Input` object. + :param input_ready_callback: Called when the input is ready to read. + """ + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + # Add reader. + previous_callback = win32_handles.remove_win32_handle(handle) + win32_handles.add_win32_handle(handle, callback) + + try: + yield + finally: + win32_handles.remove_win32_handle(handle) + + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +@contextmanager +def detach_win32_input(input: _Win32InputBase) -> Iterator[None]: + win32_handles = input.win32_handles + handle = input.handle + + if handle.value is None: + raise ValueError("Invalid handle.") + + previous_callback = win32_handles.remove_win32_handle(handle) + + try: + yield + finally: + if previous_callback: + win32_handles.add_win32_handle(handle, previous_callback) + + +class raw_mode: + """ + :: + + with raw_mode(stdin): + ''' the windows terminal is now in 'raw' mode. ''' + + The ``fileno`` attribute is ignored. This is to be compatible with the + `raw_input` method of `.vt100_input`. + """ + + def __init__(self, fileno: Optional[int] = None) -> None: + self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE)) + + def __enter__(self) -> None: + # Remember original mode. + original_mode = DWORD() + windll.kernel32.GetConsoleMode(self.handle, pointer(original_mode)) + self.original_mode = original_mode + + self._patch() + + def _patch(self) -> None: + # Set raw + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) + + def __exit__(self, *a: object) -> None: + # Restore original mode + windll.kernel32.SetConsoleMode(self.handle, self.original_mode) + + +class cooked_mode(raw_mode): + """ + :: + + with cooked_mode(stdin): + ''' The pseudo-terminal stdin is now used in cooked mode. ''' + """ + + def _patch(self) -> None: + # Set cooked. + ENABLE_ECHO_INPUT = 0x0004 + ENABLE_LINE_INPUT = 0x0002 + ENABLE_PROCESSED_INPUT = 0x0001 + + windll.kernel32.SetConsoleMode( + self.handle, + self.original_mode.value + | (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT), + ) |