// Copyright (c) 2015-2016 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//
/** This accessor subscribes to a Vert.x event bus, which is a peer-to-peer
* publish-and-subscribe system. This accessor will receive data that is
* sent by any subscriber that publishes to the same address and runs in
* the same Vertx cluster (see the VertxPublish accessor).
* The Vertx cluster normally includes all machines that hear multicast
* packets sent by the machine hosting this accessor.
*
* The output produced by this accessor may be any data type supported by
* accessors that has a string representation in JSON syntax.
* The data sent by a VertxPublish accessor is first converted to a string
* in JSON format and sent to the event bus. This VertxSubscribe accessor
* will parse that string and output the data in the native format of its host.
*
* If the reply input is set to a non-empty value, then whenever this accessor
* receives a point-to-point message from the event bus (see VertxPublish),
* then it will reply with the specified message, acknowledging receipt.
* The reply can also be any data type that has a JSON string representation.
*
* The busHost input specifies the name of the network interface through
* which to connect to the Vert.x event bus cluster, and busHostPort specifies
* the port to use for this. Normally, you can leave these at their default
* values unless you need to need to use a network interface that is not
* 'localhost' or you need to use a particular port.
* These two inputs are examined only at initialization time,
* so changing them during execution of a swarmlet will have no effect.
*
* @accessor net/VertxSubscribe
* @author Patricia Derler, Edward A. Lee, Ben Zhang
* @version $$Id$$
* @input {string} address The event bus address, which is the name of the event stream
* to which to subscribe. This defaults to 'topic'.
* @output message The message received.
* @input reply The reply to send back to the sender for point-to-point messages,
* or empty to send no reply (the default).
* @parameter {string} busHost The name of the network interface to use for the Vert.x
* event bus. A blank string is interpreted as 'localhost' (the default).
* @parameter {int} busHostPort The port for the Vert.x event bus. A value of 0
* indicates to just find an available port (the default).
*/
// Stop extra messages from jslint and jshint. Note that there should
// be no space between the / and the * and global. See
// https://chess.eecs.berkeley.edu/ptexternal/wiki/Main/JSHint */
/*globals addInputHandler, exports, get, input, output, parameter, removeInputHandler, require, send */
/*jshint globalstrict: true*/
'use strict';
var eventbus = require('eventbus');
/** Set up the accessor by defining the inputs, outputs, and parameters. */
exports.setup = function () {
this.input('address', {
'value': 'topic',
'type': 'string'
});
this.output('message');
this.parameter('busHost', {
'type': 'string'
});
this.parameter('busHostPort', {
'value': 0,
'type': 'int'
});
this.input('reply', {
'value': ''
});
};
var bus, currentAddress, addressHandle, replyHandle;
var onReceived = function (msg) {
this.send('message', msg);
};
exports.initialize = function () {
var port = this.get('busHostPort');
var host = this.get('busHost');
bus = new eventbus.VertxBus({
'port': port,
'host': host
});
currentAddress = this.get('address');
bus.subscribe(currentAddress);
bus.on(this.get('address'), onReceived.bind(this));
var replyText = this.get('reply');
if (replyText !== null && replyText !== '') {
bus.setReply(replyText);
}
addressHandle = this.addInputHandler('address', function () {
var topic = this.get('address');
if (topic != currentAddress) {
bus.unsubscribe(currentAddress);
bus.subscribe(topic);
}
});
replyHandle = this.addInputHandler('reply', function () {
var replyText = this.get('reply');
if (replyText) {
bus.setReply(replyText);
} else {
bus.setReply(null);
}
});
};
exports.wrapup = function () {
bus.unsubscribe();
this.removeInputHandler('address', addressHandle);
this.removeInputHandler('reply', replyHandle);
};