File Explorer

/var/runtime/node_modules/@aws-sdk/node_modules/mqtt-packet

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

3 dirs
13 files
writeToStream.js30.2 KB · 1118 lines
const protocol = require('./constants')const empty = Buffer.allocUnsafe(0)const zeroBuf = Buffer.from([0])const numbers = require('./numbers')const nextTick = require('process-nextick-args').nextTickconst debug = require('debug')('mqtt-packet:writeToStream') const numCache = numbers.cacheconst generateNumber = numbers.generateNumberconst generateCache = numbers.generateCacheconst genBufVariableByteInt = numbers.genBufVariableByteIntconst generate4ByteBuffer = numbers.generate4ByteBufferlet writeNumber = writeNumberCachedlet toGenerate = true function generate (packet, stream, opts) {  debug('generate called')  if (stream.cork) {    stream.cork()    nextTick(uncork, stream)  }   if (toGenerate) {    toGenerate = false    generateCache()  }  debug('generate: packet.cmd: %s', packet.cmd)  switch (packet.cmd) {    case 'connect':      return connect(packet, stream, opts)    case 'connack':      return connack(packet, stream, opts)    case 'publish':      return publish(packet, stream, opts)    case 'puback':    case 'pubrec':    case 'pubrel':    case 'pubcomp':      return confirmation(packet, stream, opts)    case 'subscribe':      return subscribe(packet, stream, opts)    case 'suback':      return suback(packet, stream, opts)    case 'unsubscribe':      return unsubscribe(packet, stream, opts)    case 'unsuback':      return unsuback(packet, stream, opts)    case 'pingreq':    case 'pingresp':      return emptyPacket(packet, stream, opts)    case 'disconnect':      return disconnect(packet, stream, opts)    case 'auth':      return auth(packet, stream, opts)    default:      stream.emit('error', new Error('Unknown command'))      return false  }}/** * Controls numbers cache. * Set to "false" to allocate buffers on-the-flight instead of pre-generated cache */Object.defineProperty(generate, 'cacheNumbers', {  get () {    return writeNumber === writeNumberCached  },  set (value) {    if (value) {      if (!numCache || Object.keys(numCache).length === 0) toGenerate = true      writeNumber = writeNumberCached    } else {      toGenerate = false      writeNumber = writeNumberGenerated    }  }}) function uncork (stream) {  stream.uncork()} function connect (packet, stream, opts) {  const settings = packet || {}  const protocolId = settings.protocolId || 'MQTT'  let protocolVersion = settings.protocolVersion || 4  const will = settings.will  let clean = settings.clean  const keepalive = settings.keepalive || 0  const clientId = settings.clientId || ''  const username = settings.username  const password = settings.password  /* mqtt5 new oprions */  const properties = settings.properties   if (clean === undefined) clean = true   let length = 0   // Must be a string and non-falsy  if (!protocolId ||     (typeof protocolId !== 'string' && !Buffer.isBuffer(protocolId))) {    stream.emit('error', new Error('Invalid protocolId'))    return false  } else length += protocolId.length + 2   // Must be 3 or 4 or 5  if (protocolVersion !== 3 && protocolVersion !== 4 && protocolVersion !== 5) {    stream.emit('error', new Error('Invalid protocol version'))    return false  } else length += 1   // ClientId might be omitted in 3.1.1 and 5, but only if cleanSession is set to 1  if ((typeof clientId === 'string' || Buffer.isBuffer(clientId)) &&     (clientId || protocolVersion >= 4) && (clientId || clean)) {    length += Buffer.byteLength(clientId) + 2  } else {    if (protocolVersion < 4) {      stream.emit('error', new Error('clientId must be supplied before 3.1.1'))      return false    }    if ((clean * 1) === 0) {      stream.emit('error', new Error('clientId must be given if cleanSession set to 0'))      return false    }  }   // Must be a two byte number  if (typeof keepalive !== 'number' ||      keepalive < 0 ||      keepalive > 65535 ||      keepalive % 1 !== 0) {    stream.emit('error', new Error('Invalid keepalive'))    return false  } else length += 2   // Connect flags  length += 1   // Properties  if (protocolVersion === 5) {    var propertiesData = getProperties(stream, properties)    if (!propertiesData) { return false }    length += propertiesData.length  }   // If will exists...  if (will) {    // It must be an object    if (typeof will !== 'object') {      stream.emit('error', new Error('Invalid will'))      return false    }    // It must have topic typeof string    if (!will.topic || typeof will.topic !== 'string') {      stream.emit('error', new Error('Invalid will topic'))      return false    } else {      length += Buffer.byteLength(will.topic) + 2    }     // Payload    length += 2 // payload length    if (will.payload) {      if (will.payload.length >= 0) {        if (typeof will.payload === 'string') {          length += Buffer.byteLength(will.payload)        } else {          length += will.payload.length        }      } else {        stream.emit('error', new Error('Invalid will payload'))        return false      }    }    // will properties    var willProperties = {}    if (protocolVersion === 5) {      willProperties = getProperties(stream, will.properties)      if (!willProperties) { return false }      length += willProperties.length    }  }   // Username  let providedUsername = false  if (username != null) {    if (isStringOrBuffer(username)) {      providedUsername = true      length += Buffer.byteLength(username) + 2    } else {      stream.emit('error', new Error('Invalid username'))      return false    }  }   // Password  if (password != null) {    if (!providedUsername) {      stream.emit('error', new Error('Username is required to use password'))      return false    }     if (isStringOrBuffer(password)) {      length += byteLength(password) + 2    } else {      stream.emit('error', new Error('Invalid password'))      return false    }  }   // Generate header  stream.write(protocol.CONNECT_HEADER)   // Generate length  writeVarByteInt(stream, length)   // Generate protocol ID  writeStringOrBuffer(stream, protocolId)   if (settings.bridgeMode) {    protocolVersion += 128  }   stream.write(    protocolVersion === 131      ? protocol.VERSION131      : protocolVersion === 132        ? protocol.VERSION132        : protocolVersion === 4          ? protocol.VERSION4          : protocolVersion === 5            ? protocol.VERSION5            : protocol.VERSION3  )   // Connect flags  let flags = 0  flags |= (username != null) ? protocol.USERNAME_MASK : 0  flags |= (password != null) ? protocol.PASSWORD_MASK : 0  flags |= (will && will.retain) ? protocol.WILL_RETAIN_MASK : 0  flags |= (will && will.qos) ? will.qos << protocol.WILL_QOS_SHIFT : 0  flags |= will ? protocol.WILL_FLAG_MASK : 0  flags |= clean ? protocol.CLEAN_SESSION_MASK : 0   stream.write(Buffer.from([flags]))   // Keepalive  writeNumber(stream, keepalive)   // Properties  if (protocolVersion === 5) {    propertiesData.write()  }   // Client ID  writeStringOrBuffer(stream, clientId)   // Will  if (will) {    if (protocolVersion === 5) {      willProperties.write()    }    writeString(stream, will.topic)    writeStringOrBuffer(stream, will.payload)  }   // Username and password  if (username != null) {    writeStringOrBuffer(stream, username)  }  if (password != null) {    writeStringOrBuffer(stream, password)  }  // This is a small packet that happens only once on a stream  // We assume the stream is always free to receive more data after this  return true} function connack (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const rc = version === 5 ? settings.reasonCode : settings.returnCode  const properties = settings.properties  let length = 2 // length of rc and sessionHeader   // Check return code  if (typeof rc !== 'number') {    stream.emit('error', new Error('Invalid return code'))    return false  }  // mqtt5 properties  let propertiesData = null  if (version === 5) {    propertiesData = getProperties(stream, properties)    if (!propertiesData) { return false }    length += propertiesData.length  }   stream.write(protocol.CONNACK_HEADER)  // length  writeVarByteInt(stream, length)  stream.write(settings.sessionPresent ? protocol.SESSIONPRESENT_HEADER : zeroBuf)   stream.write(Buffer.from([rc]))  if (propertiesData != null) {    propertiesData.write()  }  return true} function publish (packet, stream, opts) {  debug('publish: packet: %o', packet)  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const qos = settings.qos || 0  const retain = settings.retain ? protocol.RETAIN_MASK : 0  const topic = settings.topic  const payload = settings.payload || empty  const id = settings.messageId  const properties = settings.properties   let length = 0   // Topic must be a non-empty string or Buffer  if (typeof topic === 'string') length += Buffer.byteLength(topic) + 2  else if (Buffer.isBuffer(topic)) length += topic.length + 2  else {    stream.emit('error', new Error('Invalid topic'))    return false  }   // Get the payload length  if (!Buffer.isBuffer(payload)) length += Buffer.byteLength(payload)  else length += payload.length   // Message ID must a number if qos > 0  if (qos && typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  } else if (qos) length += 2   // mqtt5 properties  let propertiesData = null  if (version === 5) {    propertiesData = getProperties(stream, properties)    if (!propertiesData) { return false }    length += propertiesData.length  }   // Header  stream.write(protocol.PUBLISH_HEADER[qos][settings.dup ? 1 : 0][retain ? 1 : 0])   // Remaining length  writeVarByteInt(stream, length)   // Topic  writeNumber(stream, byteLength(topic))  stream.write(topic)   // Message ID  if (qos > 0) writeNumber(stream, id)   // Properties  if (propertiesData != null) {    propertiesData.write()  }   // Payload  debug('publish: payload: %o', payload)  return stream.write(payload)} /* Puback, pubrec, pubrel and pubcomp */function confirmation (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const type = settings.cmd || 'puback'  const id = settings.messageId  const dup = (settings.dup && type === 'pubrel') ? protocol.DUP_MASK : 0  let qos = 0  const reasonCode = settings.reasonCode  const properties = settings.properties  let length = version === 5 ? 3 : 2   if (type === 'pubrel') qos = 1   // Check message ID  if (typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  }   // properies mqtt 5  let propertiesData = null  if (version === 5) {    // Confirm should not add empty property length with no properties (rfc 3.4.2.2.1)    if (typeof properties === 'object') {      propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)      if (!propertiesData) { return false }      length += propertiesData.length    }  }   // Header  stream.write(protocol.ACKS[type][qos][dup][0])   // Length  writeVarByteInt(stream, length)   // Message ID  writeNumber(stream, id)   // reason code in header  if (version === 5) {    stream.write(Buffer.from([reasonCode]))  }   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }  return true} function subscribe (packet, stream, opts) {  debug('subscribe: packet: ')  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const dup = settings.dup ? protocol.DUP_MASK : 0  const id = settings.messageId  const subs = settings.subscriptions  const properties = settings.properties   let length = 0   // Check message ID  if (typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  } else length += 2   // properies mqtt 5  let propertiesData = null  if (version === 5) {    propertiesData = getProperties(stream, properties)    if (!propertiesData) { return false }    length += propertiesData.length  }   // Check subscriptions  if (typeof subs === 'object' && subs.length) {    for (let i = 0; i < subs.length; i += 1) {      const itopic = subs[i].topic      const iqos = subs[i].qos       if (typeof itopic !== 'string') {        stream.emit('error', new Error('Invalid subscriptions - invalid topic'))        return false      }      if (typeof iqos !== 'number') {        stream.emit('error', new Error('Invalid subscriptions - invalid qos'))        return false      }       if (version === 5) {        const nl = subs[i].nl || false        if (typeof nl !== 'boolean') {          stream.emit('error', new Error('Invalid subscriptions - invalid No Local'))          return false        }        const rap = subs[i].rap || false        if (typeof rap !== 'boolean') {          stream.emit('error', new Error('Invalid subscriptions - invalid Retain as Published'))          return false        }        const rh = subs[i].rh || 0        if (typeof rh !== 'number' || rh > 2) {          stream.emit('error', new Error('Invalid subscriptions - invalid Retain Handling'))          return false        }      }       length += Buffer.byteLength(itopic) + 2 + 1    }  } else {    stream.emit('error', new Error('Invalid subscriptions'))    return false  }   // Generate header  debug('subscribe: writing to stream: %o', protocol.SUBSCRIBE_HEADER)  stream.write(protocol.SUBSCRIBE_HEADER[1][dup ? 1 : 0][0])   // Generate length  writeVarByteInt(stream, length)   // Generate message ID  writeNumber(stream, id)   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }   let result = true   // Generate subs  for (const sub of subs) {    const jtopic = sub.topic    const jqos = sub.qos    const jnl = +sub.nl    const jrap = +sub.rap    const jrh = sub.rh    let joptions     // Write topic string    writeString(stream, jtopic)     // options process    joptions = protocol.SUBSCRIBE_OPTIONS_QOS[jqos]    if (version === 5) {      joptions |= jnl ? protocol.SUBSCRIBE_OPTIONS_NL : 0      joptions |= jrap ? protocol.SUBSCRIBE_OPTIONS_RAP : 0      joptions |= jrh ? protocol.SUBSCRIBE_OPTIONS_RH[jrh] : 0    }    // Write options    result = stream.write(Buffer.from([joptions]))  }   return result} function suback (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const id = settings.messageId  const granted = settings.granted  const properties = settings.properties  let length = 0   // Check message ID  if (typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  } else length += 2   // Check granted qos vector  if (typeof granted === 'object' && granted.length) {    for (let i = 0; i < granted.length; i += 1) {      if (typeof granted[i] !== 'number') {        stream.emit('error', new Error('Invalid qos vector'))        return false      }      length += 1    }  } else {    stream.emit('error', new Error('Invalid qos vector'))    return false  }   // properies mqtt 5  let propertiesData = null  if (version === 5) {    propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)    if (!propertiesData) { return false }    length += propertiesData.length  }   // header  stream.write(protocol.SUBACK_HEADER)   // Length  writeVarByteInt(stream, length)   // Message ID  writeNumber(stream, id)   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }   return stream.write(Buffer.from(granted))} function unsubscribe (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const id = settings.messageId  const dup = settings.dup ? protocol.DUP_MASK : 0  const unsubs = settings.unsubscriptions  const properties = settings.properties   let length = 0   // Check message ID  if (typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  } else {    length += 2  }  // Check unsubs  if (typeof unsubs === 'object' && unsubs.length) {    for (let i = 0; i < unsubs.length; i += 1) {      if (typeof unsubs[i] !== 'string') {        stream.emit('error', new Error('Invalid unsubscriptions'))        return false      }      length += Buffer.byteLength(unsubs[i]) + 2    }  } else {    stream.emit('error', new Error('Invalid unsubscriptions'))    return false  }  // properies mqtt 5  let propertiesData = null  if (version === 5) {    propertiesData = getProperties(stream, properties)    if (!propertiesData) { return false }    length += propertiesData.length  }   // Header  stream.write(protocol.UNSUBSCRIBE_HEADER[1][dup ? 1 : 0][0])   // Length  writeVarByteInt(stream, length)   // Message ID  writeNumber(stream, id)   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }   // Unsubs  let result = true  for (let j = 0; j < unsubs.length; j++) {    result = writeString(stream, unsubs[j])  }   return result} function unsuback (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const id = settings.messageId  const dup = settings.dup ? protocol.DUP_MASK : 0  const granted = settings.granted  const properties = settings.properties  const type = settings.cmd  const qos = 0   let length = 2   // Check message ID  if (typeof id !== 'number') {    stream.emit('error', new Error('Invalid messageId'))    return false  }   // Check granted  if (version === 5) {    if (typeof granted === 'object' && granted.length) {      for (let i = 0; i < granted.length; i += 1) {        if (typeof granted[i] !== 'number') {          stream.emit('error', new Error('Invalid qos vector'))          return false        }        length += 1      }    } else {      stream.emit('error', new Error('Invalid qos vector'))      return false    }  }   // properies mqtt 5  let propertiesData = null  if (version === 5) {    propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)    if (!propertiesData) { return false }    length += propertiesData.length  }   // Header  stream.write(protocol.ACKS[type][qos][dup][0])   // Length  writeVarByteInt(stream, length)   // Message ID  writeNumber(stream, id)   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }   // payload  if (version === 5) {    stream.write(Buffer.from(granted))  }  return true} function emptyPacket (packet, stream, opts) {  return stream.write(protocol.EMPTY[packet.cmd])} function disconnect (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const reasonCode = settings.reasonCode  const properties = settings.properties  let length = version === 5 ? 1 : 0   // properies mqtt 5  let propertiesData = null  if (version === 5) {    propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)    if (!propertiesData) { return false }    length += propertiesData.length  }   // Header  stream.write(Buffer.from([protocol.codes.disconnect << 4]))   // Length  writeVarByteInt(stream, length)   // reason code in header  if (version === 5) {    stream.write(Buffer.from([reasonCode]))  }   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }   return true} function auth (packet, stream, opts) {  const version = opts ? opts.protocolVersion : 4  const settings = packet || {}  const reasonCode = settings.reasonCode  const properties = settings.properties  let length = version === 5 ? 1 : 0   if (version !== 5) stream.emit('error', new Error('Invalid mqtt version for auth packet'))   // properies mqtt 5  const propertiesData = getPropertiesByMaximumPacketSize(stream, properties, opts, length)  if (!propertiesData) { return false }  length += propertiesData.length   // Header  stream.write(Buffer.from([protocol.codes.auth << 4]))   // Length  writeVarByteInt(stream, length)   // reason code in header  stream.write(Buffer.from([reasonCode]))   // properies mqtt 5  if (propertiesData !== null) {    propertiesData.write()  }  return true} /** * writeVarByteInt - write an MQTT style variable byte integer to the buffer * * @param <Buffer> buffer - destination * @param <Number> pos - offset * @param <Number> length - length (>0) * @returns <Number> number of bytes written * * @api private */ const varByteIntCache = {}function writeVarByteInt (stream, num) {  if (num > protocol.VARBYTEINT_MAX) {    stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))    return false  }   let buffer = varByteIntCache[num]   if (!buffer) {    buffer = genBufVariableByteInt(num)    if (num < 16384) varByteIntCache[num] = buffer  }  debug('writeVarByteInt: writing to stream: %o', buffer)  return stream.write(buffer)} /** * writeString - write a utf8 string to the buffer * * @param <Buffer> buffer - destination * @param <Number> pos - offset * @param <String> string - string to write * @return <Number> number of bytes written * * @api private */ function writeString (stream, string) {  const strlen = Buffer.byteLength(string)  writeNumber(stream, strlen)   debug('writeString: %s', string)  return stream.write(string, 'utf8')} /** * writeStringPair - write a utf8 string pairs to the buffer * * @param <Buffer> buffer - destination * @param <String> name - string name to write * @param <String> value - string value to write * @return <Number> number of bytes written * * @api private */function writeStringPair (stream, name, value) {  writeString(stream, name)  writeString(stream, value)} /** * writeNumber - write a two byte number to the buffer * * @param <Buffer> buffer - destination * @param <Number> pos - offset * @param <String> number - number to write * @return <Number> number of bytes written * * @api private */function writeNumberCached (stream, number) {  debug('writeNumberCached: number: %d', number)  debug('writeNumberCached: %o', numCache[number])  return stream.write(numCache[number])}function writeNumberGenerated (stream, number) {  const generatedNumber = generateNumber(number)  debug('writeNumberGenerated: %o', generatedNumber)  return stream.write(generatedNumber)}function write4ByteNumber (stream, number) {  const generated4ByteBuffer = generate4ByteBuffer(number)  debug('write4ByteNumber: %o', generated4ByteBuffer)  return stream.write(generated4ByteBuffer)}/** * writeStringOrBuffer - write a String or Buffer with the its length prefix * * @param <Buffer> buffer - destination * @param <Number> pos - offset * @param <String> toWrite - String or Buffer * @return <Number> number of bytes written */function writeStringOrBuffer (stream, toWrite) {  if (typeof toWrite === 'string') {    writeString(stream, toWrite)  } else if (toWrite) {    writeNumber(stream, toWrite.length)    stream.write(toWrite)  } else writeNumber(stream, 0)} function getProperties (stream, properties) {  /* connect properties */  if (typeof properties !== 'object' || properties.length != null) {    return {      length: 1,      write () {        writeProperties(stream, {}, 0)      }    }  }  let propertiesLength = 0  function getLengthProperty (name, value) {    const type = protocol.propertiesTypes[name]    let length = 0    switch (type) {      case 'byte': {        if (typeof value !== 'boolean') {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + 1        break      }      case 'int8': {        if (typeof value !== 'number' || value < 0 || value > 0xff) {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + 1        break      }      case 'binary': {        if (value && value === null) {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + Buffer.byteLength(value) + 2        break      }      case 'int16': {        if (typeof value !== 'number' || value < 0 || value > 0xffff) {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + 2        break      }      case 'int32': {        if (typeof value !== 'number' || value < 0 || value > 0xffffffff) {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + 4        break      }      case 'var': {        // var byte integer is max 24 bits packed in 32 bits        if (typeof value !== 'number' || value < 0 || value > 0x0fffffff) {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + Buffer.byteLength(genBufVariableByteInt(value))        break      }      case 'string': {        if (typeof value !== 'string') {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += 1 + 2 + Buffer.byteLength(value.toString())        break      }      case 'pair': {        if (typeof value !== 'object') {          stream.emit('error', new Error(`Invalid ${name}: ${value}`))          return false        }        length += Object.getOwnPropertyNames(value).reduce((result, name) => {          const currentValue = value[name]          if (Array.isArray(currentValue)) {            result += currentValue.reduce((currentLength, value) => {              currentLength += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value.toString())              return currentLength            }, 0)          } else {            result += 1 + 2 + Buffer.byteLength(name.toString()) + 2 + Buffer.byteLength(value[name].toString())          }          return result        }, 0)        break      }      default: {        stream.emit('error', new Error(`Invalid property ${name}: ${value}`))        return false      }    }    return length  }  if (properties) {    for (const propName in properties) {      let propLength = 0      let propValueLength = 0      const propValue = properties[propName]      if (Array.isArray(propValue)) {        for (let valueIndex = 0; valueIndex < propValue.length; valueIndex++) {          propValueLength = getLengthProperty(propName, propValue[valueIndex])          if (!propValueLength) { return false }          propLength += propValueLength        }      } else {        propValueLength = getLengthProperty(propName, propValue)        if (!propValueLength) { return false }        propLength = propValueLength      }      if (!propLength) return false      propertiesLength += propLength    }  }  const propertiesLengthLength = Buffer.byteLength(genBufVariableByteInt(propertiesLength))   return {    length: propertiesLengthLength + propertiesLength,    write () {      writeProperties(stream, properties, propertiesLength)    }  }} function getPropertiesByMaximumPacketSize (stream, properties, opts, length) {  const mayEmptyProps = ['reasonString', 'userProperties']  const maximumPacketSize = opts && opts.properties && opts.properties.maximumPacketSize ? opts.properties.maximumPacketSize : 0   let propertiesData = getProperties(stream, properties)  if (maximumPacketSize) {    while (length + propertiesData.length > maximumPacketSize) {      const currentMayEmptyProp = mayEmptyProps.shift()      if (currentMayEmptyProp && properties[currentMayEmptyProp]) {        delete properties[currentMayEmptyProp]        propertiesData = getProperties(stream, properties)      } else {        return false      }    }  }  return propertiesData} function writeProperty (stream, propName, value) {  const type = protocol.propertiesTypes[propName]  switch (type) {    case 'byte': {      stream.write(Buffer.from([protocol.properties[propName]]))      stream.write(Buffer.from([+value]))      break    }    case 'int8': {      stream.write(Buffer.from([protocol.properties[propName]]))      stream.write(Buffer.from([value]))      break    }    case 'binary': {      stream.write(Buffer.from([protocol.properties[propName]]))      writeStringOrBuffer(stream, value)      break    }    case 'int16': {      stream.write(Buffer.from([protocol.properties[propName]]))      writeNumber(stream, value)      break    }    case 'int32': {      stream.write(Buffer.from([protocol.properties[propName]]))      write4ByteNumber(stream, value)      break    }    case 'var': {      stream.write(Buffer.from([protocol.properties[propName]]))      writeVarByteInt(stream, value)      break    }    case 'string': {      stream.write(Buffer.from([protocol.properties[propName]]))      writeString(stream, value)      break    }    case 'pair': {      Object.getOwnPropertyNames(value).forEach(name => {        const currentValue = value[name]        if (Array.isArray(currentValue)) {          currentValue.forEach(value => {            stream.write(Buffer.from([protocol.properties[propName]]))            writeStringPair(stream, name.toString(), value.toString())          })        } else {          stream.write(Buffer.from([protocol.properties[propName]]))          writeStringPair(stream, name.toString(), currentValue.toString())        }      })      break    }    default: {      stream.emit('error', new Error(`Invalid property ${propName} value: ${value}`))      return false    }  }} function writeProperties (stream, properties, propertiesLength) {  /* write properties to stream */  writeVarByteInt(stream, propertiesLength)  for (const propName in properties) {    if (Object.prototype.hasOwnProperty.call(properties, propName) && properties[propName] !== null) {      const value = properties[propName]      if (Array.isArray(value)) {        for (let valueIndex = 0; valueIndex < value.length; valueIndex++) {          writeProperty(stream, propName, value[valueIndex])        }      } else {        writeProperty(stream, propName, value)      }    }  }} function byteLength (bufOrString) {  if (!bufOrString) return 0  else if (bufOrString instanceof Buffer) return bufOrString.length  else return Buffer.byteLength(bufOrString)} function isStringOrBuffer (field) {  return typeof field === 'string' || field instanceof Buffer} module.exports = generate