Source: org/terraswarm/accessor/accessors/web/net/VertxPublish.js

// Copyright (c) 2015-2016 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//

/** Publish to a Vert.x event bus, which is a peer-to-peer publish-and-subscribe system.
 *  The published data will be sent to any subscriber that subscribes to the same
 *  address and runs in the same Vertx cluster (see the VertxSubscribe accessor).
 *  The Vertx cluster normally includes all machines that hear multicast packets
 *  sent by the machine hosting this accessor.
 *
 *  The input to be published can be any data type supported by accessors that
 *  has a string representation in JSON syntax. The data will be converted to a
 *  string in JSON format and sent to the event bus. The VertxSubscribe accessor
 *  will parse that string and output the data in the native format of its host.
 *
 *  If the broadcast input is set to false, then instead of broadcasting the
 *  message to all subscribers, the Vertx bus will pick exactly one subscriber
 *  and send the message to it.  The bus picks the subscribers in an approximately
 *  round-robin fashion. If the subscriber replies to this message, then that
 *  will reply (required to be also be a JSON object) will be produced on the
 *  reply output port.
 *
 *  The busHost input specifies the name of the network interface through which
 *  to connect to the Vert.x event bus cluster, and busHostPort specifies the
 *  port to use for this. Normally, you can leave these at their default values
 *  unless you need to need to use a network interface that is not 'localhost'
 *  or you need to use a particular port. These two inputs are examined only
 *  at initialization time, so changing them during execution of a swarmlet
 *  will have no effect.
 *
 *  @accessor net/VertxPublish
 *  @author Patricia Derler, Edward A. Lee, Ben Zhang
 *  @version $$Id$$
 *  @input {string} address The event bus address, which is the name of the event stream.
 *   This defaults to 'topic'.
 *  @input message The message to publish.
 *  @input {boolean} broadcast Indicator of whether to send to all subscribers or just one.
 *   This defaults to true, which means to send to all subscribers.
 *  @parameter {string} busHost The name of the network interface to use for the Vert.x
 *   event bus. A blank string is interpreted as 'localhost' (the default).
 *  @parameter {int} busHostPort The port for the Vert.x event bus. A value of 0
 *   indicates to just find an available port (the default).
 *  @output reply The reply, if any, received after a point-to-point send
 *   (where broadcast == false).
 */

// Stop extra messages from jslint and jshint.  Note that there should
// be no space between the / and the * and global. See
// https://chess.eecs.berkeley.edu/ptexternal/wiki/Main/JSHint */
/*globals addInputHandler, exports, get, input, output, parameter, removeInputHandler, require, send */
/*jshint globalstrict: true*/
'use strict';

var eventbus = require('eventbus');

/** Set up the accessor by defining the inputs and outputs. */
exports.setup = function () {
    this.input('address', {
        'value': 'topic',
        'type': 'string'
    });
    this.input('message');
    this.input('broadcast', {
        'value': true,
        'type': 'boolean'
    });
    this.parameter('busHost', {
        'type': 'string'
    });
    this.parameter('busHostPort', {
        'value': 0,
        'type': 'int'
    });
    this.output('reply');
};

// State variables.
var bus;
var handle;

var replyHandler = function (message) {
    this.send('reply', message);
};

exports.initialize = function () {
    var port = this.get('busHostPort');
    var host = this.get('busHost');
    bus = new eventbus.VertxBus({
        'port': port,
        'host': host
    });

    handle = this.addInputHandler('message', function () {
        var topic = this.get('address');
        var msg = this.get('message');
        var all = this.get('broadcast');
        if (msg) {
            if (all) {
                bus.publish(topic, msg);
            } else {
                bus.send(topic, msg, replyHandler.bind(this));
            }
        }
    });
};

exports.wrapup = function () {
    bus.unsubscribe();
    this.removeInputHandler(handle, 'message');
};