aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-02-08 15:26:58 +0300
committerarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-02-08 15:26:58 +0300
commit2efaaefec2cb2a55d55c01753d1eed2a3296adb5 (patch)
tree27ec5258b325565c3963b91b36d64e84084fdb04 /contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py
parent23793c5d0827a08b5ff9242d2123eff92b681912 (diff)
downloadydb-2efaaefec2cb2a55d55c01753d1eed2a3296adb5.tar.gz
intermediate changes
ref:0bfa27bb6c38df8c510497e54e4ebf0b4fb84503
Diffstat (limited to 'contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py')
-rw-r--r--contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py113
1 files changed, 33 insertions, 80 deletions
diff --git a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py
index 686303fa7b..0586267286 100644
--- a/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py
+++ b/contrib/python/prompt-toolkit/py3/prompt_toolkit/output/vt100.py
@@ -6,34 +6,30 @@ A lot of thanks, regarding outputting of colors, goes to the Pygments project:
everything has been highly optimized.)
http://pygments.org/
"""
-import array
-import errno
import io
import os
import sys
-from contextlib import contextmanager
from typing import (
- IO,
Callable,
Dict,
Hashable,
Iterable,
- Iterator,
List,
Optional,
Sequence,
Set,
TextIO,
Tuple,
- cast,
)
+from prompt_toolkit.cursor_shapes import CursorShape
from prompt_toolkit.data_structures import Size
from prompt_toolkit.output import Output
from prompt_toolkit.styles import ANSI_COLOR_NAMES, Attrs
from prompt_toolkit.utils import is_dumb_terminal
from .color_depth import ColorDepth
+from .flush_stdout import flush_stdout
__all__ = [
"Vt100_Output",
@@ -443,6 +439,11 @@ class Vt100_Output(Output):
ColorDepth.DEPTH_24_BIT: _EscapeCodeCache(ColorDepth.DEPTH_24_BIT),
}
+ # Keep track of whether the cursor shape was ever changed.
+ # (We don't restore the cursor shape if it was never changed - by
+ # default, we don't change them.)
+ self._cursor_shape_changed = False
+
@classmethod
def from_pty(
cls,
@@ -663,6 +664,31 @@ class Vt100_Output(Output):
def show_cursor(self) -> None:
self.write_raw("\x1b[?12l\x1b[?25h") # Stop blinking cursor and show.
+ def set_cursor_shape(self, cursor_shape: CursorShape) -> None:
+ if cursor_shape == CursorShape._NEVER_CHANGE:
+ return
+
+ self._cursor_shape_changed = True
+ self.write_raw(
+ {
+ CursorShape.BLOCK: "\x1b[2 q",
+ CursorShape.BEAM: "\x1b[6 q",
+ CursorShape.UNDERLINE: "\x1b[4 q",
+ CursorShape.BLINKING_BLOCK: "\x1b[1 q",
+ CursorShape.BLINKING_BEAM: "\x1b[5 q",
+ CursorShape.BLINKING_UNDERLINE: "\x1b[3 q",
+ }.get(cursor_shape, "")
+ )
+
+ def reset_cursor_shape(self) -> None:
+ "Reset cursor shape."
+ # (Only reset cursor shape, if we ever changed it.)
+ if self._cursor_shape_changed:
+ self._cursor_shape_changed = False
+
+ # Reset cursor shape.
+ self.write_raw("\x1b[0 q")
+
def flush(self) -> None:
"""
Write to output stream and flush.
@@ -673,46 +699,7 @@ class Vt100_Output(Output):
data = "".join(self._buffer)
self._buffer = []
- try:
- # Ensure that `self.stdout` is made blocking when writing into it.
- # Otherwise, when uvloop is activated (which makes stdout
- # non-blocking), and we write big amounts of text, then we get a
- # `BlockingIOError` here.
- with blocking_io(self.stdout):
- # (We try to encode ourself, because that way we can replace
- # characters that don't exist in the character set, avoiding
- # UnicodeEncodeError crashes. E.g. u'\xb7' does not appear in 'ascii'.)
- # My Arch Linux installation of july 2015 reported 'ANSI_X3.4-1968'
- # for sys.stdout.encoding in xterm.
- out: IO[bytes]
- if self.write_binary:
- if hasattr(self.stdout, "buffer"):
- out = self.stdout.buffer
- else:
- # IO[bytes] was given to begin with.
- # (Used in the unit tests, for instance.)
- out = cast(IO[bytes], self.stdout)
- out.write(data.encode(self.stdout.encoding or "utf-8", "replace"))
- else:
- self.stdout.write(data)
-
- self.stdout.flush()
- except IOError as e:
- if e.args and e.args[0] == errno.EINTR:
- # Interrupted system call. Can happen in case of a window
- # resize signal. (Just ignore. The resize handler will render
- # again anyway.)
- pass
- elif e.args and e.args[0] == 0:
- # This can happen when there is a lot of output and the user
- # sends a KeyboardInterrupt by pressing Control-C. E.g. in
- # a Python REPL when we execute "while True: print('test')".
- # (The `ptpython` REPL uses this `Output` class instead of
- # `stdout` directly -- in order to be network transparent.)
- # So, just ignore.
- pass
- else:
- raise
+ flush_stdout(self.stdout, data, write_binary=self.write_binary)
def ask_for_cpr(self) -> None:
"""
@@ -764,37 +751,3 @@ class Vt100_Output(Output):
return ColorDepth.DEPTH_4_BIT
return ColorDepth.DEFAULT
-
-
-@contextmanager
-def blocking_io(io: IO[str]) -> Iterator[None]:
- """
- Ensure that the FD for `io` is set to blocking in here.
- """
- if sys.platform == "win32":
- # On Windows, the `os` module doesn't have a `get/set_blocking`
- # function.
- yield
- return
-
- try:
- fd = io.fileno()
- blocking = os.get_blocking(fd)
- except: # noqa
- # Failed somewhere.
- # `get_blocking` can raise `OSError`.
- # The io object can raise `AttributeError` when no `fileno()` method is
- # present if we're not a real file object.
- blocking = True # Assume we're good, and don't do anything.
-
- try:
- # Make blocking if we weren't blocking yet.
- if not blocking:
- os.set_blocking(fd, True)
-
- yield
-
- finally:
- # Restore original blocking mode.
- if not blocking:
- os.set_blocking(fd, blocking)