Source: org/terraswarm/accessor/accessors/web/hosts/node/node_modules/@accessors-modules/mqtt/mqtt.js

// Below is the copyright agreement for the Ptolemy II system.
//
// Copyright (c) 2017 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//
//
/**
 * Module supporting the MQTT protocol of the accessors framework using
 * the MQTT module of Node.js.
 * 
 * @module mqtt
 * @author Edward A. Lee and Ravi Akella
 * @version $$Id$$
 */

// Stop extra messages from jslint.  Note that there should be no
// space between the / and the * and global.
/*globals actor, Java, module, require, util */
/*jshint globalstrict: true */
"use strict";

var nodeHost = require('@accessors-hosts/node');
var mqtt = nodeHost.installIfMissingThenRequire('mqtt');

module.exports.createClient = function (port, host, options) {
    return new Client(port, host, options);
};

////////////////////
// Convert data format from binary array to string.
module.exports.byteArrayToString = function (data) {
    var result = "";
    for (var i = 0; i < data.length; i++) {
        result += String.fromCharCode(data[i]);
    }
    return result;
};

////////////////////
// Construct an instance of an MQTT client.
var events = require('events');

function Client(port, host, options) {
    if (typeof port != 'number') {
        throw "Invalid MQTT broker port";
    }
    if (typeof host != 'string') {
        throw "Invalid MQTT broker host";
    }
    if (options === null || options === undefined) {
        options = {};
    }
    
    this.nodeClient = null;
    this.host = host;
    this.port = port;
    this.options = options;

    // FIXME: Is this needed?
    // events.EventEmitter.call(this);
}

var util = require('util');
util.inherits(Client, events.EventEmitter);

////////////////////
// A property for whether the client is connected to a broker server.
Object.defineProperties(Client.prototype, {
    connected: {
        get: function () {
            if (!this.nodeClient) {
                return false;
            }
            return this.nodeClient.connected;
        }
    }
});

////////////////////
// End the connection to the broker.
Client.prototype.end = function () {
    if (this.nodeClient) {
        this.nodeClient.end();
    }
}

////////////////////
// Subscribe a topic using the given maximum QoS level. Start getting messages on the topic.
Client.prototype.subscribe = function (topic, options) {
    if (!options) {
        options = {
            qos: 0
        };
    }

    var qos;
    if (options.qos) {
        qos = options.qos;
    } else {
        qos = 0;
    }
    var self = this;
    //console.log('Subscribing to topic: ' + topic);
    this.nodeClient.subscribe(topic, qos);
    this.nodeClient.on('message', function(mtopic, message, packet) {
        if (mtopic == topic) {
            // FIXME: Only handling string messages right now. message is a Buffer.
            self.emit('message', topic, message.toString());
        }
    });
};

/** Start connection between the client and the broker server. */
Client.prototype.start = function () {
    var self = this;
    this.nodeClient = mqtt.connect('mqtt://' + this.host + ':' + this.port, this.options);

    this.nodeClient.on('connect', function() {
        self.emit('connect');
    });

    this.nodeClient.on('close', function() {
        self.emit('close');
    });

    this.nodeClient.on('error', function(e) {
        self.emit('error', e);
    });
};

/** Unsubscribe to a topic. Stop getting messages on the topic. */
Client.prototype.unsubscribe = function (topic) {
    if (!this.nodeClient) {
        // Nothing to do. Not subscribed.
        return;
    }

    //console.log('Unsubscribing from topic' + topic);
    this.nodeClient.unsubscribe(topic);
};

/** Publish an MQTT message to subscribers listening to the topic. */
Client.prototype.publish = function (topic, message, opts, callback) {
    if (!this.nodeClient) {
        throw 'No connection. Call start().';
    }
    if (!opts) {
        opts = {
            qos: 0,
            retain: false
        };
    }

    var qos;
    if (opts.qos) {
        qos = opts.qos;
    } else {
        qos = 0;
    }

    var retain;
    if (opts.retain) {
        retain = opts.retain;
    } else {
        retain = false;
    }

    this.nodeClient.publish(topic, message, qos, retain);
};