aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py2/twisted/conch/insults/insults.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py2/twisted/conch/insults/insults.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py2/twisted/conch/insults/insults.py')
-rw-r--r--contrib/python/Twisted/py2/twisted/conch/insults/insults.py1289
1 files changed, 1289 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py2/twisted/conch/insults/insults.py b/contrib/python/Twisted/py2/twisted/conch/insults/insults.py
new file mode 100644
index 0000000000..a583174415
--- /dev/null
+++ b/contrib/python/Twisted/py2/twisted/conch/insults/insults.py
@@ -0,0 +1,1289 @@
+# -*- test-case-name: twisted.conch.test.test_insults -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+VT102 and VT220 terminal manipulation.
+
+@author: Jp Calderone
+"""
+
+from zope.interface import implementer, Interface
+
+from twisted.internet import protocol, defer, interfaces as iinternet
+from twisted.python.compat import intToBytes, iterbytes, networkString
+
+
+
+class ITerminalProtocol(Interface):
+ def makeConnection(transport):
+ """
+ Called with an L{ITerminalTransport} when a connection is established.
+ """
+
+ def keystrokeReceived(keyID, modifier):
+ """
+ A keystroke was received.
+
+ Each keystroke corresponds to one invocation of this method.
+ keyID is a string identifier for that key. Printable characters
+ are represented by themselves. Control keys, such as arrows and
+ function keys, are represented with symbolic constants on
+ L{ServerProtocol}.
+ """
+
+ def terminalSize(width, height):
+ """
+ Called to indicate the size of the terminal.
+
+ A terminal of 80x24 should be assumed if this method is not
+ called. This method might not be called for real terminals.
+ """
+
+ def unhandledControlSequence(seq):
+ """
+ Called when an unsupported control sequence is received.
+
+ @type seq: L{str}
+ @param seq: The whole control sequence which could not be interpreted.
+ """
+
+ def connectionLost(reason):
+ """
+ Called when the connection has been lost.
+
+ reason is a Failure describing why.
+ """
+
+
+
+@implementer(ITerminalProtocol)
+class TerminalProtocol(object):
+ def makeConnection(self, terminal):
+ # assert ITerminalTransport.providedBy(transport), "TerminalProtocol.makeConnection must be passed an ITerminalTransport implementor"
+ self.terminal = terminal
+ self.connectionMade()
+
+
+ def connectionMade(self):
+ """
+ Called after a connection has been established.
+ """
+
+
+ def keystrokeReceived(self, keyID, modifier):
+ pass
+
+
+ def terminalSize(self, width, height):
+ pass
+
+
+ def unhandledControlSequence(self, seq):
+ pass
+
+
+ def connectionLost(self, reason):
+ pass
+
+
+
+class ITerminalTransport(iinternet.ITransport):
+ def cursorUp(n=1):
+ """
+ Move the cursor up n lines.
+ """
+
+
+ def cursorDown(n=1):
+ """
+ Move the cursor down n lines.
+ """
+
+
+ def cursorForward(n=1):
+ """
+ Move the cursor right n columns.
+ """
+
+
+ def cursorBackward(n=1):
+ """
+ Move the cursor left n columns.
+ """
+
+
+ def cursorPosition(column, line):
+ """
+ Move the cursor to the given line and column.
+ """
+
+
+ def cursorHome():
+ """
+ Move the cursor home.
+ """
+
+
+ def index():
+ """
+ Move the cursor down one line, performing scrolling if necessary.
+ """
+
+
+ def reverseIndex():
+ """
+ Move the cursor up one line, performing scrolling if necessary.
+ """
+
+
+ def nextLine():
+ """
+ Move the cursor to the first position on the next line, performing scrolling if necessary.
+ """
+
+
+ def saveCursor():
+ """
+ Save the cursor position, character attribute, character set, and origin mode selection.
+ """
+
+
+ def restoreCursor():
+ """
+ Restore the previously saved cursor position, character attribute, character set, and origin mode selection.
+
+ If no cursor state was previously saved, move the cursor to the home position.
+ """
+
+
+ def setModes(modes):
+ """
+ Set the given modes on the terminal.
+ """
+
+ def resetModes(mode):
+ """
+ Reset the given modes on the terminal.
+ """
+
+
+ def setPrivateModes(modes):
+ """
+ Set the given DEC private modes on the terminal.
+ """
+
+
+ def resetPrivateModes(modes):
+ """
+ Reset the given DEC private modes on the terminal.
+ """
+
+
+ def applicationKeypadMode():
+ """
+ Cause keypad to generate control functions.
+
+ Cursor key mode selects the type of characters generated by cursor keys.
+ """
+
+
+ def numericKeypadMode():
+ """
+ Cause keypad to generate normal characters.
+ """
+
+
+ def selectCharacterSet(charSet, which):
+ """
+ Select a character set.
+
+ charSet should be one of CS_US, CS_UK, CS_DRAWING, CS_ALTERNATE, or
+ CS_ALTERNATE_SPECIAL.
+
+ which should be one of G0 or G1.
+ """
+
+
+ def shiftIn():
+ """
+ Activate the G0 character set.
+ """
+
+
+ def shiftOut():
+ """
+ Activate the G1 character set.
+ """
+
+
+ def singleShift2():
+ """
+ Shift to the G2 character set for a single character.
+ """
+
+
+ def singleShift3():
+ """
+ Shift to the G3 character set for a single character.
+ """
+
+
+ def selectGraphicRendition(*attributes):
+ """
+ Enabled one or more character attributes.
+
+ Arguments should be one or more of UNDERLINE, REVERSE_VIDEO, BLINK, or BOLD.
+ NORMAL may also be specified to disable all character attributes.
+ """
+
+
+ def horizontalTabulationSet():
+ """
+ Set a tab stop at the current cursor position.
+ """
+
+
+ def tabulationClear():
+ """
+ Clear the tab stop at the current cursor position.
+ """
+
+
+ def tabulationClearAll():
+ """
+ Clear all tab stops.
+ """
+
+
+ def doubleHeightLine(top=True):
+ """
+ Make the current line the top or bottom half of a double-height, double-width line.
+
+ If top is True, the current line is the top half. Otherwise, it is the bottom half.
+ """
+
+
+ def singleWidthLine():
+ """
+ Make the current line a single-width, single-height line.
+ """
+
+
+ def doubleWidthLine():
+ """
+ Make the current line a double-width line.
+ """
+
+
+ def eraseToLineEnd():
+ """
+ Erase from the cursor to the end of line, including cursor position.
+ """
+
+
+ def eraseToLineBeginning():
+ """
+ Erase from the cursor to the beginning of the line, including the cursor position.
+ """
+
+
+ def eraseLine():
+ """
+ Erase the entire cursor line.
+ """
+
+
+ def eraseToDisplayEnd():
+ """
+ Erase from the cursor to the end of the display, including the cursor position.
+ """
+
+
+ def eraseToDisplayBeginning():
+ """
+ Erase from the cursor to the beginning of the display, including the cursor position.
+ """
+
+
+ def eraseDisplay():
+ """
+ Erase the entire display.
+ """
+
+
+ def deleteCharacter(n=1):
+ """
+ Delete n characters starting at the cursor position.
+
+ Characters to the right of deleted characters are shifted to the left.
+ """
+
+
+ def insertLine(n=1):
+ """
+ Insert n lines at the cursor position.
+
+ Lines below the cursor are shifted down. Lines moved past the bottom margin are lost.
+ This command is ignored when the cursor is outside the scroll region.
+ """
+
+
+ def deleteLine(n=1):
+ """
+ Delete n lines starting at the cursor position.
+
+ Lines below the cursor are shifted up. This command is ignored when the cursor is outside
+ the scroll region.
+ """
+
+
+ def reportCursorPosition():
+ """
+ Return a Deferred that fires with a two-tuple of (x, y) indicating the cursor position.
+ """
+
+
+ def reset():
+ """
+ Reset the terminal to its initial state.
+ """
+
+
+ def unhandledControlSequence(seq):
+ """
+ Called when an unsupported control sequence is received.
+
+ @type seq: L{str}
+ @param seq: The whole control sequence which could not be interpreted.
+ """
+
+
+CSI = b'\x1b'
+CST = {b'~': b'tilde'}
+
+class modes:
+ """
+ ECMA 48 standardized modes
+ """
+
+ # BREAKS YOPUR KEYBOARD MOFO
+ KEYBOARD_ACTION = KAM = 2
+
+ # When set, enables character insertion. New display characters
+ # move old display characters to the right. Characters moved past
+ # the right margin are lost.
+
+ # When reset, enables replacement mode (disables character
+ # insertion). New display characters replace old display
+ # characters at cursor position. The old character is erased.
+ INSERTION_REPLACEMENT = IRM = 4
+
+ # Set causes a received linefeed, form feed, or vertical tab to
+ # move cursor to first column of next line. RETURN transmits both
+ # a carriage return and linefeed. This selection is also called
+ # new line option.
+
+ # Reset causes a received linefeed, form feed, or vertical tab to
+ # move cursor to next line in current column. RETURN transmits a
+ # carriage return.
+ LINEFEED_NEWLINE = LNM = 20
+
+
+
+class privateModes:
+ """
+ ANSI-Compatible Private Modes
+ """
+ ERROR = 0
+ CURSOR_KEY = 1
+ ANSI_VT52 = 2
+ COLUMN = 3
+ SCROLL = 4
+ SCREEN = 5
+ ORIGIN = 6
+ AUTO_WRAP = 7
+ AUTO_REPEAT = 8
+ PRINTER_FORM_FEED = 18
+ PRINTER_EXTENT = 19
+
+ # Toggle cursor visibility (reset hides it)
+ CURSOR_MODE = 25
+
+
+# Character sets
+CS_US = b'CS_US'
+CS_UK = b'CS_UK'
+CS_DRAWING = b'CS_DRAWING'
+CS_ALTERNATE = b'CS_ALTERNATE'
+CS_ALTERNATE_SPECIAL = b'CS_ALTERNATE_SPECIAL'
+
+# Groupings (or something?? These are like variables that can be bound to character sets)
+G0 = b'G0'
+G1 = b'G1'
+
+# G2 and G3 cannot be changed, but they can be shifted to.
+G2 = b'G2'
+G3 = b'G3'
+
+# Character attributes
+
+NORMAL = 0
+BOLD = 1
+UNDERLINE = 4
+BLINK = 5
+REVERSE_VIDEO = 7
+
+class Vector:
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+
+
+
+def log(s):
+ with open('log', 'a') as f:
+ f.write(str(s) + '\n')
+
+# XXX TODO - These attributes are really part of the
+# ITerminalTransport interface, I think.
+_KEY_NAMES = ('UP_ARROW', 'DOWN_ARROW', 'RIGHT_ARROW', 'LEFT_ARROW',
+ 'HOME', 'INSERT', 'DELETE', 'END', 'PGUP', 'PGDN', 'NUMPAD_MIDDLE',
+ 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9',
+ 'F10', 'F11', 'F12',
+
+ 'ALT', 'SHIFT', 'CONTROL')
+
+class _const(object):
+ """
+ @ivar name: A string naming this constant
+ """
+ def __init__(self, name):
+ self.name = name
+
+
+ def __repr__(self):
+ return '[' + self.name + ']'
+
+
+ def __bytes__(self):
+ return ('[' + self.name + ']').encode("ascii")
+
+
+FUNCTION_KEYS = [
+ _const(_name).__bytes__() for _name in _KEY_NAMES]
+
+
+
+@implementer(ITerminalTransport)
+class ServerProtocol(protocol.Protocol):
+ protocolFactory = None
+ terminalProtocol = None
+
+ TAB = b'\t'
+ BACKSPACE = b'\x7f'
+ ##
+
+ lastWrite = b''
+
+ state = b'data'
+
+ termSize = Vector(80, 24)
+ cursorPos = Vector(0, 0)
+ scrollRegion = None
+
+ # Factory who instantiated me
+ factory = None
+
+ def __init__(self, protocolFactory=None, *a, **kw):
+ """
+ @param protocolFactory: A callable which will be invoked with
+ *a, **kw and should return an ITerminalProtocol implementor.
+ This will be invoked when a connection to this ServerProtocol
+ is established.
+
+ @param a: Any positional arguments to pass to protocolFactory.
+ @param kw: Any keyword arguments to pass to protocolFactory.
+ """
+ # assert protocolFactory is None or ITerminalProtocol.implementedBy(protocolFactory), "ServerProtocol.__init__ must be passed an ITerminalProtocol implementor"
+ if protocolFactory is not None:
+ self.protocolFactory = protocolFactory
+ self.protocolArgs = a
+ self.protocolKwArgs = kw
+
+ self._cursorReports = []
+
+
+ def connectionMade(self):
+ if self.protocolFactory is not None:
+ self.terminalProtocol = self.protocolFactory(*self.protocolArgs, **self.protocolKwArgs)
+
+ try:
+ factory = self.factory
+ except AttributeError:
+ pass
+ else:
+ self.terminalProtocol.factory = factory
+
+ self.terminalProtocol.makeConnection(self)
+
+
+ def dataReceived(self, data):
+ for ch in iterbytes(data):
+ if self.state == b'data':
+ if ch == b'\x1b':
+ self.state = b'escaped'
+ else:
+ self.terminalProtocol.keystrokeReceived(ch, None)
+ elif self.state == b'escaped':
+ if ch == b'[':
+ self.state = b'bracket-escaped'
+ self.escBuf = []
+ elif ch == b'O':
+ self.state = b'low-function-escaped'
+ else:
+ self.state = b'data'
+ self._handleShortControlSequence(ch)
+ elif self.state == b'bracket-escaped':
+ if ch == b'O':
+ self.state = b'low-function-escaped'
+ elif ch.isalpha() or ch == b'~':
+ self._handleControlSequence(b''.join(self.escBuf) + ch)
+ del self.escBuf
+ self.state = b'data'
+ else:
+ self.escBuf.append(ch)
+ elif self.state == b'low-function-escaped':
+ self._handleLowFunctionControlSequence(ch)
+ self.state = b'data'
+ else:
+ raise ValueError("Illegal state")
+
+
+ def _handleShortControlSequence(self, ch):
+ self.terminalProtocol.keystrokeReceived(ch, self.ALT)
+
+
+ def _handleControlSequence(self, buf):
+ buf = b'\x1b[' + buf
+ f = getattr(self.controlSequenceParser,
+ CST.get(buf[-1:], buf[-1:]).decode("ascii"),
+ None)
+ if f is None:
+ self.unhandledControlSequence(buf)
+ else:
+ f(self, self.terminalProtocol, buf[:-1])
+
+
+ def unhandledControlSequence(self, buf):
+ self.terminalProtocol.unhandledControlSequence(buf)
+
+
+ def _handleLowFunctionControlSequence(self, ch):
+ functionKeys = {b'P': self.F1, b'Q': self.F2,
+ b'R': self.F3, b'S': self.F4}
+ keyID = functionKeys.get(ch)
+ if keyID is not None:
+ self.terminalProtocol.keystrokeReceived(keyID, None)
+ else:
+ self.terminalProtocol.unhandledControlSequence(b'\x1b[O' + ch)
+
+
+ class ControlSequenceParser:
+ def A(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.UP_ARROW, None)
+ else:
+ handler.unhandledControlSequence(buf + b'A')
+
+
+ def B(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.DOWN_ARROW, None)
+ else:
+ handler.unhandledControlSequence(buf + b'B')
+
+
+ def C(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.RIGHT_ARROW, None)
+ else:
+ handler.unhandledControlSequence(buf + b'C')
+
+
+ def D(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.LEFT_ARROW, None)
+ else:
+ handler.unhandledControlSequence(buf + b'D')
+
+
+ def E(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.NUMPAD_MIDDLE, None)
+ else:
+ handler.unhandledControlSequence(buf + b'E')
+
+
+ def F(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.END, None)
+ else:
+ handler.unhandledControlSequence(buf + b'F')
+
+
+ def H(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.HOME, None)
+ else:
+ handler.unhandledControlSequence(buf + b'H')
+
+
+ def R(self, proto, handler, buf):
+ if not proto._cursorReports:
+ handler.unhandledControlSequence(buf + b'R')
+ elif buf.startswith(b'\x1b['):
+ report = buf[2:]
+ parts = report.split(b';')
+ if len(parts) != 2:
+ handler.unhandledControlSequence(buf + b'R')
+ else:
+ Pl, Pc = parts
+ try:
+ Pl, Pc = int(Pl), int(Pc)
+ except ValueError:
+ handler.unhandledControlSequence(buf + b'R')
+ else:
+ d = proto._cursorReports.pop(0)
+ d.callback((Pc - 1, Pl - 1))
+ else:
+ handler.unhandledControlSequence(buf + b'R')
+
+
+ def Z(self, proto, handler, buf):
+ if buf == b'\x1b[':
+ handler.keystrokeReceived(proto.TAB, proto.SHIFT)
+ else:
+ handler.unhandledControlSequence(buf + b'Z')
+
+
+ def tilde(self, proto, handler, buf):
+ map = {1: proto.HOME, 2: proto.INSERT, 3: proto.DELETE,
+ 4: proto.END, 5: proto.PGUP, 6: proto.PGDN,
+
+ 15: proto.F5, 17: proto.F6, 18: proto.F7,
+ 19: proto.F8, 20: proto.F9, 21: proto.F10,
+ 23: proto.F11, 24: proto.F12}
+
+ if buf.startswith(b'\x1b['):
+ ch = buf[2:]
+ try:
+ v = int(ch)
+ except ValueError:
+ handler.unhandledControlSequence(buf + b'~')
+ else:
+ symbolic = map.get(v)
+ if symbolic is not None:
+ handler.keystrokeReceived(map[v], None)
+ else:
+ handler.unhandledControlSequence(buf + b'~')
+ else:
+ handler.unhandledControlSequence(buf + b'~')
+
+ controlSequenceParser = ControlSequenceParser()
+
+
+ # ITerminalTransport
+ def cursorUp(self, n=1):
+ assert n >= 1
+ self.cursorPos.y = max(self.cursorPos.y - n, 0)
+ self.write(b'\x1b[' + intToBytes(n) + b'A')
+
+
+ def cursorDown(self, n=1):
+ assert n >= 1
+ self.cursorPos.y = min(self.cursorPos.y + n, self.termSize.y - 1)
+ self.write(b'\x1b[' + intToBytes(n) + b'B')
+
+
+ def cursorForward(self, n=1):
+ assert n >= 1
+ self.cursorPos.x = min(self.cursorPos.x + n, self.termSize.x - 1)
+ self.write(b'\x1b[' + intToBytes(n) + b'C')
+
+
+ def cursorBackward(self, n=1):
+ assert n >= 1
+ self.cursorPos.x = max(self.cursorPos.x - n, 0)
+ self.write(b'\x1b[' + intToBytes(n) + b'D')
+
+
+ def cursorPosition(self, column, line):
+ self.write(b'\x1b[' +
+ intToBytes(line + 1) +
+ b';' +
+ intToBytes(column + 1) +
+ b'H')
+
+
+ def cursorHome(self):
+ self.cursorPos.x = self.cursorPos.y = 0
+ self.write(b'\x1b[H')
+
+
+ def index(self):
+ # ECMA48 5th Edition removes this
+ self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
+ self.write(b'\x1bD')
+
+
+ def reverseIndex(self):
+ self.cursorPos.y = max(self.cursorPos.y - 1, 0)
+ self.write(b'\x1bM')
+
+
+ def nextLine(self):
+ self.cursorPos.x = 0
+ self.cursorPos.y = min(self.cursorPos.y + 1, self.termSize.y - 1)
+ self.write(b'\n')
+
+
+ def saveCursor(self):
+ self._savedCursorPos = Vector(self.cursorPos.x, self.cursorPos.y)
+ self.write(b'\x1b7')
+
+
+ def restoreCursor(self):
+ self.cursorPos = self._savedCursorPos
+ del self._savedCursorPos
+ self.write(b'\x1b8')
+
+
+ def setModes(self, modes):
+ # XXX Support ANSI-Compatible private modes
+ modesBytes = b';'.join([intToBytes(mode) for mode in modes])
+ self.write(b'\x1b[' + modesBytes + b'h')
+
+
+ def setPrivateModes(self, modes):
+ modesBytes = b';'.join([intToBytes(mode) for mode in modes])
+ self.write(b'\x1b[?' + modesBytes + b'h')
+
+
+ def resetModes(self, modes):
+ # XXX Support ANSI-Compatible private modes
+ modesBytes = b';'.join([intToBytes(mode) for mode in modes])
+ self.write(b'\x1b[' + modesBytes + b'l')
+
+
+ def resetPrivateModes(self, modes):
+ modesBytes = b';'.join([intToBytes(mode) for mode in modes])
+ self.write(b'\x1b[?' + modesBytes + b'l')
+
+
+ def applicationKeypadMode(self):
+ self.write(b'\x1b=')
+
+
+ def numericKeypadMode(self):
+ self.write(b'\x1b>')
+
+
+ def selectCharacterSet(self, charSet, which):
+ # XXX Rewrite these as dict lookups
+ if which == G0:
+ which = b'('
+ elif which == G1:
+ which = b')'
+ else:
+ raise ValueError("`which' argument to selectCharacterSet must be G0 or G1")
+ if charSet == CS_UK:
+ charSet = b'A'
+ elif charSet == CS_US:
+ charSet = b'B'
+ elif charSet == CS_DRAWING:
+ charSet = b'0'
+ elif charSet == CS_ALTERNATE:
+ charSet = b'1'
+ elif charSet == CS_ALTERNATE_SPECIAL:
+ charSet = b'2'
+ else:
+ raise ValueError("Invalid `charSet' argument to selectCharacterSet")
+ self.write(b'\x1b' + which + charSet)
+
+
+ def shiftIn(self):
+ self.write(b'\x15')
+
+
+ def shiftOut(self):
+ self.write(b'\x14')
+
+
+ def singleShift2(self):
+ self.write(b'\x1bN')
+
+
+ def singleShift3(self):
+ self.write(b'\x1bO')
+
+
+ def selectGraphicRendition(self, *attributes):
+ # each member of attributes must be a native string
+ attrs = []
+ for a in attributes:
+ attrs.append(networkString(a))
+ self.write(b'\x1b[' +
+ b';'.join(attrs) +
+ b'm')
+
+
+ def horizontalTabulationSet(self):
+ self.write(b'\x1bH')
+
+
+ def tabulationClear(self):
+ self.write(b'\x1b[q')
+
+
+ def tabulationClearAll(self):
+ self.write(b'\x1b[3q')
+
+
+ def doubleHeightLine(self, top=True):
+ if top:
+ self.write(b'\x1b#3')
+ else:
+ self.write(b'\x1b#4')
+
+
+ def singleWidthLine(self):
+ self.write(b'\x1b#5')
+
+
+ def doubleWidthLine(self):
+ self.write(b'\x1b#6')
+
+
+ def eraseToLineEnd(self):
+ self.write(b'\x1b[K')
+
+
+ def eraseToLineBeginning(self):
+ self.write(b'\x1b[1K')
+
+
+ def eraseLine(self):
+ self.write(b'\x1b[2K')
+
+
+ def eraseToDisplayEnd(self):
+ self.write(b'\x1b[J')
+
+
+ def eraseToDisplayBeginning(self):
+ self.write(b'\x1b[1J')
+
+
+ def eraseDisplay(self):
+ self.write(b'\x1b[2J')
+
+
+ def deleteCharacter(self, n=1):
+ self.write(b'\x1b[' + intToBytes(n) + b'P')
+
+
+ def insertLine(self, n=1):
+ self.write(b'\x1b[' + intToBytes(n) + b'L')
+
+
+ def deleteLine(self, n=1):
+ self.write(b'\x1b[' + intToBytes(n) + b'M')
+
+
+ def setScrollRegion(self, first=None, last=None):
+ if first is not None:
+ first = intToBytes(first)
+ else:
+ first = b''
+ if last is not None:
+ last = intToBytes(last)
+ else:
+ last = b''
+ self.write(b'\x1b[' + first + b';' + last + b'r')
+
+
+ def resetScrollRegion(self):
+ self.setScrollRegion()
+
+
+ def reportCursorPosition(self):
+ d = defer.Deferred()
+ self._cursorReports.append(d)
+ self.write(b'\x1b[6n')
+ return d
+
+
+ def reset(self):
+ self.cursorPos.x = self.cursorPos.y = 0
+ try:
+ del self._savedCursorPos
+ except AttributeError:
+ pass
+ self.write(b'\x1bc')
+
+
+ # ITransport
+ def write(self, data):
+ if data:
+ if not isinstance(data, bytes):
+ data = data.encode("utf-8")
+ self.lastWrite = data
+ self.transport.write(b'\r\n'.join(data.split(b'\n')))
+
+
+ def writeSequence(self, data):
+ self.write(b''.join(data))
+
+
+ def loseConnection(self):
+ self.reset()
+ self.transport.loseConnection()
+
+
+ def connectionLost(self, reason):
+ if self.terminalProtocol is not None:
+ try:
+ self.terminalProtocol.connectionLost(reason)
+ finally:
+ self.terminalProtocol = None
+# Add symbolic names for function keys
+for name, const in zip(_KEY_NAMES, FUNCTION_KEYS):
+ setattr(ServerProtocol, name, const)
+
+
+
+class ClientProtocol(protocol.Protocol):
+
+ terminalFactory = None
+ terminal = None
+
+ state = b'data'
+
+ _escBuf = None
+
+ _shorts = {
+ b'D': b'index',
+ b'M': b'reverseIndex',
+ b'E': b'nextLine',
+ b'7': b'saveCursor',
+ b'8': b'restoreCursor',
+ b'=': b'applicationKeypadMode',
+ b'>': b'numericKeypadMode',
+ b'N': b'singleShift2',
+ b'O': b'singleShift3',
+ b'H': b'horizontalTabulationSet',
+ b'c': b'reset'}
+
+ _longs = {
+ b'[': b'bracket-escape',
+ b'(': b'select-g0',
+ b')': b'select-g1',
+ b'#': b'select-height-width'}
+
+ _charsets = {
+ b'A': CS_UK,
+ b'B': CS_US,
+ b'0': CS_DRAWING,
+ b'1': CS_ALTERNATE,
+ b'2': CS_ALTERNATE_SPECIAL}
+
+ # Factory who instantiated me
+ factory = None
+
+ def __init__(self, terminalFactory=None, *a, **kw):
+ """
+ @param terminalFactory: A callable which will be invoked with
+ *a, **kw and should return an ITerminalTransport provider.
+ This will be invoked when this ClientProtocol establishes a
+ connection.
+
+ @param a: Any positional arguments to pass to terminalFactory.
+ @param kw: Any keyword arguments to pass to terminalFactory.
+ """
+ # assert terminalFactory is None or ITerminalTransport.implementedBy(terminalFactory), "ClientProtocol.__init__ must be passed an ITerminalTransport implementor"
+ if terminalFactory is not None:
+ self.terminalFactory = terminalFactory
+ self.terminalArgs = a
+ self.terminalKwArgs = kw
+
+
+ def connectionMade(self):
+ if self.terminalFactory is not None:
+ self.terminal = self.terminalFactory(*self.terminalArgs, **self.terminalKwArgs)
+ self.terminal.factory = self.factory
+ self.terminal.makeConnection(self)
+
+
+ def connectionLost(self, reason):
+ if self.terminal is not None:
+ try:
+ self.terminal.connectionLost(reason)
+ finally:
+ del self.terminal
+
+
+ def dataReceived(self, data):
+ """
+ Parse the given data from a terminal server, dispatching to event
+ handlers defined by C{self.terminal}.
+ """
+ toWrite = []
+ for b in iterbytes(data):
+ if self.state == b'data':
+ if b == b'\x1b':
+ if toWrite:
+ self.terminal.write(b''.join(toWrite))
+ del toWrite[:]
+ self.state = b'escaped'
+ elif b == b'\x14':
+ if toWrite:
+ self.terminal.write(b''.join(toWrite))
+ del toWrite[:]
+ self.terminal.shiftOut()
+ elif b == b'\x15':
+ if toWrite:
+ self.terminal.write(b''.join(toWrite))
+ del toWrite[:]
+ self.terminal.shiftIn()
+ elif b == b'\x08':
+ if toWrite:
+ self.terminal.write(b''.join(toWrite))
+ del toWrite[:]
+ self.terminal.cursorBackward()
+ else:
+ toWrite.append(b)
+ elif self.state == b'escaped':
+ fName = self._shorts.get(b)
+ if fName is not None:
+ self.state = b'data'
+ getattr(self.terminal, fName.decode("ascii"))()
+ else:
+ state = self._longs.get(b)
+ if state is not None:
+ self.state = state
+ else:
+ self.terminal.unhandledControlSequence(b'\x1b' + b)
+ self.state = b'data'
+ elif self.state == b'bracket-escape':
+ if self._escBuf is None:
+ self._escBuf = []
+ if b.isalpha() or b == b'~':
+ self._handleControlSequence(b''.join(self._escBuf), b)
+ del self._escBuf
+ self.state = b'data'
+ else:
+ self._escBuf.append(b)
+ elif self.state == b'select-g0':
+ self.terminal.selectCharacterSet(self._charsets.get(b, b), G0)
+ self.state = b'data'
+ elif self.state == b'select-g1':
+ self.terminal.selectCharacterSet(self._charsets.get(b, b), G1)
+ self.state = b'data'
+ elif self.state == b'select-height-width':
+ self._handleHeightWidth(b)
+ self.state = b'data'
+ else:
+ raise ValueError("Illegal state")
+ if toWrite:
+ self.terminal.write(b''.join(toWrite))
+
+
+ def _handleControlSequence(self, buf, terminal):
+ f = getattr(self.controlSequenceParser, CST.get(terminal, terminal).decode("ascii"), None)
+ if f is None:
+ self.terminal.unhandledControlSequence(b'\x1b[' + buf + terminal)
+ else:
+ f(self, self.terminal, buf)
+
+
+ class ControlSequenceParser:
+ def _makeSimple(ch, fName):
+ n = 'cursor' + fName
+ def simple(self, proto, handler, buf):
+ if not buf:
+ getattr(handler, n)(1)
+ else:
+ try:
+ m = int(buf)
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + ch)
+ else:
+ getattr(handler, n)(m)
+ return simple
+
+ for (ch, fName) in (('A', 'Up'),
+ ('B', 'Down'),
+ ('C', 'Forward'),
+ ('D', 'Backward')):
+ exec(ch + " = _makeSimple(ch, fName)")
+ del _makeSimple
+
+
+ def h(self, proto, handler, buf):
+ # XXX - Handle '?' to introduce ANSI-Compatible private modes.
+ try:
+ modes = [int(mode) for mode in buf.split(b';')]
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'h')
+ else:
+ handler.setModes(modes)
+
+
+ def l(self, proto, handler, buf):
+ # XXX - Handle '?' to introduce ANSI-Compatible private modes.
+ try:
+ modes = [int(mode) for mode in buf.split(b';')]
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + 'l')
+ else:
+ handler.resetModes(modes)
+
+
+ def r(self, proto, handler, buf):
+ parts = buf.split(b';')
+ if len(parts) == 1:
+ handler.setScrollRegion(None, None)
+ elif len(parts) == 2:
+ try:
+ if parts[0]:
+ pt = int(parts[0])
+ else:
+ pt = None
+ if parts[1]:
+ pb = int(parts[1])
+ else:
+ pb = None
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'r')
+ else:
+ handler.setScrollRegion(pt, pb)
+ else:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'r')
+
+ def K(self, proto, handler, buf):
+ if not buf:
+ handler.eraseToLineEnd()
+ elif buf == b'1':
+ handler.eraseToLineBeginning()
+ elif buf == b'2':
+ handler.eraseLine()
+ else:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'K')
+
+
+ def H(self, proto, handler, buf):
+ handler.cursorHome()
+
+
+ def J(self, proto, handler, buf):
+ if not buf:
+ handler.eraseToDisplayEnd()
+ elif buf == b'1':
+ handler.eraseToDisplayBeginning()
+ elif buf == b'2':
+ handler.eraseDisplay()
+ else:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'J')
+
+
+ def P(self, proto, handler, buf):
+ if not buf:
+ handler.deleteCharacter(1)
+ else:
+ try:
+ n = int(buf)
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'P')
+ else:
+ handler.deleteCharacter(n)
+
+ def L(self, proto, handler, buf):
+ if not buf:
+ handler.insertLine(1)
+ else:
+ try:
+ n = int(buf)
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'L')
+ else:
+ handler.insertLine(n)
+
+
+ def M(self, proto, handler, buf):
+ if not buf:
+ handler.deleteLine(1)
+ else:
+ try:
+ n = int(buf)
+ except ValueError:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'M')
+ else:
+ handler.deleteLine(n)
+
+
+ def n(self, proto, handler, buf):
+ if buf == b'6':
+ x, y = handler.reportCursorPosition()
+ proto.transport.write(b'\x1b['
+ + intToBytes(x+1)
+ + b';'
+ + intToBytes(y+1)
+ + b'R')
+ else:
+ handler.unhandledControlSequence(b'\x1b[' + buf + b'n')
+
+
+ def m(self, proto, handler, buf):
+ if not buf:
+ handler.selectGraphicRendition(NORMAL)
+ else:
+ attrs = []
+ for a in buf.split(b';'):
+ try:
+ a = int(a)
+ except ValueError:
+ pass
+ attrs.append(a)
+ handler.selectGraphicRendition(*attrs)
+
+ controlSequenceParser = ControlSequenceParser()
+
+
+ def _handleHeightWidth(self, b):
+ if b == b'3':
+ self.terminal.doubleHeightLine(True)
+ elif b == b'4':
+ self.terminal.doubleHeightLine(False)
+ elif b == b'5':
+ self.terminal.singleWidthLine()
+ elif b == b'6':
+ self.terminal.doubleWidthLine()
+ else:
+ self.terminal.unhandledControlSequence(b'\x1b#' + b)
+
+
+__all__ = [
+ # Interfaces
+ 'ITerminalProtocol', 'ITerminalTransport',
+
+ # Symbolic constants
+ 'modes', 'privateModes', 'FUNCTION_KEYS',
+
+ 'CS_US', 'CS_UK', 'CS_DRAWING', 'CS_ALTERNATE', 'CS_ALTERNATE_SPECIAL',
+ 'G0', 'G1', 'G2', 'G3',
+
+ 'UNDERLINE', 'REVERSE_VIDEO', 'BLINK', 'BOLD', 'NORMAL',
+
+ # Protocol classes
+ 'ServerProtocol', 'ClientProtocol']