diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-02-08 15:26:58 +0300 |
---|---|---|
committer | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-02-08 15:26:58 +0300 |
commit | 2efaaefec2cb2a55d55c01753d1eed2a3296adb5 (patch) | |
tree | 27ec5258b325565c3963b91b36d64e84084fdb04 /contrib/python/prompt-toolkit/py3/prompt_toolkit/output | |
parent | 23793c5d0827a08b5ff9242d2123eff92b681912 (diff) | |
download | ydb-2efaaefec2cb2a55d55c01753d1eed2a3296adb5.tar.gz |
intermediate changes
ref:0bfa27bb6c38df8c510497e54e4ebf0b4fb84503
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/output')
6 files changed, 300 insertions, 94 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/base.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/base.py index 0d6be34842..c78677bc8b 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/base.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/base.py @@ -4,6 +4,7 @@ Interface for an output. from abc import ABCMeta, abstractmethod from typing import Optional, TextIO +from prompt_toolkit.cursor_shapes import CursorShape from prompt_toolkit.data_structures import Size from prompt_toolkit.styles import Attrs @@ -140,6 +141,14 @@ class Output(metaclass=ABCMeta): def show_cursor(self) -> None: "Show cursor." + @abstractmethod + def set_cursor_shape(self, cursor_shape: CursorShape) -> None: + "Set cursor shape to block, beam or underline." + + @abstractmethod + def reset_cursor_shape(self) -> None: + "Reset cursor shape." + def ask_for_cpr(self) -> None: """ Asks for a cursor position report (CPR). @@ -289,6 +298,12 @@ class DummyOutput(Output): def show_cursor(self) -> None: pass + def set_cursor_shape(self, cursor_shape: CursorShape) -> None: + pass + + def reset_cursor_shape(self) -> None: + pass + def ask_for_cpr(self) -> None: pass diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/defaults.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/defaults.py index fc107a4d6e..bd4bf950c4 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/defaults.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/defaults.py @@ -10,6 +10,7 @@ from prompt_toolkit.utils import ( from .base import DummyOutput, Output from .color_depth import ColorDepth +from .plain_text import PlainTextOutput __all__ = [ "create_output", @@ -17,7 +18,7 @@ __all__ = [ def create_output( - stdout: Optional[TextIO] = None, always_prefer_tty: bool = True + stdout: Optional[TextIO] = None, always_prefer_tty: bool = False ) -> Output: """ Return an :class:`~prompt_toolkit.output.Output` instance for the command @@ -25,9 +26,13 @@ def create_output( :param stdout: The stdout object :param always_prefer_tty: When set, look for `sys.stderr` if `sys.stdout` - is not a TTY. (The prompt_toolkit render output is not meant to be - consumed by something other then a terminal, so this is a reasonable - default.) + is not a TTY. Useful if `sys.stdout` is redirected to a file, but we + still want user input and output on the terminal. + + By default, this is `False`. If `sys.stdout` is not a terminal (maybe + it's redirected to a file), then a `PlainTextOutput` will be returned. + That way, tools like `print_formatted_text` will write plain text into + that file. """ # Consider TERM, PROMPT_TOOLKIT_BELL, and PROMPT_TOOLKIT_COLOR_DEPTH # environment variables. Notice that PROMPT_TOOLKIT_COLOR_DEPTH value is @@ -82,6 +87,12 @@ def create_output( else: from .vt100 import Vt100_Output + # Stdout is not a TTY? Render as plain text. + # This is mostly useful if stdout is redirected to a file, and + # `print_formatted_text` is used. + if not stdout.isatty(): + return PlainTextOutput(stdout) + return Vt100_Output.from_pty( stdout, term=term_from_env, diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/flush_stdout.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/flush_stdout.py new file mode 100644 index 0000000000..4adcbd109d --- /dev/null +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/flush_stdout.py @@ -0,0 +1,84 @@ +import errno +import os +import sys +from contextlib import contextmanager +from typing import IO, Iterator, TextIO, cast + +__all__ = ["flush_stdout"] + + +def flush_stdout(stdout: TextIO, data: str, write_binary: bool) -> None: + try: + # Ensure that `stdout` is made blocking when writing into it. + # Otherwise, when uvloop is activated (which makes stdout + # non-blocking), and we write big amounts of text, then we get a + # `BlockingIOError` here. + with _blocking_io(stdout): + # (We try to encode ourself, because that way we can replace + # characters that don't exist in the character set, avoiding + # UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.) + # My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968' + # for sys.stdout.encoding in xterm. + out: IO[bytes] + if write_binary: + if hasattr(stdout, "buffer"): + out = stdout.buffer + else: + # IO[bytes] was given to begin with. + # (Used in the unit tests, for instance.) + out = cast(IO[bytes], stdout) + out.write(data.encode(stdout.encoding or "utf-8", "replace")) + else: + stdout.write(data) + + stdout.flush() + except IOError as e: + if e.args and e.args[0] == errno.EINTR: + # Interrupted system call. Can happen in case of a window + # resize signal. (Just ignore. The resize handler will render + # again anyway.) + pass + elif e.args and e.args[0] == 0: + # This can happen when there is a lot of output and the user + # sends a KeyboardInterrupt by pressing Control-C. E.g. in + # a Python REPL when we execute "while True: print('test')". + # (The `ptpython` REPL uses this `Output` class instead of + # `stdout` directly -- in order to be network transparent.) + # So, just ignore. + pass + else: + raise + + +@contextmanager +def _blocking_io(io: IO[str]) -> Iterator[None]: + """ + Ensure that the FD for `io` is set to blocking in here. + """ + if sys.platform == "win32": + # On Windows, the `os` module doesn't have a `get/set_blocking` + # function. + yield + return + + try: + fd = io.fileno() + blocking = os.get_blocking(fd) + except: # noqa + # Failed somewhere. + # `get_blocking` can raise `OSError`. + # The io object can raise `AttributeError` when no `fileno()` method is + # present if we're not a real file object. + blocking = True # Assume we're good, and don't do anything. + + try: + # Make blocking if we weren't blocking yet. + if not blocking: + os.set_blocking(fd, True) + + yield + + finally: + # Restore original blocking mode. + if not blocking: + os.set_blocking(fd, blocking) diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/plain_text.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/plain_text.py new file mode 100644 index 0000000000..23c1e9453f --- /dev/null +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/plain_text.py @@ -0,0 +1,145 @@ +from typing import List, TextIO + +from prompt_toolkit.cursor_shapes import CursorShape +from prompt_toolkit.data_structures import Size +from prompt_toolkit.styles import Attrs + +from .base import Output +from .color_depth import ColorDepth +from .flush_stdout import flush_stdout + +__all__ = ["PlainTextOutput"] + + +class PlainTextOutput(Output): + """ + Output that won't include any ANSI escape sequences. + + Useful when stdout is not a terminal. Maybe stdout is redirected to a file. + In this case, if `print_formatted_text` is used, for instance, we don't + want to include formatting. + + (The code is mostly identical to `Vt100_Output`, but without the + formatting.) + """ + + def __init__(self, stdout: TextIO, write_binary: bool = True) -> None: + assert all(hasattr(stdout, a) for a in ("write", "flush")) + + if write_binary: + assert hasattr(stdout, "encoding") + + self.stdout: TextIO = stdout + self.write_binary = write_binary + self._buffer: List[str] = [] + + def fileno(self) -> int: + "There is no sensible default for fileno()." + return self.stdout.fileno() + + def encoding(self) -> str: + return "utf-8" + + def write(self, data: str) -> None: + self._buffer.append(data) + + def write_raw(self, data: str) -> None: + self._buffer.append(data) + + def set_title(self, title: str) -> None: + pass + + def clear_title(self) -> None: + pass + + def flush(self) -> None: + if not self._buffer: + return + + data = "".join(self._buffer) + self._buffer = [] + flush_stdout(self.stdout, data, write_binary=self.write_binary) + + def erase_screen(self) -> None: + pass + + def enter_alternate_screen(self) -> None: + pass + + def quit_alternate_screen(self) -> None: + pass + + def enable_mouse_support(self) -> None: + pass + + def disable_mouse_support(self) -> None: + pass + + def erase_end_of_line(self) -> None: + pass + + def erase_down(self) -> None: + pass + + def reset_attributes(self) -> None: + pass + + def set_attributes(self, attrs: Attrs, color_depth: ColorDepth) -> None: + pass + + def disable_autowrap(self) -> None: + pass + + def enable_autowrap(self) -> None: + pass + + def cursor_goto(self, row: int = 0, column: int = 0) -> None: + pass + + def cursor_up(self, amount: int) -> None: + pass + + def cursor_down(self, amount: int) -> None: + self._buffer.append("\n") + + def cursor_forward(self, amount: int) -> None: + self._buffer.append(" " * amount) + + def cursor_backward(self, amount: int) -> None: + pass + + def hide_cursor(self) -> None: + pass + + def show_cursor(self) -> None: + pass + + def set_cursor_shape(self, cursor_shape: CursorShape) -> None: + pass + + def reset_cursor_shape(self) -> None: + pass + + def ask_for_cpr(self) -> None: + pass + + def bell(self) -> None: + pass + + def enable_bracketed_paste(self) -> None: + pass + + def disable_bracketed_paste(self) -> None: + pass + + def scroll_buffer_to_prompt(self) -> None: + pass + + def get_size(self) -> Size: + return Size(rows=40, columns=80) + + def get_rows_below_cursor_position(self) -> int: + return 8 + + def get_default_color_depth(self) -> ColorDepth: + return ColorDepth.DEPTH_1_BIT diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py index 686303fa7b..0586267286 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py @@ -6,34 +6,30 @@ A lot of thanks, regarding outputting of colors, goes to the Pygments project: everything has been highly optimized.) http://pygments.org/ """ -import array -import errno import io import os import sys -from contextlib import contextmanager from typing import ( - IO, Callable, Dict, Hashable, Iterable, - Iterator, List, Optional, Sequence, Set, TextIO, Tuple, - cast, ) +from prompt_toolkit.cursor_shapes import CursorShape from prompt_toolkit.data_structures import Size from prompt_toolkit.output import Output from prompt_toolkit.styles import ANSI_COLOR_NAMES, Attrs from prompt_toolkit.utils import is_dumb_terminal from .color_depth import ColorDepth +from .flush_stdout import flush_stdout __all__ = [ "Vt100_Output", @@ -443,6 +439,11 @@ class Vt100_Output(Output): ColorDepth.DEPTH_24_BIT: _EscapeCodeCache(ColorDepth.DEPTH_24_BIT), } + # Keep track of whether the cursor shape was ever changed. + # (We don't restore the cursor shape if it was never changed - by + # default, we don't change them.) + self._cursor_shape_changed = False + @classmethod def from_pty( cls, @@ -663,6 +664,31 @@ class Vt100_Output(Output): def show_cursor(self) -> None: self.write_raw("\x1b[?12l\x1b[?25h") # Stop blinking cursor and show. + def set_cursor_shape(self, cursor_shape: CursorShape) -> None: + if cursor_shape == CursorShape._NEVER_CHANGE: + return + + self._cursor_shape_changed = True + self.write_raw( + { + CursorShape.BLOCK: "\x1b[2 q", + CursorShape.BEAM: "\x1b[6 q", + CursorShape.UNDERLINE: "\x1b[4 q", + CursorShape.BLINKING_BLOCK: "\x1b[1 q", + CursorShape.BLINKING_BEAM: "\x1b[5 q", + CursorShape.BLINKING_UNDERLINE: "\x1b[3 q", + }.get(cursor_shape, "") + ) + + def reset_cursor_shape(self) -> None: + "Reset cursor shape." + # (Only reset cursor shape, if we ever changed it.) + if self._cursor_shape_changed: + self._cursor_shape_changed = False + + # Reset cursor shape. + self.write_raw("\x1b[0 q") + def flush(self) -> None: """ Write to output stream and flush. @@ -673,46 +699,7 @@ class Vt100_Output(Output): data = "".join(self._buffer) self._buffer = [] - try: - # Ensure that `self.stdout` is made blocking when writing into it. - # Otherwise, when uvloop is activated (which makes stdout - # non-blocking), and we write big amounts of text, then we get a - # `BlockingIOError` here. - with blocking_io(self.stdout): - # (We try to encode ourself, because that way we can replace - # characters that don't exist in the character set, avoiding - # UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.) - # My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968' - # for sys.stdout.encoding in xterm. - out: IO[bytes] - if self.write_binary: - if hasattr(self.stdout, "buffer"): - out = self.stdout.buffer - else: - # IO[bytes] was given to begin with. - # (Used in the unit tests, for instance.) - out = cast(IO[bytes], self.stdout) - out.write(data.encode(self.stdout.encoding or "utf-8", "replace")) - else: - self.stdout.write(data) - - self.stdout.flush() - except IOError as e: - if e.args and e.args[0] == errno.EINTR: - # Interrupted system call. Can happen in case of a window - # resize signal. (Just ignore. The resize handler will render - # again anyway.) - pass - elif e.args and e.args[0] == 0: - # This can happen when there is a lot of output and the user - # sends a KeyboardInterrupt by pressing Control-C. E.g. in - # a Python REPL when we execute "while True: print('test')". - # (The `ptpython` REPL uses this `Output` class instead of - # `stdout` directly -- in order to be network transparent.) - # So, just ignore. - pass - else: - raise + flush_stdout(self.stdout, data, write_binary=self.write_binary) def ask_for_cpr(self) -> None: """ @@ -764,37 +751,3 @@ class Vt100_Output(Output): return ColorDepth.DEPTH_4_BIT return ColorDepth.DEFAULT - - -@contextmanager -def blocking_io(io: IO[str]) -> Iterator[None]: - """ - Ensure that the FD for `io` is set to blocking in here. - """ - if sys.platform == "win32": - # On Windows, the `os` module doesn't have a `get/set_blocking` - # function. - yield - return - - try: - fd = io.fileno() - blocking = os.get_blocking(fd) - except: # noqa - # Failed somewhere. - # `get_blocking` can raise `OSError`. - # The io object can raise `AttributeError` when no `fileno()` method is - # present if we're not a real file object. - blocking = True # Assume we're good, and don't do anything. - - try: - # Make blocking if we weren't blocking yet. - if not blocking: - os.set_blocking(fd, True) - - yield - - finally: - # Restore original blocking mode. - if not blocking: - os.set_blocking(fd, blocking) diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/win32.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/win32.py index f26066cb99..abfd61774b 100644 --- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/win32.py +++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/win32.py @@ -1,14 +1,5 @@ import os -from ctypes import ( - ArgumentError, - byref, - c_char, - c_long, - c_short, - c_uint, - c_ulong, - pointer, -) +from ctypes import ArgumentError, byref, c_char, c_long, c_uint, c_ulong, pointer from ..utils import SPHINX_AUTODOC_RUNNING @@ -20,6 +11,7 @@ if not SPHINX_AUTODOC_RUNNING: from ctypes.wintypes import DWORD, HANDLE from typing import Callable, Dict, List, Optional, TextIO, Tuple, Type, TypeVar, Union +from prompt_toolkit.cursor_shapes import CursorShape from prompt_toolkit.data_structures import Size from prompt_toolkit.styles import ANSI_COLOR_NAMES, Attrs from prompt_toolkit.utils import get_cwidth @@ -498,6 +490,12 @@ class Win32Output(Output): def show_cursor(self) -> None: pass + def set_cursor_shape(self, cursor_shape: CursorShape) -> None: + pass + + def reset_cursor_shape(self) -> None: + pass + @classmethod def win32_refresh_window(cls) -> None: """ |