aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/conch/insults/helper.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/conch/insults/helper.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/conch/insults/helper.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/conch/insults/helper.py517
1 files changed, 517 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/conch/insults/helper.py b/contrib/python/Twisted/py2/twisted/conch/insults/helper.py
new file mode 100644
index 0000000000..0485bfdbe6
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/conch/insults/helper.py
@@ -0,0 +1,517 @@
+# -*- test-case-name: twisted.conch.test.test_helper -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Partial in-memory terminal emulator
+
+@author: Jp Calderone
+"""
+
+from __future__ import print_function
+
+import re, string
+
+from zope.interface import implementer
+
+from incremental import Version
+
+from twisted.internet import defer, protocol, reactor
+from twisted.python import log, _textattributes
+from twisted.python.compat import iterbytes
+from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
+from twisted.conch.insults import insults
+
+FOREGROUND = 30
+BACKGROUND = 40
+BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
+
+
+
+class _FormattingState(_textattributes._FormattingStateMixin):
+ """
+ Represents the formatting state/attributes of a single character.
+
+ Character set, intensity, underlinedness, blinkitude, video
+ reversal, as well as foreground and background colors made up a
+ character's attributes.
+ """
+ compareAttributes = (
+ 'charset', 'bold', 'underline', 'blink', 'reverseVideo', 'foreground',
+ 'background', '_subtracting')
+
+
+ def __init__(self, charset=insults.G0, bold=False, underline=False,
+ blink=False, reverseVideo=False, foreground=WHITE,
+ background=BLACK, _subtracting=False):
+ self.charset = charset
+ self.bold = bold
+ self.underline = underline
+ self.blink = blink
+ self.reverseVideo = reverseVideo
+ self.foreground = foreground
+ self.background = background
+ self._subtracting = _subtracting
+
+
+ @deprecated(Version('Twisted', 13, 1, 0))
+ def wantOne(self, **kw):
+ """
+ Add a character attribute to a copy of this formatting state.
+
+ @param **kw: An optional attribute name and value can be provided with
+ a keyword argument.
+
+ @return: A formatting state instance with the new attribute.
+
+ @see: L{DefaultFormattingState._withAttribute}.
+ """
+ k, v = kw.popitem()
+ return self._withAttribute(k, v)
+
+
+ def toVT102(self):
+ # Spit out a vt102 control sequence that will set up
+ # all the attributes set here. Except charset.
+ attrs = []
+ if self._subtracting:
+ attrs.append(0)
+ if self.bold:
+ attrs.append(insults.BOLD)
+ if self.underline:
+ attrs.append(insults.UNDERLINE)
+ if self.blink:
+ attrs.append(insults.BLINK)
+ if self.reverseVideo:
+ attrs.append(insults.REVERSE_VIDEO)
+ if self.foreground != WHITE:
+ attrs.append(FOREGROUND + self.foreground)
+ if self.background != BLACK:
+ attrs.append(BACKGROUND + self.background)
+ if attrs:
+ return '\x1b[' + ';'.join(map(str, attrs)) + 'm'
+ return ''
+
+CharacterAttribute = _FormattingState
+
+deprecatedModuleAttribute(
+ Version('Twisted', 13, 1, 0),
+ 'Use twisted.conch.insults.text.assembleFormattedText instead.',
+ 'twisted.conch.insults.helper',
+ 'CharacterAttribute')
+
+
+
+# XXX - need to support scroll regions and scroll history
+@implementer(insults.ITerminalTransport)
+class TerminalBuffer(protocol.Protocol):
+ """
+ An in-memory terminal emulator.
+ """
+ for keyID in (b'UP_ARROW', b'DOWN_ARROW', b'RIGHT_ARROW', b'LEFT_ARROW',
+ b'HOME', b'INSERT', b'DELETE', b'END', b'PGUP', b'PGDN',
+ b'F1', b'F2', b'F3', b'F4', b'F5', b'F6', b'F7', b'F8', b'F9',
+ b'F10', b'F11', b'F12'):
+ execBytes = keyID + b" = object()"
+ execStr = execBytes.decode("ascii")
+ exec(execStr)
+
+ TAB = b'\t'
+ BACKSPACE = b'\x7f'
+
+ width = 80
+ height = 24
+
+ fill = b' '
+ void = object()
+
+ def getCharacter(self, x, y):
+ return self.lines[y][x]
+
+
+ def connectionMade(self):
+ self.reset()
+
+
+ def write(self, data):
+ """
+ Add the given printable bytes to the terminal.
+
+ Line feeds in L{bytes} will be replaced with carriage return / line
+ feed pairs.
+ """
+ for b in iterbytes(data.replace(b'\n', b'\r\n')):
+ self.insertAtCursor(b)
+
+
+ def _currentFormattingState(self):
+ return _FormattingState(self.activeCharset, **self.graphicRendition)
+
+
+ def insertAtCursor(self, b):
+ """
+ Add one byte to the terminal at the cursor and make consequent state
+ updates.
+
+ If b is a carriage return, move the cursor to the beginning of the
+ current row.
+
+ If b is a line feed, move the cursor to the next row or scroll down if
+ the cursor is already in the last row.
+
+ Otherwise, if b is printable, put it at the cursor position (inserting
+ or overwriting as dictated by the current mode) and move the cursor.
+ """
+ if b == b'\r':
+ self.x = 0
+ elif b == b'\n':
+ self._scrollDown()
+ elif b in string.printable.encode("ascii"):
+ if self.x >= self.width:
+ self.nextLine()
+ ch = (b, self._currentFormattingState())
+ if self.modes.get(insults.modes.IRM):
+ self.lines[self.y][self.x:self.x] = [ch]
+ self.lines[self.y].pop()
+ else:
+ self.lines[self.y][self.x] = ch
+ self.x += 1
+
+
+ def _emptyLine(self, width):
+ return [(self.void, self._currentFormattingState())
+ for i in range(width)]
+
+
+ def _scrollDown(self):
+ self.y += 1
+ if self.y >= self.height:
+ self.y -= 1
+ del self.lines[0]
+ self.lines.append(self._emptyLine(self.width))
+
+
+ def _scrollUp(self):
+ self.y -= 1
+ if self.y < 0:
+ self.y = 0
+ del self.lines[-1]
+ self.lines.insert(0, self._emptyLine(self.width))
+
+
+ def cursorUp(self, n=1):
+ self.y = max(0, self.y - n)
+
+
+ def cursorDown(self, n=1):
+ self.y = min(self.height - 1, self.y + n)
+
+
+ def cursorBackward(self, n=1):
+ self.x = max(0, self.x - n)
+
+
+ def cursorForward(self, n=1):
+ self.x = min(self.width, self.x + n)
+
+
+ def cursorPosition(self, column, line):
+ self.x = column
+ self.y = line
+
+
+ def cursorHome(self):
+ self.x = self.home.x
+ self.y = self.home.y
+
+
+ def index(self):
+ self._scrollDown()
+
+
+ def reverseIndex(self):
+ self._scrollUp()
+
+
+ def nextLine(self):
+ """
+ Update the cursor position attributes and scroll down if appropriate.
+ """
+ self.x = 0
+ self._scrollDown()
+
+
+ def saveCursor(self):
+ self._savedCursor = (self.x, self.y)
+
+
+ def restoreCursor(self):
+ self.x, self.y = self._savedCursor
+ del self._savedCursor
+
+
+ def setModes(self, modes):
+ for m in modes:
+ self.modes[m] = True
+
+
+ def resetModes(self, modes):
+ for m in modes:
+ try:
+ del self.modes[m]
+ except KeyError:
+ pass
+
+
+ def setPrivateModes(self, modes):
+ """
+ Enable the given modes.
+
+ Track which modes have been enabled so that the implementations of
+ other L{insults.ITerminalTransport} methods can be properly implemented
+ to respect these settings.
+
+ @see: L{resetPrivateModes}
+ @see: L{insults.ITerminalTransport.setPrivateModes}
+ """
+ for m in modes:
+ self.privateModes[m] = True
+
+
+ def resetPrivateModes(self, modes):
+ """
+ Disable the given modes.
+
+ @see: L{setPrivateModes}
+ @see: L{insults.ITerminalTransport.resetPrivateModes}
+ """
+ for m in modes:
+ try:
+ del self.privateModes[m]
+ except KeyError:
+ pass
+
+
+ def applicationKeypadMode(self):
+ self.keypadMode = 'app'
+
+
+ def numericKeypadMode(self):
+ self.keypadMode = 'num'
+
+
+ def selectCharacterSet(self, charSet, which):
+ self.charsets[which] = charSet
+
+
+ def shiftIn(self):
+ self.activeCharset = insults.G0
+
+
+ def shiftOut(self):
+ self.activeCharset = insults.G1
+
+
+ def singleShift2(self):
+ oldActiveCharset = self.activeCharset
+ self.activeCharset = insults.G2
+ f = self.insertAtCursor
+ def insertAtCursor(b):
+ f(b)
+ del self.insertAtCursor
+ self.activeCharset = oldActiveCharset
+ self.insertAtCursor = insertAtCursor
+
+
+ def singleShift3(self):
+ oldActiveCharset = self.activeCharset
+ self.activeCharset = insults.G3
+ f = self.insertAtCursor
+ def insertAtCursor(b):
+ f(b)
+ del self.insertAtCursor
+ self.activeCharset = oldActiveCharset
+ self.insertAtCursor = insertAtCursor
+
+
+ def selectGraphicRendition(self, *attributes):
+ for a in attributes:
+ if a == insults.NORMAL:
+ self.graphicRendition = {
+ 'bold': False,
+ 'underline': False,
+ 'blink': False,
+ 'reverseVideo': False,
+ 'foreground': WHITE,
+ 'background': BLACK}
+ elif a == insults.BOLD:
+ self.graphicRendition['bold'] = True
+ elif a == insults.UNDERLINE:
+ self.graphicRendition['underline'] = True
+ elif a == insults.BLINK:
+ self.graphicRendition['blink'] = True
+ elif a == insults.REVERSE_VIDEO:
+ self.graphicRendition['reverseVideo'] = True
+ else:
+ try:
+ v = int(a)
+ except ValueError:
+ log.msg("Unknown graphic rendition attribute: " + repr(a))
+ else:
+ if FOREGROUND <= v <= FOREGROUND + N_COLORS:
+ self.graphicRendition['foreground'] = v - FOREGROUND
+ elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
+ self.graphicRendition['background'] = v - BACKGROUND
+ else:
+ log.msg("Unknown graphic rendition attribute: " + repr(a))
+
+
+ def eraseLine(self):
+ self.lines[self.y] = self._emptyLine(self.width)
+
+
+ def eraseToLineEnd(self):
+ width = self.width - self.x
+ self.lines[self.y][self.x:] = self._emptyLine(width)
+
+
+ def eraseToLineBeginning(self):
+ self.lines[self.y][:self.x + 1] = self._emptyLine(self.x + 1)
+
+
+ def eraseDisplay(self):
+ self.lines = [self._emptyLine(self.width) for i in range(self.height)]
+
+
+ def eraseToDisplayEnd(self):
+ self.eraseToLineEnd()
+ height = self.height - self.y - 1
+ self.lines[self.y + 1:] = [self._emptyLine(self.width) for i in range(height)]
+
+
+ def eraseToDisplayBeginning(self):
+ self.eraseToLineBeginning()
+ self.lines[:self.y] = [self._emptyLine(self.width) for i in range(self.y)]
+
+
+ def deleteCharacter(self, n=1):
+ del self.lines[self.y][self.x:self.x+n]
+ self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
+
+
+ def insertLine(self, n=1):
+ self.lines[self.y:self.y] = [self._emptyLine(self.width) for i in range(n)]
+ del self.lines[self.height:]
+
+
+ def deleteLine(self, n=1):
+ del self.lines[self.y:self.y+n]
+ self.lines.extend([self._emptyLine(self.width) for i in range(n)])
+
+
+ def reportCursorPosition(self):
+ return (self.x, self.y)
+
+
+ def reset(self):
+ self.home = insults.Vector(0, 0)
+ self.x = self.y = 0
+ self.modes = {}
+ self.privateModes = {}
+ self.setPrivateModes([insults.privateModes.AUTO_WRAP,
+ insults.privateModes.CURSOR_MODE])
+ self.numericKeypad = 'app'
+ self.activeCharset = insults.G0
+ self.graphicRendition = {
+ 'bold': False,
+ 'underline': False,
+ 'blink': False,
+ 'reverseVideo': False,
+ 'foreground': WHITE,
+ 'background': BLACK}
+ self.charsets = {
+ insults.G0: insults.CS_US,
+ insults.G1: insults.CS_US,
+ insults.G2: insults.CS_ALTERNATE,
+ insults.G3: insults.CS_ALTERNATE_SPECIAL}
+ self.eraseDisplay()
+
+
+ def unhandledControlSequence(self, buf):
+ print('Could not handle', repr(buf))
+
+
+ def __bytes__(self):
+ lines = []
+ for L in self.lines:
+ buf = []
+ length = 0
+ for (ch, attr) in L:
+ if ch is not self.void:
+ buf.append(ch)
+ length = len(buf)
+ else:
+ buf.append(self.fill)
+ lines.append(b''.join(buf[:length]))
+ return b'\n'.join(lines)
+
+
+
+class ExpectationTimeout(Exception):
+ pass
+
+
+
+class ExpectableBuffer(TerminalBuffer):
+ _mark = 0
+
+ def connectionMade(self):
+ TerminalBuffer.connectionMade(self)
+ self._expecting = []
+
+
+ def write(self, data):
+ TerminalBuffer.write(self, data)
+ self._checkExpected()
+
+
+ def cursorHome(self):
+ TerminalBuffer.cursorHome(self)
+ self._mark = 0
+
+
+ def _timeoutExpected(self, d):
+ d.errback(ExpectationTimeout())
+ self._checkExpected()
+
+
+ def _checkExpected(self):
+ s = self.__bytes__()[self._mark:]
+ while self._expecting:
+ expr, timer, deferred = self._expecting[0]
+ if timer and not timer.active():
+ del self._expecting[0]
+ continue
+ for match in expr.finditer(s):
+ if timer:
+ timer.cancel()
+ del self._expecting[0]
+ self._mark += match.end()
+ s = s[match.end():]
+ deferred.callback(match)
+ break
+ else:
+ return
+
+
+ def expect(self, expression, timeout=None, scheduler=reactor):
+ d = defer.Deferred()
+ timer = None
+ if timeout:
+ timer = scheduler.callLater(timeout, self._timeoutExpected, d)
+ self._expecting.append((re.compile(expression), timer, d))
+ self._checkExpected()
+ return d
+
+__all__ = [
+ 'CharacterAttribute', 'TerminalBuffer', 'ExpectableBuffer']