Source: org/terraswarm/accessor/accessors/web/net/WebSocketClient.js

// Copyright (c) 2016-2017 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//

/** This accessor sends and/or receives messages from a web socket at
 *  the specified host and port.
 *  <a href="https://en.wikipedia.org/wiki/WebSocket">WebSockets</a>
 *  provide full-duplex communication channels over a single TCP/IP connection.
 *  In `initialize()`, it  begins connecting to the web socket server.
 *  Once the connection is established, a `true` boolean is sent to
 *  the `connected` output.
 *  If connection is not established immediately, the accessor will attempt to
 *  reconnect _numberOfRetries_ times at an interval of _reconnectInterval_.
 *
 *  Whenever an input is received on the `toSend`
 *  input, the message is sent to the socket. If the socket is not yet open,
 *  this accessor will, by default, queue the message to send when the socket opens,
 *  unless the `discardMessagesBeforeOpen` parameter is true, in which case,
 *  input messages that are received before the socket is opened will be
 *  discarded. If messages are queued and `throttleFactor` is non-zero, then
 *  whenever a message is queued to be later sent, the accessor's input handler will stall
 *  by a number of milliseconds given by the queue size times the throttleFactor.
 *  The longer the queue, the longer the stall. Note that this will likely block
 *  the host from executing, so this feature should be used with caution.
 *
 *  Whenever a message is received from the socket, that message is
 *  produced on the `'received'` output. Note that the message may actually be sent
 *  over multiple 'frames', but the frames will be aggregated and produced as one
 *  message.
 *
 *  When `wrapup()` is invoked, this accessor closes the
 *  connection.
 *
 *  If the connection is dropped midway, the swarmlet may monitor the 'connected'
 *  output for a value 'false' and attempt a reconnection by providing either a
 *  port or server input.
 *
 *  The default type for both sending and receiving
 *  is 'application/json', which allows sending and receiving anything that has
 *  a string representation in JSON. The types supported by this implementation
 *  include at least:
 *  * __application/json__: The this.send() function uses JSON.stringify() and sends the
 *    result with a UTF-8 encoding. An incoming byte stream will be parsed as JSON,
 *    and if the parsing fails, will be provided as a string interpretation of the byte
 *    stream.
 *  * __text/\*__: Any text type is sent as a string encoded in UTF-8.
 *  * __image/x__: Where __x__ is one of __json__, __png__, __gif__,
 *    and more.
 *    In this case, the data passed to this.send() is assumed to be an image, as encoded
 *    on the host, and the image will be encoded as a byte stream in the specified
 *    format before sending.  A received byte stream will be decoded as an image,
 *    if possible.
 *
 *  When a model with an instance of this accessor stops executing, there
 *  are two mechanisms by which data in transit can be lost. In both cases, warning
 *  messages or error messages will be issued to the host to be displayed or otherwise
 *  handled as the host sees fit.
 *
 *  * First, there might be queued messages that were received on `toSend` but have not yet
 *    been sent, either because the socket has not yet been opened or because
 *    it was closed from the other side.
 *  * Second, a message might be received from the server after shutdown has commenced.
 *    In particular, received messages are handled asynchronously by a handler function
 *    that can be invoked at any time, and that handler might be invoked after it is no
 *    longer possible for this accessor to produce outputs (it has entered its wrapup
 *    phase of execution).
 *
 *  The server might similarly lose messages by the same two mechanisms occurring
 *  on the server side. In that case, messages will presumably be displayed on the
 *  server side.
 *
 *  Accessors that extend this one can override the `toSendInputHandler` function
 *  to customize what is sent. See `RosPublisher.js` for an example.
 *
 *  This accessor requires the 'webSocket' module.
 *
 *  @accessor net/WebSocketClient
 *  @input {string} server The IP address or domain name of server. Defaults to 'localhost'.
 *  @input {int} port The port on the server to connect to. Defaults to -1, which means
 *   wait for a non-negative input before connecting.
 *  @input toSend The data to be sent over the socket.
 *  @output {boolean} connected Output `true` on connected and `false` on disconnected.
 *  @output received The data received from the web socket server.
 *
 *  @parameter {string} receiveType The MIME type for incoming messages,
 *   which defaults to 'application/json'.
 *  @parameter {string} sendType The MIME type for outgoing messages,
 *   which defaults to 'application/json'.
 *  @parameter {int} connectTimeout The time in milliseconds to wait
 *   before giving up on a connection (default is 1000).
 *  @parameter {int} numberOfRetries The number of times to retry if
 *   a connection fails. Defaults to 5.
 *  @parameter {int} timeBetweenRetries The time between retries in milliseconds.
 *   Defaults to 500.
 *  @parameter {boolean} trustAll Whether to trust any server certificate.
 *   This defaults to false. Setting it to true means that if sslTls is set to true,
 *   then any certificate provided by the server will be trusted.
 *  @parameter {string} trustedCACertPath If sslTls is set to true and trustAll is set to false,
 *   then this option needs to specify the fully qualified filename for the file that stores
 *   the certificate of a certificate authority (CA) that this client will use to verify server
 *   certificates. This path can be any of those understood by the Ptolemy host, e.g. paths
 *   beginning with $CLASSPATH/.
 *   FIXME: Need to be a list of paths for certificates rather than a single path.
 *  @parameter {boolean} sslTls Whether SSL/TLS is enabled. This defaults to false.
 *  @parameter {boolean} discardMessagesBeforeOpen If true,
 *   then any messages received on `toSend` before the socket
 *   is open will be discarded. This defaults to false.
 *  @parameter {int} throttleFactor If non-zero, specifies a
 *   time (in milliseconds) to stall when a message is queued
 *   because the socket is not yet open. The time of the stall
 *   will be the queue size (after adding the message) times
 *   the throttleFactor. This defaults to 100. Making it non-zero
 *   causes the input handler to take time if there are pending unsent messages.

 *  @author Hokeun Kim, Marcus Pan, Edward A. Lee, Matt Weber
 *  @version $$Id$$
 */

