File Explorer

/var/runtime/node_modules/@aws-sdk/node_modules/aws-crt/source

This explorer reads the filesystem of the server it runs on, so /workspace/user isn't present here. Browsing and the terminal still work against this server's own disk from /.

1 dir
34 files
event_stream.c93.0 KB · 2426 lines
/** * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. * SPDX-License-Identifier: Apache-2.0. */ #include "event_stream.h" #include <aws/event-stream/event_stream_rpc_client.h>#include <aws/io/socket.h>#include <aws/io/tls_channel_handler.h> static const uint32_t AWS_EVENT_STREAM_CONNECT_TIMEOUT_DEFAULT_MS = 10000; static const char *AWS_EVENT_STREAM_PROPERTY_NAME_HOST = "hostName";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_PORT = "port";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_NAME = "name";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_TYPE = "type";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_VALUE = "value";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_HEADERS = "headers";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_PAYLOAD = "payload";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_FLAGS = "flags";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_MESSAGE = "message";static const char *AWS_EVENT_STREAM_PROPERTY_NAME_OPERATION = "operation"; /* * The fact that a zeroed array list crashes in aws_array_list_length drives me crazy.  Since CBMC proofs are * entangled with the implementation, I don't want to change it. */static size_t s_aws_array_list_length(struct aws_array_list *array_list) {    if (array_list == NULL) {        return 0;    }     if (!aws_array_list_is_valid(array_list)) {        return 0;    }     return aws_array_list_length(array_list);} /* * Binding object that outlives the associated napi wrapper object.  When that object finalizes, then it's a signal * to this object to destroy the connection (and itself, afterwards). * * WARNING * Data Access Rules: *  (1) If in the libuv thread (called from JS or in the invocation of a thread-safe function), you may access anything *      in the binding *  (2) Otherwise, you may only access thread-safe functions or the binding's ref count APIs.  In particular, *      'connection' and 'is_closed' are off-limits unless you're in the libuv thread. */struct aws_event_stream_client_connection_binding {    struct aws_allocator *allocator;     /*     * We ref count the binding itself because there are two primary time intervals that together create a union     * that we must honor.     *     * Interval #1: The binding must live from new() to close()     * Interval #2: The binding must live from connect() to {connection failure || connection shutdown} as processed     *    by the libuv thread.  It is incorrect to react to those events in the event loop callback; we must bundle     *    and ship them across to the libuv thread.  When the libuv thread is processing a connection failure or     *    a connection shutdown, we know that no other events can possibly be pending (they would have already been     *    processed in the libuv thread).     *     * The union of those two intervals is "probably" enough, but its correctness would rest on an internal property     * of the node implementation itself: "are calls to napi_call_threadsafe_function() well-ordered with respect to     * a single producer (we only call it from the libuv thread itself)?"  This is almost certainly true, but I don't     * see it guaranteed within the n-api documentation.  For that reason, we also add the intervals of all     * completable connection events: incoming protocol messages and outbound message flushes     */    struct aws_ref_count ref_count;     /*     * May only be accessed from within the libuv thread.  This includes connection APIs like acquire and release.     */    struct aws_event_stream_rpc_client_connection *connection;    bool is_closed;     /*     * Cached config since connect is separate     *     * Const post-creation.     */    struct aws_string *host;    uint32_t port;    struct aws_socket_options socket_options;    struct aws_tls_connection_options tls_connection_options;    bool using_tls;     /*     * Single count ref to the JS connection object.     */    napi_ref node_event_stream_client_connection_ref;     /*     * Single count ref to the node external managed by the binding.     */    napi_ref node_event_stream_client_connection_external_ref;     napi_threadsafe_function on_connection_setup;    napi_threadsafe_function on_connection_shutdown;    napi_threadsafe_function on_protocol_message;}; static void s_aws_event_stream_client_connection_binding_on_zero(void *context) {    if (context == NULL) {        return;    }     struct aws_event_stream_client_connection_binding *binding = context;     aws_string_destroy(binding->host);    aws_tls_connection_options_clean_up(&binding->tls_connection_options);     AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_connection_setup);    AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_connection_shutdown);    AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_protocol_message);     aws_mem_release(binding->allocator, binding);} static struct aws_event_stream_client_connection_binding *s_aws_event_stream_client_connection_binding_acquire(    struct aws_event_stream_client_connection_binding *binding) {    if (binding == NULL) {        return NULL;    }     aws_ref_count_acquire(&binding->ref_count);    return binding;} static struct aws_event_stream_client_connection_binding *s_aws_event_stream_client_connection_binding_release(    struct aws_event_stream_client_connection_binding *binding) {    if (binding != NULL) {        aws_ref_count_release(&binding->ref_count);    }     return NULL;} static void s_close_connection_binding(napi_env env, struct aws_event_stream_client_connection_binding *binding) {    AWS_FATAL_ASSERT(env != NULL);     binding->is_closed = true;     napi_ref node_event_stream_client_connection_external_ref =        binding->node_event_stream_client_connection_external_ref;    binding->node_event_stream_client_connection_external_ref = NULL;     napi_ref node_event_stream_client_connection_ref = binding->node_event_stream_client_connection_ref;    binding->node_event_stream_client_connection_ref = NULL;     if (node_event_stream_client_connection_external_ref != NULL) {        napi_delete_reference(env, node_event_stream_client_connection_external_ref);    }     if (node_event_stream_client_connection_ref != NULL) {        napi_delete_reference(env, node_event_stream_client_connection_ref);    }} /* * Holds relevant information about a connection setup or shutdown callback from the event loop.  This is shipped * over to a threadsafe function that runs on the libuv thread. */struct aws_event_stream_connection_event_data {    struct aws_allocator *allocator;     struct aws_event_stream_client_connection_binding *binding;    int error_code;    struct aws_event_stream_rpc_client_connection *connection;}; static void s_napi_event_stream_connection_on_connection_shutdown(    napi_env env,    napi_value function,    void *context,    void *user_data) {     (void)context;     struct aws_event_stream_connection_event_data *shutdown_data = user_data;    struct aws_event_stream_client_connection_binding *binding = shutdown_data->binding;     AWS_FATAL_ASSERT(binding->connection != NULL);     AWS_LOGF_INFO(        AWS_LS_NODEJS_CRT_GENERAL,        "s_napi_event_stream_connection_on_connection_shutdown - event stream connection has completed shutdown");     if (env && !binding->is_closed) {        napi_value params[2];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream connection, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_connection_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_connection_on_connection_shutdown - event_stream_client_connection node wrapper "                "no longer resolvable");            goto done;        }         AWS_NAPI_CALL(env, napi_create_uint32(env, shutdown_data->error_code, &params[1]), { goto done; });         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, binding->on_connection_shutdown, NULL, function, num_params, params));    } done:     /*     * Release our reference, which in this case, allows the connection to finally delete itself.     */    aws_event_stream_rpc_client_connection_release(binding->connection);    binding->connection = NULL;     /*     * Our invariant is that for the time interval between attempting to connect and either     *     *  (1) connection establishment failed, or     *  (2) connection establishment succeeded and some arbitrary time later, gets shutdown     *     * we maintain a ref on the binding itself, ie native event stream can safely invoke callbacks that are     * guaranteed to reach a valid binding.     *     * It's trickier than normal because, while we acquire in a single spot (the connect() call), we release in     * two very different spots:     *     *  (1) connection establishment failed: in s_napi_on_event_stream_client_connection_setup     *  (2) connection establishment succeeded: here     *     * Additionally, we can only release when we're in the libuv thread.     */    s_aws_event_stream_client_connection_binding_release(binding);     aws_mem_release(shutdown_data->allocator, shutdown_data);} struct aws_event_stream_message_storage {    struct aws_allocator *allocator;    struct aws_array_list headers;    struct aws_byte_buf *payload;    enum aws_event_stream_rpc_message_type message_type;    uint32_t message_flags;}; static void s_aws_event_stream_message_storage_clean_up(struct aws_event_stream_message_storage *storage) {    aws_event_stream_headers_list_cleanup(&storage->headers);     if (storage->payload != NULL) {        aws_byte_buf_clean_up(storage->payload);        aws_mem_release(storage->allocator, storage->payload);    }} static int s_aws_event_stream_message_storage_init_from_native(    struct aws_event_stream_message_storage *storage,    struct aws_allocator *allocator,    const struct aws_event_stream_rpc_message_args *message) {     storage->allocator = allocator;     /* we don't use the event stream headers init api so that we can allocate the proper amount */    if (aws_array_list_init_dynamic(            &storage->headers, allocator, message->headers_count, sizeof(struct aws_event_stream_header_value_pair))) {        return AWS_OP_ERR;    }     for (size_t i = 0; i < message->headers_count; ++i) {        struct aws_event_stream_header_value_pair *source_header = &message->headers[i];         if (aws_event_stream_add_header(&storage->headers, source_header)) {            goto error;        }    }     if (message->payload != NULL) {        storage->payload = aws_mem_calloc(allocator, 1, sizeof(struct aws_byte_buf));        if (aws_byte_buf_init_copy_from_cursor(                storage->payload, allocator, aws_byte_cursor_from_buf(message->payload))) {            goto error;        }    }     storage->message_type = message->message_type;    storage->message_flags = message->message_flags;     return AWS_OP_SUCCESS; error:     /* assumes AWS_ZERO_STRUCT was called first */    s_aws_event_stream_message_storage_clean_up(storage);     return AWS_OP_ERR;} #define ADD_INTEGRAL_HEADER(type_name, napi_extraction_fn_name, add_header_fn_name)                                    \    type_name value = 0;                                                                                               \    if (napi_extraction_fn_name(env, napi_header, AWS_EVENT_STREAM_PROPERTY_NAME_VALUE, &value) !=                     \        AWS_NGNPR_VALID_VALUE) {                                                                                       \        AWS_LOGF_ERROR(                                                                                                \            AWS_LS_NODEJS_CRT_GENERAL,                                                                                 \            "id=%p s_add_event_stream_header_from_js - invalid integer property value",                                \            log_context);                                                                                              \        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);                                                                   \        goto done;                                                                                                     \    }                                                                                                                  \                                                                                                                       \    if (add_header_fn_name(headers, aws_byte_cursor_from_buf(&name_buffer), value)) {                                  \        AWS_LOGF_ERROR(                                                                                                \            AWS_LS_NODEJS_CRT_GENERAL,                                                                                 \            "id=%p s_add_event_stream_header_from_js - failed to add integer-valued header to header list",            \            log_context);                                                                                              \        goto done;                                                                                                     \    } #define ADD_BUFFERED_HEADER(napi_query_type, add_header_fn_name)                                                       \    if (aws_napi_get_named_property_as_bytebuf(                                                                        \            env, napi_header, AWS_EVENT_STREAM_PROPERTY_NAME_VALUE, napi_query_type, &value_buffer) !=                 \        AWS_NGNPR_VALID_VALUE) {                                                                                       \        AWS_LOGF_ERROR(                                                                                                \            AWS_LS_NODEJS_CRT_GENERAL,                                                                                 \            "id=%p s_add_event_stream_header_from_js - failed to parse 'value' property as a byte sequence",           \            log_context);                                                                                              \        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);                                                                   \        goto done;                                                                                                     \    }                                                                                                                  \    if (add_header_fn_name(                                                                                            \            headers, aws_byte_cursor_from_buf(&name_buffer), aws_byte_cursor_from_buf(&value_buffer))) {               \        AWS_LOGF_ERROR(                                                                                                \            AWS_LS_NODEJS_CRT_GENERAL,                                                                                 \            "id=%p s_add_event_stream_header_from_js - failed to byte sequence valued header to header list",          \            log_context);                                                                                              \        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);                                                                   \        goto done;                                                                                                     \    } static int s_aws_event_stream_add_int64_header_by_cursor(    struct aws_array_list *headers,    struct aws_byte_cursor name,    struct aws_byte_cursor value) {    AWS_FATAL_ASSERT(value.len == 8);     /*     * We pass int64s encoded as a two's-complement byte sequence.  This lets us just build the 64-bit value     * directly and then cast it to an int64.     */    uint64_t uint64_value = 0;    for (size_t i = 0; i < value.len; ++i) {        uint64_t byte_value = value.ptr[i];        uint64_value |= (byte_value << (i * 8));    }     int64_t int64_value = uint64_value;     return aws_event_stream_add_int64_header_by_cursor(headers, name, int64_value);} static int s_add_event_stream_header_from_js(    struct aws_array_list *headers,    napi_env env,    napi_value napi_header,    void *log_context) {     int result = AWS_OP_ERR;     struct aws_byte_buf name_buffer;    AWS_ZERO_STRUCT(name_buffer);     struct aws_byte_buf value_buffer;    AWS_ZERO_STRUCT(value_buffer);     if (aws_napi_get_named_property_as_bytebuf(            env, napi_header, AWS_EVENT_STREAM_PROPERTY_NAME_NAME, napi_string, &name_buffer) !=        AWS_NGNPR_VALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_add_event_stream_header_from_js - failed to parse required 'name' property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto done;    }     uint32_t value_type_u32 = 0;    enum aws_event_stream_header_value_type value_type = 0;    if (aws_napi_get_named_property_as_uint32(env, napi_header, AWS_EVENT_STREAM_PROPERTY_NAME_TYPE, &value_type_u32) !=        AWS_NGNPR_VALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_add_event_stream_header_from_js - failed to parse required 'type' property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto done;    }     value_type = (enum aws_event_stream_header_value_type)value_type_u32;    if (value_type < 0 || value_type > AWS_EVENT_STREAM_HEADER_UUID) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_add_event_stream_header_from_js - 'type' property has invalid value",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto done;    }     switch (value_type) {        case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:        case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:            if (aws_event_stream_add_bool_header_by_cursor(                    headers, aws_byte_cursor_from_buf(&name_buffer), value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE)) {                AWS_LOGF_ERROR(                    AWS_LS_NODEJS_CRT_GENERAL,                    "id=%p s_add_event_stream_header_from_js - failed to add boolean-valued header",                    log_context);                goto done;            }            break;         case AWS_EVENT_STREAM_HEADER_BYTE: {            ADD_INTEGRAL_HEADER(int8_t, aws_napi_get_named_property_as_int8, aws_event_stream_add_byte_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_INT16: {            ADD_INTEGRAL_HEADER(                int16_t, aws_napi_get_named_property_as_int16, aws_event_stream_add_int16_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_INT32: {            ADD_INTEGRAL_HEADER(                int32_t, aws_napi_get_named_property_as_int32, aws_event_stream_add_int32_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_INT64: {            ADD_BUFFERED_HEADER(napi_undefined, s_aws_event_stream_add_int64_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_BYTE_BUF: {            ADD_BUFFERED_HEADER(napi_undefined, aws_event_stream_add_byte_buf_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_STRING: {            ADD_BUFFERED_HEADER(napi_string, aws_event_stream_add_string_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_TIMESTAMP: {            ADD_INTEGRAL_HEADER(                int64_t, aws_napi_get_named_property_as_int64, aws_event_stream_add_timestamp_header_by_cursor)            break;        }         case AWS_EVENT_STREAM_HEADER_UUID: {            ADD_BUFFERED_HEADER(napi_undefined, aws_event_stream_add_uuid_header_by_cursor)            break;        }         default:            goto done;    }     result = AWS_OP_SUCCESS; done:     aws_byte_buf_clean_up(&name_buffer);    aws_byte_buf_clean_up(&value_buffer);     return result;} static int s_aws_event_stream_message_storage_init_from_js(    struct aws_event_stream_message_storage *storage,    struct aws_allocator *allocator,    napi_env env,    napi_value message,    void *log_context) {     int result = AWS_OP_ERR;     storage->allocator = allocator;     napi_value napi_headers = NULL;    enum aws_napi_get_named_property_result get_headers_result =        aws_napi_get_named_property(env, message, AWS_EVENT_STREAM_PROPERTY_NAME_HEADERS, napi_object, &napi_headers);    if (get_headers_result == AWS_NGNPR_INVALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_event_stream_message_storage_init_from_js - invalid headers property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto error;    }     if (get_headers_result == AWS_NGNPR_VALID_VALUE) {        size_t header_array_length = 0;        if (aws_napi_get_property_array_size(                env, message, AWS_EVENT_STREAM_PROPERTY_NAME_HEADERS, &header_array_length)) {            AWS_LOGF_ERROR(                AWS_LS_NODEJS_CRT_GENERAL,                "id=%p s_aws_event_stream_message_storage_init_from_js - headers property is not an array",                log_context);            aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);            goto error;        }         aws_array_list_init_dynamic(            &storage->headers, allocator, header_array_length, sizeof(struct aws_event_stream_header_value_pair));         for (size_t i = 0; i < header_array_length; ++i) {             napi_value napi_header = NULL;            AWS_NAPI_CALL(env, napi_get_element(env, napi_headers, (uint32_t)i, &napi_header), { goto error; });             if (s_add_event_stream_header_from_js(&storage->headers, env, napi_header, log_context)) {                AWS_LOGF_ERROR(                    AWS_LS_NODEJS_CRT_GENERAL,                    "id=%p s_aws_event_stream_message_storage_init_from_js - could not extract eventstream header",                    log_context);                goto error;            }        }    }     struct aws_byte_buf payload_buffer;    AWS_ZERO_STRUCT(payload_buffer);     enum aws_napi_get_named_property_result get_payload_result = aws_napi_get_named_property_as_bytebuf(        env, message, AWS_EVENT_STREAM_PROPERTY_NAME_PAYLOAD, napi_undefined, &payload_buffer);    if (get_payload_result == AWS_NGNPR_INVALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_event_stream_message_storage_init_from_js - invalid headers property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto error;    } else if (get_payload_result == AWS_NGNPR_VALID_VALUE) {        storage->payload = aws_mem_calloc(allocator, 1, sizeof(struct aws_byte_buf));        *storage->payload = payload_buffer;    }     uint32_t message_type_uint32 = 0;    if (aws_napi_get_named_property_as_uint32(            env, message, AWS_EVENT_STREAM_PROPERTY_NAME_TYPE, &message_type_uint32) != AWS_NGNPR_VALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_event_stream_message_storage_init_from_js - invalid message type property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto error;    }     storage->message_type = (enum aws_event_stream_rpc_message_type)message_type_uint32;     if (aws_napi_get_named_property_as_uint32(            env, message, AWS_EVENT_STREAM_PROPERTY_NAME_FLAGS, &storage->message_flags) == AWS_NGNPR_INVALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_event_stream_message_storage_init_from_js - invalid message flags property",            log_context);        aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);        goto error;    }     result = AWS_OP_SUCCESS;    goto done; error:     s_aws_event_stream_message_storage_clean_up(storage); done:     return result;} #define AWS_ATTACH_BUFFER_VALUE_TO_HEADER(get_buffer_fn)                                                               \    struct aws_byte_buf non_heap_buffer = get_buffer_fn(header);                                                       \    struct aws_byte_buf *heap_buffer = aws_mem_calloc(allocator, 1, sizeof(struct aws_byte_buf));                      \    aws_byte_buf_init_copy_from_cursor(heap_buffer, allocator, aws_byte_cursor_from_buf(&non_heap_buffer));            \    if (aws_napi_attach_object_property_binary_as_finalizable_external(                                                \            napi_header, env, AWS_EVENT_STREAM_PROPERTY_NAME_VALUE, heap_buffer)) {                                    \        aws_byte_buf_clean_up(heap_buffer);                                                                            \        aws_mem_release(allocator, heap_buffer);                                                                       \        return AWS_OP_ERR;                                                                                             \    } static int s_aws_create_napi_header_value(    napi_env env,    struct aws_event_stream_header_value_pair *header,    napi_value *napi_header_out) {     napi_value napi_header = NULL;    struct aws_allocator *allocator = aws_napi_get_allocator();     AWS_NAPI_CALL(        env, napi_create_object(env, &napi_header), { return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE); });     struct aws_byte_cursor name_cursor = aws_byte_cursor_from_array(header->header_name, header->header_name_len);    if (aws_napi_attach_object_property_string(napi_header, env, AWS_EVENT_STREAM_PROPERTY_NAME_NAME, name_cursor)) {        return AWS_OP_ERR;    }     if (aws_napi_attach_object_property_u32(            napi_header, env, AWS_EVENT_STREAM_PROPERTY_NAME_TYPE, (uint32_t)header->header_value_type)) {        return AWS_OP_ERR;    }     switch (header->header_value_type) {        case AWS_EVENT_STREAM_HEADER_BOOL_TRUE:        case AWS_EVENT_STREAM_HEADER_BOOL_FALSE:            if (aws_napi_attach_object_property_boolean(                    napi_header,                    env,                    AWS_EVENT_STREAM_PROPERTY_NAME_VALUE,                    header->header_value_type == AWS_EVENT_STREAM_HEADER_BOOL_TRUE)) {                return AWS_OP_ERR;            }            break;         case AWS_EVENT_STREAM_HEADER_BYTE:            if (aws_napi_attach_object_property_i32(                    napi_header,                    env,                    AWS_EVENT_STREAM_PROPERTY_NAME_VALUE,                    aws_event_stream_header_value_as_byte(header))) {                return AWS_OP_ERR;            }            break;         case AWS_EVENT_STREAM_HEADER_INT16:            if (aws_napi_attach_object_property_i32(                    napi_header,                    env,                    AWS_EVENT_STREAM_PROPERTY_NAME_VALUE,                    aws_event_stream_header_value_as_int16(header))) {                return AWS_OP_ERR;            }            break;         case AWS_EVENT_STREAM_HEADER_INT32:            if (aws_napi_attach_object_property_i32(                    napi_header,                    env,                    AWS_EVENT_STREAM_PROPERTY_NAME_VALUE,                    aws_event_stream_header_value_as_int32(header))) {                return AWS_OP_ERR;            }            break;         case AWS_EVENT_STREAM_HEADER_INT64: {            int64_t value = aws_event_stream_header_value_as_int64(header);            uint8_t buffer[8];             /* We can copy the bytes from low to high directly since we use a two's complement representation */            for (size_t i = 0; i < 8; ++i) {                buffer[i] = (uint8_t)(value & 0xFF);                value >>= 8;            }             struct aws_byte_buf *heap_buffer = aws_mem_calloc(allocator, 1, sizeof(struct aws_byte_buf));            aws_byte_buf_init_copy_from_cursor(                heap_buffer, allocator, aws_byte_cursor_from_array(buffer, AWS_ARRAY_SIZE(buffer)));            if (aws_napi_attach_object_property_binary_as_finalizable_external(                    napi_header, env, AWS_EVENT_STREAM_PROPERTY_NAME_VALUE, heap_buffer)) {                aws_byte_buf_clean_up(heap_buffer);                aws_mem_release(allocator, heap_buffer);                return AWS_OP_ERR;            }            break;        }         case AWS_EVENT_STREAM_HEADER_BYTE_BUF: {            AWS_ATTACH_BUFFER_VALUE_TO_HEADER(aws_event_stream_header_value_as_bytebuf);            break;        }         case AWS_EVENT_STREAM_HEADER_UUID: {            AWS_ATTACH_BUFFER_VALUE_TO_HEADER(aws_event_stream_header_value_as_uuid);            break;        }         case AWS_EVENT_STREAM_HEADER_STRING: {            struct aws_byte_buf value_buffer = aws_event_stream_header_value_as_string(header);            if (aws_napi_attach_object_property_string(                    napi_header, env, AWS_EVENT_STREAM_PROPERTY_NAME_VALUE, aws_byte_cursor_from_buf(&value_buffer))) {                return AWS_OP_ERR;            }            break;        }         case AWS_EVENT_STREAM_HEADER_TIMESTAMP:            if (aws_napi_attach_object_property_u64(                    napi_header,                    env,                    AWS_EVENT_STREAM_PROPERTY_NAME_VALUE,                    (uint64_t)aws_event_stream_header_value_as_timestamp(header))) {                return AWS_OP_ERR;            }            break;         default:            return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     *napi_header_out = napi_header;     return AWS_OP_SUCCESS;} static int s_aws_create_napi_value_from_event_stream_message_storage(    napi_env env,    struct aws_event_stream_message_storage *message,    napi_value *napi_message_out) {     if (env == NULL) {        return aws_raise_error(AWS_CRT_NODEJS_ERROR_THREADSAFE_FUNCTION_NULL_NAPI_ENV);    }     napi_value napi_message = NULL;    AWS_NAPI_CALL(        env, napi_create_object(env, &napi_message), { return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE); });     if (aws_napi_attach_object_property_u32(            napi_message, env, AWS_EVENT_STREAM_PROPERTY_NAME_FLAGS, (uint32_t)message->message_flags)) {        return AWS_OP_ERR;    }     if (aws_napi_attach_object_property_u32(            napi_message, env, AWS_EVENT_STREAM_PROPERTY_NAME_TYPE, (uint32_t)message->message_type)) {        return AWS_OP_ERR;    }     if (message->payload->len > 0) {        if (aws_napi_attach_object_property_binary_as_finalizable_external(                napi_message, env, AWS_EVENT_STREAM_PROPERTY_NAME_PAYLOAD, message->payload)) {            return AWS_OP_ERR;        }         /* the extern's finalizer is now responsible for cleaning up the buffer */        message->payload = NULL;    }     size_t header_count = s_aws_array_list_length(&message->headers);    if (header_count > 0) {        napi_value headers_array = NULL;        AWS_NAPI_CALL(env, napi_create_array_with_length(env, header_count, &headers_array), {            return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE);        });         for (size_t i = 0; i < header_count; ++i) {            napi_value napi_header = NULL;             struct aws_event_stream_header_value_pair *header = NULL;            aws_array_list_get_at_ptr(&message->headers, (void **)&header, i);             if (s_aws_create_napi_header_value(env, header, &napi_header)) {                return AWS_OP_ERR;            }             AWS_NAPI_CALL(env, napi_set_element(env, headers_array, (uint32_t)i, napi_header), {                return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE);            });        }         AWS_NAPI_CALL(            env, napi_set_named_property(env, napi_message, AWS_EVENT_STREAM_PROPERTY_NAME_HEADERS, headers_array), {                return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE);            });    }     *napi_message_out = napi_message;     return AWS_OP_SUCCESS;} struct aws_event_stream_protocol_message_event {    struct aws_allocator *allocator;    struct aws_event_stream_message_storage storage;    struct aws_event_stream_client_connection_binding *binding;}; static void s_aws_event_stream_protocol_message_event_destroy(struct aws_event_stream_protocol_message_event *event) {    if (event == NULL) {        return;    }     s_aws_event_stream_message_storage_clean_up(&event->storage);    s_aws_event_stream_client_connection_binding_release(event->binding);     aws_mem_release(event->allocator, event);} static void s_napi_event_stream_connection_on_protocol_message(    napi_env env,    napi_value function,    void *context,    void *user_data) {     (void)context;     struct aws_event_stream_protocol_message_event *message_event = user_data;    struct aws_event_stream_client_connection_binding *binding = message_event->binding;     if (env && !binding->is_closed) {        napi_value params[2];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream connection, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_connection_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_connection_on_protocol_message - event_stream_client_connection node wrapper no "                "longer resolvable");            goto done;        }         if (s_aws_create_napi_value_from_event_stream_message_storage(env, &message_event->storage, &params[1])) {            AWS_LOGF_ERROR(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_connection_on_protocol_message - failed to create JS representation of incoming "                "message");            goto done;        }         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, binding->on_protocol_message, NULL, function, num_params, params));    } done:     s_aws_event_stream_protocol_message_event_destroy(message_event);} static void s_aws_event_stream_rpc_client_connection_protocol_message_fn(    struct aws_event_stream_rpc_client_connection *connection,    const struct aws_event_stream_rpc_message_args *message_args,    void *user_data) {    (void)connection;     struct aws_allocator *allocator = aws_napi_get_allocator();    struct aws_event_stream_protocol_message_event *event =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_protocol_message_event));     event->allocator = allocator;    event->binding = s_aws_event_stream_client_connection_binding_acquire(        (struct aws_event_stream_client_connection_binding *)user_data);     if (s_aws_event_stream_message_storage_init_from_native(&event->storage, allocator, message_args)) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_event_stream_rpc_client_connection_protocol_message_fn - unable to initialize message storage",            (void *)event->binding);        s_aws_event_stream_protocol_message_event_destroy(event);        return;    }     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(event->binding->on_protocol_message, event));} static int s_init_event_stream_connection_configuration_from_js_connection_configuration(    napi_env env,    napi_value node_connection_options,    struct aws_event_stream_client_connection_binding *binding) {     napi_value host_name_property;    if (aws_napi_get_named_property(            env, node_connection_options, AWS_EVENT_STREAM_PROPERTY_NAME_HOST, napi_string, &host_name_property) !=        AWS_NGNPR_VALID_VALUE) {        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     binding->host = aws_string_new_from_napi(env, host_name_property);    if (binding->host == NULL) {        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     if (aws_napi_get_named_property_as_uint32(            env, node_connection_options, AWS_EVENT_STREAM_PROPERTY_NAME_PORT, &binding->port) !=        AWS_NGNPR_VALID_VALUE) {        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     return AWS_OP_SUCCESS;} napi_value aws_napi_event_stream_client_connection_new(napi_env env, napi_callback_info info) {    napi_value node_args[6];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(env, NULL, "event_stream_client_connection_new - Failed to retrieve arguments");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "event_stream_client_connection_new - needs exactly 6 arguments");        return NULL;    }     napi_value node_connection_ref = NULL;    napi_value node_external = NULL;    struct aws_allocator *allocator = aws_napi_get_allocator();     struct aws_event_stream_client_connection_binding *binding =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_client_connection_binding));    binding->allocator = allocator;    aws_ref_count_init(&binding->ref_count, binding, s_aws_event_stream_client_connection_binding_on_zero);     AWS_NAPI_CALL(env, napi_create_external(env, binding, NULL, NULL, &node_external), {        aws_mem_release(allocator, binding);        napi_throw_error(env, NULL, "event_stream_client_connection_new - Failed to create n-api external");        s_aws_event_stream_client_connection_binding_release(binding);        goto done;    });     /* Arg #1: the js event stream connection */    napi_value node_connection = *arg++;    if (aws_napi_is_null_or_undefined(env, node_connection)) {        napi_throw_error(env, NULL, "event_stream_client_connection_new - Required connection parameter is null");        goto error;    }     AWS_NAPI_CALL(        env, napi_create_reference(env, node_connection, 1, &binding->node_event_stream_client_connection_ref), {            napi_throw_error(                env,                NULL,                "event_stream_client_connection_new - Failed to create reference to node event stream connection");            goto error;        });     /* Arg #2: the event stream connection options object */    napi_value node_connection_options = *arg++;    if (aws_napi_is_null_or_undefined(env, node_connection_options)) {        napi_throw_error(env, NULL, "event_stream_client_connection_new - Required options parameter is null");        goto error;    }     if (s_init_event_stream_connection_configuration_from_js_connection_configuration(            env, node_connection_options, binding)) {        napi_throw_error(            env,            NULL,            "event_stream_client_connection_new - failed to initialize native connection configuration from js "            "connection configuration");        goto error;    }     /* Arg #3: on disconnect event handler */    napi_value on_connection_shutdown_event_handler = *arg++;    if (aws_napi_is_null_or_undefined(env, on_connection_shutdown_event_handler)) {        napi_throw_error(            env, NULL, "event_stream_client_connection_new - required on_connection_shutdown event handler is null");        goto error;    }     AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            on_connection_shutdown_event_handler,            "aws_event_stream_client_connection_on_connection_shutdown",            s_napi_event_stream_connection_on_connection_shutdown,            NULL,            &binding->on_connection_shutdown),        {            napi_throw_error(                env,                NULL,                "event_stream_client_connection_new - failed to initialize on_connection_shutdown event handler");            goto error;        });     /* Arg #4: on protocol message event handler */    napi_value on_protocol_message_event_handler = *arg++;    if (aws_napi_is_null_or_undefined(env, on_protocol_message_event_handler)) {        napi_throw_error(            env, NULL, "event_stream_client_connection_new - required on_protocol_message event handler is null");        goto error;    }     AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            on_protocol_message_event_handler,            "aws_event_stream_client_connection_on_protocol_message",            s_napi_event_stream_connection_on_protocol_message,            NULL,            &binding->on_protocol_message),        {            napi_throw_error(                env,                NULL,                "event_stream_client_connection_new - failed to initialize on_protocol_message event handler");            goto error;        });     /* Arg #5: socket options */    napi_value node_socket_options = *arg++;    if (!aws_napi_is_null_or_undefined(env, node_socket_options)) {        struct aws_socket_options *socket_options_ptr = NULL;        AWS_NAPI_CALL(env, napi_get_value_external(env, node_socket_options, (void **)&socket_options_ptr), {            napi_throw_error(                env, NULL, "event_stream_client_connection_new - Unable to extract socket_options from external");            goto error;        });         if (socket_options_ptr == NULL) {            napi_throw_error(env, NULL, "event_stream_client_connection_new - Null socket options");            goto error;        }         binding->socket_options = *socket_options_ptr;    } else {        /* Default is stream, ipv4, and a basic timeout */        binding->socket_options.connect_timeout_ms = AWS_EVENT_STREAM_CONNECT_TIMEOUT_DEFAULT_MS;    }     /* Arg #6: tls options */    napi_value node_tls = *arg++;    if (!aws_napi_is_null_or_undefined(env, node_tls)) {        struct aws_tls_ctx *tls_ctx;        AWS_NAPI_CALL(env, napi_get_value_external(env, node_tls, (void **)&tls_ctx), {            napi_throw_error(env, NULL, "event_stream_client_connection_new - Failed to extract tls_ctx from external");            goto error;        });         aws_tls_connection_options_init_from_ctx(&binding->tls_connection_options, tls_ctx);        binding->using_tls = true;    }     AWS_NAPI_CALL(        env, napi_create_reference(env, node_external, 1, &binding->node_event_stream_client_connection_external_ref), {            napi_throw_error(                env,                NULL,                "event_stream_client_connection_new - Failed to create one count reference to napi external");            goto error;        });     node_connection_ref = node_external;    goto done; error:     s_close_connection_binding(env, binding); done:     return node_connection_ref;} napi_value aws_napi_event_stream_client_connection_close(napi_env env, napi_callback_info info) {    napi_value node_args[1];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_close - Failed to retrieve arguments");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_close - needs exactly 1 argument");        return NULL;    }     struct aws_event_stream_client_connection_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_close - Failed to extract connection binding from first argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_close - binding was null");        return NULL;    }     /* This severs the ability to call back into JS and makes the binding's extern available for garbage collection */    s_close_connection_binding(env, binding);     if (binding->connection != NULL) {        aws_event_stream_rpc_client_connection_close(binding->connection, AWS_CRT_NODEJS_ERROR_EVENT_STREAM_USER_CLOSE);    }     /*     * Release the allocation-ref on the binding.  If there is a connection in progress (or being shutdown) there     * is a second ref outstanding which is removed on connection shutdown or failed setup.     *     * This is safe to do here because the internal state of the JS connection object blocks all future native     * invocations.     */    s_aws_event_stream_client_connection_binding_release(binding);     return NULL;} /* An internal helper function that lets us fake socket closes (at least from the binding's perspective) */napi_value aws_napi_event_stream_client_connection_close_internal(napi_env env, napi_callback_info info) {    napi_value node_args[1];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_connection_close_internal - Failed to retrieve arguments");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_connection_close_internal - needs exactly 1 argument");        return NULL;    }     struct aws_event_stream_client_connection_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_close_internal - Failed to extract connection binding from first "            "argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_close_internal - binding was null");        return NULL;    }     if (binding->connection != NULL) {        aws_event_stream_rpc_client_connection_close(binding->connection, AWS_IO_SOCKET_CLOSED);    }     return NULL;} static void s_aws_event_stream_rpc_client_on_connection_shutdown_fn(    struct aws_event_stream_rpc_client_connection *connection,    int error_code,    void *user_data) {     struct aws_allocator *allocator = aws_napi_get_allocator();    struct aws_event_stream_client_connection_binding *binding = user_data;     struct aws_event_stream_connection_event_data *shutdown_data =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_connection_event_data));    shutdown_data->allocator = allocator;    shutdown_data->error_code = error_code;    shutdown_data->binding = binding;       /* we already have a ref from the original connect call */    shutdown_data->connection = connection; /* not really necessary with shutdown, but doesn't hurt */     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(binding->on_connection_shutdown, shutdown_data));} static void s_napi_on_event_stream_client_connection_setup(    napi_env env,    napi_value function,    void *context,    void *user_data) {     (void)context;     struct aws_event_stream_connection_event_data *setup_data = user_data;    struct aws_event_stream_client_connection_binding *binding = setup_data->binding;     /*     * We took a reference to the connection when we initialized setup_data.  That is our reference; no need to take     * one here.     */    binding->connection = setup_data->connection;     if (env && !binding->is_closed) {        napi_value params[2];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream connection, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_connection_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_on_event_stream_client_connection_setup - event_stream_client_connection node wrapper no "                "longer resolvable");            goto close;        }         AWS_NAPI_CALL(env, napi_create_uint32(env, setup_data->error_code, &params[1]), { goto close; });         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, binding->on_connection_setup, NULL, function, num_params, params));         /* Successful callback, skip ahead */        goto done;    } close:     /*     * We hit here only if the JS object has been closed or there was a terminal failure in trying to invoke     * the setup callback.  In all cases, log it, and shutdown the connection.     */    AWS_LOGF_INFO(        AWS_LS_NODEJS_CRT_GENERAL,        "s_napi_on_event_stream_client_connection_setup - node wrapper has been closed or hit a terminal failure, "        "halting connection setup");     /*     * Close the connection, starting the shutdown process     */    if (binding->connection != NULL) {        aws_event_stream_rpc_client_connection_close(binding->connection, AWS_CRT_NODEJS_ERROR_EVENT_STREAM_USER_CLOSE);    } done:     /*     * Our invariant is that for the time interval between attempting to connect and either     *     *  (1) connection establishment failed, or     *  (2) connection establishment succeeded and some arbitrary time later, gets shutdown     *     * we maintain a ref on the binding itself, ie native event stream can safely invoke callbacks that are     * guaranteed to reach a valid binding.     *     * It's trickier than normal because, while we acquire in a single spot (the connect() call), we release in     * two very different spots:     *     *  (1) connection establishment failed: here     *  (2) connection establishment succeeded: in s_napi_on_event_stream_client_connection_shutdown     *     * Important: in the case that we successfully connected but close had already been called, we don't release     * the binding yet and instead let shutdown release it.     */    if (!setup_data->connection) {        /*         * Only release the binding if this was a failure to connect.         */        s_aws_event_stream_client_connection_binding_release(binding);    }     aws_mem_release(setup_data->allocator, setup_data);} static void s_aws_event_stream_rpc_client_on_connection_setup_fn(    struct aws_event_stream_rpc_client_connection *connection,    int error_code,    void *user_data) {     struct aws_allocator *allocator = aws_napi_get_allocator();    struct aws_event_stream_client_connection_binding *binding = user_data;     struct aws_event_stream_connection_event_data *setup_data =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_connection_event_data));    setup_data->allocator = allocator;    setup_data->error_code = error_code;    setup_data->binding = binding; /* we already have a ref from the original connect call */    setup_data->connection = connection;     if (connection != NULL) {        /*         * We don't own the initial ref (the channel does, sigh).  While we are in setup data atm, this acquire         * represents the binding's reference.         */        aws_event_stream_rpc_client_connection_acquire(setup_data->connection);    }     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(binding->on_connection_setup, setup_data));} napi_value aws_napi_event_stream_client_connection_connect(napi_env env, napi_callback_info info) {    struct aws_allocator *allocator = aws_napi_get_allocator();     napi_value node_args[2];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_connection_connect - Failed to extract parameter array");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_connect - needs exactly 2 arguments");        return NULL;    }     struct aws_event_stream_client_connection_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_connect - Failed to extract connection binding from first "            "argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_connect - binding was null");        return NULL;    }     if (binding->is_closed) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_connect - connection already closed");        return NULL;    }     AWS_FATAL_ASSERT(binding->connection == NULL);     napi_value connection_setup_callback = *arg++;    AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            connection_setup_callback,            "aws_event_stream_client_connection_on_connection_setup",            s_napi_on_event_stream_client_connection_setup,            binding,            &binding->on_connection_setup),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_connection_connect - failed to create threadsafe callback function");            return NULL;        });     struct aws_tls_connection_options *tls_options = NULL;    if (binding->using_tls) {        tls_options = &binding->tls_connection_options;    }     struct aws_event_stream_rpc_client_connection_options connect_options = {        .host_name = aws_string_c_str(binding->host),        .port = binding->port,        .socket_options = &binding->socket_options,        .tls_options = tls_options,        .bootstrap = aws_napi_get_default_client_bootstrap(),        .on_connection_setup = s_aws_event_stream_rpc_client_on_connection_setup_fn,        .on_connection_protocol_message = s_aws_event_stream_rpc_client_connection_protocol_message_fn,        .on_connection_shutdown = s_aws_event_stream_rpc_client_on_connection_shutdown_fn,        .user_data = binding,    };     s_aws_event_stream_client_connection_binding_acquire(binding);     if (aws_event_stream_rpc_client_connection_connect(allocator, &connect_options)) {        /* Undo the acquire just above */        s_aws_event_stream_client_connection_binding_release(binding);        aws_napi_throw_last_error_with_context(            env,            "aws_napi_event_stream_client_connection_connect - synchronous failure invoking "            "aws_event_stream_rpc_client_connection_connect");        return NULL;    }     return NULL;} struct aws_event_stream_protocol_message_flushed_callback {    struct aws_allocator *allocator;    struct aws_event_stream_client_connection_binding *binding;    napi_threadsafe_function on_message_flushed;    int error_code;}; static void s_aws_event_stream_protocol_message_flushed_callback_destroy(    struct aws_event_stream_protocol_message_flushed_callback *callback_data) {    if (callback_data == NULL) {        return;    }     AWS_CLEAN_THREADSAFE_FUNCTION(callback_data, on_message_flushed);    s_aws_event_stream_client_connection_binding_release(callback_data->binding);     aws_mem_release(callback_data->allocator, callback_data);} static void s_napi_on_event_stream_client_connection_message_flushed(    napi_env env,    napi_value function,    void *context,    void *user_data) {     (void)context;     struct aws_event_stream_protocol_message_flushed_callback *callback_data = user_data;    struct aws_event_stream_client_connection_binding *binding = callback_data->binding;     if (env && !binding->is_closed) {        napi_value params[1];        const size_t num_params = AWS_ARRAY_SIZE(params);         AWS_NAPI_CALL(env, napi_create_uint32(env, callback_data->error_code, &params[0]), { goto done; });         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, callback_data->on_message_flushed, NULL, function, num_params, params));    } done:     s_aws_event_stream_protocol_message_flushed_callback_destroy(callback_data);} static void s_aws_event_stream_on_protocol_message_flushed(int error_code, void *user_data) {    struct aws_event_stream_protocol_message_flushed_callback *callback_data = user_data;     callback_data->error_code = error_code;     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(callback_data->on_message_flushed, callback_data));} napi_value aws_napi_event_stream_client_connection_send_protocol_message(napi_env env, napi_callback_info info) {    struct aws_allocator *allocator = aws_napi_get_allocator();     struct aws_event_stream_message_storage message_storage;    AWS_ZERO_STRUCT(message_storage);     napi_value node_args[3];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_send_protocol_message - Failed to extract parameter array");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_connection_send_protocol_message - needs exactly 3 arguments");        return NULL;    }     struct aws_event_stream_client_connection_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_send_protocol_message - Failed to extract connection binding from "            "first "            "argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_connection_send_protocol_message - binding was null");        return NULL;    }     if (binding->is_closed) {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_connection_send_protocol_message - connection already closed");        return NULL;    }     AWS_FATAL_ASSERT(binding->connection != NULL);     napi_value napi_message_options = *arg++;    napi_value napi_message = NULL;    if (aws_napi_get_named_property(            env, napi_message_options, AWS_EVENT_STREAM_PROPERTY_NAME_MESSAGE, napi_object, &napi_message) !=        AWS_NGNPR_VALID_VALUE) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_send_protocol_message - message options with invalid message "            "parameter");        return NULL;    }     struct aws_event_stream_protocol_message_flushed_callback *callback_data =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_protocol_message_flushed_callback));    callback_data->allocator = allocator;    callback_data->binding = s_aws_event_stream_client_connection_binding_acquire(binding);     if (s_aws_event_stream_message_storage_init_from_js(&message_storage, allocator, env, napi_message, binding)) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_send_protocol_message - failed to read message properties from JS "            "object");        goto error;    }     napi_value message_flushed_callback = *arg++;    AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            message_flushed_callback,            "aws_event_stream_client_connection_on_message_flushed",            s_napi_on_event_stream_client_connection_message_flushed,            callback_data,            &callback_data->on_message_flushed),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_connection_send_protocol_message - failed to create threadsafe callback "                "function");            goto error;        });     struct aws_event_stream_rpc_message_args message_args = {        .headers = (struct aws_event_stream_header_value_pair *)message_storage.headers.data,        .headers_count = s_aws_array_list_length(&message_storage.headers),        .payload = message_storage.payload,        .message_type = message_storage.message_type,        .message_flags = message_storage.message_flags,    };     if (aws_event_stream_rpc_client_connection_send_protocol_message(            binding->connection, &message_args, s_aws_event_stream_on_protocol_message_flushed, callback_data)) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_connection_send_protocol_message - synchronous error invoking native "            "send_protocol_message");        goto error;    }     goto done; error:     s_aws_event_stream_protocol_message_flushed_callback_destroy(callback_data); done:     s_aws_event_stream_message_storage_clean_up(&message_storage);     return NULL;} /*********************************************************************************************************************/ /* * Binding object that outlives the associated napi wrapper object.  When that object finalizes, then it's a signal * to this object to destroy the stream (and itself, afterwards). * * WARNING * Data Access Rules: *  (1) If in the libuv thread (called from JS or in the invocation of a thread-safe function), you may access anything *      in the binding *  (2) Otherwise, you may only access thread-safe functions or the binding's ref count APIs.  In particular, *      'stream' and 'is_closed' are off-limits unless you're in the libuv thread. */struct aws_event_stream_client_stream_binding {    struct aws_allocator *allocator;     /*     * We ref count the binding itself because there are two primary time intervals that together create a union     * that we must honor.     *     * Interval #1: The binding must live from new() to close()     * Interval #2: The binding must live from activate() to {stream failure || stream shutdown} as processed     *    by the libuv thread.  It is incorrect to react to those events in the event loop callback; we must bundle     *    and ship them across to the libuv thread.  When the libuv thread is processing a stream failure or     *    shutdown, we know that no other events can possibly be pending (they would have already been     *    processed in the libuv thread).     *     * The union of those two intervals is "probably" enough, but its correctness would rest on an internal property     * of the node implementation itself: "are calls to napi_call_threadsafe_function() well-ordered with respect to     * a single producer (we only call it from the libuv thread itself)?"  This is almost certainly true, but I don't     * see it guaranteed within the n-api documentation.  For that reason, we also add the intervals of all     * completable stream events: incoming stream messages and outbound message flushes     */    struct aws_ref_count ref_count;     /*     * May only be accessed from within the libuv thread.  This includes stream APIs like acquire and release.     */    struct aws_event_stream_rpc_client_continuation_token *stream;    bool is_closed;     /*     * Single count ref to the JS stream object.     */    napi_ref node_event_stream_client_stream_ref;     /*     * Single count ref to the node external managed by the binding.     */    napi_ref node_event_stream_client_stream_external_ref;     napi_threadsafe_function on_stream_activated;    napi_threadsafe_function on_stream_ended;    napi_threadsafe_function on_stream_message;}; static void s_aws_event_stream_client_stream_binding_on_zero(void *context) {    if (context == NULL) {        return;    }     struct aws_event_stream_client_stream_binding *binding = context;     AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_stream_activated);    AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_stream_ended);    AWS_CLEAN_THREADSAFE_FUNCTION(binding, on_stream_message);     aws_mem_release(binding->allocator, binding);} static struct aws_event_stream_client_stream_binding *s_aws_event_stream_client_stream_binding_acquire(    struct aws_event_stream_client_stream_binding *binding) {    if (binding == NULL) {        return NULL;    }     aws_ref_count_acquire(&binding->ref_count);    return binding;} static struct aws_event_stream_client_stream_binding *s_aws_event_stream_client_stream_binding_release(    struct aws_event_stream_client_stream_binding *binding) {    if (binding != NULL) {        aws_ref_count_release(&binding->ref_count);    }     return NULL;} static void s_close_stream_binding(napi_env env, struct aws_event_stream_client_stream_binding *binding) {    AWS_FATAL_ASSERT(env != NULL);     binding->is_closed = true;     napi_ref node_event_stream_client_stream_external_ref = binding->node_event_stream_client_stream_external_ref;    binding->node_event_stream_client_stream_external_ref = NULL;     napi_ref node_event_stream_client_stream_ref = binding->node_event_stream_client_stream_ref;    binding->node_event_stream_client_stream_ref = NULL;     if (node_event_stream_client_stream_external_ref != NULL) {        napi_delete_reference(env, node_event_stream_client_stream_external_ref);    }     if (node_event_stream_client_stream_ref != NULL) {        napi_delete_reference(env, node_event_stream_client_stream_ref);    }} static void s_napi_event_stream_on_stream_ended(napi_env env, napi_value function, void *context, void *user_data) {    (void)context;     struct aws_event_stream_client_stream_binding *binding = user_data;     if (env && !binding->is_closed) {        napi_value params[1];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_stream_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_on_stream_ended - event_stream_client_stream node wrapper no "                "longer resolvable");            goto done;        }         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(env, binding->on_stream_ended, NULL, function, num_params, params));    } done:     if (binding->stream != NULL) {        aws_event_stream_rpc_client_continuation_release(binding->stream);        binding->stream = NULL;    }     /* release the binding ref acquired in the call to activate() */    s_aws_event_stream_client_stream_binding_release(binding);} static void s_event_stream_on_stream_ended(    struct aws_event_stream_rpc_client_continuation_token *stream,    void *user_data) {     (void)stream;     struct aws_event_stream_client_stream_binding *binding = user_data;     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(binding->on_stream_ended, binding));} struct aws_event_stream_stream_message_event {    struct aws_allocator *allocator;    struct aws_event_stream_message_storage storage;    struct aws_event_stream_client_stream_binding *binding;}; static void s_aws_event_stream_stream_message_event_destroy(struct aws_event_stream_stream_message_event *event) {    if (event == NULL) {        return;    }     s_aws_event_stream_message_storage_clean_up(&event->storage);    s_aws_event_stream_client_stream_binding_release(event->binding);     aws_mem_release(event->allocator, event);} static void s_napi_event_stream_on_stream_message(napi_env env, napi_value function, void *context, void *user_data) {     (void)context;     struct aws_event_stream_stream_message_event *message_event = user_data;    struct aws_event_stream_client_stream_binding *binding = message_event->binding;     if (env && !binding->is_closed) {        napi_value params[2];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_stream_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_on_stream_message - event_stream_client_stream node wrapper no "                "longer resolvable");            goto done;        }         if (s_aws_create_napi_value_from_event_stream_message_storage(env, &message_event->storage, &params[1])) {            AWS_LOGF_ERROR(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_event_stream_on_stream_message - failed to create JS representation of incoming "                "message");            goto done;        }         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(env, binding->on_stream_message, NULL, function, num_params, params));    } done:     s_aws_event_stream_stream_message_event_destroy(message_event);} static void s_event_stream_on_stream_message(    struct aws_event_stream_rpc_client_continuation_token *stream,    const struct aws_event_stream_rpc_message_args *message_args,    void *user_data) {     (void)stream;     struct aws_allocator *allocator = aws_napi_get_allocator();    struct aws_event_stream_stream_message_event *event =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_stream_message_event));     event->allocator = allocator;    event->binding =        s_aws_event_stream_client_stream_binding_acquire((struct aws_event_stream_client_stream_binding *)user_data);     if (s_aws_event_stream_message_storage_init_from_native(&event->storage, allocator, message_args)) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_event_stream_on_stream_message - unable to initialize message storage",            (void *)event->binding);        s_aws_event_stream_stream_message_event_destroy(event);        return;    }     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(event->binding->on_stream_message, event));} napi_value aws_napi_event_stream_client_stream_new(napi_env env, napi_callback_info info) {    napi_value node_args[4];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - Failed to retrieve arguments");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - needs exactly 4 arguments");        return NULL;    }     napi_value node_stream_ref = NULL;    napi_value node_external = NULL;    struct aws_allocator *allocator = aws_napi_get_allocator();     struct aws_event_stream_client_stream_binding *binding =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_client_stream_binding));    binding->allocator = allocator;    aws_ref_count_init(&binding->ref_count, binding, s_aws_event_stream_client_stream_binding_on_zero);     AWS_NAPI_CALL(env, napi_create_external(env, binding, NULL, NULL, &node_external), {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - Failed to create n-api external");        s_aws_event_stream_client_stream_binding_release(binding);        goto done;    });     /*     * From here on out, a failure will lead the external to getting finalized by node, which in turn will lead the     * binding to getting cleaned up.     */     /* Arg #1: the js stream */    napi_value node_stream = *arg++;    if (aws_napi_is_null_or_undefined(env, node_stream)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - Required stream parameter is null");        goto error;    }     AWS_NAPI_CALL(env, napi_create_reference(env, node_stream, 1, &binding->node_event_stream_client_stream_ref), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_new - Failed to create reference to node event stream client stream");        goto error;    });     /* Arg #2: the event stream connection to create a stream on */    struct aws_event_stream_client_connection_binding *connection_binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&connection_binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_new - Failed to extract connection binding from "            "first "            "argument");        goto error;    });     if (connection_binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - binding was null");        goto error;    }     if (connection_binding->is_closed) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - connection already closed");        goto error;    }     if (connection_binding->connection == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - connection is null");        goto error;    }     /* Arg #3: on stream ended event handler */    napi_value on_stream_ended_event_handler = *arg++;    if (aws_napi_is_null_or_undefined(env, on_stream_ended_event_handler)) {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_stream_new - required on_stream_ended event handler is null");        goto error;    }     AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            on_stream_ended_event_handler,            "aws_event_stream_client_connection_on_stream_ended",            s_napi_event_stream_on_stream_ended,            NULL,            &binding->on_stream_ended),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_stream_new - failed to initialize on_stream_ended threadsafe function");            goto error;        });     /* Arg #4: on stream message event handler */    napi_value on_stream_message_event_handler = *arg++;    if (aws_napi_is_null_or_undefined(env, on_stream_message_event_handler)) {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_stream_new - required on_stream_message event handler is null");        goto error;    }     AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            on_stream_message_event_handler,            "aws_event_stream_on_stream_message",            s_napi_event_stream_on_stream_message,            NULL,            &binding->on_stream_message),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_stream_new - failed to initialize on_stream_message threadsafe function");            goto error;        });     struct aws_event_stream_rpc_client_stream_continuation_options stream_options = {        .on_continuation = s_event_stream_on_stream_message,        .on_continuation_closed = s_event_stream_on_stream_ended,        .user_data = binding,    };     binding->stream =        aws_event_stream_rpc_client_connection_new_stream(connection_binding->connection, &stream_options);    if (binding->stream == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_new - Failed to create native stream");        goto error;    }     AWS_NAPI_CALL(        env, napi_create_reference(env, node_external, 1, &binding->node_event_stream_client_stream_external_ref), {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_stream_new - Failed to create one count reference to napi external");            goto error;        });     node_stream_ref = node_external;    goto done; error:     s_close_stream_binding(env, binding); done:     return node_stream_ref;} napi_value aws_napi_event_stream_client_stream_close(napi_env env, napi_callback_info info) {    napi_value node_args[1];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_close - Failed to retrieve arguments");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_close - needs exactly 1 argument");        return NULL;    }     struct aws_event_stream_client_stream_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_close - Failed to extract stream binding from first argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_close - binding was null");        return NULL;    }     /* This severs the ability to call back into JS and makes the binding's extern available for garbage collection */    s_close_stream_binding(env, binding);     struct aws_event_stream_rpc_client_continuation_token *stream = binding->stream;    if (stream != NULL) {        binding->stream = NULL;        aws_event_stream_rpc_client_continuation_release(stream);    }     /*     * Release the allocation-ref on the binding.  If there is a stream activation in progress (or being shutdown) there     * is a second ref outstanding which is removed on stream shutdown or failed activation.     *     * This is safe to do here because the internal state of the JS stream object blocks all future native     * invocations.     */    s_aws_event_stream_client_stream_binding_release(binding);     return NULL;} struct aws_event_stream_activation_event_data {    struct aws_allocator *allocator;    int error_code;    struct aws_event_stream_client_stream_binding *binding;}; static void s_napi_on_event_stream_client_stream_activation(    napi_env env,    napi_value function,    void *context,    void *user_data) {    (void)context;     struct aws_event_stream_activation_event_data *activation_data = user_data;    struct aws_event_stream_client_stream_binding *binding = activation_data->binding;     if (env && !binding->is_closed) {        napi_value params[2];        const size_t num_params = AWS_ARRAY_SIZE(params);         /*         * If we can't resolve the weak ref to the event stream, then it's been garbage collected and we         * should not do anything.         */        params[0] = NULL;        if (napi_get_reference_value(env, binding->node_event_stream_client_stream_ref, &params[0]) != napi_ok ||            params[0] == NULL) {            AWS_LOGF_INFO(                AWS_LS_NODEJS_CRT_GENERAL,                "s_napi_on_event_stream_client_stream_activation - event_stream_client_stream node wrapper no "                "longer resolvable");            goto done;        }         AWS_NAPI_CALL(env, napi_create_uint32(env, activation_data->error_code, &params[1]), { goto done; });         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, binding->on_stream_activated, NULL, function, num_params, params));    } done:     /* A failed activation must release the binding ref acquired in the call to activate() */    if (activation_data->error_code != 0) {        s_aws_event_stream_client_stream_binding_release(binding);    }     aws_mem_release(activation_data->allocator, activation_data);} static void s_aws_event_stream_on_stream_activation_flush(int error_code, void *user_data) {    struct aws_allocator *allocator = aws_napi_get_allocator();    struct aws_event_stream_client_stream_binding *binding = user_data;     struct aws_event_stream_activation_event_data *activation_data =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_activation_event_data));    activation_data->allocator = allocator;    activation_data->error_code = error_code;    activation_data->binding = binding; /* we already have a ref from the original activate call */     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(binding->on_stream_activated, activation_data));} static int s_aws_extract_activation_options_from_js(    napi_env env,    napi_value napi_activation_options,    struct aws_event_stream_client_stream_binding *binding,    struct aws_byte_buf *operation_name_buffer,    struct aws_event_stream_message_storage *activation_message) {     if (aws_napi_get_named_property_as_bytebuf(            env,            napi_activation_options,            AWS_EVENT_STREAM_PROPERTY_NAME_OPERATION,            napi_string,            operation_name_buffer) != AWS_NGNPR_VALID_VALUE) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_extract_activation_options_from_js - failed to get required `operation` property from "            "activation options",            (void *)binding);        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     napi_value napi_activation_message;    if (aws_napi_get_named_property(            env,            napi_activation_options,            AWS_EVENT_STREAM_PROPERTY_NAME_MESSAGE,            napi_object,            &napi_activation_message)) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_extract_activation_options_from_js - failed to get required `message` property from "            "activation options",            (void *)binding);        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     struct aws_allocator *allocator = aws_napi_get_allocator();    if (s_aws_event_stream_message_storage_init_from_js(            activation_message, allocator, env, napi_activation_message, binding)) {        AWS_LOGF_ERROR(            AWS_LS_NODEJS_CRT_GENERAL,            "id=%p s_aws_extract_activation_options_from_js - failed to unpack activation message from JS",            (void *)binding);        return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);    }     return AWS_OP_SUCCESS;} napi_value aws_napi_event_stream_client_stream_activate(napi_env env, napi_callback_info info) {     struct aws_byte_buf operation_name_buffer;    AWS_ZERO_STRUCT(operation_name_buffer);    struct aws_event_stream_message_storage activation_message;    AWS_ZERO_STRUCT(activation_message);     napi_value node_args[3];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_activate - Failed to extract parameter array");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_activate - needs exactly 3 arguments");        return NULL;    }     struct aws_event_stream_client_stream_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_activate - Failed to extract stream binding from first "            "argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_activate - binding was null");        return NULL;    }     if (binding->is_closed || binding->stream == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_activate - stream already closed");        return NULL;    }     napi_value napi_activation_options = *arg++;    if (s_aws_extract_activation_options_from_js(            env, napi_activation_options, binding, &operation_name_buffer, &activation_message)) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_activate - unable to unpack activation options from JS object");        goto done;    }     napi_value stream_activation_callback = *arg++;    AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            stream_activation_callback,            "aws_event_stream_client_stream_on_activation",            s_napi_on_event_stream_client_stream_activation,            binding,            &binding->on_stream_activated),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_stream_activate - failed to create threadsafe callback function");            goto done;        });     s_aws_event_stream_client_stream_binding_acquire(binding);     struct aws_event_stream_rpc_message_args message_args = {        .headers = (struct aws_event_stream_header_value_pair *)activation_message.headers.data,        .headers_count = s_aws_array_list_length(&activation_message.headers),        .payload = activation_message.payload,        .message_type = activation_message.message_type,        .message_flags = activation_message.message_flags,    };     if (aws_event_stream_rpc_client_continuation_activate(            binding->stream,            aws_byte_cursor_from_buf(&operation_name_buffer),            &message_args,            s_aws_event_stream_on_stream_activation_flush,            binding)) {        /* Undo the acquire just above */        s_aws_event_stream_client_stream_binding_release(binding);        aws_napi_throw_last_error_with_context(            env,            "aws_napi_event_stream_client_stream_activate - synchronous failure invoking "            "aws_event_stream_rpc_client_continuation_activate");        goto done;    } done:     aws_byte_buf_clean_up(&operation_name_buffer);    s_aws_event_stream_message_storage_clean_up(&activation_message);     return NULL;} struct aws_event_stream_stream_message_flushed_callback {    struct aws_allocator *allocator;    struct aws_event_stream_client_stream_binding *binding;    napi_threadsafe_function on_message_flushed;    int error_code;}; static void s_aws_event_stream_stream_message_flushed_callback_destroy(    struct aws_event_stream_stream_message_flushed_callback *callback_data) {    if (callback_data == NULL) {        return;    }     AWS_CLEAN_THREADSAFE_FUNCTION(callback_data, on_message_flushed);    s_aws_event_stream_client_stream_binding_release(callback_data->binding);     aws_mem_release(callback_data->allocator, callback_data);} static void s_napi_on_event_stream_client_stream_message_flushed(    napi_env env,    napi_value function,    void *context,    void *user_data) {     (void)context;     struct aws_event_stream_stream_message_flushed_callback *callback_data = user_data;    struct aws_event_stream_client_stream_binding *binding = callback_data->binding;     if (env && !binding->is_closed) {        napi_value params[1];        const size_t num_params = AWS_ARRAY_SIZE(params);         AWS_NAPI_CALL(env, napi_create_uint32(env, callback_data->error_code, &params[0]), { goto done; });         AWS_NAPI_ENSURE(            env,            aws_napi_dispatch_threadsafe_function(                env, callback_data->on_message_flushed, NULL, function, num_params, params));    } done:     s_aws_event_stream_stream_message_flushed_callback_destroy(callback_data);} static void s_aws_event_stream_on_stream_message_flushed(int error_code, void *user_data) {    struct aws_event_stream_stream_message_flushed_callback *callback_data = user_data;     callback_data->error_code = error_code;     /* queue a callback in node's libuv thread */    AWS_NAPI_ENSURE(NULL, aws_napi_queue_threadsafe_function(callback_data->on_message_flushed, callback_data));} napi_value aws_napi_event_stream_client_stream_send_message(napi_env env, napi_callback_info info) {    struct aws_allocator *allocator = aws_napi_get_allocator();     struct aws_event_stream_message_storage message_storage;    AWS_ZERO_STRUCT(message_storage);     napi_value node_args[3];    size_t num_args = AWS_ARRAY_SIZE(node_args);    napi_value *arg = &node_args[0];    AWS_NAPI_CALL(env, napi_get_cb_info(env, info, &num_args, node_args, NULL, NULL), {        napi_throw_error(            env, NULL, "aws_napi_event_stream_client_stream_send_message - Failed to extract parameter array");        return NULL;    });     if (num_args != AWS_ARRAY_SIZE(node_args)) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_send_message - needs exactly 3 arguments");        return NULL;    }     struct aws_event_stream_client_stream_binding *binding = NULL;    napi_value node_binding = *arg++;    AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_send_message - Failed to extract stream binding from "            "first "            "argument");        return NULL;    });     if (binding == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_send_message - binding was null");        return NULL;    }     if (binding->is_closed || binding->stream == NULL) {        napi_throw_error(env, NULL, "aws_napi_event_stream_client_stream_send_message - connection already closed");        return NULL;    }     napi_value napi_message_options = *arg++;    napi_value napi_message = NULL;    if (aws_napi_get_named_property(            env, napi_message_options, AWS_EVENT_STREAM_PROPERTY_NAME_MESSAGE, napi_object, &napi_message) !=        AWS_NGNPR_VALID_VALUE) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_send_message - message options with invalid message "            "parameter");        return NULL;    }     struct aws_event_stream_stream_message_flushed_callback *callback_data =        aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_stream_message_flushed_callback));    callback_data->allocator = allocator;    callback_data->binding = s_aws_event_stream_client_stream_binding_acquire(binding);     if (s_aws_event_stream_message_storage_init_from_js(&message_storage, allocator, env, napi_message, binding)) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_send_message - failed to read message properties from JS "            "object");        goto error;    }     napi_value message_flushed_callback = *arg++;    AWS_NAPI_CALL(        env,        aws_napi_create_threadsafe_function(            env,            message_flushed_callback,            "aws_event_stream_client_stream_on_message_flushed",            s_napi_on_event_stream_client_stream_message_flushed,            callback_data,            &callback_data->on_message_flushed),        {            napi_throw_error(                env,                NULL,                "aws_napi_event_stream_client_stream_send_message - failed to create threadsafe callback "                "function");            goto error;        });     struct aws_event_stream_rpc_message_args message_args = {        .headers = (struct aws_event_stream_header_value_pair *)message_storage.headers.data,        .headers_count = s_aws_array_list_length(&message_storage.headers),        .payload = message_storage.payload,        .message_type = message_storage.message_type,        .message_flags = message_storage.message_flags,    };     if (aws_event_stream_rpc_client_continuation_send_message(            binding->stream, &message_args, s_aws_event_stream_on_stream_message_flushed, callback_data)) {        napi_throw_error(            env,            NULL,            "aws_napi_event_stream_client_stream_send_message - synchronous error invoking native "            "send_message");        goto error;    }     goto done; error:     s_aws_event_stream_stream_message_flushed_callback_destroy(callback_data); done:     s_aws_event_stream_message_storage_clean_up(&message_storage);     return NULL;}