Source: org/terraswarm/accessor/accessors/web/net/SecureSubscriber.js

// Copyright (c) 2016 The Regents of the University of California.
// All rights reserved.

// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.

// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.

// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.

/** This accessor is used for subscribing secure messages over MQTt protocol,
 *  using authorization service provided by a local authorization entity.
 *
 *  This accessor requires the 'iotAuth' and 'mqtt' modules.
 *
 *  @accessor net/SecureSubscriber
 *
 *  @input subscribe The topic to subscribe.
 *  @input unsubscribe The topic to unsubscribe.
 *
 *  @output connection Output an object when a connection with a MQTT broker is established.
 *  @output subscription When subscribed, includes subscription information.
 *  @output received The received message.
 *  @output receivedTopic The topic of received message
 *
 *  @parameter {string} brokerHost The MQTT broker's IP address or domain name.
 *  @parameter {int} brokerPort The MQTT broker's port number.
 *  @parameter {string} topic The topic to publish.
 *  @parameter {int} qosLevel The minimum QoS level of MQTT to be received by the subscriber.
 *
 *  @parameter {string} subscriberName The subscriber's unique name in string.
 *  @parameter {string} authHost Auth's IP address or domain name.
 *  @parameter {int} authPort Auth's port number.
 *
 *  @parameter {string} authCertPath The path for the X.509 certificate file (in pem format)
 *   of Auth with which the publisher is registered.
 *  @parameter {string} subscriberPrivateKeyPath The path for the pem format private key of
 *   the subscriber.
 *
 *  @parameter {string} publicKeyCryptoSpec The specification for the public cryptography
 *   algorithms to be used for communication with Auth
 *  @parameter {string} distributionCryptoSpec The specification for the symmetric cryptography
 *   algorithms to be used for communication with Auth
 *  @parameter {string} sessionCryptoSpec The specification for the symmetric cryptography
 *   algorithms to be used for publishing messages from the publishers
 *
 *  @author Hokeun Kim
 *  @version $$Id$$
 */

// Stop extra messages from jslint.  Note that there should be no
// space between the / and the * and global.
/*global console, exports, require */
/*jshint globalstrict: true */
"use strict";

var iotAuth = require('iotAuth');
var mqtt = require('mqtt');

exports.setup = function () {
    // Inputs and outputs
    this.input('subscribe');
    this.input('unsubscribe');
    this.output('connection', {
        spontaneous: true
    });
    this.output('subscription');
    this.output('received', {
        spontaneous: true
    });
    this.output('receivedTopic');
    // MQTT information
    this.parameter('brokerHost', {
        type: 'string',
        value: ''
    });
    this.parameter('brokerPort', {
        type: 'int',
        value: 1883
    });
    this.parameter('qosLevel', {
        type: 'int',
        value: 0
    });

    // Subscriber information
    this.parameter('subscriberName', {
        value: '',
        type: 'string'
    });
    // For communication with Auth
    this.parameter('authHost', {
        type: 'string',
        value: 'localhost'
    });
    this.parameter('authPort', {
        value: -1,
        type: 'int'
    });
    this.parameter('authCertPath', {
        value: '',
        type: 'string'
    });
    this.parameter('subscriberPrivateKeyPath', {
        value: '',
        type: 'string'
    });
    // Spec for communication with Auth
    this.parameter('publicKeyCryptoSpec', {
        type: 'string',
        options: iotAuth.publicKeyCryptoSpecs
    });
    this.parameter('distributionCryptoSpec', {
        type: 'string',
        options: iotAuth.symmetricCryptoSpecs
    });
    // For communication with server
    this.parameter('sessionCryptoSpec', {
        type: 'string',
        options: iotAuth.symmetricCryptoSpecs
    });
};

var self;
var mqttClient;
var authPublicKey;
var subscriberPrivateKey;
var currentDistributionKey = null;
var currentSessionKeyList = [];
var subscribeSequenceNum = 0;

