|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- # -*- test-case-name: twisted.conch.test.test_helper -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
-
- """
- Partial in-memory terminal emulator
-
- @author: Jp Calderone
- """
-
-
- import re
- import string
-
- from zope.interface import implementer
-
- from incremental import Version
-
- from twisted.conch.insults import insults
- from twisted.internet import defer, protocol, reactor
- from twisted.logger import Logger
- from twisted.python import _textattributes
- from twisted.python.compat import iterbytes
- from twisted.python.deprecate import deprecated, deprecatedModuleAttribute
-
- FOREGROUND = 30
- BACKGROUND = 40
- BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE, N_COLORS = range(9)
-
-
- class _FormattingState(_textattributes._FormattingStateMixin):
- """
- Represents the formatting state/attributes of a single character.
-
- Character set, intensity, underlinedness, blinkitude, video
- reversal, as well as foreground and background colors made up a
- character's attributes.
- """
-
- compareAttributes = (
- "charset",
- "bold",
- "underline",
- "blink",
- "reverseVideo",
- "foreground",
- "background",
- "_subtracting",
- )
-
- def __init__(
- self,
- charset=insults.G0,
- bold=False,
- underline=False,
- blink=False,
- reverseVideo=False,
- foreground=WHITE,
- background=BLACK,
- _subtracting=False,
- ):
- self.charset = charset
- self.bold = bold
- self.underline = underline
- self.blink = blink
- self.reverseVideo = reverseVideo
- self.foreground = foreground
- self.background = background
- self._subtracting = _subtracting
-
- @deprecated(Version("Twisted", 13, 1, 0))
- def wantOne(self, **kw):
- """
- Add a character attribute to a copy of this formatting state.
-
- @param kw: An optional attribute name and value can be provided with
- a keyword argument.
-
- @return: A formatting state instance with the new attribute.
-
- @see: L{DefaultFormattingState._withAttribute}.
- """
- k, v = kw.popitem()
- return self._withAttribute(k, v)
-
- def toVT102(self):
- # Spit out a vt102 control sequence that will set up
- # all the attributes set here. Except charset.
- attrs = []
- if self._subtracting:
- attrs.append(0)
- if self.bold:
- attrs.append(insults.BOLD)
- if self.underline:
- attrs.append(insults.UNDERLINE)
- if self.blink:
- attrs.append(insults.BLINK)
- if self.reverseVideo:
- attrs.append(insults.REVERSE_VIDEO)
- if self.foreground != WHITE:
- attrs.append(FOREGROUND + self.foreground)
- if self.background != BLACK:
- attrs.append(BACKGROUND + self.background)
- if attrs:
- return "\x1b[" + ";".join(map(str, attrs)) + "m"
- return ""
-
-
- CharacterAttribute = _FormattingState
-
- deprecatedModuleAttribute(
- Version("Twisted", 13, 1, 0),
- "Use twisted.conch.insults.text.assembleFormattedText instead.",
- "twisted.conch.insults.helper",
- "CharacterAttribute",
- )
-
-
- # XXX - need to support scroll regions and scroll history
- @implementer(insults.ITerminalTransport)
- class TerminalBuffer(protocol.Protocol):
- """
- An in-memory terminal emulator.
- """
-
- for keyID in (
- b"UP_ARROW",
- b"DOWN_ARROW",
- b"RIGHT_ARROW",
- b"LEFT_ARROW",
- b"HOME",
- b"INSERT",
- b"DELETE",
- b"END",
- b"PGUP",
- b"PGDN",
- b"F1",
- b"F2",
- b"F3",
- b"F4",
- b"F5",
- b"F6",
- b"F7",
- b"F8",
- b"F9",
- b"F10",
- b"F11",
- b"F12",
- ):
- execBytes = keyID + b" = object()"
- execStr = execBytes.decode("ascii")
- exec(execStr)
-
- TAB = b"\t"
- BACKSPACE = b"\x7f"
-
- width = 80
- height = 24
-
- fill = b" "
- void = object()
- _log = Logger()
-
- def getCharacter(self, x, y):
- return self.lines[y][x]
-
- def connectionMade(self):
- self.reset()
-
- def write(self, data):
- """
- Add the given printable bytes to the terminal.
-
- Line feeds in L{bytes} will be replaced with carriage return / line
- feed pairs.
- """
- for b in iterbytes(data.replace(b"\n", b"\r\n")):
- self.insertAtCursor(b)
-
- def _currentFormattingState(self):
- return _FormattingState(self.activeCharset, **self.graphicRendition)
-
- def insertAtCursor(self, b):
- """
- Add one byte to the terminal at the cursor and make consequent state
- updates.
-
- If b is a carriage return, move the cursor to the beginning of the
- current row.
-
- If b is a line feed, move the cursor to the next row or scroll down if
- the cursor is already in the last row.
-
- Otherwise, if b is printable, put it at the cursor position (inserting
- or overwriting as dictated by the current mode) and move the cursor.
- """
- if b == b"\r":
- self.x = 0
- elif b == b"\n":
- self._scrollDown()
- elif b in string.printable.encode("ascii"):
- if self.x >= self.width:
- self.nextLine()
- ch = (b, self._currentFormattingState())
- if self.modes.get(insults.modes.IRM):
- self.lines[self.y][self.x : self.x] = [ch]
- self.lines[self.y].pop()
- else:
- self.lines[self.y][self.x] = ch
- self.x += 1
-
- def _emptyLine(self, width):
- return [(self.void, self._currentFormattingState()) for i in range(width)]
-
- def _scrollDown(self):
- self.y += 1
- if self.y >= self.height:
- self.y -= 1
- del self.lines[0]
- self.lines.append(self._emptyLine(self.width))
-
- def _scrollUp(self):
- self.y -= 1
- if self.y < 0:
- self.y = 0
- del self.lines[-1]
- self.lines.insert(0, self._emptyLine(self.width))
-
- def cursorUp(self, n=1):
- self.y = max(0, self.y - n)
-
- def cursorDown(self, n=1):
- self.y = min(self.height - 1, self.y + n)
-
- def cursorBackward(self, n=1):
- self.x = max(0, self.x - n)
-
- def cursorForward(self, n=1):
- self.x = min(self.width, self.x + n)
-
- def cursorPosition(self, column, line):
- self.x = column
- self.y = line
-
- def cursorHome(self):
- self.x = self.home.x
- self.y = self.home.y
-
- def index(self):
- self._scrollDown()
-
- def reverseIndex(self):
- self._scrollUp()
-
- def nextLine(self):
- """
- Update the cursor position attributes and scroll down if appropriate.
- """
- self.x = 0
- self._scrollDown()
-
- def saveCursor(self):
- self._savedCursor = (self.x, self.y)
-
- def restoreCursor(self):
- self.x, self.y = self._savedCursor
- del self._savedCursor
-
- def setModes(self, modes):
- for m in modes:
- self.modes[m] = True
-
- def resetModes(self, modes):
- for m in modes:
- try:
- del self.modes[m]
- except KeyError:
- pass
-
- def setPrivateModes(self, modes):
- """
- Enable the given modes.
-
- Track which modes have been enabled so that the implementations of
- other L{insults.ITerminalTransport} methods can be properly implemented
- to respect these settings.
-
- @see: L{resetPrivateModes}
- @see: L{insults.ITerminalTransport.setPrivateModes}
- """
- for m in modes:
- self.privateModes[m] = True
-
- def resetPrivateModes(self, modes):
- """
- Disable the given modes.
-
- @see: L{setPrivateModes}
- @see: L{insults.ITerminalTransport.resetPrivateModes}
- """
- for m in modes:
- try:
- del self.privateModes[m]
- except KeyError:
- pass
-
- def applicationKeypadMode(self):
- self.keypadMode = "app"
-
- def numericKeypadMode(self):
- self.keypadMode = "num"
-
- def selectCharacterSet(self, charSet, which):
- self.charsets[which] = charSet
-
- def shiftIn(self):
- self.activeCharset = insults.G0
-
- def shiftOut(self):
- self.activeCharset = insults.G1
-
- def singleShift2(self):
- oldActiveCharset = self.activeCharset
- self.activeCharset = insults.G2
- f = self.insertAtCursor
-
- def insertAtCursor(b):
- f(b)
- del self.insertAtCursor
- self.activeCharset = oldActiveCharset
-
- self.insertAtCursor = insertAtCursor
-
- def singleShift3(self):
- oldActiveCharset = self.activeCharset
- self.activeCharset = insults.G3
- f = self.insertAtCursor
-
- def insertAtCursor(b):
- f(b)
- del self.insertAtCursor
- self.activeCharset = oldActiveCharset
-
- self.insertAtCursor = insertAtCursor
-
- def selectGraphicRendition(self, *attributes):
- for a in attributes:
- if a == insults.NORMAL:
- self.graphicRendition = {
- "bold": False,
- "underline": False,
- "blink": False,
- "reverseVideo": False,
- "foreground": WHITE,
- "background": BLACK,
- }
- elif a == insults.BOLD:
- self.graphicRendition["bold"] = True
- elif a == insults.UNDERLINE:
- self.graphicRendition["underline"] = True
- elif a == insults.BLINK:
- self.graphicRendition["blink"] = True
- elif a == insults.REVERSE_VIDEO:
- self.graphicRendition["reverseVideo"] = True
- else:
- try:
- v = int(a)
- except ValueError:
- self._log.error(
- "Unknown graphic rendition attribute: {attr!r}", attr=a
- )
- else:
- if FOREGROUND <= v <= FOREGROUND + N_COLORS:
- self.graphicRendition["foreground"] = v - FOREGROUND
- elif BACKGROUND <= v <= BACKGROUND + N_COLORS:
- self.graphicRendition["background"] = v - BACKGROUND
- else:
- self._log.error(
- "Unknown graphic rendition attribute: {attr!r}", attr=a
- )
-
- def eraseLine(self):
- self.lines[self.y] = self._emptyLine(self.width)
-
- def eraseToLineEnd(self):
- width = self.width - self.x
- self.lines[self.y][self.x :] = self._emptyLine(width)
-
- def eraseToLineBeginning(self):
- self.lines[self.y][: self.x + 1] = self._emptyLine(self.x + 1)
-
- def eraseDisplay(self):
- self.lines = [self._emptyLine(self.width) for i in range(self.height)]
-
- def eraseToDisplayEnd(self):
- self.eraseToLineEnd()
- height = self.height - self.y - 1
- self.lines[self.y + 1 :] = [self._emptyLine(self.width) for i in range(height)]
-
- def eraseToDisplayBeginning(self):
- self.eraseToLineBeginning()
- self.lines[: self.y] = [self._emptyLine(self.width) for i in range(self.y)]
-
- def deleteCharacter(self, n=1):
- del self.lines[self.y][self.x : self.x + n]
- self.lines[self.y].extend(self._emptyLine(min(self.width - self.x, n)))
-
- def insertLine(self, n=1):
- self.lines[self.y : self.y] = [self._emptyLine(self.width) for i in range(n)]
- del self.lines[self.height :]
-
- def deleteLine(self, n=1):
- del self.lines[self.y : self.y + n]
- self.lines.extend([self._emptyLine(self.width) for i in range(n)])
-
- def reportCursorPosition(self):
- return (self.x, self.y)
-
- def reset(self):
- self.home = insults.Vector(0, 0)
- self.x = self.y = 0
- self.modes = {}
- self.privateModes = {}
- self.setPrivateModes(
- [insults.privateModes.AUTO_WRAP, insults.privateModes.CURSOR_MODE]
- )
- self.numericKeypad = "app"
- self.activeCharset = insults.G0
- self.graphicRendition = {
- "bold": False,
- "underline": False,
- "blink": False,
- "reverseVideo": False,
- "foreground": WHITE,
- "background": BLACK,
- }
- self.charsets = {
- insults.G0: insults.CS_US,
- insults.G1: insults.CS_US,
- insults.G2: insults.CS_ALTERNATE,
- insults.G3: insults.CS_ALTERNATE_SPECIAL,
- }
- self.eraseDisplay()
-
- def unhandledControlSequence(self, buf):
- print("Could not handle", repr(buf))
-
- def __bytes__(self):
- lines = []
- for L in self.lines:
- buf = []
- length = 0
- for (ch, attr) in L:
- if ch is not self.void:
- buf.append(ch)
- length = len(buf)
- else:
- buf.append(self.fill)
- lines.append(b"".join(buf[:length]))
- return b"\n".join(lines)
-
- def getHost(self):
- # ITransport.getHost
- raise NotImplementedError("Unimplemented: TerminalBuffer.getHost")
-
- def getPeer(self):
- # ITransport.getPeer
- raise NotImplementedError("Unimplemented: TerminalBuffer.getPeer")
-
- def loseConnection(self):
- # ITransport.loseConnection
- raise NotImplementedError("Unimplemented: TerminalBuffer.loseConnection")
-
- def writeSequence(self, data):
- # ITransport.writeSequence
- raise NotImplementedError("Unimplemented: TerminalBuffer.writeSequence")
-
- def horizontalTabulationSet(self):
- # ITerminalTransport.horizontalTabulationSet
- raise NotImplementedError(
- "Unimplemented: TerminalBuffer.horizontalTabulationSet"
- )
-
- def tabulationClear(self):
- # TerminalTransport.tabulationClear
- raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClear")
-
- def tabulationClearAll(self):
- # TerminalTransport.tabulationClearAll
- raise NotImplementedError("Unimplemented: TerminalBuffer.tabulationClearAll")
-
- def doubleHeightLine(self, top=True):
- # ITerminalTransport.doubleHeightLine
- raise NotImplementedError("Unimplemented: TerminalBuffer.doubleHeightLine")
-
- def singleWidthLine(self):
- # ITerminalTransport.singleWidthLine
- raise NotImplementedError("Unimplemented: TerminalBuffer.singleWidthLine")
-
- def doubleWidthLine(self):
- # ITerminalTransport.doubleWidthLine
- raise NotImplementedError("Unimplemented: TerminalBuffer.doubleWidthLine")
-
-
- class ExpectationTimeout(Exception):
- pass
-
-
- class ExpectableBuffer(TerminalBuffer):
- _mark = 0
-
- def connectionMade(self):
- TerminalBuffer.connectionMade(self)
- self._expecting = []
-
- def write(self, data):
- TerminalBuffer.write(self, data)
- self._checkExpected()
-
- def cursorHome(self):
- TerminalBuffer.cursorHome(self)
- self._mark = 0
-
- def _timeoutExpected(self, d):
- d.errback(ExpectationTimeout())
- self._checkExpected()
-
- def _checkExpected(self):
- s = self.__bytes__()[self._mark :]
- while self._expecting:
- expr, timer, deferred = self._expecting[0]
- if timer and not timer.active():
- del self._expecting[0]
- continue
- for match in expr.finditer(s):
- if timer:
- timer.cancel()
- del self._expecting[0]
- self._mark += match.end()
- s = s[match.end() :]
- deferred.callback(match)
- break
- else:
- return
-
- def expect(self, expression, timeout=None, scheduler=reactor):
- d = defer.Deferred()
- timer = None
- if timeout:
- timer = scheduler.callLater(timeout, self._timeoutExpected, d)
- self._expecting.append((re.compile(expression), timer, d))
- self._checkExpected()
- return d
-
-
- __all__ = ["CharacterAttribute", "TerminalBuffer", "ExpectableBuffer"]
|