File Explorer

/proc/self/root/proc/1/task/1/root/lib64/python3.9

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

telnetlib.py22.7 KB · 678 lines
r"""TELNET client class. Based on RFC 854: TELNET Protocol Specification, by J. Postel andJ. Reynolds Example: >>> from telnetlib import Telnet>>> tn = Telnet('www.python.org', 79)   # connect to finger port>>> tn.write(b'guido\r\n')>>> print(tn.read_all())Login       Name               TTY         Idle    When    Whereguido    Guido van Rossum      pts/2        <Dec  2 11:10> snag.cnri.reston.. >>> Note that read_all() won't read until eof -- it just reads some data-- but it guarantees to read at least one byte unless EOF is hit. It is possible to pass a Telnet object to a selector in order to wait untilmore data is available.  Note that in this case, read_eager() may return b''even if there was data on the socket, because the protocol negotiation may haveeaten the data.  This is why EOFError is needed in some cases to distinguishbetween "no data" and "connection closed" (since the socket also appears readyfor reading when it is closed). To do:- option negotiation- timeout should be intrinsic to the connection object instead of an  option on one of the read calls only """  # Imported modulesimport sysimport socketimport selectorsfrom time import monotonic as _time __all__ = ["Telnet"] # Tunable parametersDEBUGLEVEL = 0 # Telnet protocol defaultsTELNET_PORT = 23 # Telnet protocol characters (don't change)IAC  = bytes([255]) # "Interpret As Command"DONT = bytes([254])DO   = bytes([253])WONT = bytes([252])WILL = bytes([251])theNULL = bytes([0]) SE  = bytes([240])  # Subnegotiation EndNOP = bytes([241])  # No OperationDM  = bytes([242])  # Data MarkBRK = bytes([243])  # BreakIP  = bytes([244])  # Interrupt processAO  = bytes([245])  # Abort outputAYT = bytes([246])  # Are You ThereEC  = bytes([247])  # Erase CharacterEL  = bytes([248])  # Erase LineGA  = bytes([249])  # Go AheadSB =  bytes([250])  # Subnegotiation Begin  # Telnet protocol options code (don't change)# These ones all come from arpa/telnet.hBINARY = bytes([0]) # 8-bit data pathECHO = bytes([1]) # echoRCP = bytes([2]) # prepare to reconnectSGA = bytes([3]) # suppress go aheadNAMS = bytes([4]) # approximate message sizeSTATUS = bytes([5]) # give statusTM = bytes([6]) # timing markRCTE = bytes([7]) # remote controlled transmission and echoNAOL = bytes([8]) # negotiate about output line widthNAOP = bytes([9]) # negotiate about output page sizeNAOCRD = bytes([10]) # negotiate about CR dispositionNAOHTS = bytes([11]) # negotiate about horizontal tabstopsNAOHTD = bytes([12]) # negotiate about horizontal tab dispositionNAOFFD = bytes([13]) # negotiate about formfeed dispositionNAOVTS = bytes([14]) # negotiate about vertical tab stopsNAOVTD = bytes([15]) # negotiate about vertical tab dispositionNAOLFD = bytes([16]) # negotiate about output LF dispositionXASCII = bytes([17]) # extended ascii character setLOGOUT = bytes([18]) # force logoutBM = bytes([19]) # byte macroDET = bytes([20]) # data entry terminalSUPDUP = bytes([21]) # supdup protocolSUPDUPOUTPUT = bytes([22]) # supdup outputSNDLOC = bytes([23]) # send locationTTYPE = bytes([24]) # terminal typeEOR = bytes([25]) # end or recordTUID = bytes([26]) # TACACS user identificationOUTMRK = bytes([27]) # output markingTTYLOC = bytes([28]) # terminal location numberVT3270REGIME = bytes([29]) # 3270 regimeX3PAD = bytes([30]) # X.3 PADNAWS = bytes([31]) # window sizeTSPEED = bytes([32]) # terminal speedLFLOW = bytes([33]) # remote flow controlLINEMODE = bytes([34]) # Linemode optionXDISPLOC = bytes([35]) # X Display LocationOLD_ENVIRON = bytes([36]) # Old - Environment variablesAUTHENTICATION = bytes([37]) # AuthenticateENCRYPT = bytes([38]) # Encryption optionNEW_ENVIRON = bytes([39]) # New - Environment variables# the following ones come from# http://www.iana.org/assignments/telnet-options# Unfortunately, that document does not assign identifiers# to all of them, so we are making them upTN3270E = bytes([40]) # TN3270EXAUTH = bytes([41]) # XAUTHCHARSET = bytes([42]) # CHARSETRSP = bytes([43]) # Telnet Remote Serial PortCOM_PORT_OPTION = bytes([44]) # Com Port Control OptionSUPPRESS_LOCAL_ECHO = bytes([45]) # Telnet Suppress Local EchoTLS = bytes([46]) # Telnet Start TLSKERMIT = bytes([47]) # KERMITSEND_URL = bytes([48]) # SEND-URLFORWARD_X = bytes([49]) # FORWARD_XPRAGMA_LOGON = bytes([138]) # TELOPT PRAGMA LOGONSSPI_LOGON = bytes([139]) # TELOPT SSPI LOGONPRAGMA_HEARTBEAT = bytes([140]) # TELOPT PRAGMA HEARTBEATEXOPL = bytes([255]) # Extended-Options-ListNOOPT = bytes([0])  # poll/select have the advantage of not requiring any extra file descriptor,# contrarily to epoll/kqueue (also, they require a single syscall).if hasattr(selectors, 'PollSelector'):    _TelnetSelector = selectors.PollSelectorelse:    _TelnetSelector = selectors.SelectSelector  class Telnet:     """Telnet interface class.     An instance of this class represents a connection to a telnet    server.  The instance is initially not connected; the open()    method must be used to establish a connection.  Alternatively, the    host name and optional port number can be passed to the    constructor, too.     Don't try to reopen an already connected instance.     This class has many read_*() methods.  Note that some of them    raise EOFError when the end of the connection is read, because    they can return an empty string for other reasons.  See the    individual doc strings.     read_until(expected, [timeout])        Read until the expected string has been seen, or a timeout is        hit (default is no timeout); may block.     read_all()        Read all data until EOF; may block.     read_some()        Read at least one byte or EOF; may block.     read_very_eager()        Read all data available already queued or on the socket,        without blocking.     read_eager()        Read either data already queued or some data available on the        socket, without blocking.     read_lazy()        Read all data in the raw queue (processing it first), without        doing any socket I/O.     read_very_lazy()        Reads all data in the cooked queue, without doing any socket        I/O.     read_sb_data()        Reads available data between SB ... SE sequence. Don't block.     set_option_negotiation_callback(callback)        Each time a telnet option is read on the input flow, this callback        (if set) is called with the following parameters :        callback(telnet socket, command, option)            option will be chr(0) when there is no option.        No other action is done afterwards by telnetlib.     """     def __init__(self, host=None, port=0,                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT):        """Constructor.         When called without arguments, create an unconnected instance.        With a hostname argument, it connects the instance; port number        and timeout are optional.        """        self.debuglevel = DEBUGLEVEL        self.host = host        self.port = port        self.timeout = timeout        self.sock = None        self.rawq = b''        self.irawq = 0        self.cookedq = b''        self.eof = 0        self.iacseq = b'' # Buffer for IAC sequence.        self.sb = 0 # flag for SB and SE sequence.        self.sbdataq = b''        self.option_callback = None        if host is not None:            self.open(host, port, timeout)     def open(self, host, port=0, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):        """Connect to a host.         The optional second argument is the port number, which        defaults to the standard telnet port (23).         Don't try to reopen an already connected instance.        """        self.eof = 0        if not port:            port = TELNET_PORT        self.host = host        self.port = port        self.timeout = timeout        sys.audit("telnetlib.Telnet.open", self, host, port)        self.sock = socket.create_connection((host, port), timeout)     def __del__(self):        """Destructor -- close the connection."""        self.close()     def msg(self, msg, *args):        """Print a debug message, when the debug level is > 0.         If extra arguments are present, they are substituted in the        message using the standard string formatting operator.         """        if self.debuglevel > 0:            print('Telnet(%s,%s):' % (self.host, self.port), end=' ')            if args:                print(msg % args)            else:                print(msg)     def set_debuglevel(self, debuglevel):        """Set the debug level.         The higher it is, the more debug output you get (on sys.stdout).         """        self.debuglevel = debuglevel     def close(self):        """Close the connection."""        sock = self.sock        self.sock = None        self.eof = True        self.iacseq = b''        self.sb = 0        if sock:            sock.close()     def get_socket(self):        """Return the socket object used internally."""        return self.sock     def fileno(self):        """Return the fileno() of the socket object used internally."""        return self.sock.fileno()     def write(self, buffer):        """Write a string to the socket, doubling any IAC characters.         Can block if the connection is blocked.  May raise        OSError if the connection is closed.         """        if IAC in buffer:            buffer = buffer.replace(IAC, IAC+IAC)        sys.audit("telnetlib.Telnet.write", self, buffer)        self.msg("send %r", buffer)        self.sock.sendall(buffer)     def read_until(self, match, timeout=None):        """Read until a given string is encountered or until timeout.         When no match is found, return whatever is available instead,        possibly the empty string.  Raise EOFError if the connection        is closed and no cooked data is available.         """        n = len(match)        self.process_rawq()        i = self.cookedq.find(match)        if i >= 0:            i = i+n            buf = self.cookedq[:i]            self.cookedq = self.cookedq[i:]            return buf        if timeout is not None:            deadline = _time() + timeout        with _TelnetSelector() as selector:            selector.register(self, selectors.EVENT_READ)            while not self.eof:                if selector.select(timeout):                    i = max(0, len(self.cookedq)-n)                    self.fill_rawq()                    self.process_rawq()                    i = self.cookedq.find(match, i)                    if i >= 0:                        i = i+n                        buf = self.cookedq[:i]                        self.cookedq = self.cookedq[i:]                        return buf                if timeout is not None:                    timeout = deadline - _time()                    if timeout < 0:                        break        return self.read_very_lazy()     def read_all(self):        """Read all data until EOF; block until connection closed."""        self.process_rawq()        while not self.eof:            self.fill_rawq()            self.process_rawq()        buf = self.cookedq        self.cookedq = b''        return buf     def read_some(self):        """Read at least one byte of cooked data unless EOF is hit.         Return b'' if EOF is hit.  Block if no data is immediately        available.         """        self.process_rawq()        while not self.cookedq and not self.eof:            self.fill_rawq()            self.process_rawq()        buf = self.cookedq        self.cookedq = b''        return buf     def read_very_eager(self):        """Read everything that's possible without blocking in I/O (eager).         Raise EOFError if connection closed and no cooked data        available.  Return b'' if no cooked data available otherwise.        Don't block unless in the midst of an IAC sequence.         """        self.process_rawq()        while not self.eof and self.sock_avail():            self.fill_rawq()            self.process_rawq()        return self.read_very_lazy()     def read_eager(self):        """Read readily available data.         Raise EOFError if connection closed and no cooked data        available.  Return b'' if no cooked data available otherwise.        Don't block unless in the midst of an IAC sequence.         """        self.process_rawq()        while not self.cookedq and not self.eof and self.sock_avail():            self.fill_rawq()            self.process_rawq()        return self.read_very_lazy()     def read_lazy(self):        """Process and return data that's already in the queues (lazy).         Raise EOFError if connection closed and no data available.        Return b'' if no cooked data available otherwise.  Don't block        unless in the midst of an IAC sequence.         """        self.process_rawq()        return self.read_very_lazy()     def read_very_lazy(self):        """Return any data available in the cooked queue (very lazy).         Raise EOFError if connection closed and no data available.        Return b'' if no cooked data available otherwise.  Don't block.         """        buf = self.cookedq        self.cookedq = b''        if not buf and self.eof and not self.rawq:            raise EOFError('telnet connection closed')        return buf     def read_sb_data(self):        """Return any data available in the SB ... SE queue.         Return b'' if no SB ... SE available. Should only be called        after seeing a SB or SE command. When a new SB command is        found, old unread SB data will be discarded. Don't block.         """        buf = self.sbdataq        self.sbdataq = b''        return buf     def set_option_negotiation_callback(self, callback):        """Provide a callback function called after each receipt of a telnet option."""        self.option_callback = callback     def process_rawq(self):        """Transfer from raw queue to cooked queue.         Set self.eof when connection is closed.  Don't block unless in        the midst of an IAC sequence.         """        buf = [b'', b'']        try:            while self.rawq:                c = self.rawq_getchar()                if not self.iacseq:                    if c == theNULL:                        continue                    if c == b"\021":                        continue                    if c != IAC:                        buf[self.sb] = buf[self.sb] + c                        continue                    else:                        self.iacseq += c                elif len(self.iacseq) == 1:                    # 'IAC: IAC CMD [OPTION only for WILL/WONT/DO/DONT]'                    if c in (DO, DONT, WILL, WONT):                        self.iacseq += c                        continue                     self.iacseq = b''                    if c == IAC:                        buf[self.sb] = buf[self.sb] + c                    else:                        if c == SB: # SB ... SE start.                            self.sb = 1                            self.sbdataq = b''                        elif c == SE:                            self.sb = 0                            self.sbdataq = self.sbdataq + buf[1]                            buf[1] = b''                        if self.option_callback:                            # Callback is supposed to look into                            # the sbdataq                            self.option_callback(self.sock, c, NOOPT)                        else:                            # We can't offer automatic processing of                            # suboptions. Alas, we should not get any                            # unless we did a WILL/DO before.                            self.msg('IAC %d not recognized' % ord(c))                elif len(self.iacseq) == 2:                    cmd = self.iacseq[1:2]                    self.iacseq = b''                    opt = c                    if cmd in (DO, DONT):                        self.msg('IAC %s %d',                            cmd == DO and 'DO' or 'DONT', ord(opt))                        if self.option_callback:                            self.option_callback(self.sock, cmd, opt)                        else:                            self.sock.sendall(IAC + WONT + opt)                    elif cmd in (WILL, WONT):                        self.msg('IAC %s %d',                            cmd == WILL and 'WILL' or 'WONT', ord(opt))                        if self.option_callback:                            self.option_callback(self.sock, cmd, opt)                        else:                            self.sock.sendall(IAC + DONT + opt)        except EOFError: # raised by self.rawq_getchar()            self.iacseq = b'' # Reset on EOF            self.sb = 0            pass        self.cookedq = self.cookedq + buf[0]        self.sbdataq = self.sbdataq + buf[1]     def rawq_getchar(self):        """Get next char from raw queue.         Block if no data is immediately available.  Raise EOFError        when connection is closed.         """        if not self.rawq:            self.fill_rawq()            if self.eof:                raise EOFError        c = self.rawq[self.irawq:self.irawq+1]        self.irawq = self.irawq + 1        if self.irawq >= len(self.rawq):            self.rawq = b''            self.irawq = 0        return c     def fill_rawq(self):        """Fill raw queue from exactly one recv() system call.         Block if no data is immediately available.  Set self.eof when        connection is closed.         """        if self.irawq >= len(self.rawq):            self.rawq = b''            self.irawq = 0        # The buffer size should be fairly small so as to avoid quadratic        # behavior in process_rawq() above        buf = self.sock.recv(50)        self.msg("recv %r", buf)        self.eof = (not buf)        self.rawq = self.rawq + buf     def sock_avail(self):        """Test whether data is available on the socket."""        with _TelnetSelector() as selector:            selector.register(self, selectors.EVENT_READ)            return bool(selector.select(0))     def interact(self):        """Interaction function, emulates a very dumb telnet client."""        if sys.platform == "win32":            self.mt_interact()            return        with _TelnetSelector() as selector:            selector.register(self, selectors.EVENT_READ)            selector.register(sys.stdin, selectors.EVENT_READ)             while True:                for key, events in selector.select():                    if key.fileobj is self:                        try:                            text = self.read_eager()                        except EOFError:                            print('*** Connection closed by remote host ***')                            return                        if text:                            sys.stdout.write(text.decode('ascii'))                            sys.stdout.flush()                    elif key.fileobj is sys.stdin:                        line = sys.stdin.readline().encode('ascii')                        if not line:                            return                        self.write(line)     def mt_interact(self):        """Multithreaded version of interact()."""        import _thread        _thread.start_new_thread(self.listener, ())        while 1:            line = sys.stdin.readline()            if not line:                break            self.write(line.encode('ascii'))     def listener(self):        """Helper for mt_interact() -- this executes in the other thread."""        while 1:            try:                data = self.read_eager()            except EOFError:                print('*** Connection closed by remote host ***')                return            if data:                sys.stdout.write(data.decode('ascii'))            else:                sys.stdout.flush()     def expect(self, list, timeout=None):        """Read until one from a list of a regular expressions matches.         The first argument is a list of regular expressions, either        compiled (re.Pattern instances) or uncompiled (strings).        The optional second argument is a timeout, in seconds; default        is no timeout.         Return a tuple of three items: the index in the list of the        first regular expression that matches; the re.Match object        returned; and the text read up till and including the match.         If EOF is read and no text was read, raise EOFError.        Otherwise, when nothing matches, return (-1, None, text) where        text is the text received so far (may be the empty string if a        timeout happened).         If a regular expression ends with a greedy match (e.g. '.*')        or if more than one expression can match the same input, the        results are undeterministic, and may depend on the I/O timing.         """        re = None        list = list[:]        indices = range(len(list))        for i in indices:            if not hasattr(list[i], "search"):                if not re: import re                list[i] = re.compile(list[i])        if timeout is not None:            deadline = _time() + timeout        with _TelnetSelector() as selector:            selector.register(self, selectors.EVENT_READ)            while not self.eof:                self.process_rawq()                for i in indices:                    m = list[i].search(self.cookedq)                    if m:                        e = m.end()                        text = self.cookedq[:e]                        self.cookedq = self.cookedq[e:]                        return (i, m, text)                if timeout is not None:                    ready = selector.select(timeout)                    timeout = deadline - _time()                    if not ready:                        if timeout < 0:                            break                        else:                            continue                self.fill_rawq()        text = self.read_very_lazy()        if not text and self.eof:            raise EOFError        return (-1, None, text)     def __enter__(self):        return self     def __exit__(self, type, value, traceback):        self.close()  def test():    """Test program for telnetlib.     Usage: python telnetlib.py [-d] ... [host [port]]     Default host is localhost; default port is 23.     """    debuglevel = 0    while sys.argv[1:] and sys.argv[1] == '-d':        debuglevel = debuglevel+1        del sys.argv[1]    host = 'localhost'    if sys.argv[1:]:        host = sys.argv[1]    port = 0    if sys.argv[2:]:        portstr = sys.argv[2]        try:            port = int(portstr)        except ValueError:            port = socket.getservbyname(portstr, 'tcp')    with Telnet() as tn:        tn.set_debuglevel(debuglevel)        tn.open(host, port, timeout=0.5)        tn.interact() if __name__ == '__main__':    test()