// Below is the copyright agreement for the Ptolemy II system.
//
// Copyright (c) 2017 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//
//
/**
* Module supporting the MQTT protocol of the accessors framework using
* the MQTT module of Node.js.
*
* @module mqtt
* @author Edward A. Lee and Ravi Akella
* @version $$Id$$
*/
// Stop extra messages from jslint. Note that there should be no
// space between the / and the * and global.
/*globals actor, Java, module, require, util */
/*jshint globalstrict: true */
"use strict";
var nodeHost = require('@accessors-hosts/node');
var mqtt = nodeHost.installIfMissingThenRequire('mqtt');
module.exports.createClient = function (port, host, options) {
return new Client(port, host, options);
};
////////////////////
// Convert data format from binary array to string.
module.exports.byteArrayToString = function (data) {
var result = "";
for (var i = 0; i < data.length; i++) {
result += String.fromCharCode(data[i]);
}
return result;
};
////////////////////
// Construct an instance of an MQTT client.
var events = require('events');
function Client(port, host, options) {
if (typeof port != 'number') {
throw "Invalid MQTT broker port";
}
if (typeof host != 'string') {
throw "Invalid MQTT broker host";
}
if (options === null || options === undefined) {
options = {};
}
this.nodeClient = null;
this.host = host;
this.port = port;
this.options = options;
// FIXME: Is this needed?
// events.EventEmitter.call(this);
}
var util = require('util');
util.inherits(Client, events.EventEmitter);
////////////////////
// A property for whether the client is connected to a broker server.
Object.defineProperties(Client.prototype, {
connected: {
get: function () {
if (!this.nodeClient) {
return false;
}
return this.nodeClient.connected;
}
}
});
////////////////////
// End the connection to the broker.
Client.prototype.end = function () {
if (this.nodeClient) {
this.nodeClient.end();
}
}
////////////////////
// Subscribe a topic using the given maximum QoS level. Start getting messages on the topic.
Client.prototype.subscribe = function (topic, options) {
if (!options) {
options = {
qos: 0
};
}
var qos;
if (options.qos) {
qos = options.qos;
} else {
qos = 0;
}
var self = this;
//console.log('Subscribing to topic: ' + topic);
this.nodeClient.subscribe(topic, qos);
this.nodeClient.on('message', function(mtopic, message, packet) {
if (mtopic == topic) {
// FIXME: Only handling string messages right now. message is a Buffer.
self.emit('message', topic, message.toString());
}
});
};
/** Start connection between the client and the broker server. */
Client.prototype.start = function () {
var self = this;
this.nodeClient = mqtt.connect('mqtt://' + this.host + ':' + this.port, this.options);
this.nodeClient.on('connect', function() {
self.emit('connect');
});
this.nodeClient.on('close', function() {
self.emit('close');
});
this.nodeClient.on('error', function(e) {
self.emit('error', e);
});
};
/** Unsubscribe to a topic. Stop getting messages on the topic. */
Client.prototype.unsubscribe = function (topic) {
if (!this.nodeClient) {
// Nothing to do. Not subscribed.
return;
}
//console.log('Unsubscribing from topic' + topic);
this.nodeClient.unsubscribe(topic);
};
/** Publish an MQTT message to subscribers listening to the topic. */
Client.prototype.publish = function (topic, message, opts, callback) {
if (!this.nodeClient) {
throw 'No connection. Call start().';
}
if (!opts) {
opts = {
qos: 0,
retain: false
};
}
var qos;
if (opts.qos) {
qos = opts.qos;
} else {
qos = 0;
}
var retain;
if (opts.retain) {
retain = opts.retain;
} else {
retain = false;
}
this.nodeClient.publish(topic, message, qos, retain);
};