aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/prompt-toolkit/py3/prompt_toolkit/input
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2025-02-04 23:55:58 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2025-02-05 00:13:10 +0300
commite0149bcce6c022b2baaf22dc46dfad00080d041d (patch)
tree8d8629279ddbec4d3cbd1756cd1348c7c1d53706 /contrib/python/prompt-toolkit/py3/prompt_toolkit/input
parent850bc4677d9c730e49444ba0fba91309d9cadd0b (diff)
downloadydb-e0149bcce6c022b2baaf22dc46dfad00080d041d.tar.gz
Intermediate changes
commit_hash:fe9cb645107d4e98cea6850ff89242dd287facbb
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/input')
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py153
1 files changed, 145 insertions, 8 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
index 322d7c0d72..1ff3234a39 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/win32.py
@@ -16,7 +16,7 @@ if not SPHINX_AUTODOC_RUNNING:
import msvcrt
from ctypes import windll
-from ctypes import Array, pointer
+from ctypes import Array, byref, pointer
from ctypes.wintypes import DWORD, HANDLE
from typing import Callable, ContextManager, Iterable, Iterator, TextIO
@@ -35,6 +35,7 @@ from prompt_toolkit.win32_types import (
from .ansi_escape_sequences import REVERSE_ANSI_SEQUENCES
from .base import Input
+from .vt100_parser import Vt100Parser
__all__ = [
"Win32Input",
@@ -52,6 +53,9 @@ RIGHTMOST_BUTTON_PRESSED = 0x2
MOUSE_MOVED = 0x0001
MOUSE_WHEELED = 0x0004
+# See: https://msdn.microsoft.com/pl-pl/library/windows/desktop/ms686033(v=vs.85).aspx
+ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200
+
class _Win32InputBase(Input):
"""
@@ -74,7 +78,14 @@ class Win32Input(_Win32InputBase):
def __init__(self, stdin: TextIO | None = None) -> None:
super().__init__()
- self.console_input_reader = ConsoleInputReader()
+ self._use_virtual_terminal_input = _is_win_vt100_input_enabled()
+
+ self.console_input_reader: Vt100ConsoleInputReader | ConsoleInputReader
+
+ if self._use_virtual_terminal_input:
+ self.console_input_reader = Vt100ConsoleInputReader()
+ else:
+ self.console_input_reader = ConsoleInputReader()
def attach(self, input_ready_callback: Callable[[], None]) -> ContextManager[None]:
"""
@@ -101,7 +112,9 @@ class Win32Input(_Win32InputBase):
return False
def raw_mode(self) -> ContextManager[None]:
- return raw_mode()
+ return raw_mode(
+ use_win10_virtual_terminal_input=self._use_virtual_terminal_input
+ )
def cooked_mode(self) -> ContextManager[None]:
return cooked_mode()
@@ -555,6 +568,102 @@ class ConsoleInputReader:
return [KeyPress(Keys.WindowsMouseEvent, data)]
+class Vt100ConsoleInputReader:
+ """
+ Similar to `ConsoleInputReader`, but for usage when
+ `ENABLE_VIRTUAL_TERMINAL_INPUT` is enabled. This assumes that Windows sends
+ us the right vt100 escape sequences and we parse those with our vt100
+ parser.
+
+ (Using this instead of `ConsoleInputReader` results in the "data" attribute
+ from the `KeyPress` instances to be more correct in edge cases, because
+ this responds to for instance the terminal being in application cursor keys
+ mode.)
+ """
+
+ def __init__(self) -> None:
+ self._fdcon = None
+
+ self._buffer: list[KeyPress] = [] # Buffer to collect the Key objects.
+ self._vt100_parser = Vt100Parser(
+ lambda key_press: self._buffer.append(key_press)
+ )
+
+ # When stdin is a tty, use that handle, otherwise, create a handle from
+ # CONIN$.
+ self.handle: HANDLE
+ if sys.stdin.isatty():
+ self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+ else:
+ self._fdcon = os.open("CONIN$", os.O_RDWR | os.O_BINARY)
+ self.handle = HANDLE(msvcrt.get_osfhandle(self._fdcon))
+
+ def close(self) -> None:
+ "Close fdcon."
+ if self._fdcon is not None:
+ os.close(self._fdcon)
+
+ def read(self) -> Iterable[KeyPress]:
+ """
+ Return a list of `KeyPress` instances. It won't return anything when
+ there was nothing to read. (This function doesn't block.)
+
+ http://msdn.microsoft.com/en-us/library/windows/desktop/ms684961(v=vs.85).aspx
+ """
+ max_count = 2048 # Max events to read at the same time.
+
+ read = DWORD(0)
+ arrtype = INPUT_RECORD * max_count
+ input_records = arrtype()
+
+ # Check whether there is some input to read. `ReadConsoleInputW` would
+ # block otherwise.
+ # (Actually, the event loop is responsible to make sure that this
+ # function is only called when there is something to read, but for some
+ # reason this happened in the asyncio_win32 loop, and it's better to be
+ # safe anyway.)
+ if not wait_for_handles([self.handle], timeout=0):
+ return []
+
+ # Get next batch of input event.
+ windll.kernel32.ReadConsoleInputW(
+ self.handle, pointer(input_records), max_count, pointer(read)
+ )
+
+ # First, get all the keys from the input buffer, in order to determine
+ # whether we should consider this a paste event or not.
+ for key_data in self._get_keys(read, input_records):
+ self._vt100_parser.feed(key_data)
+
+ # Return result.
+ result = self._buffer
+ self._buffer = []
+ return result
+
+ def _get_keys(
+ self, read: DWORD, input_records: Array[INPUT_RECORD]
+ ) -> Iterator[str]:
+ """
+ Generator that yields `KeyPress` objects from the input records.
+ """
+ for i in range(read.value):
+ ir = input_records[i]
+
+ # Get the right EventType from the EVENT_RECORD.
+ # (For some reason the Windows console application 'cmder'
+ # [http://gooseberrycreative.com/cmder/] can return '0' for
+ # ir.EventType. -- Just ignore that.)
+ if ir.EventType in EventTypes:
+ ev = getattr(ir.Event, EventTypes[ir.EventType])
+
+ # Process if this is a key event. (We also have mouse, menu and
+ # focus events.)
+ if isinstance(ev, KEY_EVENT_RECORD) and ev.KeyDown:
+ u_char = ev.uChar.UnicodeChar
+ if u_char != "\x00":
+ yield u_char
+
+
class _Win32Handles:
"""
Utility to keep track of which handles are connectod to which callbacks.
@@ -700,8 +809,11 @@ class raw_mode:
`raw_input` method of `.vt100_input`.
"""
- def __init__(self, fileno: int | None = None) -> None:
+ def __init__(
+ self, fileno: int | None = None, use_win10_virtual_terminal_input: bool = False
+ ) -> None:
self.handle = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+ self.use_win10_virtual_terminal_input = use_win10_virtual_terminal_input
def __enter__(self) -> None:
# Remember original mode.
@@ -717,12 +829,15 @@ class raw_mode:
ENABLE_LINE_INPUT = 0x0002
ENABLE_PROCESSED_INPUT = 0x0001
- windll.kernel32.SetConsoleMode(
- self.handle,
- self.original_mode.value
- & ~(ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
+ new_mode = self.original_mode.value & ~(
+ ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT
)
+ if self.use_win10_virtual_terminal_input:
+ new_mode |= ENABLE_VIRTUAL_TERMINAL_INPUT
+
+ windll.kernel32.SetConsoleMode(self.handle, new_mode)
+
def __exit__(self, *a: object) -> None:
# Restore original mode
windll.kernel32.SetConsoleMode(self.handle, self.original_mode)
@@ -747,3 +862,25 @@ class cooked_mode(raw_mode):
self.original_mode.value
| (ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT),
)
+
+
+def _is_win_vt100_input_enabled() -> bool:
+ """
+ Returns True when we're running Windows and VT100 escape sequences are
+ supported.
+ """
+ hconsole = HANDLE(windll.kernel32.GetStdHandle(STD_INPUT_HANDLE))
+
+ # Get original console mode.
+ original_mode = DWORD(0)
+ windll.kernel32.GetConsoleMode(hconsole, byref(original_mode))
+
+ try:
+ # Try to enable VT100 sequences.
+ result: int = windll.kernel32.SetConsoleMode(
+ hconsole, DWORD(ENABLE_VIRTUAL_TERMINAL_INPUT)
+ )
+
+ return result == 1
+ finally:
+ windll.kernel32.SetConsoleMode(hconsole, original_mode)