// Copyright (c) sock-2016 The Regents of the University of California.
// All rights reserved.
//
// Permission is hereby granted, without written agreement and without
// license or royalty fees, to use, copy, modify, and distribute this
// software and its documentation for any purpose, provided that the above
// copyright notice and the following two paragraphs appear in all copies
// of this software.
//
// IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
// FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
// ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
// THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
// SUCH DAMAGE.
//
// THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
// PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
// CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
// ENHANCEMENTS, OR MODIFICATIONS.
//
/** This accessor subscribes to a pre-established ROS topic.<br>
* It communicates to ROS through the rosbridge web socket, and extends the
* WebSocketClient accessor to do so.
* It has a 'topic' parameter, that must be prefixed with a '/' eg: '/noise'.<br>.
* The other parameters configure how the data is to be received according
* to the rosbridge specification:
* https://github.com/RobotWebTools/rosbridge_suite/blob/develop/ROSBRIDGE_PROTOCOL.md#344-subscribe
*
* @accessor robotics/RosSubscriber
* @parameter {string} topic The ROS topic to subscribe to.
* @parameter {int} throttleRate The minimum amount of time (in ms)
* that must elapse between messages sent. Defaults to 0.
* @parameter {int} queueLength The ROS size of the queue to buffer messages.
* Messages are buffered as a result of the throttleRate. Defaults to 1.
* @parameter {int} fragment_size The maximum size that a message can take
* before it is to be fragmented. Defaults to 1000. Ptolemy will close the
* model if fragment size is too large (not sure what the maximum is).
* @parameter {string} compression A string to specify the compression
* scheme to be used on messages. Options are "none" (default) and "png".
* @parameter {boolean} outputCompleteResponseOnly A flag which if set to true
* will cause the accessor to delay in sending messages on the "received" port
* until it has concatenated the data fields from message fragments back into
* the original unfragmented message. Otherwise it will send the message
* fragments as they come in.
* @output {boolean} connected The status of the web socket connection.
* @output {JSON} received The data received from the web socket server.
* @author Marcus Pan, Matt Weber
* @version $$Id$$
*/
// Stop extra messages from jslint and jshint. Note that there should
// be no space between the / and the * and global. See
// https://chess.eecs.berkeley.edu/ptexternal/wiki/Main/JSHint */
/*globals console, getParameter, exports, extend, parameter, send */
/*jshint globalstrict: true*/
'use strict';
/** Sets up by accessor by inheriting inputs, outputs and parameters from setup() in WebSocketClient.<br>
* Adds a 'topic' input which is the ROS topic to subscribe to. */
exports.setup = function () {
this.extend('net/WebSocketClient');
this.parameter('topic', {
type: "string",
value: ""
});
this.parameter('throttleRate', {
type: "int",
value: 0
});
this.parameter('queueLength', {
type: "int",
value: 10
});
this.parameter('fragmentSize', {
type: "int",
value: 10000
});
this.parameter('outputCompleteResponseOnly', {
type: "boolean",
value: true
});
this.parameter('compression', {
type: "string",
value: 'none'
});
};
/** Overrides the toSendInputHandler to throw an error if called.
* A subscriber should not be publishing inputs.
*/
exports.toSendInputHandler = function () {
console.error('This is a subscriber and does not take input to publish.');
};
/** Inherits initialize from webSocketClient.
* Sends a message to rosbridge to start subscribing to the topic on input 'topic'.
*/
exports.initialize = function () {
this.exports.ssuper.initialize.call(this);
this.exports.sendToWebSocket.call(this, {
"op": "subscribe",
"topic": this.getParameter('topic'),
"throttle_rate": this.getParameter('throttleRate'),
"queue_length": this.getParameter('queueLength'),
"fragment_size": this.getParameter('fragmentSize'),
"compression": this.getParameter('compression')
});
};
/** Unsubscribe from the topic. Close websocket connections by calling wrapup of WebSocketClient */
exports.wrapup = function () {
var unsubscribe = {
"op": "unsubscribe",
"topic": this.getParameter('topic')
};
this.exports.sendToWebSocket.call(this, unsubscribe);
this.exports.ssuper.wrapup.call(this);
};
//Combines fragments into the original message. If the message is incomplete this function
//returns null. When the entire message has been received it returns the whole message.
exports.defragmentMessage = (function () {
//This closure remembers the number and content of fragments already seen.
var originalMessage = "",
fragmentCount = 0,
processMessage = function (message) {
//Check for missing fragment
if (fragmentCount !== message.num) {
console.error("Fragment " +
fragmentCount +
" of message is missing. Instead received fragment number " +
message.num);
}
//Accumulate data from fragment.
if (fragmentCount === 0) {
originalMessage = message.data;
fragmentCount += 1;
return null;
} else if (fragmentCount < message.total - 1) {
originalMessage += message.data;
fragmentCount += 1;
return null;
} else if (fragmentCount == message.total - 1) {
originalMessage += message.data;
fragmentCount = 0;
return originalMessage;
} else {
console.error("Error in reconstructing fragments. Fragment count exceeds indicated total.");
return null;
}
};
return processMessage;
})();
exports.onMessage = function (message) {
var messageToSend;
if (this.getParameter('outputCompleteResponseOnly') && message.op === "fragment") {
messageToSend = this.defragmentMessage(message);
if (messageToSend === null) {
return;
}
} else {
messageToSend = message;
}
this.send('received', messageToSend);
};