001/*
002 * Copyright (c) 1998-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-08-24 22:45:41 +0000 (Mon, 24 Aug 2015) $' 
007 * '$Revision: 33631 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.data.datasource.dataturbine;
031
032import java.text.ParseException;
033import java.text.SimpleDateFormat;
034import java.util.Collection;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.Iterator;
038import java.util.Map;
039import java.util.Vector;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.regex.Matcher;
042
043import org.apache.commons.logging.Log;
044import org.apache.commons.logging.LogFactory;
045import org.ecoinformatics.seek.datasource.DataSourceIcon;
046
047import com.rbnb.sapi.ChannelMap;
048import com.rbnb.sapi.SAPIException;
049import com.rbnb.sapi.Sink;
050
051import ptolemy.actor.TypedIOPort;
052import ptolemy.actor.lib.LimitedFiringSource;
053import ptolemy.actor.parameters.PortParameter;
054import ptolemy.data.ArrayToken;
055import ptolemy.data.BooleanToken;
056import ptolemy.data.DoubleMatrixToken;
057import ptolemy.data.DoubleToken;
058import ptolemy.data.FloatToken;
059import ptolemy.data.IntMatrixToken;
060import ptolemy.data.IntToken;
061import ptolemy.data.LongToken;
062import ptolemy.data.RecordToken;
063import ptolemy.data.ShortToken;
064import ptolemy.data.StringToken;
065import ptolemy.data.Token;
066import ptolemy.data.UnsignedByteToken;
067import ptolemy.data.expr.Parameter;
068import ptolemy.data.type.ArrayType;
069import ptolemy.data.type.BaseType;
070import ptolemy.data.type.RecordType;
071import ptolemy.data.type.Type;
072import ptolemy.kernel.CompositeEntity;
073import ptolemy.kernel.util.IllegalActionException;
074import ptolemy.kernel.util.NameDuplicationException;
075import ptolemy.kernel.util.Workspace;
076import ptolemy.util.MessageHandler;
077
078/**
079 * The DataTurbine actor retrieves and outputs data from an RBNB DataTurbine server. 
080 * Sink mode Request has been tested beneath SDF,
081 * modes Monitor and Subscribe briefly in PN.
082 * 
083 * @author Derik Barseghian
084 * @version $Id: DataTurbine.java 33631 2015-08-24 22:45:41Z crawl $
085 */
086
087public class DataTurbine extends LimitedFiringSource {
088
089        public static final String ZEROTIME = "0";
090        private static Log log = LogFactory.getLog("org.kepler.data.datasource.datasource.dataturbine.DataTurbine");
091        static {
092          System.setProperty("ZEROTIME", ZEROTIME);
093        }
094        private static final String ARRAY_OF_X_RECORDS = "Array of x records";
095        private static final String RECORD_OF_2_ARRAYS = "Record of 2 arrays";
096        public static final String SINKMODE_MONITOR = "Monitor";
097        public static final String SINKMODE_REQUEST = "Request";
098        public static final String SINKMODE_SUBSCRIBE = "Subscribe";
099
100        /** The URL to the DataTurbine Server */
101        public PortParameter dataTurbineAddressInputParam;
102
103        /** Actor mode */
104        public Parameter actorModeInputParam;
105        
106        /** The name of the channel to output through the specifiedChannel output port */
107        public PortParameter outputChannelPortParam;
108        
109        /**
110         * The amount of time (ms) to wait for data to become available. Use 0 for
111         * no delay or any negative number for an infinite delay.
112         */
113        public Parameter blockTimeoutParam;
114
115        /**
116         * Sink mode.
117         * <ul>
118         * <li>Request: Initiates a request for a specific time slice of data.</li>
119         * <li>Subscribe: Starts a continuous feed of data on the specified channels
120         * to this sink, for retrieval. Each block retrieved will be duration time
121         * units in length.</li>
122         * <li>Monitor: Similar to Subscribe, but allows for continuous frames of
123         * data without gaps.</li>
124         * </ul>
125         */
126        public Parameter sinkModeInputParam;
127
128        /** Start time for Request or Subscribe modes. seconds or Date: yyyy-MM-dd HH:mm:ss */
129        public PortParameter startTimePortParam;
130
131        /** The duration of the request. Unit is seconds unless fetchByFrame is set. */
132        public PortParameter durationPortParam;
133        
134        /** channelNames - This output port outputs all of the filtered (non-metric)
135         * channel names. 
136         */
137        public static final String CHANNEL_NAMES_OUTPUT_PORT = "channelNames";
138
139        /**
140         * For Subscribe mode: Any of "newest", "oldest", "absolute", "next", or
141         * "previous". <br />
142         * For Request mode:
143         * <ul>
144         * <li>"absolute" -- The start parameter is absolute time from midnight, Jan
145         * 1st, 1970 UTC.</li>
146         * <li>"newest" -- The start parameter is measured from the most recent data
147         * available in the server at the time this request is received. Note that
148         * for this case, the start parameter actually represents the end of the
149         * duration, and positive times proceed toward oldest data.</li>
150         * <li>"oldest" -- As "newest", but relative to the oldest data.</li>
151         * <li>"aligned" -- As "newest", but rather than per channel, this is
152         * relative to the newest for all of the channels.</li>
153         * <li>"after" -- A combination between "absolute" and "newest", this flag
154         * causes the server to return the newest data available after the specified
155         * start time. Unlike "newest", you do not have to request the data to find
156         * out that you already have it. Unlike "absolute", a gap may be inserted in
157         * the data to provide you with the freshest data.</li>
158         * <li>"modified" -- Similar to "after", but attempts to return a duration's
159         * worth of data in a contiguous block. If the data is not available after
160         * the start time, it will be taken from before the start time.</li>
161         * <li>"next" - gets the data that immediately follows the time range
162         * specified. This will skip over gaps.</li>
163         * <li>"previous" - get the data that immediately preceeds the time range
164         * specified. This will skip over gaps.</li>
165         * </ul>
166         * 
167         * */
168        public Parameter referenceInputParam;
169
170        /**
171         * Format of output datapoint and timestamp pairs: Record of 2 Arrays, or an
172         * Array of X Records.
173         */
174        public Parameter outputRecordOfArrays;
175
176        /**
177         * Will attempt to identify and pad gappy data with pairs of timestamps and
178         * nils. Need at least 2 samples to be able to pad.
179         */
180        public Parameter tryToPad;
181
182        private Sink _sink = null;
183        private ChannelMap _map = null;
184        private ChannelMap _registrationMap = null;
185
186        
187        private final static String DEFAULT_RBNB_CLIENT_NAME = "KeplerClient";
188        private String _rbnbClientName = DEFAULT_RBNB_CLIENT_NAME;
189
190        private String _url = "";
191        private String _specifiedOutputChannel = "";
192        /** specificChannel*/
193        private final static String SPECIFIC_CHANNEL = "specificChannel";
194        /** Subscribe, Monitor, or Request */
195        private String _sinkMode = SINKMODE_REQUEST;
196        /** for Request or Subscribe mode */
197        private String _startTime = "0";
198        /** for Request or Subscribe mode */
199        private String _duration = "0";
200        /** for Request mode: absolute,
201        * newest, oldest, aligned, after,
202        * modified, next, previous 
203        */
204        private String _reference = "absolute";
205        /** for Request mode */
206        private String[] _connectedOutputPortNames = null;
207        /** for Request mode - time (ms) to wait
208        * for data to become available. 0 is no
209        * delay. negative number for infinite
210        * delay.
211        */
212        private int _blockTimeout = 15000; 
213        private double _startTimeDouble = 0.0;
214        private double _durationDouble = 0.0;
215        // TODO: eventually allow use of different date patterns:
216        private final static String pattern = "yyyy-MM-dd HH:mm:ss";
217        private final static SimpleDateFormat format = new SimpleDateFormat(pattern);
218        private Date startDate = null;
219        private boolean reload = false;
220
221        private int _numChans = 0;
222        private int _numChans2 = 0;
223        private String[] _chanNames = null;
224        private String[] _chanTypes = null;
225        private double[] _chanDurations = null;
226        private String[] _chanMimeTypes = null;
227        private String[] _filteredChanNames = null;
228        private String[] _filteredChanTypes = null;
229        private double[] _filteredChanDurations = null;
230        private String[] _filteredChanMimeTypes = null;
231        private double[] _filteredChanStartTimes = null;
232        private Map<String,String> _userInfoTypesMap = new HashMap<String,String>();
233        
234        public String outputRecOfArrays = RECORD_OF_2_ARRAYS;
235
236        private final static String[] labels = { "timestamps", "data" };
237        private Token[] values = new Token[labels.length];
238        public boolean paddingOn = false;
239        /** padding is not always possible. */
240        public boolean willPad = paddingOn; 
241
242        private DataSourceIcon _icon;
243
244        /** for use with Monitor mode, presumably value
245        * doesn't matter since gapControl logic is not
246        * yet implemented in DT
247        */
248        private int gapControl = 0;
249        
250    /** If true, director told us to stop firing. */
251    private AtomicBoolean _stopRequested = new AtomicBoolean(false);
252
253        /**
254         * Construct a DataTurbine source with the given container and name.
255         * 
256         * @param name
257         *            The name of this actor.
258         * @exception IllegalActionException
259         *                If the entity cannot be contained by the proposed
260         *                container.
261         * @exception NameDuplicationException
262         *                If the container already has an actor with this name.
263         */
264        public DataTurbine(CompositeEntity container, String name)
265                        throws NameDuplicationException, IllegalActionException {
266                super(container, name);
267
268                _icon = new DataSourceIcon(this);
269
270                dataTurbineAddressInputParam = new PortParameter(this, "DataTurbine Address");
271                dataTurbineAddressInputParam.setStringMode(true);
272                dataTurbineAddressInputParam.setTypeEquals(BaseType.STRING);
273                dataTurbineAddressInputParam.getPort().setTypeEquals(BaseType.STRING);
274                
275                outputChannelPortParam = new PortParameter(this, 
276                                "specificChannel Name");
277                outputChannelPortParam.setStringMode(true);
278                outputChannelPortParam.setTypeEquals(BaseType.STRING);
279                outputChannelPortParam.getPort().setTypeEquals(BaseType.STRING);
280                
281                sinkModeInputParam = new Parameter(this, "Sink Mode");
282                sinkModeInputParam.setStringMode(true);
283                sinkModeInputParam.setTypeEquals(BaseType.STRING);
284                sinkModeInputParam.addChoice(SINKMODE_REQUEST);
285                sinkModeInputParam.addChoice(SINKMODE_MONITOR);
286                sinkModeInputParam.addChoice(SINKMODE_SUBSCRIBE);
287                sinkModeInputParam.setExpression(SINKMODE_REQUEST);
288                
289                startTimePortParam = new PortParameter(this,
290                                "Start Time (for Request or Subscribe modes)", 
291                                new StringToken(""));
292                startTimePortParam.setStringMode(true);
293                startTimePortParam.setTypeEquals(BaseType.STRING);
294                startTimePortParam.getPort().setTypeEquals(BaseType.STRING);
295                
296                durationPortParam = new PortParameter(this,
297                                "Duration (for Request or Subscribe modes)",
298                                new StringToken(""));
299                durationPortParam.setStringMode(true);
300                durationPortParam.setTypeEquals(BaseType.STRING);
301                durationPortParam.getPort().setTypeEquals(BaseType.STRING);
302                
303                referenceInputParam = new Parameter(this,
304                                "Reference (for Request or Subscribe modes)");
305                referenceInputParam.setStringMode(true);
306                referenceInputParam.setTypeEquals(BaseType.STRING);
307                referenceInputParam.addChoice("absolute");
308                referenceInputParam.addChoice("newest");
309                referenceInputParam.addChoice("oldest");
310                referenceInputParam.addChoice("aligned   (Request mode only)");
311                referenceInputParam.addChoice("after   (Request mode only)");
312                referenceInputParam.addChoice("modified   (Request mode only)");
313                referenceInputParam.addChoice("next");
314                referenceInputParam.addChoice("previous");
315                referenceInputParam.setExpression("absolute");
316                blockTimeoutParam = new Parameter(this,
317                                "Block Timeout (ms) (for Fetch)", new IntToken(15000));
318
319                // note: keep this outputRecordOfArrays param initialization after
320                // RBNBurlInputParam - this will avoid relation-breaking problem 
321                // during reload events.
322                // outputRecordOfArrays:
323                //    RECORD_OF_2_ARRAYS - each output port will output a record of 
324                //                                                 2 arrays (data and timestamps)
325                //    ARRAY_OF_X_RECORDS - each output port will output an array of
326                //                                                 records (each record a data and timestamp)
327                outputRecordOfArrays = new Parameter(this, "Output Data Type");
328                outputRecordOfArrays.setStringMode(true);
329                outputRecordOfArrays.setTypeEquals(BaseType.STRING);
330                outputRecordOfArrays.addChoice(RECORD_OF_2_ARRAYS);
331                outputRecordOfArrays.addChoice(ARRAY_OF_X_RECORDS);
332                outputRecordOfArrays.setExpression(RECORD_OF_2_ARRAYS);
333
334                tryToPad = new Parameter(this, "Pad data gaps with nils");
335                tryToPad.setTypeEquals(BaseType.BOOLEAN);
336                tryToPad.setExpression("false");
337
338                _sink = new Sink();
339                _map = new ChannelMap();
340                _registrationMap = new ChannelMap();
341
342                _attachText("_iconDescription", "<svg>\n" + "<rect x=\"0\" y=\"0\" "
343                                + "width=\"60\" height=\"20\" " + "style=\"fill:white\"/>\n"
344                                + "</svg>\n");
345        }
346        
347
348        /**
349         * Send the token in the value parameter to the output.
350         * 
351         * @exception IllegalActionException
352         *                If it is thrown by the send() method sending out the
353         *                token.
354         */
355        @Override
356    public void fire() throws IllegalActionException {
357                super.fire();
358
359                outputRecOfArrays = ((StringToken)outputRecordOfArrays.getToken()).stringValue();
360
361                paddingOn = ((BooleanToken) tryToPad.getToken()).booleanValue();
362                willPad = paddingOn;
363
364                _url = ((StringToken)dataTurbineAddressInputParam.getToken()).stringValue();
365                _url = _url.replaceAll("\"", "");
366
367                _blockTimeout = Integer.parseInt(blockTimeoutParam.getToken()
368                                .toString());
369
370                _sinkMode = ((StringToken)sinkModeInputParam.getToken()).stringValue();
371
372                if (!(_sinkMode.equals(SINKMODE_MONITOR) || _sinkMode.equals(SINKMODE_REQUEST) || _sinkMode
373                                .equals(SINKMODE_SUBSCRIBE))) {
374                        throw new IllegalActionException(this,
375                                        "Error. sinkMode must be Monitor, Request or Subscribe.");
376                }
377
378                if ((_sinkMode.equals(SINKMODE_SUBSCRIBE) || _sinkMode
379                                .equals(SINKMODE_REQUEST))) {
380                        startTimePortParam.update();
381                        _startTime = ((StringToken)startTimePortParam.getToken()).stringValue();
382                        _startTime = _startTime.replaceAll("\"", "");
383                        
384                        durationPortParam.update();
385                        _duration = ((StringToken)durationPortParam.getToken()).stringValue();
386                        _duration = _duration.replaceAll("\"", "");
387                        _reference = ((StringToken)referenceInputParam.getToken()).stringValue();
388                        _reference = _reference.replaceAll("\\s*\\(Request mode only\\)", "");
389                        
390                        if (_startTime == null || _startTime.equals("")) {
391                                throw new IllegalActionException(this,
392                                                "DataTurbine actor must specify a Start Time for "+ _sinkMode + " Sink Mode.");
393                        }
394                        if (_duration == null || _duration.equals("")) {
395                                throw new IllegalActionException(this,
396                                                "DataTurbine actor must specify a Duration for "+ _sinkMode + " Sink Mode.");
397                        }
398                        if (_reference == null || _reference.equals("")) {
399                                throw new IllegalActionException(this,
400                                                "DataTurbine actor must specify a Reference for "+ _sinkMode + " Sink Mode.");
401                        }
402                        
403                        try {
404                                startDate = format.parse(_startTime);
405                                _startTimeDouble = startDate.getTime() / 1000;
406                        } catch (ParseException pe) {
407                                //throw new IllegalActionException(this, "ParseException " + pe);
408                                // allow users to also specify startTime as seconds, since this
409                                // is more natural when using "newest"
410                                try{
411                                        _startTimeDouble = new Double(_startTime);
412                                }
413                                catch(NumberFormatException nfe){
414                                        throw new IllegalActionException(this, "Start Time must be number (seconds) or Date formatted: "+pattern);
415                                }
416                        }
417                }
418                
419                outputChannelPortParam.update();
420
421                // count connected output ports
422                int itr = 0;
423                Iterator<?> q = this.outputPortList().iterator();
424                while (q.hasNext()) {
425                        TypedIOPort port = (TypedIOPort) q.next();
426                        if (port.numberOfSinks() > 0) {
427                                itr++;
428                        }
429                }
430                _connectedOutputPortNames = new String[itr];
431
432                // gather connected output port names
433                itr = 0;
434                q = this.outputPortList().iterator();
435                while (q.hasNext()) {
436                        TypedIOPort port = (TypedIOPort) q.next();
437                        if (port.numberOfSinks() > 0) {
438                                _connectedOutputPortNames[itr] = port.getName();
439                                itr++;
440                        }
441                }
442
443                if (_sinkMode.equals(SINKMODE_REQUEST) || _sinkMode.equals(SINKMODE_SUBSCRIBE)) {
444                        _durationDouble = Double.parseDouble(_duration);
445                }
446
447                _icon.setBusy();
448
449                // connect to dataturbine.
450                try {
451                        openDataTurbine();
452                } catch (Exception e) {
453                        throw new IllegalActionException(this,
454                                        "ERROR opening DataTurbine connection from fire()");
455                }
456
457                // get dataturbine metadata.
458                getDataTurbineInfo();
459
460                try {
461                        
462                        // output the channel names
463                        outputChannelNames();
464                        
465                        // add requested channels to userMap.
466                        ChannelMap userChanMap = getUserChannelMap(_connectedOutputPortNames);
467                        if (userChanMap.NumberOfChannels() > 0) {
468                                if (_sinkMode.equals(SINKMODE_REQUEST)) {
469                                        // request data in userMap.
470                                        // System.out.println("----- about to sink.Request(userChanMap, "
471                                        // + _startTimeDouble + ", " + _durationDouble + ", "
472                                        // + _reference + ") -----");
473                                        _sink.Request(userChanMap, _startTimeDouble,
474                                                        _durationDouble, _reference);
475
476                                        // fetch data in userMap.
477                                        // System.out.println("----- fetching data, using timeout:"
478                                        // + _blockTimeout + " -----");
479                                        _sink.Fetch(_blockTimeout, userChanMap);
480
481                                        // send data out output ports.
482                                        // System.out.println("----- about to call outputData -----");
483                                        outputData(userChanMap);
484                                } else if (_sinkMode.equals(SINKMODE_MONITOR)) {
485                                        // setup Monitor
486                                        // System.out.println("----- about to sink.Monitor(userChanMap, "
487                                        // + gapControl + ") -----");
488                                        _sink.Monitor(userChanMap, gapControl);
489                                        do {
490                                                // fetch data in userMap.
491                                                // System.out.println("----- fetching data, using timeout:"
492                                                // + _blockTimeout + " -----");
493                                                _sink.Fetch(_blockTimeout, userChanMap);
494
495                                                // send data out output ports.
496                                                // System.out.println("----- about to call outputData -----");
497                                                outputData(userChanMap);
498                                        } while (!_stopRequested.get());
499                                } else if (_sinkMode.equals(SINKMODE_SUBSCRIBE)) {
500                                        // setup Subscribe
501                                        // System.out
502                                        // .println("----- about to sink.Subscribe(userChanMap, "
503                                        // + _startTimeDouble + ", " + _durationDouble
504                                        // + ", " + _reference + ") -----");
505                                        _sink.Subscribe(userChanMap, _startTimeDouble,
506                                                        _durationDouble, _reference);
507                                        do {
508                                                // fetch data in userMap.
509                                                // System.out.println("----- fetching data, using timeout:"
510                                                // + _blockTimeout + " -----");
511                                                _sink.Fetch(_blockTimeout, userChanMap);
512
513                                                // send data out output ports.
514                                                // System.out.println("----- about to call outputData -----");
515                                                outputData(userChanMap);
516                                        } while (!_stopRequested.get());
517                                }
518                        }
519
520                        _icon.setReady();
521                } catch (SAPIException sapie) {
522                    // ignore errors if we've stopped.
523                    if(! _stopRequested.get())
524                    {
525                        log.error("DataTurbine actor Error: during fire:" + sapie);
526                        _icon.setReady();
527                        log.debug("disconnect from DataTurbine");
528                        _sink.CloseRBNBConnection();
529                        throw new IllegalActionException(this, sapie, "Error during fire()");
530                    }
531                } catch (Exception e){
532                        log.error("DataTurbine actor Error: during fire:" + e);
533                        _icon.setReady();
534                        log.debug("disconnect from DataTurbine");
535                        _sink.CloseRBNBConnection();
536                        throw new IllegalActionException(this, e, "Error during fire()");
537                } 
538
539                // disconnect from DataTurbine in wrapup() so same connection can be shared
540                // across a workflow execution.
541        }// fire
542                
543        /** Reset the stop requested boolean. */
544        @Override
545    public void preinitialize() throws IllegalActionException {
546            super.preinitialize();
547            _stopRequested.set(false);
548        }
549        
550        /** The director told us to stop firing immediately. */
551        @Override
552    public void stop() {
553                super.stop();
554        _stopRequested.set(true);
555                log.debug("disconnect from DataTurbine");
556        _sink.CloseRBNBConnection();
557        }
558        
559        /**
560         * Sets all the object variables by gathering info from the RBNB server.
561         * (fire() and attributeChanged() should probably be the only methods to
562         * call getDataTurbineInfo.)
563         * 
564         * @throws IllegalActionException
565         */
566        private boolean getDataTurbineInfo() throws IllegalActionException {
567
568                int chanType = 0;
569                // filtered lists will exclude channels with duration 0
570                // and those with names that begin with an underscore
571                int filteredListLength = 0;
572
573                try{
574                        openDataTurbine();
575
576                        _map.Clear();
577                        _registrationMap.Clear();
578
579                        _sink.RequestRegistration(_registrationMap);
580                        _sink.Fetch(_blockTimeout, _registrationMap);
581                        _numChans = _registrationMap.NumberOfChannels();
582                        _chanNames = new String[_numChans];
583                        _chanTypes = new String[_numChans];
584                        _chanDurations = new double[_numChans];
585                        _chanMimeTypes = new String[_numChans];
586                        
587                        _userInfoTypesMap.clear();
588                        
589                        // Determine length of the filtered list. Add all chans to _map. Set
590                        // _chanNames[]
591                        for (int i = 0; i < _numChans; i++) {
592                                _chanNames[i] = _registrationMap.GetName(i);
593                                _chanDurations[i] = _registrationMap.GetTimeDuration(i);
594                                chanType = _registrationMap.GetType(i);
595                                _chanTypes[i] = _registrationMap.TypeName(chanType);
596                                _chanMimeTypes[i] = _registrationMap.GetMime(i);
597                                
598                                String userInfo = _registrationMap.GetUserInfo(i);
599                                if(! userInfo.isEmpty()) {
600                        Matcher matcher = DataTurbineWriter.PTOLEMY_TYPE_PATTERN.matcher(userInfo);
601                        if(matcher.matches()) {
602                            _userInfoTypesMap.put(_chanNames[i], matcher.group(1));
603                        }
604                                }
605                                    
606                                if (_chanDurations[i] < 0 || _chanNames[i].matches("_.*")) {
607                                        // System.out.println("WARNING: chan " + i + " named: " +
608                                        // _chanNames[i] + " duration: " + _chanDurations[i] +
609                                        // " will not be added");
610                                } else {
611                                        _map.Add(_chanNames[i]);
612                                        // System.out.println("added chan " + i + ": " +
613                                        // _chanNames[i] + " _chanType: " + _chanTypes[i] +
614                                        // "_chanDuration: " + _chanDurations[i]);
615                                        filteredListLength++;
616                                }
617                        }
618
619                        _filteredChanNames = new String[filteredListLength];
620                        _filteredChanTypes = new String[filteredListLength];
621                        _filteredChanDurations = new double[filteredListLength];
622                        _filteredChanMimeTypes = new String[filteredListLength];
623                        _filteredChanStartTimes = new double[filteredListLength];
624
625                        // Just get one small chunk of data so we are able to check the data
626                        // types. If server contains no non-metrics channels, do not make
627                        // (illegal) empty request.
628                        if (filteredListLength > 0) {
629                                _sink.Request(_map, 0, 1, "oldest");
630
631                                // Fetch data. Channels with duration 0 will not be returned.
632                                _sink.Fetch(_blockTimeout, _map);
633                                _numChans2 = _map.NumberOfChannels();
634
635                                boolean didfetchtimeout = _map.GetIfFetchTimedOut();
636                                if (didfetchtimeout) {
637                                        log
638                                                        .warn("WARNING: fetch timed out. Try increasing the blockTimeOut value");
639                                        if (filteredListLength != _numChans2) {
640                                                log.error("DataTurbine actor Error: only " + _numChans2
641                                                                + " channels were returned");
642                                                // throw error
643                                        }
644                                }
645
646                                Double smallestDuration = null;
647                                Double earliestStartTime = null;
648                                for (int i = 0; i < _numChans2; i++) {
649                                        chanType = _map.GetType(i);
650                                        _filteredChanTypes[i] = _map.TypeName(chanType);
651                                        _filteredChanNames[i] = _map.GetName(i);
652                                        _filteredChanDurations[i] = _map.GetTimeDuration(i);
653                                        // TODO: durations apparently change after a fetch as well?
654                                        // do they really change to sample duration?
655                                        _filteredChanMimeTypes[i] = _map.GetMime(i);
656                                        _filteredChanStartTimes[i] = _map.GetTimeStart(i);
657                                        
658                                        if (i == 0){
659                                                smallestDuration = _filteredChanDurations[i];
660                                                earliestStartTime = _filteredChanStartTimes[i];
661                                        }
662                                        else{
663                                                if (_filteredChanDurations[i] < smallestDuration){
664                                                        smallestDuration = _filteredChanDurations[i];
665                                                }
666                                                if (_filteredChanStartTimes[i] < earliestStartTime){
667                                                        earliestStartTime = _filteredChanStartTimes[i];
668                                                }
669                                        }
670                                        
671                                        //System.out.println("_filteredChanNames: " +
672                                        //_filteredChanNames[i] + " _filteredChanType: " +
673                                        //_filteredChanTypes[i] + " _filteredChanDuration: " +
674                                        //_filteredChanDurations[i] + " _filteredChanStartTime: " + 
675                                        //_filteredChanStartTimes[i]);
676                                }
677                                
678                                // try to be helpful, if blank, set duration & startTime to smallest & earliest
679                                if (smallestDuration != null){
680                                        if (((StringToken)durationPortParam.getToken()).stringValue().equals("")){
681                                                durationPortParam.setToken(smallestDuration.toString());
682                                                _duration = smallestDuration.toString();
683                                        }
684                                }
685                                if (earliestStartTime != null){
686                                        if (((StringToken)startTimePortParam.getToken()).stringValue().equals("")){
687                                                long l = (new Double(earliestStartTime)).longValue();
688                                                Date earliestDate = new Date(l*1000);
689                                                String startDateString = format.format(earliestDate);
690                                                startTimePortParam.setToken(startDateString);
691                                                _startTime = earliestStartTime.toString();
692                                        }
693                                }
694                                                
695                        }
696
697                        return true;
698                } catch (Exception e) {
699                        throw new IllegalActionException(this, e,
700                                        "Problem opening DataTurbine connection in getDataTurbineInfo()");
701                }
702
703        }
704
705        /**
706         * Return a channel map that contains only channels with duration >=0 and
707         * that were requested.
708         * 
709         * @return chanMap The ChannelMap.
710         */
711        public ChannelMap getUserChannelMap(String[] requestedChannels)
712                        throws SAPIException {
713
714                ChannelMap chanMap = new ChannelMap();
715
716                Iterator<?> i = this.outputPortList().iterator();
717                while (i.hasNext()) {
718                        TypedIOPort port = (TypedIOPort) i.next();
719                        String outputPortName = port.getName();
720                        for (int j = 0; j < requestedChannels.length; j++) {
721                                if (outputPortName.equals(requestedChannels[j])) {
722                                        if (outputPortName.equals(SPECIFIC_CHANNEL)
723                                                        && _specifiedOutputChannel != null) {
724                                                chanMap.Add(_specifiedOutputChannel);
725                                        } else {
726                                                // if channel is already present, its current index is
727                                                // returned, and no other action is taken.
728                                                chanMap.Add(outputPortName);
729                                        }
730                                        break;
731                                }
732                        }
733                }
734
735                return chanMap;
736        }
737
738        /**
739         * Get the server name and port string.
740         * 
741         * @return server URL
742         */
743        public String getServer() {
744                return _url;
745        }
746
747        /**
748         * Get the name of this DataTurbine client.
749         * 
750         * @return client name
751         */
752        public String getRBNBClientName() {
753                return _rbnbClientName;
754        }
755
756        /**
757         * Output the channel names through the CHANNEL_NAMES_OUTPUT_PORT
758         * as an ArrayToken.
759         */
760        public void outputChannelNames(){
761                Iterator<?> j = this.outputPortList().iterator();
762                while (j.hasNext()) {
763                        TypedIOPort port = (TypedIOPort) j.next();
764                        String currPortName = port.getName();
765                        if (currPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){
766                                for (int i=0; i< _connectedOutputPortNames.length; i++){
767                                        if (_connectedOutputPortNames[i].equals(CHANNEL_NAMES_OUTPUT_PORT)){
768                                                String chanNames = "{";
769                                                for (int h=0; h < _filteredChanNames.length; h++){
770                                                        if (h != _filteredChanNames.length-1){
771                                                                chanNames = chanNames.concat("\""+ _filteredChanNames[h] + "\"" + ",");
772                                                        }
773                                                        else{
774                                                                chanNames = chanNames.concat("\""+ _filteredChanNames[h] + "\"");
775                                                        }
776                                                }
777                                                chanNames = chanNames.concat("}");
778                                                ArrayToken at;
779                                                try {
780                                                        at = new ArrayToken(chanNames);
781                                                        port.send(0, at);
782                                                } catch (IllegalActionException e) {
783                                                        // TODO Auto-generated catch block
784                                                        e.printStackTrace();
785                                                }
786                                        }
787                                }
788                        }
789                }
790                
791        }
792        
793        /**
794         * Push the data out the output ports.
795         * 
796         * @param ChannelMap
797         */
798        public void outputData(ChannelMap cmap) throws IllegalActionException {
799
800                // TODO: check if we should be accessing the map in outputData, or
801                // instead referencing eg _chanNames
802
803                // The RBNB data types:
804                // UNKNOWN = 0.
805                // TYPE_FROM_INPUT = 1. Take the data type from the input data payload.
806                // The resulting data type value will be one of the other types.
807                // TYPE_BOOLEAN = 2. Boolean (8-bit, one byte)
808                // TYPE_INT8 = 3. 8-bit (one byte) signed integer data type
809                // TYPE_INT16 = 4. 16-bit (two byte) signed integer data type.
810                // TYPE_INT32 = 5; 32-bit (four byte) signed integer data type.
811                // TYPE_INT64 = 6. 64-bit (eight byte) signed integer data type
812                // TYPE_FLOAT32 = 7. 32-bit (four byte) floating point data type
813                // TYPE_FLOAT64 = 8. 64-bit (eight byte) floating point data type.
814                // TYPE_STRING = 9. Character string data type.
815                // TYPE_BYTEARRAY = 10. Byte array data type.
816                // TYPE_USER = 11. User registration type.
817
818                int outputPortListSize = this.outputPortList().size();
819
820                // When RBNB returns no channels, output NIL tokens out every port
821                if (cmap.NumberOfChannels() == 0) {
822                        // System.out
823                        // .println("* outputting nils * - Your request returned no channels");
824                        Iterator<?> j = this.outputPortList().iterator();
825                        while (j.hasNext()) {
826                                TypedIOPort port = (TypedIOPort) j.next();
827                                //String currPortName = port.getName();
828                                // System.out.println("* outputting nils * - Your request returned no data for channel: "
829                                // + currPortName);
830                                outputNils(port);
831                        }
832                } else {
833                        // When only some channels have data, output NILs on those
834                        // channels that have no data.
835                        boolean[] portHasData = new boolean[outputPortListSize];
836                        Iterator<?> z = this.outputPortList().iterator();
837                        int h = -1;
838                        while (z.hasNext()) {
839                                h++;
840                                TypedIOPort port = (TypedIOPort) z.next();
841                                String outputPortName = port.getName();
842                                portHasData[h] = false; // initialize false
843                                
844                                int index = -1;
845                                //for the specificChannel output port we use the index of the specified channel
846                                if (outputPortName.equals(SPECIFIC_CHANNEL) && _specifiedOutputChannel != null){
847                                        index = cmap.GetIndex(_specifiedOutputChannel);
848                                }
849                                else{
850                                        index = cmap.GetIndex(outputPortName);
851                                }
852                                if (index != -1){
853                                        portHasData[h] = true;
854                                        int chanType = cmap.GetType(index);
855                                        String chanTypeName = cmap.TypeName(chanType);
856
857                                        double[] someTimes = cmap.GetTimes(index);
858                                        Token someTimeTokens[] = new Token[someTimes.length];
859
860                                        for (int j = 0; j < someTimeTokens.length; j++) {
861                                                someTimeTokens[j] = new DoubleToken(someTimes[j]);
862                                        }
863
864                                        Token someDataTokens[] = new Token[someTimes.length];
865                                        Token paddedTimeTokens[] = null;
866                                        Token paddedDataTokens[] = null;
867
868                                        Vector<Object> timeAndDataVector = new Vector<Object>();
869                                        Vector<Object> vectorOfArrays = new Vector<Object>();
870
871                                        if (chanTypeName.equals("Float32")) {
872                                                float[] someData = cmap.GetDataAsFloat32(index);
873                                                checkLengths(someData.length, someTimes.length);
874
875                                                if (paddingOn && someData.length == 1) {
876                                                        willPad = false;
877                                                        log
878                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
879                                                }
880
881                                                if (willPad) {
882                                                        for (int j = 0; j < someData.length; j++) {
883                                                                timeAndDataVector.add(someTimes[j]);
884                                                                timeAndDataVector.add(someData[j]);
885                                                        }
886                                                        vectorOfArrays = doPadding(timeAndDataVector,
887                                                                        chanTypeName);
888                                                        paddedTimeTokens = (Token[]) vectorOfArrays
889                                                                        .get(0);
890                                                        paddedDataTokens = (Token[]) vectorOfArrays
891                                                                        .get(1);
892                                                } else {
893                                                        for (int j = 0; j < someData.length; j++) {
894                                                                someDataTokens[j] = new FloatToken(
895                                                                                someData[j]);
896                                                        }
897                                                }
898
899                                        } else if (chanTypeName.equals("Float64")) {
900                                                double[] someData = cmap.GetDataAsFloat64(index);
901                                                checkLengths(someData.length, someTimes.length);
902                                                if (paddingOn && someData.length == 1) {
903                                                        willPad = false;
904                                                        log
905                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
906                                                }
907
908                                                if (willPad) {
909                                                        for (int j = 0; j < someData.length; j++) {
910                                                                timeAndDataVector.add(someTimes[j]);
911                                                                timeAndDataVector.add(someData[j]);
912                                                        }
913                                                        vectorOfArrays = doPadding(timeAndDataVector,
914                                                                        chanTypeName);
915                                                        paddedTimeTokens = (Token[]) vectorOfArrays
916                                                                        .get(0);
917                                                        paddedDataTokens = (Token[]) vectorOfArrays
918                                                                        .get(1);
919                                                } else {
920                                                        for (int j = 0; j < someData.length; j++) {
921                                                                someDataTokens[j] = new DoubleToken(
922                                                                                someData[j]);
923                                                        }
924                                                }
925                                        } else if (chanTypeName.equals("String")) {
926                                                String[] someData = cmap.GetDataAsString(index);
927                                                checkLengths(someData.length, someTimes.length);
928                                                if (paddingOn && someData.length == 1) {
929                                                        willPad = false;
930                                                        log
931                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
932                                                }
933
934                                                if (willPad) {
935                                                        for (int j = 0; j < someData.length; j++) {
936                                                                timeAndDataVector.add(someTimes[j]);
937                                                                timeAndDataVector.add(someData[j]);
938                                                        }
939                                                        vectorOfArrays = doPadding(timeAndDataVector,
940                                                                        chanTypeName);
941                                                        paddedTimeTokens = (Token[]) vectorOfArrays
942                                                                        .get(0);
943                                                        paddedDataTokens = (Token[]) vectorOfArrays
944                                                                        .get(1);
945                                                } else {
946                                                    String userInfoType = _userInfoTypesMap.get(outputPortName);
947                                                        for (int j = 0; j < someData.length; j++) {
948                                                            // see if the user info field contained a ptolemy type
949                                                            // if not, then put into string token
950                                                            if(userInfoType == null) {
951                                                                someDataTokens[j] = new StringToken(
952                                                                        someData[j]);
953                                                            } else if(userInfoType.equals("[int]")) {
954                                                                someDataTokens[j] = new IntMatrixToken(someData[j]);
955                                } else if(userInfoType.equals("[double]")) {
956                                    someDataTokens[j] = new DoubleMatrixToken(someData[j]);
957                                                            } else {
958                                                                MessageHandler.error("Channel " + outputPortName + 
959                                                                    " has unsupported ptolemy type: " + userInfoType);
960                                                            }
961                                                        }
962                                                }
963                                        } else if (chanTypeName.equals("Int8")) {
964                                                byte[] someData = cmap.GetDataAsInt8(index);
965                                                checkLengths(someData.length, someTimes.length);
966                                                if (paddingOn && someData.length == 1) {
967                                                        willPad = false;
968                                                        log
969                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
970                                                }
971
972                                                if (willPad) {
973                                                        for (int j = 0; j < someData.length; j++) {
974                                                                timeAndDataVector.add(someTimes[j]);
975                                                                timeAndDataVector.add(someData[j]);
976                                                        }
977                                                        vectorOfArrays = doPadding(timeAndDataVector,
978                                                                        chanTypeName);
979                                                        paddedTimeTokens = (Token[]) vectorOfArrays
980                                                                        .get(0);
981                                                        paddedDataTokens = (Token[]) vectorOfArrays
982                                                                        .get(1);
983                                                } else {
984                                                        for (int j = 0; j < someData.length; j++) {
985                                                                someDataTokens[j] = new UnsignedByteToken(
986                                                                                someData[j]);
987                                                        }
988                                                }
989                                        } else if (chanTypeName.equals("Int16")) {
990                                                short[] someData = cmap.GetDataAsInt16(index);
991                                                checkLengths(someData.length, someTimes.length);
992                                                if (paddingOn && someData.length == 1) {
993                                                        willPad = false;
994                                                        log
995                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
996                                                }
997
998                                                if (willPad) {
999                                                        for (int j = 0; j < someData.length; j++) {
1000                                                                timeAndDataVector.add(someTimes[j]);
1001                                                                timeAndDataVector.add(someData[j]);
1002                                                        }
1003                                                        vectorOfArrays = doPadding(timeAndDataVector,
1004                                                                        chanTypeName);
1005                                                        paddedTimeTokens = (Token[]) vectorOfArrays
1006                                                                        .get(0);
1007                                                        paddedDataTokens = (Token[]) vectorOfArrays
1008                                                                        .get(1);
1009                                                } else {
1010                                                        for (int j = 0; j < someData.length; j++) {
1011                                                                someDataTokens[j] = new ShortToken(
1012                                                                                someData[j]);
1013                                                        }
1014                                                }
1015                                        } else if (chanTypeName.equals("Int32")) {
1016                                                int[] someData = cmap.GetDataAsInt32(index);
1017                                                checkLengths(someData.length, someTimes.length);
1018                                                if (paddingOn && someData.length == 1) {
1019                                                        willPad = false;
1020                                                        log
1021                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
1022                                                }
1023
1024                                                if (willPad) {
1025                                                        for (int j = 0; j < someData.length; j++) {
1026                                                                timeAndDataVector.add(someTimes[j]);
1027                                                                timeAndDataVector.add(someData[j]);
1028                                                        }
1029                                                        vectorOfArrays = doPadding(timeAndDataVector,
1030                                                                        chanTypeName);
1031                                                        paddedTimeTokens = (Token[]) vectorOfArrays
1032                                                                        .get(0);
1033                                                        paddedDataTokens = (Token[]) vectorOfArrays
1034                                                                        .get(1);
1035                                                } else {
1036                                                        for (int j = 0; j < someData.length; j++) {
1037                                                                someDataTokens[j] = new IntToken(
1038                                                                                someData[j]);
1039                                                        }
1040                                                }
1041                                        } else if (chanTypeName.equals("Int64")) {
1042                                                long[] someData = cmap.GetDataAsInt64(index);
1043                                                checkLengths(someData.length, someTimes.length);
1044                                                if (paddingOn && someData.length == 1) {
1045                                                        willPad = false;
1046                                                        log
1047                                                                        .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1.");
1048                                                }
1049
1050                                                if (willPad) {
1051                                                        for (int j = 0; j < someData.length; j++) {
1052                                                                timeAndDataVector.add(someTimes[j]);
1053                                                                timeAndDataVector.add(someData[j]);
1054                                                        }
1055                                                        vectorOfArrays = doPadding(timeAndDataVector,
1056                                                                        chanTypeName);
1057                                                        paddedTimeTokens = (Token[]) vectorOfArrays
1058                                                                        .get(0);
1059                                                        paddedDataTokens = (Token[]) vectorOfArrays
1060                                                                        .get(1);
1061                                                } else {
1062                                                        for (int j = 0; j < someData.length; j++) {
1063                                                                someDataTokens[j] = new LongToken(
1064                                                                                someData[j]);
1065                                                        }
1066                                                }
1067                                        }
1068                                        if (!chanTypeName.equals("ByteArray")) {
1069                                                // System.out
1070                                                // .println("about to send dataTokens out of port: "
1071                                                // + currPortName);
1072                                                try {
1073                                                        if (outputRecOfArrays
1074                                                                        .equals(RECORD_OF_2_ARRAYS)) {
1075
1076                                                                if (willPad) {
1077                                                                        ArrayToken atSomeTimeTokens = new ArrayToken(
1078                                                                                        paddedTimeTokens);
1079                                                                        ArrayToken atSomeDataTokens = new ArrayToken(
1080                                                                                        paddedDataTokens);
1081                                                                        Token[] dataValues = {
1082                                                                                        atSomeTimeTokens,
1083                                                                                        atSomeDataTokens };
1084                                                                        RecordToken recToken = new RecordToken(
1085                                                                                        labels, dataValues);
1086                                                                        port.send(0, recToken);
1087                                                                } else {
1088                                                                        ArrayToken atSomeTimeTokens = new ArrayToken(
1089                                                                                        someTimeTokens);
1090                                                                        ArrayToken atSomeDataTokens = new ArrayToken(
1091                                                                                        someDataTokens);
1092                                                                        Token[] dataValues = {
1093                                                                                        atSomeTimeTokens,
1094                                                                                        atSomeDataTokens };
1095                                                                        RecordToken recToken = new RecordToken(
1096                                                                                        labels, dataValues);
1097                                                                        port.send(0, recToken);
1098                                                                }
1099                                                        } else {
1100
1101                                                                if (willPad) {
1102                                                                        RecordToken[] recTokens = new RecordToken[paddedTimeTokens.length];
1103                                                                        for (int k = 0; k < paddedTimeTokens.length; k++) {
1104                                                                                values[0] = paddedTimeTokens[k];
1105                                                                                values[1] = paddedDataTokens[k];
1106                                                                                recTokens[k] = new RecordToken(
1107                                                                                                labels, values);
1108                                                                        }
1109                                                                        ArrayToken at = new ArrayToken(
1110                                                                                        recTokens);
1111                                                                        port.send(0, at);
1112                                                                } else {
1113                                                                        RecordToken[] recTokens = new RecordToken[someTimeTokens.length];
1114                                                                        for (int k = 0; k < someTimeTokens.length; k++) {
1115                                                                                values[0] = someTimeTokens[k];
1116                                                                                values[1] = someDataTokens[k];
1117                                                                                recTokens[k] = new RecordToken(
1118                                                                                                labels, values);
1119                                                                        }
1120                                                                        ArrayToken at = new ArrayToken(
1121                                                                                        recTokens);
1122                                                                        port.send(0, at);
1123                                                                }
1124                                                        }
1125                                                } catch (Exception e) {
1126                                                        throw new IllegalActionException(this, e,
1127                                                                        "Exception trying to send dataTokens"
1128                                                                                        + " out of port: "
1129                                                                                        + outputPortName);
1130
1131                                                }
1132                                        } else if (chanTypeName.equals("ByteArray")) { 
1133                                                // TODO: padding for byte channel
1134                                                byte[][] someData = cmap.GetDataAsByteArray(index);
1135                                                Token[][] someByteDataTokens = new Token[someData.length][];
1136                                                checkLengths(someData.length, someTimes.length);
1137
1138                                                for (int j = 0; j < someData.length; j++) {
1139                                                        someByteDataTokens[j] = new Token[someData[j].length];
1140                                                        // System.out.println("debug Tokens: someData["
1141                                                        // + j + "]: " + someData[j] + " someTimes[" + j
1142                                                        // + "]: " +someTimes[j]);
1143                                                        for (int k = 0; k < someData[j].length; k++) {
1144                                                                try {
1145                                                                        someByteDataTokens[j][k] = new UnsignedByteToken(
1146                                                                                        someData[j][k]);
1147                                                                } catch (Exception e) {
1148                                                                        throw new IllegalActionException(this,
1149                                                                                        e,
1150                                                                                        "Exception trying to create and assign an UnsignedByteToken.");
1151                                                                }
1152                                                        }
1153                                                }
1154                                                try {
1155                                                        if (outputRecOfArrays
1156                                                                        .equals(RECORD_OF_2_ARRAYS)) {
1157                                                                ArrayToken[] atSomeDataTokens = new ArrayToken[someTimeTokens.length];
1158                                                                for (int k = 0; k < someTimeTokens.length; k++) {
1159                                                                        atSomeDataTokens[k] = new ArrayToken(
1160                                                                                        someByteDataTokens[k]);
1161                                                                }
1162                                                                ArrayToken atSomeTimeTokens = new ArrayToken(
1163                                                                                someTimeTokens);
1164                                                                ArrayToken atatSomeDataTokens = new ArrayToken(
1165                                                                                atSomeDataTokens);
1166                                                                Token[] dataValues = { atSomeTimeTokens,
1167                                                                                atatSomeDataTokens };
1168                                                                RecordToken recToken = new RecordToken(
1169                                                                                labels, dataValues);
1170                                                                port.send(0, recToken);
1171                                                        } else {
1172                                                                RecordToken[] recTokens = new RecordToken[someTimeTokens.length];
1173                                                                for (int k = 0; k < someTimeTokens.length; k++) {
1174                                                                        values[0] = someTimeTokens[k];
1175                                                                        values[1] = new ArrayToken(
1176                                                                                        someByteDataTokens[k]);
1177                                                                        recTokens[k] = new RecordToken(labels,
1178                                                                                        values);
1179                                                                }
1180                                                                ArrayToken at = new ArrayToken(recTokens);
1181                                                                port.send(0, at);
1182                                                        }
1183                                                } catch (Exception e) {
1184                                                        throw new IllegalActionException(this, e,
1185                                                                        "Exception trying to send out port 0.");
1186                                                }
1187                                        }
1188                                }
1189                                if (!portHasData[h]) {
1190                                        if (!outputPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){
1191                                                        //System.out.println("* outputting nils * - Your request returned no data for channel: "
1192                                                        //              + currPortName);
1193                                                        outputNils(port);
1194                                        }
1195                                }
1196                        } // end while z
1197
1198                }
1199
1200        } // outputData()
1201
1202        /**
1203         * 
1204         * @param timeAndDataVector
1205         * @param chanTypeName
1206         * @return
1207         */
1208        public Vector<Object> doPadding(Vector<Object> timeAndDataVector,
1209                        String chanTypeName) {
1210
1211                // System.out.println("Will try to pad any gaps...");
1212
1213                Token paddedTimeTokens[] = null;
1214                Token paddedDataTokens[] = null;
1215
1216                timeAndDataVector = fillGaps(timeAndDataVector);
1217                paddedTimeTokens = new Token[timeAndDataVector.size() / 2];
1218                paddedDataTokens = new Token[timeAndDataVector.size() / 2];
1219
1220                int itr = -1;
1221                for (int j = 0; j < timeAndDataVector.size() - 1; j += 2) {
1222                        itr++;
1223                        paddedTimeTokens[itr] = new DoubleToken((Double) timeAndDataVector
1224                                        .get(j));
1225
1226                        // TODO: surely there's a better way of doing this:
1227                        String t = timeAndDataVector.get(j + 1).toString();
1228                        if (t.equals("nil")) {
1229                                paddedDataTokens[itr] = Token.NIL;
1230                        } else {
1231                                if (chanTypeName.equals("Float32")) {
1232                                        paddedDataTokens[itr] = new FloatToken(
1233                                                        (Float) timeAndDataVector.get(j + 1));
1234                                } else if (chanTypeName.equals("Float64")) {
1235                                        paddedDataTokens[itr] = new DoubleToken(
1236                                                        (Double) timeAndDataVector.get(j + 1));
1237                                } else if (chanTypeName.equals("String")) {
1238                                        paddedDataTokens[itr] = new StringToken(
1239                                                        (String) timeAndDataVector.get(j + 1));
1240                                } else if (chanTypeName.equals("Int8")) {
1241                                        paddedDataTokens[itr] = new UnsignedByteToken(
1242                                                        (Byte) timeAndDataVector.get(j + 1));
1243                                } else if (chanTypeName.equals("Int16")) {
1244                                        paddedDataTokens[itr] = new ShortToken(
1245                                                        (Short) timeAndDataVector.get(j + 1));
1246                                } else if (chanTypeName.equals("Int32")) {
1247                                        paddedDataTokens[itr] = new IntToken(
1248                                                        (Integer) timeAndDataVector.get(j + 1));
1249                                } else if (chanTypeName.equals("Int64")) {
1250                                        paddedDataTokens[itr] = new LongToken(
1251                                                        (Long) timeAndDataVector.get(j + 1));
1252                                } else {
1253                                        log
1254                                                        .error("DataTurbine Actor: ERROR. padding for this datatype not yet implemented!");
1255                                }
1256                        }
1257                }
1258
1259                Vector<Object> vectorOfArrays = new Vector<Object>();
1260                vectorOfArrays.add(paddedTimeTokens);
1261                vectorOfArrays.add(paddedDataTokens);
1262
1263                return vectorOfArrays;
1264        }
1265
1266        public Vector<Object> fillGaps(Vector<Object> timeAndDataVector) {
1267
1268                long sampleInterval = guessSamplingRate(timeAndDataVector);
1269
1270                // TODO: track down adcp binary chan bug (unless fixed, will require an
1271                // if here on the +1)
1272
1273                // note: duration is number of seconds that might contain data, not
1274                // number of data points.
1275
1276                double durationDouble = (double) _durationDouble;
1277                double sampleIntervalDouble = (double) sampleInterval;
1278                // double firstTimeSample = (Double) timeAndDataVector.get(0);
1279                // double missingFront = firstTimeSample - _startTimeDouble;
1280
1281                double dnumSamples = durationDouble / sampleIntervalDouble;
1282                // TODO: double check this round
1283                int numSamples = (int) Math.round(dnumSamples);
1284                if (sampleIntervalDouble > durationDouble) {
1285                        log
1286                                        .error("DataTurbine Actor: ERROR: sampleIntervalDouble > durationDouble, something went wrong");
1287                }
1288
1289                // System.out.println("durationDouble: " + durationDouble +
1290                // " sampleIntervalDouble: " + sampleIntervalDouble + " numSamples:"+
1291                // numSamples);
1292
1293                Vector<Object> paddedTimeAndDataVector = null;
1294
1295                if (numSamples != timeAndDataVector.size() / 2) {
1296                        // System.out.println(numSamples + " != " +
1297                        // timeAndDataVector.size()/2);
1298                        int numMissing = numSamples - (timeAndDataVector.size() / 2);
1299                        log
1300                                        .debug("DataTurbine Actor: Padding - requested timeslice missing "
1301                                                        + numMissing
1302                                                        + " samples, padding using sampleInterval of majority:"
1303                                                        + sampleInterval + "sec");
1304
1305                        // it will get larger
1306                        paddedTimeAndDataVector = new Vector<Object>(numSamples * 2);
1307                        paddedTimeAndDataVector = padGaps(sampleInterval, numMissing,
1308                                        timeAndDataVector);
1309
1310                        return paddedTimeAndDataVector;
1311                }
1312
1313                log.debug("DataTurbine Actor: No need to pad");
1314                return timeAndDataVector;
1315        }
1316
1317        // Guess Sampling Rate
1318        //
1319        // TODO: fix math - this will not work for faster than 1sps sampling!
1320        //
1321        public long guessSamplingRate(Vector<Object> timeAndDataVector) {
1322
1323                Map hashmap = new HashMap(); // hash table
1324                double timesDifference;
1325                long roundedTimesDiff; // used for rounded value
1326                boolean keyExists;
1327                for (int j = 0; j < timeAndDataVector.size() - 2; j += 2) {
1328                        timesDifference = (Double) timeAndDataVector.get(j + 2)
1329                                        - (Double) timeAndDataVector.get(j);
1330                        roundedTimesDiff = Math.round(timesDifference);
1331                        // System.out.println("timesDifference:" + timesDifference +
1332                        // "roundedTimesDiff:" + roundedTimesDiff);
1333                        keyExists = hashmap.containsKey(roundedTimesDiff);
1334                        if (keyExists) {
1335                                int hashValue = (Integer) hashmap.get(roundedTimesDiff);
1336                                hashmap.put(Long.valueOf(roundedTimesDiff), Integer
1337                                                .valueOf(++hashValue));
1338                                hashValue = (Integer) hashmap.get(roundedTimesDiff);
1339                        } else {
1340                                hashmap.put(Long.valueOf(roundedTimesDiff), Integer.valueOf(1));
1341                                int hashValue = (Integer) hashmap.get(roundedTimesDiff);
1342                        }
1343                }
1344
1345                int maxHashValue = 0;
1346                long keyOfMax = 0l;
1347                Iterator hashItr = hashmap.keySet().iterator();
1348                while (hashItr.hasNext()) {
1349                        long hashKey = (Long) hashItr.next();
1350                        int hashValue = (Integer) hashmap.get(hashKey);
1351                        if (hashValue > maxHashValue) {
1352                                maxHashValue = hashValue;
1353                                keyOfMax = hashKey;
1354                        }
1355                        // System.out.println(""+hashKey+ " => "+hashValue + "times");
1356                }
1357                long sampleInterval = keyOfMax;
1358
1359                return sampleInterval;
1360        }
1361
1362        public Vector<Object> padGaps(long sampleInterval, int numMissing,
1363                        Vector<Object> timeAndDataVector) {
1364                // int newLength = timeAndDataVector.size() + numMissing * 2;
1365
1366                double time, fakeTimeStamp;
1367
1368                double firstTimeStamp = (Double) timeAndDataVector.get(0);
1369                double lastTimeStamp = (Double) timeAndDataVector.get(timeAndDataVector
1370                                .size() - 2);
1371                // double firstPossibleTimeStamp = _startTimeDouble;
1372                double lastPossibleTimeStamp = _startTimeDouble + _durationDouble;
1373
1374                // System.out.println("firstPossibleTimeStamp" + firstPossibleTimeStamp
1375                // + " firstTimeStamp " + firstTimeStamp +
1376                // " lastTimeStamp: " + lastTimeStamp + " lastPossibleTimeStamp: " +
1377                // lastPossibleTimeStamp);
1378
1379                // insert - fill any gaps between first and last obtained samples
1380                int itr = -1;
1381                log.debug("padGaps numMissing:"+numMissing);
1382                for (int i = 0; i < timeAndDataVector.size(); i += 2) { // dynamic end
1383                        // condition
1384                        itr++;
1385                        fakeTimeStamp = firstTimeStamp + sampleInterval * itr;
1386                        time = (Double) timeAndDataVector.get(i);
1387                        if (fakeTimeStamp != time) {
1388                                // System.out.println("inserting fakeTimeStamp: " +
1389                                // fakeTimeStamp);
1390                                timeAndDataVector.add(i, Token.NIL);
1391                                timeAndDataVector.add(i, fakeTimeStamp);
1392                                numMissing--;
1393                                if (numMissing < 0) {
1394                                        log
1395                                                        .error("DataTurbine Actor Error: tried to insert too many samples");
1396                                }
1397                        }
1398                }
1399
1400                // append - if needed, fill between last obtained sample and last
1401                // possible time
1402                if (numMissing != 0) {
1403                        itr = 1;
1404                        fakeTimeStamp = lastTimeStamp + sampleInterval * itr;
1405                        while (fakeTimeStamp <= lastPossibleTimeStamp) {
1406                                // System.out.println("appending fakeTimeStamp: " +
1407                                // fakeTimeStamp);
1408                                timeAndDataVector.add(fakeTimeStamp);
1409                                timeAndDataVector.add(Token.NIL);
1410                                numMissing--;
1411
1412                                itr++;
1413                                fakeTimeStamp = lastTimeStamp + sampleInterval * itr;
1414                        }
1415                }
1416
1417                // prepend - if needed, fill between first possible time to first
1418                // obtained time
1419                itr = 0;
1420                while (numMissing > 0) {
1421                        itr++;
1422                        fakeTimeStamp = firstTimeStamp - sampleInterval * itr;
1423                        if (fakeTimeStamp < _startTimeDouble) { // sanity-check
1424                                log
1425                                                .error("ERROR! Tried to prepend before requested start time, something wrong!");
1426                                // throw exception
1427                                numMissing = 0;
1428                        } else {
1429                                // System.out.println("prepending fakeTimeStamp: " +
1430                                // fakeTimeStamp);
1431                                timeAndDataVector.add(0, Token.NIL);
1432                                timeAndDataVector.add(0, fakeTimeStamp);
1433                                numMissing--;
1434                                // sanity check:
1435                                if (numMissing == 0) {
1436                                        double tmp = fakeTimeStamp - sampleInterval;
1437                                        // System.out.println("fakeTimeStamp - sampleInterval " +
1438                                        // tmp + " _startTimeDouble " + _startTimeDouble);
1439                                }
1440                        }
1441                }
1442
1443                return timeAndDataVector;
1444        }
1445
1446        /**
1447         * Output nils for a given port.
1448         * 
1449         * @param TypedIOPort
1450         *            port
1451         */
1452        public void outputNils(TypedIOPort port) {
1453
1454                Type currPortType = port.getType();
1455                // String currPortName = port.getName();
1456                Token[] nilTokenArray = new Token[1];
1457                nilTokenArray[0] = Token.NIL;
1458
1459                RecordToken[] recTokens = new RecordToken[1];
1460
1461                try {
1462
1463                        ArrayToken atSomeTimeTokens = new ArrayToken(nilTokenArray);
1464
1465                        // hardcode:
1466                        if ((currPortType.toString().contains(" = arrayType(unsignedByte)") && outputRecOfArrays
1467                                        .equals(ARRAY_OF_X_RECORDS))
1468                                        || (currPortType.toString().contains(
1469                                                        " = arrayType(arrayType(unsignedByte))") && outputRecOfArrays
1470                                                        .equals(RECORD_OF_2_ARRAYS))) {
1471
1472                                // System.out.println("about to send * NIL dataTokens * out of BYTE port: "+currPortName);
1473                                if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) {
1474                                        ArrayToken[] atSomeDataTokens = new ArrayToken[1];
1475                                        atSomeDataTokens[0] = new ArrayToken(nilTokenArray);
1476                                        ArrayToken atatSomeDataTokens = new ArrayToken(
1477                                                        atSomeDataTokens);
1478                                        values[0] = atSomeTimeTokens;
1479                                        values[1] = atatSomeDataTokens;
1480                                        RecordToken recToken = new RecordToken(labels, values);
1481                                        port.send(0, recToken);
1482                                } else {
1483                                        values[0] = Token.NIL;
1484                                        values[1] = new ArrayToken(nilTokenArray);
1485                                        recTokens[0] = new RecordToken(labels, values);
1486                                        ArrayToken at = new ArrayToken(recTokens);
1487                                        port.send(0, at);
1488                                }
1489                        } else {
1490
1491                                // System.out.println("about to send * NIL dataTokens * out of port: "+currPortName);
1492                                if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) {
1493                                        ArrayToken atSomeDataTokens = new ArrayToken(nilTokenArray);
1494                                        values[0] = atSomeTimeTokens;
1495                                        values[1] = atSomeDataTokens;
1496                                        RecordToken recToken = new RecordToken(labels, values);
1497                                        port.send(0, recToken);
1498                                } else {
1499                                        values[0] = Token.NIL;
1500                                        values[1] = Token.NIL;
1501                                        recTokens[0] = new RecordToken(labels, values);
1502                                        ArrayToken at = new ArrayToken(recTokens);
1503                                        port.send(0, at);
1504                                }
1505                        }
1506                } catch (Exception e) {
1507                        log.error("DataTurbine actor: exception trying to sendout port 0"
1508                                        + e);
1509                }
1510        }
1511
1512        /**
1513         * Compare two array lengths.
1514         * 
1515         * @param int arraylength1, int arraylength2
1516         * @throws IllegalActionException
1517         */
1518        void checkLengths(int dataLength, int timesLength)
1519                        throws IllegalActionException {
1520                if (dataLength != timesLength) {
1521                        throw new IllegalActionException(
1522                                        this,
1523                                        "Error. someData.length != someTimes.length, This shouldn't be. Check your DataTurbine server data");
1524                }
1525        }
1526
1527        /**
1528         * Post fire the actor. Return false to indicated that the process has
1529         * finished. If it returns true, the process will continue indefinitely.
1530         * public boolean postfire() throws IllegalActionException { if
1531         * (_sinkMode.equals(SINKMODE_MONITOR) ){ return true; } else { return false; } }
1532         */
1533
1534        /**
1535         * Open connection to DataTurbine server.
1536         * 
1537         * @throws SAPIException
1538         * @throws IllegalActionException 
1539         * 
1540         */
1541        public boolean openDataTurbine() throws SAPIException, IllegalActionException {
1542                try {
1543                        if (getServer() != null && !getServer().equals("") 
1544                                        && getRBNBClientName() != null && !getRBNBClientName().equals("")){
1545                                if (_sink.VerifyConnection()) {
1546                                        return true;
1547                                }
1548                                log.debug("open DataTurbine connection. OpenRBNBConnection(" + 
1549                                                getServer() + "," + getRBNBClientName() + ")");
1550                                _sink.OpenRBNBConnection(getServer(), getRBNBClientName());
1551                                if (!_sink.VerifyConnection()) {
1552                                        log.error("Error trying to open DataTurbine connection");
1553                                        return false;
1554                                }
1555                                return true;
1556                        }
1557                } catch (SAPIException sap) {
1558                        log.warn("\nFailed to connect to "+getServer()+"\n"+
1559                                "Verify server URL. Enter empty string for none.\n");
1560                        throw new SAPIException("\nFailed to connect to "+getServer()+"\n"+
1561                                "Verify server URL. Enter empty string for none.\n");
1562                }
1563                return false;
1564        }
1565
1566        /**
1567         * Reconfigure actor when certain attributes change.
1568         * 
1569         * @param attribute
1570         *            The changed Attribute.
1571         * @throws ptolemy.kernel.util.IllegalActionException
1572         * 
1573         */
1574        @Override
1575    public void attributeChanged(ptolemy.kernel.util.Attribute attribute)
1576                        throws ptolemy.kernel.util.IllegalActionException {
1577                
1578            boolean outputChannelChanged = false;
1579            
1580                if (attribute == dataTurbineAddressInputParam) {
1581                        if (dataTurbineAddressInputParam != null && dataTurbineAddressInputParam.getToken() != null){
1582                                String tmpUrl = ((StringToken)dataTurbineAddressInputParam.getToken()).stringValue();
1583                                tmpUrl = tmpUrl.replaceAll("\"", "");
1584                        
1585                                if (!_url.equals(tmpUrl)) {
1586                                        _url = tmpUrl;
1587                                        reload = true;
1588                                }
1589                        }
1590                }
1591                else if (attribute == outputChannelPortParam){
1592                        if (outputChannelPortParam != null && outputChannelPortParam.getToken() != null) {
1593
1594                                String tmpSpecifiedOutputChannel = ((StringToken) outputChannelPortParam
1595                                                .getToken()).stringValue();
1596                                if ((tmpSpecifiedOutputChannel != null && 
1597                                                !_specifiedOutputChannel.equals(tmpSpecifiedOutputChannel))) {
1598                                        _specifiedOutputChannel = tmpSpecifiedOutputChannel;
1599                                        outputChannelChanged = true;
1600
1601                                }
1602                        }
1603                } else if (attribute == outputRecordOfArrays) {
1604                        String tmpoutputRecOfArrays = ((StringToken)outputRecordOfArrays.getToken()).stringValue();
1605                        if (tmpoutputRecOfArrays != null && !outputRecOfArrays.equals(tmpoutputRecOfArrays)) {
1606                                reload = true;
1607                                outputRecOfArrays = tmpoutputRecOfArrays;
1608
1609                        }
1610                } else if (attribute == tryToPad) {
1611                        boolean tmpPaddingOn = ((BooleanToken) tryToPad.getToken())
1612                                        .booleanValue();
1613
1614                        if (paddingOn != tmpPaddingOn) {
1615                                paddingOn = tmpPaddingOn;
1616                                willPad = paddingOn;
1617                        }
1618                } else{
1619                        super.attributeChanged(attribute);
1620                }
1621
1622                if(outputChannelChanged) {
1623                    getDataTurbineInfo();
1624            }
1625                else if (reload) {
1626                        try {
1627                                reload = false;
1628                                log.debug("Fetching info from DataTurbine: " + _url);
1629                                // first make sure we can connect
1630                                // if already connected to a server, disconnect first
1631                        log.debug("disconnect from DataTurbine");
1632                                _sink.CloseRBNBConnection();
1633                                boolean opened = openDataTurbine();
1634                                if (opened){
1635                                        boolean gotInfo = getDataTurbineInfo();
1636                                        if (gotInfo){
1637                                                configureOutputPorts(null);
1638                                        }
1639                                }
1640                                else{
1641                                        // should only occur when the user manually changes to blank
1642                                        // (and not also during instantiation).
1643                                        if (attribute == dataTurbineAddressInputParam){
1644                                                removeAllOutputPorts();
1645                                        }
1646                                }
1647                        }
1648                        catch (SAPIException e) {
1649                                e.printStackTrace();
1650                                throw new IllegalActionException("\nFailed to connect to "+getServer()+"\n"+
1651                                                "Verify server URL. Enter empty string for none.\n");
1652                        }
1653                        catch (Exception e) {
1654                                e.printStackTrace();
1655                                throw new IllegalActionException(this, e,
1656                                                "Error opening DataTurbine connection.");
1657                        }
1658                }
1659        }
1660        
1661        /** Clone the actor into the specified workspace. */
1662        @Override
1663    public Object clone(Workspace workspace) throws CloneNotSupportedException {
1664            DataTurbine newObject = (DataTurbine) super.clone(workspace);
1665        newObject._icon = null;
1666            newObject._map = null;
1667            newObject._registrationMap = null;
1668            newObject._sink = null;         
1669            newObject._stopRequested = new AtomicBoolean(false);
1670            newObject._userInfoTypesMap = new HashMap<String,String>();
1671            newObject.values = new Token[labels.length];
1672            return newObject;
1673        }
1674
1675        /**
1676         * Remove all existing output ports.
1677         * 
1678         * @throws ptolemy.kernel.util.IllegalActionException
1679         */
1680        private void removeAllOutputPorts() throws IllegalActionException {
1681                Iterator<?> i = this.outputPortList().iterator();
1682                while (i.hasNext()) {
1683                        TypedIOPort port = (TypedIOPort) i.next();
1684                        String currPortName = port.getName();
1685                        try {
1686                                port.setContainer(null);
1687                        } catch (Exception ex) {
1688                                throw new IllegalActionException(this, ex,
1689                                                "Error removing port: " + currPortName);
1690                        }
1691                }
1692        }
1693
1694        /**
1695         * Remove all ports with names not in the selected vector.
1696         * @param nonRemovePortName
1697         * @throws IllegalActionException
1698         */
1699        void removeOtherOutputPorts(Collection<?> nonRemovePortName)
1700                        throws IllegalActionException {
1701                // Use toArray() to make a deep copy of this.portList().
1702                // Do this to prevent ConcurrentModificationExceptions.
1703
1704                TypedIOPort[] l = new TypedIOPort[0];
1705                l = (TypedIOPort[]) this.portList().toArray(l);
1706
1707                for (int i = 0; i < l.length; i++) {
1708                        TypedIOPort port = l[i];
1709                        if (port == null || port.isInput()) {
1710                                continue;
1711                        }
1712                        String currPortName = port.getName();
1713                        if (!nonRemovePortName.contains(currPortName)) {
1714                                try {
1715                                        port.setContainer(null);
1716                                } catch (Exception ex) {
1717                                        throw new IllegalActionException(this, ex,
1718                                                        "Error removing port: " + currPortName);
1719                                }
1720                        }
1721                }
1722        }
1723
1724        /**
1725         * Reconfigure output ports.
1726         * 
1727         * Some channels may have a period in their name. Ptolemy does not allow
1728         * ports to have periods in their name, so replace . with PERIOD
1729         * 
1730         * @param requestedChanNames
1731         *            A list of channels from which the actor will request data, and
1732         *            will therefore need output ports for. If null, use all
1733         *            filteredChanNames.
1734         * @throws ptolemy.kernel.util.IllegalActionException
1735         * 
1736         */
1737        private void configureOutputPorts(String[] requestedChanNames)
1738                        throws IllegalActionException {
1739
1740                Vector<String> portsToKeep = new Vector<String>();
1741                
1742                // add the output port that simply outputs all filtered
1743                // channel names
1744                addPort(CHANNEL_NAMES_OUTPUT_PORT, "String");
1745                portsToKeep.add(CHANNEL_NAMES_OUTPUT_PORT);
1746                
1747                // add the specific channel output port
1748                addPort(SPECIFIC_CHANNEL,"String");
1749                portsToKeep.add(SPECIFIC_CHANNEL);
1750
1751                if (_filteredChanNames.length != _filteredChanTypes.length) {
1752                        throw new IllegalActionException(this,
1753                                        "ERROR filteredNames.length:" + _filteredChanNames.length
1754                                                        + " and filteredTypes.length:"
1755                                                        + _filteredChanTypes.length + " need to match!");
1756                }
1757
1758                for (int z = 0; z < _filteredChanNames.length; z++) {
1759                        if (_filteredChanNames[z] == null) {
1760                                continue;
1761                        }
1762                        if (requestedChanNames == null) {
1763                                addPort(_filteredChanNames[z].replaceAll("\\.", "PERIOD"),
1764                                                _filteredChanTypes[z]);
1765                                portsToKeep.add(_filteredChanNames[z].replaceAll("\\.", "PERIOD"));
1766                        } else {
1767                                for (int i = 0; i < requestedChanNames.length; i++) {
1768                                        if (_filteredChanNames[z].equals(requestedChanNames[i])) {
1769                                                addPort(_filteredChanNames[z].replaceAll("\\.", "PERIOD"),
1770                                                                _filteredChanTypes[z]);
1771                                                portsToKeep.add(_filteredChanNames[z].replaceAll("\\.",
1772                                                                "PERIOD"));
1773                                                break;
1774                                        }
1775                                }
1776                        }
1777                }
1778                
1779                removeOtherOutputPorts(portsToKeep);
1780        }
1781
1782        /**
1783         * Add an output port to DataTurbine actor if it does not already exist and
1784         * set the type appropriately.
1785         * 
1786         * @param aPortName
1787         *            The name of the port.
1788         * @param aPortType
1789         *            The type of the port.
1790         * 
1791         */
1792        private void addPort(String aPortName, String rbnbType) {
1793                try {
1794                        TypedIOPort port = (TypedIOPort) this.getPort(aPortName);
1795                        boolean aIsNew = (port == null);
1796                        if (aIsNew) {
1797                                port = new TypedIOPort(this, aPortName, false, true);
1798                        }
1799                        setPortType(aPortName, rbnbType, port);
1800                } catch (Exception e) {
1801                        log.error("DataTurbine Error. Trouble making port: " + aPortName
1802                                        + " with type: " + rbnbType);
1803                }
1804        }
1805
1806        /**
1807         * Set the output port type based on outputRecOfArrays boolean
1808         * 
1809         * @param aPortName
1810         *            The name of the port.
1811         * @param aPortType
1812         *            The type of the port.
1813         * 
1814         */
1815        private void setPortType(String aPortName, String rbnbType, TypedIOPort port) {
1816
1817                Type[] types = new Type[labels.length];
1818
1819                String rbnbUserInfoType = _userInfoTypesMap.get(aPortName);
1820                
1821                //special case for output port that simply outputs channel names
1822                if (aPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){
1823                        ArrayType at = new ArrayType(BaseType.STRING);
1824                        port.setTypeEquals(at);
1825                }
1826                else if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) {
1827                        // System.out.println("Setting port " + aPortName +
1828                        // " to send out a Record of arrays");
1829                    
1830            types[0] = new ArrayType(BaseType.DOUBLE);
1831
1832                    if(rbnbUserInfoType != null) {
1833                        types[1] = new ArrayType(BaseType.forName(rbnbUserInfoType)); 
1834                    } else if (rbnbType.equals("Float32")) {
1835                                types[1] = new ArrayType(BaseType.FLOAT);
1836                        } else if (rbnbType.equals("Float64")) {
1837                                types[1] = new ArrayType(BaseType.DOUBLE);
1838                        } else if (rbnbType.equals("String")) {
1839                                types[1] = new ArrayType(BaseType.STRING);
1840                        } else if (rbnbType.equals("Int8")) {
1841                                types[1] = new ArrayType(BaseType.UNSIGNED_BYTE);
1842                        } else if (rbnbType.equals("Int16")) {
1843                                types[1] = new ArrayType(BaseType.SHORT);
1844                        } else if (rbnbType.equals("Int32")) {
1845                                types[1] = new ArrayType(BaseType.INT);
1846                        } else if (rbnbType.equals("Int64")) {
1847                                types[1] = new ArrayType(BaseType.LONG);
1848                        } else if (rbnbType.equals("Unknown")) {
1849                                types[1] = new ArrayType(BaseType.UNKNOWN);
1850                        } else if (rbnbType.equals("ByteArray")) {
1851                                types[1] = new ArrayType(new ArrayType(BaseType.UNSIGNED_BYTE));
1852                        }
1853                        else if (rbnbType.equals("User")) {
1854                            System.out.println("type is User for " + aPortName);
1855                        } else {
1856                                log.error("DataTurbine actor Error: trouble making port: "
1857                                                + aPortName + " unhandled DataTurbine type");
1858                        }
1859                        RecordType rt = new RecordType(labels, types);
1860                        port.setTypeEquals(rt);
1861                } else {
1862                    
1863            types[0] = BaseType.DOUBLE;
1864
1865                        // System.out.println("Setting " + aPortName +
1866                        // " to send out an Array of records");
1867            if(rbnbUserInfoType != null) {
1868                types[1] = BaseType.forName(rbnbUserInfoType); 
1869            } else if (rbnbType.equals("Float32")) {
1870                                types[1] = BaseType.FLOAT;
1871                        } else if (rbnbType.equals("Float64")) {
1872                                types[1] = BaseType.DOUBLE;
1873                        } else if (rbnbType.equals("String")) {
1874                                types[1] = BaseType.STRING;
1875                        } else if (rbnbType.equals("Int8")) {
1876                                types[1] = BaseType.UNSIGNED_BYTE;
1877                        } else if (rbnbType.equals("Int16")) {
1878                                types[1] = BaseType.SHORT;
1879                        } else if (rbnbType.equals("Int32")) {
1880                                types[1] = BaseType.INT;
1881                        } else if (rbnbType.equals("Int64")) {
1882                                types[1] = BaseType.LONG;
1883                        } else if (rbnbType.equals("Unknown")) {
1884                                types[1] = BaseType.UNKNOWN;
1885                        } else if (rbnbType.equals("ByteArray")) {
1886                                types[1] = new ArrayType(BaseType.UNSIGNED_BYTE);
1887                        }
1888                        // else if (rbnbType.equals("User")){
1889                        // ??
1890                        // }
1891                        else {
1892                                log.error("DataTurbine actor Error: trouble making port: "
1893                                                + aPortName + " unhandled DataTurbine type");
1894                        }
1895
1896                        RecordType rt = new RecordType(labels, types);
1897                        ArrayType at = new ArrayType(rt);
1898                        port.setTypeEquals(at);
1899                }
1900
1901        }
1902
1903        @Override
1904    public void wrapup() throws IllegalActionException {
1905                super.wrapup();
1906                log.debug("disconnect from DataTurbine");
1907                _sink.CloseRBNBConnection();
1908        }
1909
1910
1911}