File Explorer

/var/runtime/node_modules/@aws-sdk/node_modules/mqtt/lib/connect

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

ws.js6.4 KB · 258 lines
'use strict' const { Buffer } = require('buffer')const WS = require('ws')const debug = require('debug')('mqttjs:ws')const duplexify = require('duplexify')const Transform = require('readable-stream').Transform const WSS_OPTIONS = [  'rejectUnauthorized',  'ca',  'cert',  'key',  'pfx',  'passphrase']// eslint-disable-next-line camelcaseconst IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'function buildUrl (opts, client) {  let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path  if (typeof (opts.transformWsUrl) === 'function') {    url = opts.transformWsUrl(url, opts, client)  }  return url} function setDefaultOpts (opts) {  const options = opts  if (!opts.hostname) {    options.hostname = 'localhost'  }  if (!opts.port) {    if (opts.protocol === 'wss') {      options.port = 443    } else {      options.port = 80    }  }  if (!opts.path) {    options.path = '/'  }   if (!opts.wsOptions) {    options.wsOptions = {}  }  if (!IS_BROWSER && opts.protocol === 'wss') {    // Add cert/key/ca etc options    WSS_OPTIONS.forEach(function (prop) {      if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {        options.wsOptions[prop] = opts[prop]      }    })  }   return options} function setDefaultBrowserOpts (opts) {  const options = setDefaultOpts(opts)   if (!options.hostname) {    options.hostname = options.host  }   if (!options.hostname) {    // Throwing an error in a Web Worker if no `hostname` is given, because we    // can not determine the `hostname` automatically.  If connecting to    // localhost, please supply the `hostname` as an argument.    if (typeof (document) === 'undefined') {      throw new Error('Could not determine host. Specify host manually.')    }    const parsed = new URL(document.URL)    options.hostname = parsed.hostname     if (!options.port) {      options.port = parsed.port    }  }   // objectMode should be defined for logic  if (options.objectMode === undefined) {    options.objectMode = !(options.binary === true || options.binary === undefined)  }   return options} function createWebSocket (client, url, opts) {  debug('createWebSocket')  debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)  const websocketSubProtocol =    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)      ? 'mqttv3.1'      : 'mqtt'   debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)  const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)  return socket} function createBrowserWebSocket (client, opts) {  const websocketSubProtocol =  (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)    ? 'mqttv3.1'    : 'mqtt'   const url = buildUrl(opts, client)  /* global WebSocket */  const socket = new WebSocket(url, [websocketSubProtocol])  socket.binaryType = 'arraybuffer'  return socket} function streamBuilder (client, opts) {  debug('streamBuilder')  const options = setDefaultOpts(opts)  const url = buildUrl(options, client)  const socket = createWebSocket(client, url, options)  const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)  webSocketStream.url = url  socket.on('close', () => { webSocketStream.destroy() })  return webSocketStream} function browserStreamBuilder (client, opts) {  debug('browserStreamBuilder')  let stream  const options = setDefaultBrowserOpts(opts)  // sets the maximum socket buffer size before throttling  const bufferSize = options.browserBufferSize || 1024 * 512   const bufferTimeout = opts.browserBufferTimeout || 1000   const coerceToBuffer = !opts.objectMode   const socket = createBrowserWebSocket(client, opts)   const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)   if (!opts.objectMode) {    proxy._writev = writev  }  proxy.on('close', () => { socket.close() })   const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')   // was already open when passed in  if (socket.readyState === socket.OPEN) {    stream = proxy  } else {    stream = stream = duplexify(undefined, undefined, opts)    if (!opts.objectMode) {      stream._writev = writev    }     if (eventListenerSupport) {      socket.addEventListener('open', onopen)    } else {      socket.onopen = onopen    }  }   stream.socket = socket   if (eventListenerSupport) {    socket.addEventListener('close', onclose)    socket.addEventListener('error', onerror)    socket.addEventListener('message', onmessage)  } else {    socket.onclose = onclose    socket.onerror = onerror    socket.onmessage = onmessage  }   // methods for browserStreamBuilder   function buildProxy (options, socketWrite, socketEnd) {    const proxy = new Transform({      objectModeMode: options.objectMode    })     proxy._write = socketWrite    proxy._flush = socketEnd     return proxy  }   function onopen () {    stream.setReadable(proxy)    stream.setWritable(proxy)    stream.emit('connect')  }   function onclose () {    stream.end()    stream.destroy()  }   function onerror (err) {    stream.destroy(err)  }   function onmessage (event) {    let data = event.data    if (data instanceof ArrayBuffer) data = Buffer.from(data)    else data = Buffer.from(data, 'utf8')    proxy.push(data)  }   // this is to be enabled only if objectMode is false  function writev (chunks, cb) {    const buffers = new Array(chunks.length)    for (let i = 0; i < chunks.length; i++) {      if (typeof chunks[i].chunk === 'string') {        buffers[i] = Buffer.from(chunks[i], 'utf8')      } else {        buffers[i] = chunks[i].chunk      }    }     this._write(Buffer.concat(buffers), 'binary', cb)  }   function socketWriteBrowser (chunk, enc, next) {    if (socket.bufferedAmount > bufferSize) {      // throttle data until buffered amount is reduced.      setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)    }     if (coerceToBuffer && typeof chunk === 'string') {      chunk = Buffer.from(chunk, 'utf8')    }     try {      socket.send(chunk)    } catch (err) {      return next(err)    }     next()  }   function socketEndBrowser (done) {    socket.close()    done()  }   // end methods for browserStreamBuilder   return stream} if (IS_BROWSER) {  module.exports = browserStreamBuilder} else {  module.exports = streamBuilder}