// Stop extra messages from jslint and jshint.  Note that there should
// be no space between the / and the * and global. See
// https://chess.eecs.berkeley.edu/ptexternal/wiki/Main/JSHint */
/*global console, error, exports, require */
/*jshint globalstrict: true*/
'use strict';
/*jslint plusplus: true */

var WebSocket = require('@accessors-modules/web-socket-client');
var client = null;
var pendingSends = [];
var previousServer, previousPort;
var running = false;

var debug = false;

/** Set up the accessor by defining the parameters, inputs, and outputs. */
exports.setup = function () {
    this.input('server', {
        type: 'string',
        value: 'localhost'
    });
    this.input('port', {
        type: 'int',
        value: -1
    });
    this.input('toSend');
    this.output('connected', {
        type: 'boolean'
    });
    this.output('received');

    this.parameter('receiveType', {
        type: 'string',
        value: 'application/json'
    });
    this.parameter('sendType', {
        type: 'string',
        value: 'application/json'
    });
    this.parameter('connectTimeout', {
        value: 1000,
        type: "int"
    });
    this.parameter('numberOfRetries', {
        type: 'int',
        value: 5
    });
    this.parameter('timeBetweenRetries', {
        type: 'int',
        value: 500
    });
    this.parameter('trustAll', {
        type: 'boolean',
        value: false
    });
    this.parameter('trustedCACertPath', {
        type: 'string',
        value: ''
    });
    this.parameter('sslTls', {
        type: 'boolean',
        value: false
    });
    this.parameter('discardMessagesBeforeOpen', {
        type: 'boolean',
        value: false
    });
    this.parameter('throttleFactor', {
        type: 'int',
        value: 100
    });

    // Attempt to add a list of options for types, but do not error out
    // if the socket module is not supported by the host.
    try {
        this.parameter('receiveType', {
            options: WebSocket.supportedReceiveTypes()
        });
        this.parameter('sendType', {
            options: WebSocket.supportedSendTypes()
        });
    } catch (err) {
        this.error(err);
    }
};

/** Set up input handlers, and if the current value of the 'port' input is
 *  non-negative, initiate a connection to the server using the
 *  current parameter values, and
 *  set up handlers for for establishment of the connection, incoming data,
 *  errors, and closing from the server.
 */
exports.initialize = function () {
    if (debug) {
        console.log(this.accessorName + ': WebSockClient.js: initialize()');
    }
    this.addInputHandler('server', this.exports.connect.bind(this));
    this.addInputHandler('port', this.exports.connect.bind(this));
    this.addInputHandler('toSend', this.exports.toSendInputHandler.bind(this));
    running = true;
    this.exports.connect.call(this);
};

/** Initiate a connection to the server using the current parameter values,
 *  set up handlers for for establishment of the connection, incoming data,
 *  errors, and closing from the server, and set up a handler for inputs
 *  on the toSend() input port.
 */
