File Explorer

/var/runtime/node_modules/@aws-sdk/node_modules/aws-crt/lib/common

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

mqtt.spec.ts11.9 KB Β· 298 lines
/* * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ import { v4 as uuid } from 'uuid'; import * as test_env from "@test/test_env"import * as retry from "@test/retry"import { ClientBootstrap } from '@awscrt/io';import { MqttClient, MqttClientConnection, QoS, MqttWill, Payload } from '@awscrt/mqtt';import { AwsIotMqttConnectionConfigBuilder } from '@awscrt/aws_iot';import { fromUtf8 } from '@aws-sdk/util-utf8-browser';import {once} from "events"; jest.setTimeout(30000); async function makeConnection(will?: MqttWill, client_id: string = `mqtt-unit-test-${uuid()}`) : Promise<MqttClientConnection> {    return new Promise<MqttClientConnection>(async (resolve, reject) => {        try {            let builder = AwsIotMqttConnectionConfigBuilder.new_with_websockets()                .with_clean_session(true)                .with_client_id(client_id)                .with_endpoint(test_env.AWS_IOT_ENV.MQTT311_HOST)                .with_credentials(                    test_env.AWS_IOT_ENV.MQTT311_REGION,                    test_env.AWS_IOT_ENV.MQTT311_CRED_ACCESS_KEY,                    test_env.AWS_IOT_ENV.MQTT311_CRED_SECRET_ACCESS_KEY,                    test_env.AWS_IOT_ENV.MQTT311_CRED_SESSION_TOKEN)                .with_ping_timeout_ms(5000);             if (will !== undefined) {                builder.with_will(will);            }             const config = builder.build();             const client = new MqttClient(new ClientBootstrap());            const connection = client.new_connection(config);            resolve(connection);        } catch (err) {            reject(err);        }    });} test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQTT Connect/Disconnect', async () => {    await retry.networkTimeoutRetryWrapper( async () => {        const connection = await makeConnection();         let onConnect = once(connection, 'connect');        let onDisconnect = once(connection, 'disconnect');         await connection.connect();         let connectResult = (await onConnect)[0];        expect(connectResult).toBeFalsy(); /* session present */         await connection.disconnect();        await onDisconnect;    })}); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQTT Pub/Sub', async () => {    await retry.networkTimeoutRetryWrapper( async () => {        const connection = await makeConnection();         let onConnect = once(connection, 'connect');        let onDisconnect = once(connection, 'disconnect');         await connection.connect();         let connectResult = (await onConnect)[0];        expect(connectResult).toBeFalsy(); /* session present */         const test_topic = `test/me/senpai/${uuid()}`;        const test_payload = 'NOTICE ME';         var resolvePromise: (value: void | PromiseLike<void>) => void;        let messageReceivedPromise = new Promise<void>((resolve, reject) => {            resolvePromise = resolve;        });         const sub = connection.subscribe(test_topic, QoS.AtLeastOnce, async (topic, payload, dup, qos, retain) => {            expect(topic).toEqual(test_topic);            const payload_str = (new TextDecoder()).decode(new Uint8Array(payload));            expect(payload_str).toEqual(test_payload);            expect(qos).toEqual(QoS.AtLeastOnce);            expect(retain).toBeFalsy();            resolvePromise();        });        await expect(sub).resolves.toBeTruthy();         const publishResult = connection.publish(test_topic, test_payload, QoS.AtLeastOnce);        await expect(publishResult).resolves.toBeTruthy();         await messageReceivedPromise;         const unsubscribed = connection.unsubscribe(test_topic);        await expect(unsubscribed).resolves.toHaveProperty('packet_id');         await connection.disconnect();        await onDisconnect;    })}); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQTT Will', async () => {    await retry.networkTimeoutRetryWrapper( async () => {        // To check that Will message was successfully set for a connection, the connection should be closed without        // sending a client-side DISCONNECT packet. This test forces server to close connection by opening another        // connection with the same client ID.         const willTopic = 'test/last/will/and/testament'        const willPayload = 'AVENGE ME'        const client_id = `node-mqtt-unit-test-will-${uuid()}`         // Connection with Will set.        const connectionWithWill = await makeConnection(new MqttWill(            willTopic,            QoS.AtLeastOnce,            willPayload        ), client_id);        const onConnectWithWill = once(connectionWithWill, 'connect');        const onDisconnectWithWill = once(connectionWithWill, 'disconnect');        await connectionWithWill.connect();        const connectWithWillResult = (await onConnectWithWill)[0];        expect(connectWithWillResult).toBeFalsy(); /* session present */         // The second connection that subscribes to first connection's Will topic.        const connectionWaitingForWill = await makeConnection();        const onConnectWaitingForWill = once(connectionWaitingForWill, 'connect');        const onDisconnectWaitingForWill = once(connectionWaitingForWill, 'disconnect');        await connectionWaitingForWill.connect()        const connectWaitingForWill = (await onConnectWaitingForWill)[0];        expect(connectWaitingForWill).toBeFalsy(); /* session present */         const onMessage = once(connectionWaitingForWill, 'message');        await connectionWaitingForWill.subscribe(willTopic, QoS.AtLeastOnce);         // pause for a couple of seconds to try and minimize chance for a service-side race        await new Promise(resolve => setTimeout(resolve, 2000));         // The third connection that will cause the first one to be disconnected because it has the same client ID.        const connectionDuplicate = await makeConnection(undefined, client_id);         const onDisconnectDuplicate = once(connectionDuplicate, 'disconnect');         // Rarely, IoT Core disconnects the new connection and not the existing one, so retry in that case        let continueConnecting = true;        while (continueConnecting) {            try {                const onConnectDuplicate = once(connectionDuplicate, 'connect');                await connectionDuplicate.connect();                await onConnectDuplicate;                continueConnecting = false;            } catch (err) {                await new Promise(resolve => setTimeout(resolve, 1000));            }        }         // The second connection should receive Will message after the first connection was kicked out.        const messageReceivedArgs = (await onMessage);        const messageReceivedTopic = messageReceivedArgs[0];        const messageReceivedPayload = messageReceivedArgs[1];        const messageReceivedQos = messageReceivedArgs[3];        const messageReceivedRetain = messageReceivedArgs[4];         expect(messageReceivedTopic).toEqual(willTopic);        expect(messageReceivedPayload).toBeDefined();        const payload_str = (new TextDecoder()).decode(new Uint8Array(messageReceivedPayload));        expect(payload_str).toEqual(willPayload);        expect(messageReceivedQos).toEqual(QoS.AtLeastOnce);        expect(messageReceivedRetain).toBeFalsy();         await connectionWaitingForWill.disconnect();        await onDisconnectWaitingForWill;         await connectionDuplicate.disconnect();        await onDisconnectDuplicate;         await connectionWithWill.disconnect();        await onDisconnectWithWill;    })}); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQTT On Any Publish', async () => {    await retry.networkTimeoutRetryWrapper( async () => {        const connection = await makeConnection();         let onConnect = once(connection, 'connect');        let onDisconnect = once(connection, 'disconnect');         await connection.connect();         let connectResult = (await onConnect)[0];        expect(connectResult).toBeFalsy(); /* session present */         const test_topic = `test/me/senpai/${uuid()}`;        const test_payload = 'NOTICE ME';         let onMessage = once(connection, 'message');         await connection.subscribe(test_topic, QoS.AtLeastOnce);         await connection.publish(test_topic, test_payload, QoS.AtLeastOnce);         let messageReceivedArgs = (await onMessage);        let messageReceivedTopic = messageReceivedArgs[0];        let messageReceivedPayload = messageReceivedArgs[1];        let messageReceivedQos = messageReceivedArgs[3];        let messageReceivedRetain = messageReceivedArgs[4];         expect(messageReceivedTopic).toEqual(test_topic);        expect(messageReceivedPayload).toBeDefined();        const payload_str = (new TextDecoder()).decode(new Uint8Array(messageReceivedPayload));        expect(payload_str).toEqual(test_payload);        expect(messageReceivedQos).toEqual(QoS.AtLeastOnce);        expect(messageReceivedRetain).toBeFalsy();         await connection.disconnect();        await onDisconnect;    })}); test_env.conditional_test(test_env.AWS_IOT_ENV.mqtt311_is_valid_iot_cred())('MQTT payload types', async () => {    await retry.networkTimeoutRetryWrapper( async () => {        const connection = await makeConnection();         let onDisconnect = once(connection, 'disconnect');         await connection.connect();         const id = uuid();         const tests: { [key: string]: { send: Payload, recv: ArrayBuffer } } = {            [`test/types/${id}/string`]: {                send: 'utf-8 πŸ‘πŸ‘„πŸ‘ time',                recv: fromUtf8('utf-8 πŸ‘πŸ‘„πŸ‘ time').buffer,            },            [`test/types/${id}/dataview`]: {                send: new DataView(fromUtf8('I was a DataView').buffer),                recv: fromUtf8('I was a DataView').buffer,            },            [`test/types/${id}/uint8array`]: {                // note: sending partial view of a larger buffer                send: new Uint8Array(new Uint8Array([0, 1, 2, 3, 4, 5, 6]).buffer, 2, 3),                recv: new Uint8Array([2, 3, 4]).buffer,            },            [`test/types/${id}/arraybuffer`]: {                send: new Uint8Array([0, 255, 255, 255, 255, 255, 1]).buffer,                recv: new Uint8Array([0, 255, 255, 255, 255, 255, 1]).buffer,            },            [`test/types/${id}/json`]: {                send: { I: "was JSON" },                recv: fromUtf8('{"I": "was JSON"}').buffer,            },        };         // as messages are received, delete items.        // when this object is empty all expected messages have been received.        let expecting: { [key: string]: ArrayBuffer } = {}        for (const topic in tests) {            expecting[topic] = tests[topic].recv;        }         var resolveMessagesReceivedPromise: (value: void | PromiseLike<void>) => void;        let messagesReceivedPromise = new Promise<void>( (resolve, reject) => {            resolveMessagesReceivedPromise = resolve;        });         connection.on('message', async (topic, payload, dup, qos, retain) => {            // QoS1 message might arrive multiple times.            // so it's no big deal if we've already seen this topic            if (!(topic in expecting)) {                return;            }             expect(payload).toEqual(expecting[topic]);            delete expecting[topic];             if (Object.keys(expecting).length == 0) {                resolveMessagesReceivedPromise();            }        });         await connection.subscribe(`test/types/${id}/#`, QoS.AtLeastOnce);         for (const topic in tests) {            await connection.publish(topic, tests[topic].send, QoS.AtLeastOnce);        }         await messagesReceivedPromise;         await connection.disconnect();        await onDisconnect;    })});