diff options
author | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:16:14 +0300 |
---|---|---|
committer | shmel1k <shmel1k@ydb.tech> | 2023-11-26 18:43:30 +0300 |
commit | b8cf9e88f4c5c64d9406af533d8948deb050d695 (patch) | |
tree | 218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/conch/insults/helper.py | |
parent | 523f645a83a0ec97a0332dbc3863bb354c92a328 (diff) | |
download | ydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz |
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/conch/insults/helper.py')
-rw-r--r-- | contrib/python/Twisted/py2/twisted/conch/insults/helper.py | 517 |
1 files changed, 517 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/conch/insults/helper.py b/contrib/python/Twisted/py2/twisted/conch/insults/helper.py new file mode 100644 index 0000000000..0485bfdbe6 --- /dev/null +++ b/contrib/python/Twisted/py2/twisted/conch/insults/helper.py @@ -0,0 +1,517 @@ +# -*- test-case-name: twisted.conch.test.test_helper -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Partial in-memory terminal emulator + +@author: Jp Calderone +""" + +from __future__ import print_function + +import re, string + +from zope.interface import implementer + +from incremental import Version + +from twisted.internet import defer, protocol, reactor +from twisted.python import log, _textattributes +from twisted.python.compat import iterbytes +from twisted.python.deprecate import deprecated, deprecatedModuleAttribute +from twisted.conch.insults import insults + +FOREGROUND = 30 +BACKGROUND = 40 +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9) + + + +class _FormattingState(_textattributes._FormattingStateMixin): + """ + Represents the formatting state/attributes of a single character. + + Character set, intensity, underlinedness, blinkitude, video + reversal, as well as foreground and background colors made up a + character's attributes. + """ + compareAttributes = ( + 'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground', + 'background', '_subtracting') + + + def __init__(self, charset=insults.G0, bold=False, underline=False, + blink=False, reverseVideo=False, foreground=WHITE, + background=BLACK, _subtracting=False): + self.charset = charset + self.bold = bold + self.underline = underline + self.blink = blink + self.reverseVideo = reverseVideo + self.foreground = foreground + self.background = background + self._subtracting = _subtracting + + + @deprecated(Version('Twisted', 13, 1, 0)) + def wantOne(self, **kw): + """ + Add a character attribute to a copy of this formatting state. + + @param **kw: An optional attribute name and value can be provided with + a keyword argument. + + @return: A formatting state instance with the new attribute. + + @see: L{DefaultFormattingState._withAttribute}. + """ + k, v = kw.popitem() + return self._withAttribute(k, v) + + + def toVT102(self): + # Spit out a vt102 control sequence that will set up + # all the attributes set here. Except charset. + attrs = [] + if self._subtracting: + attrs.append(0) + if self.bold: + attrs.append(insults.BOLD) + if self.underline: + attrs.append(insults.UNDERLINE) + if self.blink: + attrs.append(insults.BLINK) + if self.reverseVideo: + attrs.append(insults.REVERSE_VIDEO) + if self.foreground != WHITE: + attrs.append(FOREGROUND + self.foreground) + if self.background != BLACK: + attrs.append(BACKGROUND + self.background) + if attrs: + return '\x1b[' + ';'.join(map(str, attrs)) + 'm' + return '' + +CharacterAttribute = _FormattingState + +deprecatedModuleAttribute( + Version('Twisted', 13, 1, 0), + 'Use twisted.conch.insults.text.assembleFormattedText instead.', + 'twisted.conch.insults.helper', + 'CharacterAttribute') + + + +# XXX - need to support scroll regions and scroll history +@implementer(insults.ITerminalTransport) +class TerminalBuffer(protocol.Protocol): + """ + An in-memory terminal emulator. + """ + for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW', + b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN', + b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9', + b'F10', b'F11', b'F12'): + execBytes = keyID + b" = object()" + execStr = execBytes.decode("ascii") + exec(execStr) + + TAB = b'\t' + BACKSPACE = b'\x7f' + + width = 80 + height = 24 + + fill = b' ' + void = object() + + def getCharacter(self, x, y): + return self.lines[y][x] + + + def connectionMade(self): + self.reset() + + + def write(self, data): + """ + Add the given printable bytes to the terminal. + + Line feeds in L{bytes} will be replaced with carriage return / line + feed pairs. + """ + for b in iterbytes(data.replace(b'\n', b'\r\n')): + self.insertAtCursor(b) + + + def _currentFormattingState(self): + return _FormattingState(self.activeCharset, **self.graphicRendition) + + + def insertAtCursor(self, b): + """ + Add one byte to the terminal at the cursor and make consequent state + updates. + + If b is a carriage return, move the cursor to the beginning of the + current row. + + If b is a line feed, move the cursor to the next row or scroll down if + the cursor is already in the last row. + + Otherwise, if b is printable, put it at the cursor position (inserting + or overwriting as dictated by the current mode) and move the cursor. + """ + if b == b'\r': + self.x = 0 + elif b == b'\n': + self._scrollDown() + elif b in string.printable.encode("ascii"): + if self.x >= self.width: + self.nextLine() + ch = (b, self._currentFormattingState()) + if self.modes.get(insults.modes.IRM): + self.lines[self.y][self.x:self.x] = [ch] + self.lines[self.y].pop() + else: + self.lines[self.y][self.x] = ch + self.x += 1 + + + def _emptyLine(self, width): + return [(self.void, self._currentFormattingState()) + for i in range(width)] + + + def _scrollDown(self): + self.y += 1 + if self.y >= self.height: + self.y -= 1 + del self.lines[0] + self.lines.append(self._emptyLine(self.width)) + + + def _scrollUp(self): + self.y -= 1 + if self.y < 0: + self.y = 0 + del self.lines[-1] + self.lines.insert(0, self._emptyLine(self.width)) + + + def cursorUp(self, n=1): + self.y = max(0, self.y - n) + + + def cursorDown(self, n=1): + self.y = min(self.height - 1, self.y + n) + + + def cursorBackward(self, n=1): + self.x = max(0, self.x - n) + + + def cursorForward(self, n=1): + self.x = min(self.width, self.x + n) + + + def cursorPosition(self, column, line): + self.x = column + self.y = line + + + def cursorHome(self): + self.x = self.home.x + self.y = self.home.y + + + def index(self): + self._scrollDown() + + + def reverseIndex(self): + self._scrollUp() + + + def nextLine(self): + """ + Update the cursor position attributes and scroll down if appropriate. + """ + self.x = 0 + self._scrollDown() + + + def saveCursor(self): + self._savedCursor = (self.x, self.y) + + + def restoreCursor(self): + self.x, self.y = self._savedCursor + del self._savedCursor + + + def setModes(self, modes): + for m in modes: + self.modes[m] = True + + + def resetModes(self, modes): + for m in modes: + try: + del self.modes[m] + except KeyError: + pass + + + def setPrivateModes(self, modes): + """ + Enable the given modes. + + Track which modes have been enabled so that the implementations of + other L{insults.ITerminalTransport} methods can be properly implemented + to respect these settings. + + @see: L{resetPrivateModes} + @see: L{insults.ITerminalTransport.setPrivateModes} + """ + for m in modes: + self.privateModes[m] = True + + + def resetPrivateModes(self, modes): + """ + Disable the given modes. + + @see: L{setPrivateModes} + @see: L{insults.ITerminalTransport.resetPrivateModes} + """ + for m in modes: + try: + del self.privateModes[m] + except KeyError: + pass + + + def applicationKeypadMode(self): + self.keypadMode = 'app' + + + def numericKeypadMode(self): + self.keypadMode = 'num' + + + def selectCharacterSet(self, charSet, which): + self.charsets[which] = charSet + + + def shiftIn(self): + self.activeCharset = insults.G0 + + + def shiftOut(self): + self.activeCharset = insults.G1 + + + def singleShift2(self): + oldActiveCharset = self.activeCharset + self.activeCharset = insults.G2 + f = self.insertAtCursor + def insertAtCursor(b): + f(b) + del self.insertAtCursor + self.activeCharset = oldActiveCharset + self.insertAtCursor = insertAtCursor + + + def singleShift3(self): + oldActiveCharset = self.activeCharset + self.activeCharset = insults.G3 + f = self.insertAtCursor + def insertAtCursor(b): + f(b) + del self.insertAtCursor + self.activeCharset = oldActiveCharset + self.insertAtCursor = insertAtCursor + + + def selectGraphicRendition(self, *attributes): + for a in attributes: + if a == insults.NORMAL: + self.graphicRendition = { + 'bold': False, + 'underline': False, + 'blink': False, + 'reverseVideo': False, + 'foreground': WHITE, + 'background': BLACK} + elif a == insults.BOLD: + self.graphicRendition['bold'] = True + elif a == insults.UNDERLINE: + self.graphicRendition['underline'] = True + elif a == insults.BLINK: + self.graphicRendition['blink'] = True + elif a == insults.REVERSE_VIDEO: + self.graphicRendition['reverseVideo'] = True + else: + try: + v = int(a) + except ValueError: + log.msg("Unknown graphic rendition attribute: " + repr(a)) + else: + if FOREGROUND <= v <= FOREGROUND + N_COLORS: + self.graphicRendition['foreground'] = v - FOREGROUND + elif BACKGROUND <= v <= BACKGROUND + N_COLORS: + self.graphicRendition['background'] = v - BACKGROUND + else: + log.msg("Unknown graphic rendition attribute: " + repr(a)) + + + def eraseLine(self): + self.lines[self.y] = self._emptyLine(self.width) + + + def eraseToLineEnd(self): + width = self.width - self.x + self.lines[self.y][self.x:] = self._emptyLine(width) + + + def eraseToLineBeginning(self): + self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1) + + + def eraseDisplay(self): + self.lines = [self._emptyLine(self.width) for i in range(self.height)] + + + def eraseToDisplayEnd(self): + self.eraseToLineEnd() + height = self.height - self.y - 1 + self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)] + + + def eraseToDisplayBeginning(self): + self.eraseToLineBeginning() + self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)] + + + def deleteCharacter(self, n=1): + del self.lines[self.y][self.x:self.x+n] + self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n))) + + + def insertLine(self, n=1): + self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)] + del self.lines[self.height:] + + + def deleteLine(self, n=1): + del self.lines[self.y:self.y+n] + self.lines.extend([self._emptyLine(self.width) for i in range(n)]) + + + def reportCursorPosition(self): + return (self.x, self.y) + + + def reset(self): + self.home = insults.Vector(0, 0) + self.x = self.y = 0 + self.modes = {} + self.privateModes = {} + self.setPrivateModes([insults.privateModes.AUTO_WRAP, + insults.privateModes.CURSOR_MODE]) + self.numericKeypad = 'app' + self.activeCharset = insults.G0 + self.graphicRendition = { + 'bold': False, + 'underline': False, + 'blink': False, + 'reverseVideo': False, + 'foreground': WHITE, + 'background': BLACK} + self.charsets = { + insults.G0: insults.CS_US, + insults.G1: insults.CS_US, + insults.G2: insults.CS_ALTERNATE, + insults.G3: insults.CS_ALTERNATE_SPECIAL} + self.eraseDisplay() + + + def unhandledControlSequence(self, buf): + print('Could not handle', repr(buf)) + + + def __bytes__(self): + lines = [] + for L in self.lines: + buf = [] + length = 0 + for (ch, attr) in L: + if ch is not self.void: + buf.append(ch) + length = len(buf) + else: + buf.append(self.fill) + lines.append(b''.join(buf[:length])) + return b'\n'.join(lines) + + + +class ExpectationTimeout(Exception): + pass + + + +class ExpectableBuffer(TerminalBuffer): + _mark = 0 + + def connectionMade(self): + TerminalBuffer.connectionMade(self) + self._expecting = [] + + + def write(self, data): + TerminalBuffer.write(self, data) + self._checkExpected() + + + def cursorHome(self): + TerminalBuffer.cursorHome(self) + self._mark = 0 + + + def _timeoutExpected(self, d): + d.errback(ExpectationTimeout()) + self._checkExpected() + + + def _checkExpected(self): + s = self.__bytes__()[self._mark:] + while self._expecting: + expr, timer, deferred = self._expecting[0] + if timer and not timer.active(): + del self._expecting[0] + continue + for match in expr.finditer(s): + if timer: + timer.cancel() + del self._expecting[0] + self._mark += match.end() + s = s[match.end():] + deferred.callback(match) + break + else: + return + + + def expect(self, expression, timeout=None, scheduler=reactor): + d = defer.Deferred() + timer = None + if timeout: + timer = scheduler.callLater(timeout, self._timeoutExpected, d) + self._expecting.append((re.compile(expression), timer, d)) + self._checkExpected() + return d + +__all__ = [ + 'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer'] |