File Explorer

/var/runtime/node_modules/@aws-sdk/node_modules/aws-crt/lib/native

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

mqtt5.ts25.9 KB · 700 lines
/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ /** * Node.js specific MQTT5 client implementation * * [MQTT5 Client User Guide](https://www.github.com/awslabs/aws-crt-nodejs/blob/main/MQTT5-UserGuide.md) * * @packageDocumentation * @module mqtt5 * @mergeTarget * */ import crt_native from './binding';import { NativeResourceMixin } from "./native_resource";import { BufferedEventEmitter } from '../common/event';import * as io from "./io";import * as http from './http';import * as mqtt5_packet from "../common/mqtt5_packet";import * as mqtt5 from "../common/mqtt5";import * as mqtt_shared from "../common/mqtt_shared";import {CrtError} from "./error"; export { HttpProxyOptions } from './http';export * from "../common/mqtt5";export * from '../common/mqtt5_packet'; /** * Websocket handshake http request transformation function signature */export type WebsocketHandshakeTransform = (request: http.HttpRequest, done: (error_code?: number) => void) => void; /** * Information about the client's queue of operations */export interface ClientStatistics {     /**     * Total number of operations submitted to the client that have not yet been completed.  Unacked operations     * are a subset of this.     */    incompleteOperationCount : number;     /**     * Total packet size of operations submitted to the client that have not yet been completed.  Unacked operations     * are a subset of this.     */    incompleteOperationSize : number;     /**     * Total number of operations that have been sent to the server and are waiting for a corresponding ACK before     * they can be completed.     */    unackedOperationCount : number;     /**     * Total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before     * they can be completed.     */    unackedOperationSize : number;}; /** * Controls how disconnects affect the queued and in-progress operations tracked by the client.  Also controls * how operations are handled while the client is not connected.  In particular, if the client is not connected, * then any operation that would be failed on disconnect (according to these rules) will be rejected. */export enum ClientOperationQueueBehavior {     /** Same as FailQos0PublishOnDisconnect */    Default = 0,     /**     * Re-queues QoS 1+ publishes on disconnect; un-acked publishes go to the front while unprocessed publishes stay     * in place.  All other operations (QoS 0 publishes, subscribe, unsubscribe) are failed.     */    FailNonQos1PublishOnDisconnect = 1,     /**     * QoS 0 publishes that are not complete at the time of disconnection are failed.  Un-acked QoS 1+ publishes are     * re-queued at the head of the line for immediate retransmission on a session resumption.  All other operations     * are requeued in original order behind any retransmissions.     */    FailQos0PublishOnDisconnect = 2,     /**     * All operations that are not complete at the time of disconnection are failed, except operations that     * the MQTT5 spec requires to be retransmitted (un-acked QoS1+ publishes).     */    FailAllOnDisconnect = 3,} /** * Additional controls for client behavior with respect to operation validation and flow control; these checks * go beyond the MQTT5 spec to respect limits of specific MQTT brokers. */export enum ClientExtendedValidationAndFlowControl {    /**     * Do not do any additional validation or flow control     */    None = 0,     /**     * Apply additional client-side validation and operational flow control that respects the     * default AWS IoT Core limits.     *     * Currently applies the following additional validation:     *     * 1. No more than 8 subscriptions per SUBSCRIBE packet     * 1. Topics and topic filters have a maximum of 7 slashes (8 segments), not counting any AWS rules prefix     * 1. Topics must be <= 256 bytes in length     * 1. Client id must be <= 128 bytes in length     *     * Also applies the following flow control:     *     * 1. Outbound throughput throttled to 512KB/s     * 1. Outbound publish TPS throttled to 100     */    AwsIotCoreDefaults = 1,} /** * Configuration options for mqtt5 client creation. */export interface Mqtt5ClientConfig {     /**     * Host name of the MQTT server to connect to.     */    hostName: string;     /**     * Network port of the MQTT server to connect to.     */    port: number;     /**     * Controls how the MQTT5 client should behave with respect to MQTT sessions.     */    sessionBehavior? : mqtt5.ClientSessionBehavior;     /**     * Controls how the reconnect delay is modified in order to smooth out the distribution of reconnection attempt     * timepoints for a large set of reconnecting clients.     */    retryJitterMode? : mqtt5.RetryJitterType;     /**     * Minimum amount of time to wait to reconnect after a disconnect.  Exponential backoff is performed with jitter     * after each connection failure.     */    minReconnectDelayMs? : number;     /**     * Maximum amount of time to wait to reconnect after a disconnect.  Exponential backoff is performed with jitter     * after each connection failure.     */    maxReconnectDelayMs? : number;     /**     * Amount of time that must elapse with an established connection before the reconnect delay is reset to the minimum.     * This helps alleviate bandwidth-waste in fast reconnect cycles due to permission failures on operations.     */    minConnectedTimeToResetReconnectDelayMs? : number;     /**     * Time interval to wait after sending a CONNECT request for a CONNACK to arrive.  If one does not arrive, the     * connection will be shut down.     */    connackTimeoutMs? : number;     /**     * All configurable options with respect to the CONNECT packet sent by the client, including the will.  These     * connect properties will be used for every connection attempt made by the client.     */    connectProperties?: mqtt5_packet.ConnectPacket;     /**     * Controls how disconnects affect the queued and in-progress operations tracked by the client.  Also controls     * how new operations are handled while the client is not connected.  In particular, if the client is not connected,     * then any operation that would be failed on disconnect (according to these rules) will also be rejected.     *     * @group Node-only     */    offlineQueueBehavior? : ClientOperationQueueBehavior;     /**     * Time interval to wait after sending a PINGREQ for a PINGRESP to arrive.  If one does not arrive, the client will     * close the current connection.     *     * @group Node-only     */    pingTimeoutMs? : number;     /**     * Time interval to wait for an ack after sending a QoS 1+ PUBLISH, SUBSCRIBE, or UNSUBSCRIBE before     * failing the operation.     *     * @group Node-only     */    ackTimeoutSeconds? : number;     /**     * Additional controls for client behavior with respect to topic alias usage.     *     * If this setting is left undefined, then topic aliasing behavior will be disabled.     */    topicAliasingOptions? : mqtt5.TopicAliasingOptions     /**     * Client bootstrap to use.  In almost all cases, this can be left undefined.     *     * @group Node-only     */    clientBootstrap?: io.ClientBootstrap;     /**     * Controls socket properties of the underlying MQTT connections made by the client.  Leave undefined to use     * defaults (no TCP keep alive, 10 second socket timeout).     *     * @group Node-only     */    socketOptions?: io.SocketOptions;     /**     * TLS context for secure socket connections.     * If undefined, then a plaintext connection will be used.     *     * @group Node-only     */    tlsCtx?: io.ClientTlsContext;     /**     * This callback allows a custom transformation of the HTTP request that acts as the websocket handshake.     * Websockets will be used if this is set to a valid transformation callback.  To use websockets but not perform     * a transformation, just set this as a trivial completion callback.  If undefined, the connection will be made     * with direct MQTT.     *     * @group Node-only     */    websocketHandshakeTransform?: WebsocketHandshakeTransform;     /**     * Configures (tunneling) HTTP proxy usage when establishing MQTT connections     *     * @group Node-only     */    httpProxyOptions?: http.HttpProxyOptions;     /**     * Additional controls for client behavior with respect to operation validation and flow control; these checks     * go beyond the base MQTT5 spec to respect limits of specific MQTT brokers.     *     * @group Node-only     */    extendedValidationAndFlowControlOptions? : ClientExtendedValidationAndFlowControl;} /** * Node.js specific MQTT5 client implementation * * Not all parts of the MQTT5 spec are supported. We currently do not support: * * * AUTH packets and the authentication fields in the CONNECT packet * * QoS 2 * * [MQTT5 Client User Guide](https://www.github.com/awslabs/aws-crt-nodejs/blob/main/MQTT5-UserGuide.md) * * This client is based on native resources.  When finished with the client, you must call close() to dispose of * them or they will leak. * */export class Mqtt5Client extends NativeResourceMixin(BufferedEventEmitter) implements mqtt5.IMqtt5Client {     /**     * Client constructor     *     * @param config The configuration for this client     */    constructor(config: Mqtt5ClientConfig) {        super();         this._super(crt_native.mqtt5_client_new(            this,            config,            (client: Mqtt5Client) => { Mqtt5Client._s_on_stopped(client); },            (client: Mqtt5Client) => { Mqtt5Client._s_on_attempting_connect(client); },            (client: Mqtt5Client, connack : mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) => { Mqtt5Client._s_on_connection_success(client, connack, settings); },            (client: Mqtt5Client, errorCode: number, connack? : mqtt5_packet.ConnackPacket) => { Mqtt5Client._s_on_connection_failure(client, new CrtError(errorCode), connack); },            (client: Mqtt5Client, errorCode: number, disconnect? : mqtt5_packet.DisconnectPacket) => { Mqtt5Client._s_on_disconnection(client, new CrtError(errorCode), disconnect); },            (client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId?: any) => { Mqtt5Client._s_on_message_received(client, message, pubackControlId); },            config.clientBootstrap ? config.clientBootstrap.native_handle() : null,            config.socketOptions ? config.socketOptions.native_handle() : null,            config.tlsCtx ? config.tlsCtx.native_handle() : null,            config.httpProxyOptions ? config.httpProxyOptions.create_native_handle() : null        ));    }     /**     * Triggers cleanup of native resources associated with the MQTT5 client.  Once this has been invoked, callbacks     * and events are not guaranteed to be received.     *     * This must be called when finished with a client; otherwise, native resources will leak.  It is not safe     * to invoke any further operations on the client after close() has been called.     *     * For a running client, safe and proper shutdown can be accomplished by     *     * ```ts     * const stopped = once(client, "stopped");     * client.stop();     * await stopped;     * client.close();     * ```     *     * This is an asynchronous operation.     *     * @group Node-only     */    close() {        crt_native.mqtt5_client_close(this.native_handle());    }     /**     * Notifies the MQTT5 client that you want it to maintain connectivity to the configured endpoint.     * The client will attempt to stay connected using the properties of the reconnect-related parameters     * in the mqtt5 client configuration.     *     * This is an asynchronous operation.     */    start() {        crt_native.mqtt5_client_start(this.native_handle());    }     /**     * Notifies the MQTT5 client that you want it to end connectivity to the configured endpoint, disconnecting any     * existing connection and halting reconnection attempts.     *     * This is an asynchronous operation.  Once the process completes, no further events will be emitted until the client     * has {@link start} invoked.  Invoking {@link start start()} after a {@link stop stop()} will always result in a     * new MQTT session.     *     * @param disconnectPacket (optional) properties of a DISCONNECT packet to send as part of the shutdown process     */    stop(disconnectPacket?: mqtt5_packet.DisconnectPacket) {        crt_native.mqtt5_client_stop(this.native_handle(), disconnectPacket);    }     /**     * Subscribe to one or more topic filters by queuing a SUBSCRIBE packet to be sent to the server.     *     * @param packet SUBSCRIBE packet to send to the server     * @returns a promise that will be rejected with an error or resolved with the SUBACK response     */    async subscribe(packet: mqtt5_packet.SubscribePacket) : Promise<mqtt5_packet.SubackPacket> {        return new Promise<mqtt5_packet.SubackPacket>((resolve, reject) => {             function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, suback?: mqtt5_packet.SubackPacket){                return Mqtt5Client._s_on_suback_callback(resolve, reject, client, errorCode, suback);            }             try {                crt_native.mqtt5_client_subscribe(this.native_handle(), packet, curriedPromiseCallback);            } catch (e) {                reject(e);            }        });    }     /**     * Unsubscribe from one or more topic filters by queuing an UNSUBSCRIBE packet to be sent to the server.     *     * @param packet UNSUBSCRIBE packet to send to the server     * @returns a promise that will be rejected with an error or resolved with the UNSUBACK response     */    async unsubscribe(packet: mqtt5_packet.UnsubscribePacket) : Promise<mqtt5_packet.UnsubackPacket> {        return new Promise<mqtt5_packet.UnsubackPacket>((resolve, reject) => {             function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, unsuback?: mqtt5_packet.UnsubackPacket){                return Mqtt5Client._s_on_unsuback_callback(resolve, reject, client, errorCode, unsuback);            }             try {                crt_native.mqtt5_client_unsubscribe(this.native_handle(), packet, curriedPromiseCallback);            } catch (e) {                reject(e);            }        });    }     /**     * Send a message to subscribing clients by queuing a PUBLISH packet to be sent to the server.     *     * @param packet PUBLISH packet to send to the server     * @returns a promise that will be rejected with an error or resolved with the PUBACK response (QoS 1) or     * undefined (QoS 0)     */    async publish(packet: mqtt5_packet.PublishPacket) : Promise<mqtt5.PublishCompletionResult> {        return new Promise<mqtt5.PublishCompletionResult>((resolve, reject) => {             if (packet && packet.payload) {                packet.payload = mqtt_shared.normalize_payload(packet.payload);            }             function curriedPromiseCallback(client: Mqtt5Client, errorCode: number, result: mqtt5.PublishCompletionResult){                return Mqtt5Client._s_on_puback_callback(resolve, reject, client, errorCode, result);            }             try {                crt_native.mqtt5_client_publish(this.native_handle(), packet, curriedPromiseCallback);            } catch (e) {                reject(e);            }        });    }     /**     * Queries a small set of numerical statistics about the current state of the client's operation queue     *     * @group Node-only     */    getOperationalStatistics() : ClientStatistics {        return crt_native.mqtt5_client_get_queue_statistics(this.native_handle());    }     /**     * Queries a small set of numerical statistics about the current state of the client's operation queue     * @deprecated use getOperationalStatistics instead     *     * @group Node-only     */    getQueueStatistics() : ClientStatistics {        return this.getOperationalStatistics();    }     /**     * Event emitted when the client encounters a serious error condition, such as invalid input, napi failures, and     * other potentially unrecoverable situations.     *     * Listener type: {@link ErrorEventListener}     *     * @event     */    static ERROR : string = 'error';     /**     * Event emitted when an MQTT PUBLISH packet is received by the client.     *     * Listener type: {@link MessageReceivedEventListener}     *     * @event     */    static MESSAGE_RECEIVED : string = 'messageReceived';     /**     * Event emitted when the client begins a connection attempt.     *     * Listener type: {@link AttemptingConnectEventListener}     *     * @event     */    static ATTEMPTING_CONNECT : string = 'attemptingConnect';     /**     * Event emitted when the client successfully establishes an MQTT connection.  Only emitted after     * an {@link ATTEMPTING_CONNECT attemptingConnect} event.     *     * Listener type: {@link ConnectionSuccessEventListener}     *     * @event     */    static CONNECTION_SUCCESS : string = 'connectionSuccess';     /**     * Event emitted when the client fails to establish an MQTT connection.  Only emitted after     * an {@link ATTEMPTING_CONNECT attemptingConnect} event.     *     * Listener type: {@link ConnectionFailureEventListener}     *     * @event     */    static CONNECTION_FAILURE : string = 'connectionFailure';     /**     * Event emitted when the client's current connection is closed for any reason.  Only emitted after     * a {@link CONNECTION_SUCCESS connectionSuccess} event.     *     * Listener type: {@link DisconnectionEventListener}     *     * @event     */    static DISCONNECTION : string = 'disconnection';     /**     * Event emitted when the client finishes shutdown as a result of the user invoking {@link stop}.     *     * Listener type: {@link StoppedEventListener}     *     * @event     */    static STOPPED : string = 'stopped';     /**     * Registers a listener for the client's {@link ERROR error} event.  An {@link ERROR error} event is emitted when     * the client encounters a serious error condition, such as invalid input, napi failures, and other potentially     * unrecoverable situations.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'error', listener: mqtt5.ErrorEventListener): this;     /**     * Registers a listener for the client's {@link MESSAGE_RECEIVED messageReceived} event.  A     * {@link MESSAGE_RECEIVED messageReceived} event is emitted when an MQTT PUBLISH packet is received by the     * client.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'messageReceived', listener: mqtt5.MessageReceivedEventListener): this;     /**     * Registers a listener for the client's {@link ATTEMPTING_CONNECT attemptingConnect} event.  A     * {@link ATTEMPTING_CONNECT attemptingConnect} event is emitted every time the client begins a connection attempt.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'attemptingConnect', listener: mqtt5.AttemptingConnectEventListener): this;     /**     * Registers a listener for the client's {@link CONNECTION_SUCCESS connectionSuccess} event.  A     * {@link CONNECTION_SUCCESS connectionSuccess} event is emitted every time the client successfully establishes     * an MQTT connection.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'connectionSuccess', listener: mqtt5.ConnectionSuccessEventListener): this;     /**     * Registers a listener for the client's {@link CONNECTION_FAILURE connectionFailure} event.  A     * {@link CONNECTION_FAILURE connectionFailure} event is emitted every time the client fails to establish an     * MQTT connection.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'connectionFailure', listener: mqtt5.ConnectionFailureEventListener): this;     /**     * Registers a listener for the client's {@link DISCONNECTION disconnection} event.  A     * {@link DISCONNECTION disconnection} event is emitted when the client's current MQTT connection is closed     * for any reason.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'disconnection', listener: mqtt5.DisconnectionEventListener): this;     /**     * Registers a listener for the client's {@link STOPPED stopped} event.  A     * {@link STOPPED stopped} event is emitted when the client finishes shutdown as a     * result of the user invoking {@link stop}.     *     * @param event the type of event to listen to     * @param listener the event listener to add     */    on(event: 'stopped', listener: mqtt5.StoppedEventListener): this;     on(event: string | symbol, listener: (...args: any[]) => void): this {        super.on(event, listener);        return this;    }     /*     * Private helper functions     *     * Callbacks come through static functions so that the native threadsafe function objects do not     * capture the client object itself, simplifying the number of strong references to the client floating around.     */     private static _s_on_stopped(client: Mqtt5Client) {        process.nextTick(() => {            let stoppedEvent: mqtt5.StoppedEvent = {};             client.emit(Mqtt5Client.STOPPED, stoppedEvent);        });    }     private static _s_on_attempting_connect(client: Mqtt5Client) {        process.nextTick(() => {            let attemptingConnectEvent: mqtt5.AttemptingConnectEvent = {};             client.emit(Mqtt5Client.ATTEMPTING_CONNECT, attemptingConnectEvent);        });    }     private static _s_on_connection_success(client: Mqtt5Client, connack: mqtt5_packet.ConnackPacket, settings: mqtt5.NegotiatedSettings) {        let connectionSuccessEvent: mqtt5.ConnectionSuccessEvent = {            connack: connack,            settings: settings        };         process.nextTick(() => {            client.emit(Mqtt5Client.CONNECTION_SUCCESS, connectionSuccessEvent);        });    }     private static _s_on_connection_failure(client: Mqtt5Client, error: CrtError, connack?: mqtt5_packet.ConnackPacket) {        let connectionFailureEvent: mqtt5.ConnectionFailureEvent = {            error: error        };         if (connack !== null && connack !== undefined) {            connectionFailureEvent.connack = connack;        }         process.nextTick(() => {            client.emit(Mqtt5Client.CONNECTION_FAILURE, connectionFailureEvent);        });    }     private static _s_on_disconnection(client: Mqtt5Client, error: CrtError, disconnect?: mqtt5_packet.DisconnectPacket) {        let disconnectionEvent: mqtt5.DisconnectionEvent = {            error: error        };         if (disconnect !== null && disconnect !== undefined) {            disconnectionEvent.disconnect = disconnect;        }         process.nextTick(() => {            client.emit(Mqtt5Client.DISCONNECTION, disconnectionEvent);        });    }     private static _s_on_suback_callback(resolve : (value: (mqtt5_packet.SubackPacket | PromiseLike<mqtt5_packet.SubackPacket>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, suback?: mqtt5_packet.SubackPacket) {        if (errorCode == 0 && suback !== undefined) {            resolve(suback);        } else {            reject(io.error_code_to_string(errorCode));        }    }     private static _s_on_unsuback_callback(resolve : (value: (mqtt5_packet.UnsubackPacket | PromiseLike<mqtt5_packet.UnsubackPacket>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, unsuback?: mqtt5_packet.UnsubackPacket) {        if (errorCode == 0 && unsuback !== undefined) {            resolve(unsuback);        } else {            reject(io.error_code_to_string(errorCode));        }    }     private static _s_on_puback_callback(resolve : (value: (mqtt5.PublishCompletionResult | PromiseLike<mqtt5.PublishCompletionResult>)) => void, reject : (reason?: any) => void, client: Mqtt5Client, errorCode: number, result:mqtt5.PublishCompletionResult) {        if (errorCode == 0) {            resolve(result);        } else {            reject(io.error_code_to_string(errorCode));        }    }     private static _s_on_message_received(client: Mqtt5Client, message : mqtt5_packet.PublishPacket, pubackControlId? : any) {        let acknowledgementControl : mqtt_shared.PublishAcknowledgementHandleWrapper | undefined = undefined;         if (pubackControlId !== undefined) {            const controlId = pubackControlId;            acknowledgementControl = new mqtt_shared.PublishAcknowledgementHandleWrapper(                new mqtt_shared.PublishAcknowledgementHandle(() => {                    crt_native.mqtt5_client_invoke_publish_acknowledgement(client.native_handle(), controlId);                })            );        }         let messageReceivedEvent: mqtt5.MessageReceivedEvent = {            message: message        };         if (acknowledgementControl) {            messageReceivedEvent.acknowledgementControl = acknowledgementControl;        }         process.nextTick(() => {            client.emitWithCallback(Mqtt5Client.MESSAGE_RECEIVED, () => {                /*                 * Even if corked, all listeners have had a chance to react to the event                 * and acquire the acknowledgement handle if they wanted to.  If no one did so, then we do it ourselves.                 */                if (acknowledgementControl) {                    const handle = acknowledgementControl.acquireHandle();                    if (handle) {                        handle.invokeAcknowledgement();                    }                }            }, messageReceivedEvent);        });    }}