Source: org/terraswarm/accessor/accessors/web/net/MQTTSubscriber.js

// Copyright (c) 2016-2017 The Regents of the University of California.
// All rights reserved.

// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.

// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.

// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.

/** Subscribe to MQTT protocol messages.
 *  MQTT is a lightweight messaging protocol.
 *  The brokerHost and brokerPort parameters specify the IP address and port
 *  of an MQTT broker, such as Mosquito.
 *  When a connection to the broker is established, a message will be produced
 *  on the connection output.
 *  To subscribe to a topic, provide the topic name to the subscribe input.
 *  If you send multiple topics to this input, it will subscribe to all the specified
 *  topics. To subscribe to all topics provided by the broker, give # as the topic name.
 *  To unsubscribe to a topic, provide the topics name to the unsubscribe input.
 *  
 *  This accessor requires the 'mqtt' module.
 *
 *  @input subscribe The topic name to which to subscribe.
 *   Use # to subscribe to all topics.
 *  @input unsubscribe A topic to unsubscribe from.
 *  @output connection Output on which a string is sent when a
 *   connection to the broker has been established.
 *  @output subscription Output on which a string is sent when a
 *   subscription is initiated or terminated.
 *  @output received Output on which received data is produced.
 *  @output receivedTopic Output indicating the topic of received data.
 *  @param brokerHost The IP address or domain name of the MQTT broker.
 *   If you don't have a local MQTT Broker, then try 
 *   iot.eclipse.org.  If you use iot.eclipse.org, then avoid
 *   using "#" as a topic.
 *  @param brokerPort The port for the MQTT broker, which defaults
 *   to 1883.
 *
 *  @accessor net/MQTTSubscriber
 *  @author Hokeun Kim, contributor: Christopher Brooks
 *  @version $$Id$$
 */

// Stop extra messages from jslint.  Note that there should be no
// space between the / and the * and global.
/*global console, exports, require */
/*jshint globalstrict: true */
"use strict";

var mqtt = require('@accessors-modules/mqtt');

exports.setup = function () {
    // Inputs and outputs
    this.input('subscribe', {
        'type': 'string',
        'value': ''
    });
    this.input('unsubscribe');
    this.output('connection', {
        spontaneous: true
    });
    this.output('subscription');
    this.output('received', {
        spontaneous: true
    });
    this.output('receivedTopic');
    this.parameter('brokerHost', {
        type: 'string',
        value: ''
    });
    this.parameter('brokerPort', {
        type: 'int',
        value: 1883
    });
};

var self;
var mqttClient;

function onMessage(topic, data) {
    self.send('received', data);
    self.send('receivedTopic', topic);
}

function onConnect() {
    self.send('connection', 'connected to broker');
    // In case there is a topic, subscribe to it.
    exports.subscribeInputHandler.call(self);
}

exports.subscribeInputHandler = function () {
    var topic = this.get('subscribe');
    if (topic === '') {
        // No topic is given.
        return;
    }
    if (mqttClient.connected) {
        mqttClient.subscribe(topic);
        this.send('subscription', 'Topic: ' + topic + ' - subscribed');
    } else {
        this.error('Client is not connected to broker, subscribe failed. Topic: ' + topic);
    }
};

exports.unsubscribeInputHandler = function () {
    var topic = this.get('unsubscribe');
    if (mqttClient.connected) {
        mqttClient.unsubscribe(topic);
        this.send('subscription', 'Topic: ' + topic + ' - unsubscribed');
    } else {
        this.error('Client is not connected to broker, unsubscribe failed. Topic: ' + topic);
    }
};

exports.initialize = function () {
    self = this;
    this.addInputHandler('subscribe', exports.subscribeInputHandler.bind(this));
    this.addInputHandler('unsubscribe', exports.unsubscribeInputHandler.bind(this));
    mqttClient = mqtt.createClient(this.getParameter('brokerPort'), this.getParameter('brokerHost'));
    mqttClient.on('connect', onConnect.bind(this));
    mqttClient.on('message', onMessage.bind(this));
    mqttClient.on('error', function(message) {
    	error(message);
    });
    mqttClient.start();
};

exports.wrapup = function () {
    if (mqttClient) {
        mqttClient.end();
    }
};