/*
callbackParameters = {
        keyId,
        topic,
        encryptedMessage
};
*/
function sessionKeyResponseCallback(status, distributionKey, sessionKeyList, callbackParameters) {
    if (status.error) {
        console.log(status.error);
        console.log('session key request failed...');
        return;
    }
    console.log('session key request succeeded');

    if (distributionKey) {
        console.log('Updating to a new distribution key key');
        currentDistributionKey = distributionKey;
    }

    console.log('received ' + sessionKeyList.length + ' session keys');
    for (var i = 0; i < sessionKeyList.length; i++) {
        if (sessionKeyList[i].id === callbackParameters.keyId) {
            console.log('key id is as expected!');
            currentSessionKeyList.push(sessionKeyList[i]);
            var ret = iotAuth.decryptSecurePublishedMessage(callbackParameters.encryptedMessage,
                self.getParameter('sessionCryptoSpec'), sessionKeyList[i]);
            if (!ret.success) {
                self.error('Error in decrypting published message - Details: ' + ret.error);
                return;
            }
            console.log('Received sequence number: ' + ret.sequenceNum);
            self.send('received', mqtt.byteArrayToString(ret.message));
            self.send('receivedTopic', callbackParameters.topic);
        } else {
            self.error('key id is not as expected!');
            return;
        }
    }
}

function onMessage(topic, data) {
    var ret = iotAuth.getKeyIdOfSecurePublishedMessage(data);
    if (!ret.success) {
        self.error('Error in published message - Details: ' + ret.error);
        return;
    }
    for (var i = 0; i < currentSessionKeyList.length; i++) {
        if (currentSessionKeyList[i].id === ret.keyId) {
            console.log('Session key is available!');
            ret = iotAuth.decryptSecurePublishedMessage(ret.encryptedMessage,
                self.getParameter('sessionCryptoSpec'), currentSessionKeyList[i]);
            if (!ret.success) {
                self.error('Error in decrypting published message - Details: ' + ret.error);
                return;
            }
            console.log('Received sequence number: ' + ret.sequenceNum);
            self.send('received', mqtt.byteArrayToString(ret.message));
            self.send('receivedTopic', topic);
            return;
        }
    }
    console.log('Session key for published message is not available, sending session key request..');
    var options = {
        authHost: self.getParameter('authHost'),
        authPort: self.getParameter('authPort'),
        entityName: self.getParameter('subscriberName'),
        numKeysPerRequest: 1,
        purpose: {
            keyId: ret.keyId
        },
        distributionKey: currentDistributionKey,
        distributionCryptoSpec: self.getParameter('distributionCryptoSpec'),
        publicKeyCryptoSpec: self.getParameter('publicKeyCryptoSpec'),
        authPublicKey: authPublicKey,
        entityPrivateKey: subscriberPrivateKey
    };
    var callbackParameters = {
        keyId: ret.keyId,
        topic: topic,
        encryptedMessage: ret.encryptedMessage
    };
    iotAuth.sendSessionKeyRequest(options, sessionKeyResponseCallback, callbackParameters);
}

function onConnect() {
    self.send('connection', 'connected to broker');
}

exports.subscribeInputHandler = function () {
    var topic = this.get('subscribe');
    if (mqttClient.connected) {
        mqttClient.subscribe(topic);
        self.send('subscription', 'Topic: ' + topic + ' - subscribed');
    } else {
        self.error('Client is not connected to broker, subscribe failed. Topic: ' + topic);
    }
};

exports.unsubscribeInputHandler = function () {
    var topic = this.get('unsubscribe');
    if (mqttClient.connected) {
        mqttClient.unsubscribe(topic);
        self.send('subscription', 'Topic: ' + topic + ' - unsubscribed');
    } else {
        self.error('Client is not connected to broker, unsubscribe failed. Topic: ' + topic);
    }
};

exports.initialize = function () {
    self = this;

    currentDistributionKey = null;
    currentSessionKeyList = [];
    subscribeSequenceNum = 0;
    authPublicKey = iotAuth.loadPublicKey(this.getParameter('authCertPath'));
    subscriberPrivateKey = iotAuth.loadPrivateKey(this.getParameter('subscriberPrivateKeyPath'));

    this.addInputHandler('subscribe', exports.subscribeInputHandler.bind(this));
    this.addInputHandler('unsubscribe', exports.unsubscribeInputHandler.bind(this));
    mqttClient = mqtt.createClient(
        self.getParameter('brokerPort'),
        self.getParameter('brokerHost'), {
            rawBytes: true
        });
    mqttClient.on('connect', onConnect);
    mqttClient.on('message', onMessage);
    mqttClient.start();
};

exports.wrapup = function () {
    mqttClient.end();
};