File Explorer

/proc/self/root/proc/self/root/var/runtime/node_modules/@aws-sdk/node_modules/mqtt-packet

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

parser.js19.9 KB · 717 lines
const bl = require('bl')const EventEmitter = require('events')const Packet = require('./packet')const constants = require('./constants')const debug = require('debug')('mqtt-packet:parser') class Parser extends EventEmitter {  constructor () {    super()    this.parser = this.constructor.parser  }   static parser (opt) {    if (!(this instanceof Parser)) return (new Parser()).parser(opt)     this.settings = opt || {}     this._states = [      '_parseHeader',      '_parseLength',      '_parsePayload',      '_newPacket'    ]     this._resetState()    return this  }   _resetState () {    debug('_resetState: resetting packet, error, _list, and _stateCounter')    this.packet = new Packet()    this.error = null    this._list = bl()    this._stateCounter = 0  }   parse (buf) {    if (this.error) this._resetState()     this._list.append(buf)    debug('parse: current state: %s', this._states[this._stateCounter])    while ((this.packet.length !== -1 || this._list.length > 0) &&      this[this._states[this._stateCounter]]() &&      !this.error) {      this._stateCounter++      debug('parse: state complete. _stateCounter is now: %d', this._stateCounter)      debug('parse: packet.length: %d, buffer list length: %d', this.packet.length, this._list.length)      if (this._stateCounter >= this._states.length) this._stateCounter = 0    }    debug('parse: exited while loop. packet: %d, buffer list length: %d', this.packet.length, this._list.length)    return this._list.length  }   _parseHeader () {    // There is at least one byte in the buffer    const zero = this._list.readUInt8(0)    this.packet.cmd = constants.types[zero >> constants.CMD_SHIFT]    this.packet.retain = (zero & constants.RETAIN_MASK) !== 0    this.packet.qos = (zero >> constants.QOS_SHIFT) & constants.QOS_MASK    this.packet.dup = (zero & constants.DUP_MASK) !== 0    debug('_parseHeader: packet: %o', this.packet)     this._list.consume(1)     return true  }   _parseLength () {    // There is at least one byte in the list    const result = this._parseVarByteNum(true)     if (result) {      this.packet.length = result.value      this._list.consume(result.bytes)    }    debug('_parseLength %d', result.value)    return !!result  }   _parsePayload () {    debug('_parsePayload: payload %O', this._list)    let result = false     // Do we have a payload? Do we have enough data to complete the payload?    // PINGs have no payload    if (this.packet.length === 0 || this._list.length >= this.packet.length) {      this._pos = 0       switch (this.packet.cmd) {        case 'connect':          this._parseConnect()          break        case 'connack':          this._parseConnack()          break        case 'publish':          this._parsePublish()          break        case 'puback':        case 'pubrec':        case 'pubrel':        case 'pubcomp':          this._parseConfirmation()          break        case 'subscribe':          this._parseSubscribe()          break        case 'suback':          this._parseSuback()          break        case 'unsubscribe':          this._parseUnsubscribe()          break        case 'unsuback':          this._parseUnsuback()          break        case 'pingreq':        case 'pingresp':          // These are empty, nothing to do          break        case 'disconnect':          this._parseDisconnect()          break        case 'auth':          this._parseAuth()          break        default:          this._emitError(new Error('Not supported'))      }       result = true    }    debug('_parsePayload complete result: %s', result)    return result  }   _parseConnect () {    debug('_parseConnect')    let topic // Will topic    let payload // Will payload    let password // Password    let username // Username    const flags = {}    const packet = this.packet     // Parse protocolId    const protocolId = this._parseString()     if (protocolId === null) return this._emitError(new Error('Cannot parse protocolId'))    if (protocolId !== 'MQTT' && protocolId !== 'MQIsdp') {      return this._emitError(new Error('Invalid protocolId'))    }     packet.protocolId = protocolId     // Parse constants version number    if (this._pos >= this._list.length) return this._emitError(new Error('Packet too short'))     packet.protocolVersion = this._list.readUInt8(this._pos)     if (packet.protocolVersion >= 128) {      packet.bridgeMode = true      packet.protocolVersion = packet.protocolVersion - 128    }     if (packet.protocolVersion !== 3 && packet.protocolVersion !== 4 && packet.protocolVersion !== 5) {      return this._emitError(new Error('Invalid protocol version'))    }     this._pos++     if (this._pos >= this._list.length) {      return this._emitError(new Error('Packet too short'))    }     // Parse connect flags    flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)    flags.password = (this._list.readUInt8(this._pos) & constants.PASSWORD_MASK)    flags.will = (this._list.readUInt8(this._pos) & constants.WILL_FLAG_MASK)     if (flags.will) {      packet.will = {}      packet.will.retain = (this._list.readUInt8(this._pos) & constants.WILL_RETAIN_MASK) !== 0      packet.will.qos = (this._list.readUInt8(this._pos) &        constants.WILL_QOS_MASK) >> constants.WILL_QOS_SHIFT    }     packet.clean = (this._list.readUInt8(this._pos) & constants.CLEAN_SESSION_MASK) !== 0    this._pos++     // Parse keepalive    packet.keepalive = this._parseNum()    if (packet.keepalive === -1) return this._emitError(new Error('Packet too short'))     // parse properties    if (packet.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }    // Parse clientId    const clientId = this._parseString()    if (clientId === null) return this._emitError(new Error('Packet too short'))    packet.clientId = clientId    debug('_parseConnect: packet.clientId: %s', packet.clientId)     if (flags.will) {      if (packet.protocolVersion === 5) {        const willProperties = this._parseProperties()        if (Object.getOwnPropertyNames(willProperties).length) {          packet.will.properties = willProperties        }      }      // Parse will topic      topic = this._parseString()      if (topic === null) return this._emitError(new Error('Cannot parse will topic'))      packet.will.topic = topic      debug('_parseConnect: packet.will.topic: %s', packet.will.topic)       // Parse will payload      payload = this._parseBuffer()      if (payload === null) return this._emitError(new Error('Cannot parse will payload'))      packet.will.payload = payload      debug('_parseConnect: packet.will.paylaod: %s', packet.will.payload)    }     // Parse username    if (flags.username) {      username = this._parseString()      if (username === null) return this._emitError(new Error('Cannot parse username'))      packet.username = username      debug('_parseConnect: packet.username: %s', packet.username)    }     // Parse password    if (flags.password) {      password = this._parseBuffer()      if (password === null) return this._emitError(new Error('Cannot parse password'))      packet.password = password    }    // need for right parse auth packet and self set up    this.settings = packet    debug('_parseConnect: complete')    return packet  }   _parseConnack () {    debug('_parseConnack')    const packet = this.packet     if (this._list.length < 1) return null    packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)     if (this.settings.protocolVersion === 5) {      if (this._list.length >= 2) {        packet.reasonCode = this._list.readUInt8(this._pos++)      } else {        packet.reasonCode = 0      }    } else {      if (this._list.length < 2) return null      packet.returnCode = this._list.readUInt8(this._pos++)    }     if (packet.returnCode === -1 || packet.reasonCode === -1) return this._emitError(new Error('Cannot parse return code'))    // mqtt 5 properties    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }    debug('_parseConnack: complete')  }   _parsePublish () {    debug('_parsePublish')    const packet = this.packet    packet.topic = this._parseString()     if (packet.topic === null) return this._emitError(new Error('Cannot parse topic'))     // Parse messageId    if (packet.qos > 0) if (!this._parseMessageId()) { return }     // Properties mqtt 5    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }     packet.payload = this._list.slice(this._pos, packet.length)    debug('_parsePublish: payload from buffer list: %o', packet.payload)  }   _parseSubscribe () {    debug('_parseSubscribe')    const packet = this.packet    let topic    let options    let qos    let rh    let rap    let nl    let subscription     if (packet.qos !== 1) {      return this._emitError(new Error('Wrong subscribe header'))    }     packet.subscriptions = []     if (!this._parseMessageId()) { return }     // Properties mqtt 5    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }     while (this._pos < packet.length) {      // Parse topic      topic = this._parseString()      if (topic === null) return this._emitError(new Error('Cannot parse topic'))      if (this._pos >= packet.length) return this._emitError(new Error('Malformed Subscribe Payload'))       options = this._parseByte()      qos = options & constants.SUBSCRIBE_OPTIONS_QOS_MASK      nl = ((options >> constants.SUBSCRIBE_OPTIONS_NL_SHIFT) & constants.SUBSCRIBE_OPTIONS_NL_MASK) !== 0      rap = ((options >> constants.SUBSCRIBE_OPTIONS_RAP_SHIFT) & constants.SUBSCRIBE_OPTIONS_RAP_MASK) !== 0      rh = (options >> constants.SUBSCRIBE_OPTIONS_RH_SHIFT) & constants.SUBSCRIBE_OPTIONS_RH_MASK       subscription = { topic, qos }       // mqtt 5 options      if (this.settings.protocolVersion === 5) {        subscription.nl = nl        subscription.rap = rap        subscription.rh = rh      } else if (this.settings.bridgeMode) {        subscription.rh = 0        subscription.rap = true        subscription.nl = true      }       // Push pair to subscriptions      debug('_parseSubscribe: push subscription `%s` to subscription', subscription)      packet.subscriptions.push(subscription)    }  }   _parseSuback () {    debug('_parseSuback')    const packet = this.packet    this.packet.granted = []     if (!this._parseMessageId()) { return }     // Properties mqtt 5    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }     // Parse granted QoSes    while (this._pos < this.packet.length) {      this.packet.granted.push(this._list.readUInt8(this._pos++))    }  }   _parseUnsubscribe () {    debug('_parseUnsubscribe')    const packet = this.packet     packet.unsubscriptions = []     // Parse messageId    if (!this._parseMessageId()) { return }     // Properties mqtt 5    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }     while (this._pos < packet.length) {      // Parse topic      const topic = this._parseString()      if (topic === null) return this._emitError(new Error('Cannot parse topic'))       // Push topic to unsubscriptions      debug('_parseUnsubscribe: push topic `%s` to unsubscriptions', topic)      packet.unsubscriptions.push(topic)    }  }   _parseUnsuback () {    debug('_parseUnsuback')    const packet = this.packet    if (!this._parseMessageId()) return this._emitError(new Error('Cannot parse messageId'))    // Properties mqtt 5    if (this.settings.protocolVersion === 5) {      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }      // Parse granted QoSes      packet.granted = []      while (this._pos < this.packet.length) {        this.packet.granted.push(this._list.readUInt8(this._pos++))      }    }  }   // parse packets like puback, pubrec, pubrel, pubcomp  _parseConfirmation () {    debug('_parseConfirmation: packet.cmd: `%s`', this.packet.cmd)    const packet = this.packet     this._parseMessageId()     if (this.settings.protocolVersion === 5) {      if (packet.length > 2) {        // response code        packet.reasonCode = this._parseByte()        debug('_parseConfirmation: packet.reasonCode `%d`', packet.reasonCode)      } else {        packet.reasonCode = 0      }       if (packet.length > 3) {        // properies mqtt 5        const properties = this._parseProperties()        if (Object.getOwnPropertyNames(properties).length) {          packet.properties = properties        }      }    }     return true  }   // parse disconnect packet  _parseDisconnect () {    const packet = this.packet    debug('_parseDisconnect')     if (this.settings.protocolVersion === 5) {      // response code      if (this._list.length > 0) {        packet.reasonCode = this._parseByte()      } else {        packet.reasonCode = 0      }      // properies mqtt 5      const properties = this._parseProperties()      if (Object.getOwnPropertyNames(properties).length) {        packet.properties = properties      }    }     debug('_parseDisconnect result: true')    return true  }   // parse auth packet  _parseAuth () {    debug('_parseAuth')    const packet = this.packet     if (this.settings.protocolVersion !== 5) {      return this._emitError(new Error('Not supported auth packet for this version MQTT'))    }     // response code    packet.reasonCode = this._parseByte()    // properies mqtt 5    const properties = this._parseProperties()    if (Object.getOwnPropertyNames(properties).length) {      packet.properties = properties    }     debug('_parseAuth: result: true')    return true  }   _parseMessageId () {    const packet = this.packet     packet.messageId = this._parseNum()     if (packet.messageId === null) {      this._emitError(new Error('Cannot parse messageId'))      return false    }     debug('_parseMessageId: packet.messageId %d', packet.messageId)    return true  }   _parseString (maybeBuffer) {    const length = this._parseNum()    const end = length + this._pos     if (length === -1 || end > this._list.length || end > this.packet.length) return null     const result = this._list.toString('utf8', this._pos, end)    this._pos += length    debug('_parseString: result: %s', result)    return result  }   _parseStringPair () {    debug('_parseStringPair')    return {      name: this._parseString(),      value: this._parseString()    }  }   _parseBuffer () {    const length = this._parseNum()    const end = length + this._pos     if (length === -1 || end > this._list.length || end > this.packet.length) return null     const result = this._list.slice(this._pos, end)     this._pos += length    debug('_parseBuffer: result: %o', result)    return result  }   _parseNum () {    if (this._list.length - this._pos < 2) return -1     const result = this._list.readUInt16BE(this._pos)    this._pos += 2    debug('_parseNum: result: %s', result)    return result  }   _parse4ByteNum () {    if (this._list.length - this._pos < 4) return -1     const result = this._list.readUInt32BE(this._pos)    this._pos += 4    debug('_parse4ByteNum: result: %s', result)    return result  }   _parseVarByteNum (fullInfoFlag) {    debug('_parseVarByteNum')    const maxBytes = 4    let bytes = 0    let mul = 1    let value = 0    let result = false    let current    const padding = this._pos ? this._pos : 0     while (bytes < maxBytes && (padding + bytes) < this._list.length) {      current = this._list.readUInt8(padding + bytes++)      value += mul * (current & constants.VARBYTEINT_MASK)      mul *= 0x80       if ((current & constants.VARBYTEINT_FIN_MASK) === 0) {        result = true        break      }      if (this._list.length <= bytes) {        break      }    }     if (!result && bytes === maxBytes && this._list.length >= bytes) {      this._emitError(new Error('Invalid variable byte integer'))    }     if (padding) {      this._pos += bytes    }     result = result      ? fullInfoFlag ? {        bytes,        value      } : value      : false     debug('_parseVarByteNum: result: %o', result)    return result  }   _parseByte () {    let result    if (this._pos < this._list.length) {      result = this._list.readUInt8(this._pos)      this._pos++    }    debug('_parseByte: result: %o', result)    return result  }   _parseByType (type) {    debug('_parseByType: type: %s', type)    switch (type) {      case 'byte': {        return this._parseByte() !== 0      }      case 'int8': {        return this._parseByte()      }      case 'int16': {        return this._parseNum()      }      case 'int32': {        return this._parse4ByteNum()      }      case 'var': {        return this._parseVarByteNum()      }      case 'string': {        return this._parseString()      }      case 'pair': {        return this._parseStringPair()      }      case 'binary': {        return this._parseBuffer()      }    }  }   _parseProperties () {    debug('_parseProperties')    const length = this._parseVarByteNum()    const start = this._pos    const end = start + length    const result = {}    while (this._pos < end) {      const type = this._parseByte()      if (!type) {        this._emitError(new Error('Cannot parse property code type'))        return false      }      const name = constants.propertiesCodes[type]      if (!name) {        this._emitError(new Error('Unknown property'))        return false      }      // user properties process      if (name === 'userProperties') {        if (!result[name]) {          result[name] = Object.create(null)        }        const currentUserProperty = this._parseByType(constants.propertiesTypes[name])        if (result[name][currentUserProperty.name]) {          if (Array.isArray(result[name][currentUserProperty.name])) {            result[name][currentUserProperty.name].push(currentUserProperty.value)          } else {            const currentValue = result[name][currentUserProperty.name]            result[name][currentUserProperty.name] = [currentValue]            result[name][currentUserProperty.name].push(currentUserProperty.value)          }        } else {          result[name][currentUserProperty.name] = currentUserProperty.value        }        continue      }      if (result[name]) {        if (Array.isArray(result[name])) {          result[name].push(this._parseByType(constants.propertiesTypes[name]))        } else {          result[name] = [result[name]]          result[name].push(this._parseByType(constants.propertiesTypes[name]))        }      } else {        result[name] = this._parseByType(constants.propertiesTypes[name])      }    }    return result  }   _newPacket () {    debug('_newPacket')    if (this.packet) {      this._list.consume(this.packet.length)      debug('_newPacket: parser emit packet: packet.cmd: %s, packet.payload: %s, packet.length: %d', this.packet.cmd, this.packet.payload, this.packet.length)      this.emit('packet', this.packet)    }    debug('_newPacket: new packet')    this.packet = new Packet()     this._pos = 0     return true  }   _emitError (err) {    debug('_emitError')    this.error = err    this.emit('error', err)  }} module.exports = Parser