exports.connect = function () {
    if (debug) {
        console.log(this.accessorName + ': WebSockClient.js: connect()');
    }
    // Note that if 'server' and 'port' both receive new data in the same
    // reaction, then this will be invoked twice. But we only want to open
    // the socket once.  This is fairly tricky.

    var portValue = this.get('port'),
        serverValue = null;
    if (portValue < 0) {
        // No port is specified. This could be a signal to close a previously
        // open socket.
        if (client) {
            client.close();
        }
        previousPort = null;
        previousServer = null;
        //
        console.log(this.accessorName + ': WebSocketClient.js: connect(): portValue: ' + portValue +
            ', which is less than 0. This could be a signal to close a previously open socket.' +
            '  Returning.');
        return;
    }

    serverValue = this.get('server');
    if (previousServer === serverValue && previousPort === portValue) {
        // A request to open a client for this server/port pair has already
        // been made and has not yet been closed or failed with an error.
        return;
    }
    // Record the host/port pair that we are now opening.
    previousServer = serverValue;
    previousPort = portValue;

    if (client) {
        // Either the host or the port has changed. Close the previous socket.
        client.close();
    }

    if (debug) {
        console.log(this.accessorName + ': WebSockClient.js: connect() calling new WebSocket.Client()');
    }
    client = new WebSocket.Client({
        'host': this.get('server'),
        'port': this.get('port'),
        'receiveType': this.getParameter('receiveType'),
        'sendType': this.getParameter('sendType'),
        'connectTimeout': this.getParameter('connectTimeout'),
        'numberOfRetries': this.getParameter('numberOfRetries'),
        'timeBetweenRetries': this.getParameter('timeBetweenRetries'),
        'trustAll': this.getParameter('trustAll'),
        'trustedCACertPath': this.getParameter('trustedCACertPath'),
        'sslTls': this.getParameter('sslTls'),
        'discardMessagesBeforeOpen': this.getParameter('discardMessagesBeforeOpen'),
        'throttleFactor': this.getParameter('throttleFactor')
    });

    // Using 'this.exports' rather than just 'exports' below allows these
    // functions to be overridden in derived accessors.
    client.on('open', this.exports.onOpen.bind(this));
    client.on('message', this.exports.onMessage.bind(this));
    client.on('close', this.exports.onClose.bind(this));

    client.on('error', function (message) {
        previousServer = null;
        previousPort = null;
        console.log(this.accessorName + ': WebSocketClient.js: Error: ' + message);
    });

    client.open();
    if (debug) {
        console.log(this.accessorName + ': WebSockClient.js: connect() done');
    }
};

/** Handles input on 'toSend'. */
exports.toSendInputHandler = function () {
    this.exports.sendToWebSocket.call(this, this.get('toSend'));
};

/** Sends JSON data to the web socket. */
exports.sendToWebSocket = function (data) {
    // May be receiving inputs before client has been set.
    if (client) {
        client.send(data);
    } else {
        if (!this.getParameter('discardMessagesBeforeOpen')) {
            pendingSends.push(data);
        } else {
            console.log(this.accessorName + 'WebSocketClient.js: Discarding data because socket is not open.');
        }
    }
};

/** Executes once  web socket establishes a connection.
 *  Sets 'connected' output to true.
 */
exports.onOpen = function () {
    var i;
    if (debug) {
        console.log(this.accessorName + ': WebSocketClient.js: onOpen(): Status: Connection established');
    }
    this.send('connected', true);

    // If there are pending sends, send them now.
    // Note this implementation requires that the host invoke
    // this callback function atomically w.r.t. the input handler
    // that adds messages to the pendingSends queue.
    for (i = 0; i < pendingSends.length; i += 1) {
        client.send(pendingSends[i]);
    }
    pendingSends = [];
};

/** Send false to 'connected' output.
 *  This will be called if either side closes the connection.
 */
exports.onClose = function () {
    previousServer = null;
    previousPort = null;

    if (debug) {
        console.log(this.accessorName + ': WebSocketClient.js onClose(): Status: Connection closed.');
    }

    // NOTE: Even if running is true, it can occur that it is too late
    // to send the message (the wrapup process has been started), in which case
    // the message may not be received.
    if (running) {
        this.send('connected', false);
    }
};

/** Send the message received from web socket to the 'received' output. */
exports.onMessage = function (message) {
    this.send('received', message);
};

/** Close the web socket connection. */
exports.wrapup = function () {
    running = false;
    if (client) {
        client.close();
        if (debug) {
            console.log(this.accessorName + 'WebSocketClient.js: Status: Connection closed in wrapup.');
        }
    }
};