diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py')
-rw-r--r-- | contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py | 247 |
1 files changed, 0 insertions, 247 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py deleted file mode 100644 index 3ee1e14fdda..00000000000 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/input/vt100_parser.py +++ /dev/null @@ -1,247 +0,0 @@ -""" -Parser for VT100 input stream. -""" -import re -from typing import Callable, Dict, Generator, Tuple, Union - -from ..key_binding.key_processor import KeyPress -from ..keys import Keys -from .ansi_escape_sequences import ANSI_SEQUENCES - -__all__ = [ - "Vt100Parser", -] - - -# Regex matching any CPR response -# (Note that we use '\Z' instead of '$', because '$' could include a trailing -# newline.) -_cpr_response_re = re.compile("^" + re.escape("\x1b[") + r"\d+;\d+R\Z") - -# Mouse events: -# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M" -_mouse_event_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]+[mM]|M...)\Z") - -# Regex matching any valid prefix of a CPR response. -# (Note that it doesn't contain the last character, the 'R'. The prefix has to -# be shorter.) -_cpr_response_prefix_re = re.compile("^" + re.escape("\x1b[") + r"[\d;]*\Z") - -_mouse_event_prefix_re = re.compile("^" + re.escape("\x1b[") + r"(<?[\d;]*|M.{0,2})\Z") - - -class _Flush: - """Helper object to indicate flush operation to the parser.""" - - pass - - -class _IsPrefixOfLongerMatchCache(Dict[str, bool]): - """ - Dictionary that maps input sequences to a boolean indicating whether there is - any key that start with this characters. - """ - - def __missing__(self, prefix: str) -> bool: - # (hard coded) If this could be a prefix of a CPR response, return - # True. - if _cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match( - prefix - ): - result = True - else: - # If this could be a prefix of anything else, also return True. - result = any( - v - for k, v in ANSI_SEQUENCES.items() - if k.startswith(prefix) and k != prefix - ) - - self[prefix] = result - return result - - -_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache() - - -class Vt100Parser: - """ - Parser for VT100 input stream. - Data can be fed through the `feed` method and the given callback will be - called with KeyPress objects. - - :: - - def callback(key): - pass - i = Vt100Parser(callback) - i.feed('data\x01...') - - :attr feed_key_callback: Function that will be called when a key is parsed. - """ - - # Lookup table of ANSI escape sequences for a VT100 terminal - # Hint: in order to know what sequences your terminal writes to stdin, run - # "od -c" and start typing. - def __init__(self, feed_key_callback: Callable[[KeyPress], None]) -> None: - self.feed_key_callback = feed_key_callback - self.reset() - - def reset(self, request: bool = False) -> None: - self._in_bracketed_paste = False - self._start_parser() - - def _start_parser(self) -> None: - """ - Start the parser coroutine. - """ - self._input_parser = self._input_parser_generator() - self._input_parser.send(None) # type: ignore - - def _get_match(self, prefix: str) -> Union[None, Keys, Tuple[Keys, ...]]: - """ - Return the key (or keys) that maps to this prefix. - """ - # (hard coded) If we match a CPR response, return Keys.CPRResponse. - # (This one doesn't fit in the ANSI_SEQUENCES, because it contains - # integer variables.) - if _cpr_response_re.match(prefix): - return Keys.CPRResponse - - elif _mouse_event_re.match(prefix): - return Keys.Vt100MouseEvent - - # Otherwise, use the mappings. - try: - return ANSI_SEQUENCES[prefix] - except KeyError: - return None - - def _input_parser_generator(self) -> Generator[None, Union[str, _Flush], None]: - """ - Coroutine (state machine) for the input parser. - """ - prefix = "" - retry = False - flush = False - - while True: - flush = False - - if retry: - retry = False - else: - # Get next character. - c = yield - - if isinstance(c, _Flush): - flush = True - else: - prefix += c - - # If we have some data, check for matches. - if prefix: - is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix] - match = self._get_match(prefix) - - # Exact matches found, call handlers.. - if (flush or not is_prefix_of_longer_match) and match: - self._call_handler(match, prefix) - prefix = "" - - # No exact match found. - elif (flush or not is_prefix_of_longer_match) and not match: - found = False - retry = True - - # Loop over the input, try the longest match first and - # shift. - for i in range(len(prefix), 0, -1): - match = self._get_match(prefix[:i]) - if match: - self._call_handler(match, prefix[:i]) - prefix = prefix[i:] - found = True - - if not found: - self._call_handler(prefix[0], prefix[0]) - prefix = prefix[1:] - - def _call_handler( - self, key: Union[str, Keys, Tuple[Keys, ...]], insert_text: str - ) -> None: - """ - Callback to handler. - """ - if isinstance(key, tuple): - # Received ANSI sequence that corresponds with multiple keys - # (probably alt+something). Handle keys individually, but only pass - # data payload to first KeyPress (so that we won't insert it - # multiple times). - for i, k in enumerate(key): - self._call_handler(k, insert_text if i == 0 else "") - else: - if key == Keys.BracketedPaste: - self._in_bracketed_paste = True - self._paste_buffer = "" - else: - self.feed_key_callback(KeyPress(key, insert_text)) - - def feed(self, data: str) -> None: - """ - Feed the input stream. - - :param data: Input string (unicode). - """ - # Handle bracketed paste. (We bypass the parser that matches all other - # key presses and keep reading input until we see the end mark.) - # This is much faster then parsing character by character. - if self._in_bracketed_paste: - self._paste_buffer += data - end_mark = "\x1b[201~" - - if end_mark in self._paste_buffer: - end_index = self._paste_buffer.index(end_mark) - - # Feed content to key bindings. - paste_content = self._paste_buffer[:end_index] - self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content)) - - # Quit bracketed paste mode and handle remaining input. - self._in_bracketed_paste = False - remaining = self._paste_buffer[end_index + len(end_mark) :] - self._paste_buffer = "" - - self.feed(remaining) - - # Handle normal input character by character. - else: - for i, c in enumerate(data): - if self._in_bracketed_paste: - # Quit loop and process from this position when the parser - # entered bracketed paste. - self.feed(data[i:]) - break - else: - self._input_parser.send(c) - - def flush(self) -> None: - """ - Flush the buffer of the input stream. - - This will allow us to handle the escape key (or maybe meta) sooner. - The input received by the escape key is actually the same as the first - characters of e.g. Arrow-Up, so without knowing what follows the escape - sequence, we don't know whether escape has been pressed, or whether - it's something else. This flush function should be called after a - timeout, and processes everything that's still in the buffer as-is, so - without assuming any characters will follow. - """ - self._input_parser.send(_Flush()) - - def feed_and_flush(self, data: str) -> None: - """ - Wrapper around ``feed`` and ``flush``. - """ - self.feed(data) - self.flush() |