001/* 002 * Copyright (c) 1998-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-08-24 22:45:41 +0000 (Mon, 24 Aug 2015) $' 007 * '$Revision: 33631 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.data.datasource.dataturbine; 031 032import java.text.ParseException; 033import java.text.SimpleDateFormat; 034import java.util.Collection; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.Iterator; 038import java.util.Map; 039import java.util.Vector; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.regex.Matcher; 042 043import org.apache.commons.logging.Log; 044import org.apache.commons.logging.LogFactory; 045import org.ecoinformatics.seek.datasource.DataSourceIcon; 046 047import com.rbnb.sapi.ChannelMap; 048import com.rbnb.sapi.SAPIException; 049import com.rbnb.sapi.Sink; 050 051import ptolemy.actor.TypedIOPort; 052import ptolemy.actor.lib.LimitedFiringSource; 053import ptolemy.actor.parameters.PortParameter; 054import ptolemy.data.ArrayToken; 055import ptolemy.data.BooleanToken; 056import ptolemy.data.DoubleMatrixToken; 057import ptolemy.data.DoubleToken; 058import ptolemy.data.FloatToken; 059import ptolemy.data.IntMatrixToken; 060import ptolemy.data.IntToken; 061import ptolemy.data.LongToken; 062import ptolemy.data.RecordToken; 063import ptolemy.data.ShortToken; 064import ptolemy.data.StringToken; 065import ptolemy.data.Token; 066import ptolemy.data.UnsignedByteToken; 067import ptolemy.data.expr.Parameter; 068import ptolemy.data.type.ArrayType; 069import ptolemy.data.type.BaseType; 070import ptolemy.data.type.RecordType; 071import ptolemy.data.type.Type; 072import ptolemy.kernel.CompositeEntity; 073import ptolemy.kernel.util.IllegalActionException; 074import ptolemy.kernel.util.NameDuplicationException; 075import ptolemy.kernel.util.Workspace; 076import ptolemy.util.MessageHandler; 077 078/** 079 * The DataTurbine actor retrieves and outputs data from an RBNB DataTurbine server. 080 * Sink mode Request has been tested beneath SDF, 081 * modes Monitor and Subscribe briefly in PN. 082 * 083 * @author Derik Barseghian 084 * @version $Id: DataTurbine.java 33631 2015-08-24 22:45:41Z crawl $ 085 */ 086 087public class DataTurbine extends LimitedFiringSource { 088 089 public static final String ZEROTIME = "0"; 090 private static Log log = LogFactory.getLog("org.kepler.data.datasource.datasource.dataturbine.DataTurbine"); 091 static { 092 System.setProperty("ZEROTIME", ZEROTIME); 093 } 094 private static final String ARRAY_OF_X_RECORDS = "Array of x records"; 095 private static final String RECORD_OF_2_ARRAYS = "Record of 2 arrays"; 096 public static final String SINKMODE_MONITOR = "Monitor"; 097 public static final String SINKMODE_REQUEST = "Request"; 098 public static final String SINKMODE_SUBSCRIBE = "Subscribe"; 099 100 /** The URL to the DataTurbine Server */ 101 public PortParameter dataTurbineAddressInputParam; 102 103 /** Actor mode */ 104 public Parameter actorModeInputParam; 105 106 /** The name of the channel to output through the specifiedChannel output port */ 107 public PortParameter outputChannelPortParam; 108 109 /** 110 * The amount of time (ms) to wait for data to become available. Use 0 for 111 * no delay or any negative number for an infinite delay. 112 */ 113 public Parameter blockTimeoutParam; 114 115 /** 116 * Sink mode. 117 * <ul> 118 * <li>Request: Initiates a request for a specific time slice of data.</li> 119 * <li>Subscribe: Starts a continuous feed of data on the specified channels 120 * to this sink, for retrieval. Each block retrieved will be duration time 121 * units in length.</li> 122 * <li>Monitor: Similar to Subscribe, but allows for continuous frames of 123 * data without gaps.</li> 124 * </ul> 125 */ 126 public Parameter sinkModeInputParam; 127 128 /** Start time for Request or Subscribe modes. seconds or Date: yyyy-MM-dd HH:mm:ss */ 129 public PortParameter startTimePortParam; 130 131 /** The duration of the request. Unit is seconds unless fetchByFrame is set. */ 132 public PortParameter durationPortParam; 133 134 /** channelNames - This output port outputs all of the filtered (non-metric) 135 * channel names. 136 */ 137 public static final String CHANNEL_NAMES_OUTPUT_PORT = "channelNames"; 138 139 /** 140 * For Subscribe mode: Any of "newest", "oldest", "absolute", "next", or 141 * "previous". <br /> 142 * For Request mode: 143 * <ul> 144 * <li>"absolute" -- The start parameter is absolute time from midnight, Jan 145 * 1st, 1970 UTC.</li> 146 * <li>"newest" -- The start parameter is measured from the most recent data 147 * available in the server at the time this request is received. Note that 148 * for this case, the start parameter actually represents the end of the 149 * duration, and positive times proceed toward oldest data.</li> 150 * <li>"oldest" -- As "newest", but relative to the oldest data.</li> 151 * <li>"aligned" -- As "newest", but rather than per channel, this is 152 * relative to the newest for all of the channels.</li> 153 * <li>"after" -- A combination between "absolute" and "newest", this flag 154 * causes the server to return the newest data available after the specified 155 * start time. Unlike "newest", you do not have to request the data to find 156 * out that you already have it. Unlike "absolute", a gap may be inserted in 157 * the data to provide you with the freshest data.</li> 158 * <li>"modified" -- Similar to "after", but attempts to return a duration's 159 * worth of data in a contiguous block. If the data is not available after 160 * the start time, it will be taken from before the start time.</li> 161 * <li>"next" - gets the data that immediately follows the time range 162 * specified. This will skip over gaps.</li> 163 * <li>"previous" - get the data that immediately preceeds the time range 164 * specified. This will skip over gaps.</li> 165 * </ul> 166 * 167 * */ 168 public Parameter referenceInputParam; 169 170 /** 171 * Format of output datapoint and timestamp pairs: Record of 2 Arrays, or an 172 * Array of X Records. 173 */ 174 public Parameter outputRecordOfArrays; 175 176 /** 177 * Will attempt to identify and pad gappy data with pairs of timestamps and 178 * nils. Need at least 2 samples to be able to pad. 179 */ 180 public Parameter tryToPad; 181 182 private Sink _sink = null; 183 private ChannelMap _map = null; 184 private ChannelMap _registrationMap = null; 185 186 187 private final static String DEFAULT_RBNB_CLIENT_NAME = "KeplerClient"; 188 private String _rbnbClientName = DEFAULT_RBNB_CLIENT_NAME; 189 190 private String _url = ""; 191 private String _specifiedOutputChannel = ""; 192 /** specificChannel*/ 193 private final static String SPECIFIC_CHANNEL = "specificChannel"; 194 /** Subscribe, Monitor, or Request */ 195 private String _sinkMode = SINKMODE_REQUEST; 196 /** for Request or Subscribe mode */ 197 private String _startTime = "0"; 198 /** for Request or Subscribe mode */ 199 private String _duration = "0"; 200 /** for Request mode: absolute, 201 * newest, oldest, aligned, after, 202 * modified, next, previous 203 */ 204 private String _reference = "absolute"; 205 /** for Request mode */ 206 private String[] _connectedOutputPortNames = null; 207 /** for Request mode - time (ms) to wait 208 * for data to become available. 0 is no 209 * delay. negative number for infinite 210 * delay. 211 */ 212 private int _blockTimeout = 15000; 213 private double _startTimeDouble = 0.0; 214 private double _durationDouble = 0.0; 215 // TODO: eventually allow use of different date patterns: 216 private final static String pattern = "yyyy-MM-dd HH:mm:ss"; 217 private final static SimpleDateFormat format = new SimpleDateFormat(pattern); 218 private Date startDate = null; 219 private boolean reload = false; 220 221 private int _numChans = 0; 222 private int _numChans2 = 0; 223 private String[] _chanNames = null; 224 private String[] _chanTypes = null; 225 private double[] _chanDurations = null; 226 private String[] _chanMimeTypes = null; 227 private String[] _filteredChanNames = null; 228 private String[] _filteredChanTypes = null; 229 private double[] _filteredChanDurations = null; 230 private String[] _filteredChanMimeTypes = null; 231 private double[] _filteredChanStartTimes = null; 232 private Map<String,String> _userInfoTypesMap = new HashMap<String,String>(); 233 234 public String outputRecOfArrays = RECORD_OF_2_ARRAYS; 235 236 private final static String[] labels = { "timestamps", "data" }; 237 private Token[] values = new Token[labels.length]; 238 public boolean paddingOn = false; 239 /** padding is not always possible. */ 240 public boolean willPad = paddingOn; 241 242 private DataSourceIcon _icon; 243 244 /** for use with Monitor mode, presumably value 245 * doesn't matter since gapControl logic is not 246 * yet implemented in DT 247 */ 248 private int gapControl = 0; 249 250 /** If true, director told us to stop firing. */ 251 private AtomicBoolean _stopRequested = new AtomicBoolean(false); 252 253 /** 254 * Construct a DataTurbine source with the given container and name. 255 * 256 * @param name 257 * The name of this actor. 258 * @exception IllegalActionException 259 * If the entity cannot be contained by the proposed 260 * container. 261 * @exception NameDuplicationException 262 * If the container already has an actor with this name. 263 */ 264 public DataTurbine(CompositeEntity container, String name) 265 throws NameDuplicationException, IllegalActionException { 266 super(container, name); 267 268 _icon = new DataSourceIcon(this); 269 270 dataTurbineAddressInputParam = new PortParameter(this, "DataTurbine Address"); 271 dataTurbineAddressInputParam.setStringMode(true); 272 dataTurbineAddressInputParam.setTypeEquals(BaseType.STRING); 273 dataTurbineAddressInputParam.getPort().setTypeEquals(BaseType.STRING); 274 275 outputChannelPortParam = new PortParameter(this, 276 "specificChannel Name"); 277 outputChannelPortParam.setStringMode(true); 278 outputChannelPortParam.setTypeEquals(BaseType.STRING); 279 outputChannelPortParam.getPort().setTypeEquals(BaseType.STRING); 280 281 sinkModeInputParam = new Parameter(this, "Sink Mode"); 282 sinkModeInputParam.setStringMode(true); 283 sinkModeInputParam.setTypeEquals(BaseType.STRING); 284 sinkModeInputParam.addChoice(SINKMODE_REQUEST); 285 sinkModeInputParam.addChoice(SINKMODE_MONITOR); 286 sinkModeInputParam.addChoice(SINKMODE_SUBSCRIBE); 287 sinkModeInputParam.setExpression(SINKMODE_REQUEST); 288 289 startTimePortParam = new PortParameter(this, 290 "Start Time (for Request or Subscribe modes)", 291 new StringToken("")); 292 startTimePortParam.setStringMode(true); 293 startTimePortParam.setTypeEquals(BaseType.STRING); 294 startTimePortParam.getPort().setTypeEquals(BaseType.STRING); 295 296 durationPortParam = new PortParameter(this, 297 "Duration (for Request or Subscribe modes)", 298 new StringToken("")); 299 durationPortParam.setStringMode(true); 300 durationPortParam.setTypeEquals(BaseType.STRING); 301 durationPortParam.getPort().setTypeEquals(BaseType.STRING); 302 303 referenceInputParam = new Parameter(this, 304 "Reference (for Request or Subscribe modes)"); 305 referenceInputParam.setStringMode(true); 306 referenceInputParam.setTypeEquals(BaseType.STRING); 307 referenceInputParam.addChoice("absolute"); 308 referenceInputParam.addChoice("newest"); 309 referenceInputParam.addChoice("oldest"); 310 referenceInputParam.addChoice("aligned (Request mode only)"); 311 referenceInputParam.addChoice("after (Request mode only)"); 312 referenceInputParam.addChoice("modified (Request mode only)"); 313 referenceInputParam.addChoice("next"); 314 referenceInputParam.addChoice("previous"); 315 referenceInputParam.setExpression("absolute"); 316 blockTimeoutParam = new Parameter(this, 317 "Block Timeout (ms) (for Fetch)", new IntToken(15000)); 318 319 // note: keep this outputRecordOfArrays param initialization after 320 // RBNBurlInputParam - this will avoid relation-breaking problem 321 // during reload events. 322 // outputRecordOfArrays: 323 // RECORD_OF_2_ARRAYS - each output port will output a record of 324 // 2 arrays (data and timestamps) 325 // ARRAY_OF_X_RECORDS - each output port will output an array of 326 // records (each record a data and timestamp) 327 outputRecordOfArrays = new Parameter(this, "Output Data Type"); 328 outputRecordOfArrays.setStringMode(true); 329 outputRecordOfArrays.setTypeEquals(BaseType.STRING); 330 outputRecordOfArrays.addChoice(RECORD_OF_2_ARRAYS); 331 outputRecordOfArrays.addChoice(ARRAY_OF_X_RECORDS); 332 outputRecordOfArrays.setExpression(RECORD_OF_2_ARRAYS); 333 334 tryToPad = new Parameter(this, "Pad data gaps with nils"); 335 tryToPad.setTypeEquals(BaseType.BOOLEAN); 336 tryToPad.setExpression("false"); 337 338 _sink = new Sink(); 339 _map = new ChannelMap(); 340 _registrationMap = new ChannelMap(); 341 342 _attachText("_iconDescription", "<svg>\n" + "<rect x=\"0\" y=\"0\" " 343 + "width=\"60\" height=\"20\" " + "style=\"fill:white\"/>\n" 344 + "</svg>\n"); 345 } 346 347 348 /** 349 * Send the token in the value parameter to the output. 350 * 351 * @exception IllegalActionException 352 * If it is thrown by the send() method sending out the 353 * token. 354 */ 355 @Override 356 public void fire() throws IllegalActionException { 357 super.fire(); 358 359 outputRecOfArrays = ((StringToken)outputRecordOfArrays.getToken()).stringValue(); 360 361 paddingOn = ((BooleanToken) tryToPad.getToken()).booleanValue(); 362 willPad = paddingOn; 363 364 _url = ((StringToken)dataTurbineAddressInputParam.getToken()).stringValue(); 365 _url = _url.replaceAll("\"", ""); 366 367 _blockTimeout = Integer.parseInt(blockTimeoutParam.getToken() 368 .toString()); 369 370 _sinkMode = ((StringToken)sinkModeInputParam.getToken()).stringValue(); 371 372 if (!(_sinkMode.equals(SINKMODE_MONITOR) || _sinkMode.equals(SINKMODE_REQUEST) || _sinkMode 373 .equals(SINKMODE_SUBSCRIBE))) { 374 throw new IllegalActionException(this, 375 "Error. sinkMode must be Monitor, Request or Subscribe."); 376 } 377 378 if ((_sinkMode.equals(SINKMODE_SUBSCRIBE) || _sinkMode 379 .equals(SINKMODE_REQUEST))) { 380 startTimePortParam.update(); 381 _startTime = ((StringToken)startTimePortParam.getToken()).stringValue(); 382 _startTime = _startTime.replaceAll("\"", ""); 383 384 durationPortParam.update(); 385 _duration = ((StringToken)durationPortParam.getToken()).stringValue(); 386 _duration = _duration.replaceAll("\"", ""); 387 _reference = ((StringToken)referenceInputParam.getToken()).stringValue(); 388 _reference = _reference.replaceAll("\\s*\\(Request mode only\\)", ""); 389 390 if (_startTime == null || _startTime.equals("")) { 391 throw new IllegalActionException(this, 392 "DataTurbine actor must specify a Start Time for "+ _sinkMode + " Sink Mode."); 393 } 394 if (_duration == null || _duration.equals("")) { 395 throw new IllegalActionException(this, 396 "DataTurbine actor must specify a Duration for "+ _sinkMode + " Sink Mode."); 397 } 398 if (_reference == null || _reference.equals("")) { 399 throw new IllegalActionException(this, 400 "DataTurbine actor must specify a Reference for "+ _sinkMode + " Sink Mode."); 401 } 402 403 try { 404 startDate = format.parse(_startTime); 405 _startTimeDouble = startDate.getTime() / 1000; 406 } catch (ParseException pe) { 407 //throw new IllegalActionException(this, "ParseException " + pe); 408 // allow users to also specify startTime as seconds, since this 409 // is more natural when using "newest" 410 try{ 411 _startTimeDouble = new Double(_startTime); 412 } 413 catch(NumberFormatException nfe){ 414 throw new IllegalActionException(this, "Start Time must be number (seconds) or Date formatted: "+pattern); 415 } 416 } 417 } 418 419 outputChannelPortParam.update(); 420 421 // count connected output ports 422 int itr = 0; 423 Iterator<?> q = this.outputPortList().iterator(); 424 while (q.hasNext()) { 425 TypedIOPort port = (TypedIOPort) q.next(); 426 if (port.numberOfSinks() > 0) { 427 itr++; 428 } 429 } 430 _connectedOutputPortNames = new String[itr]; 431 432 // gather connected output port names 433 itr = 0; 434 q = this.outputPortList().iterator(); 435 while (q.hasNext()) { 436 TypedIOPort port = (TypedIOPort) q.next(); 437 if (port.numberOfSinks() > 0) { 438 _connectedOutputPortNames[itr] = port.getName(); 439 itr++; 440 } 441 } 442 443 if (_sinkMode.equals(SINKMODE_REQUEST) || _sinkMode.equals(SINKMODE_SUBSCRIBE)) { 444 _durationDouble = Double.parseDouble(_duration); 445 } 446 447 _icon.setBusy(); 448 449 // connect to dataturbine. 450 try { 451 openDataTurbine(); 452 } catch (Exception e) { 453 throw new IllegalActionException(this, 454 "ERROR opening DataTurbine connection from fire()"); 455 } 456 457 // get dataturbine metadata. 458 getDataTurbineInfo(); 459 460 try { 461 462 // output the channel names 463 outputChannelNames(); 464 465 // add requested channels to userMap. 466 ChannelMap userChanMap = getUserChannelMap(_connectedOutputPortNames); 467 if (userChanMap.NumberOfChannels() > 0) { 468 if (_sinkMode.equals(SINKMODE_REQUEST)) { 469 // request data in userMap. 470 // System.out.println("----- about to sink.Request(userChanMap, " 471 // + _startTimeDouble + ", " + _durationDouble + ", " 472 // + _reference + ") -----"); 473 _sink.Request(userChanMap, _startTimeDouble, 474 _durationDouble, _reference); 475 476 // fetch data in userMap. 477 // System.out.println("----- fetching data, using timeout:" 478 // + _blockTimeout + " -----"); 479 _sink.Fetch(_blockTimeout, userChanMap); 480 481 // send data out output ports. 482 // System.out.println("----- about to call outputData -----"); 483 outputData(userChanMap); 484 } else if (_sinkMode.equals(SINKMODE_MONITOR)) { 485 // setup Monitor 486 // System.out.println("----- about to sink.Monitor(userChanMap, " 487 // + gapControl + ") -----"); 488 _sink.Monitor(userChanMap, gapControl); 489 do { 490 // fetch data in userMap. 491 // System.out.println("----- fetching data, using timeout:" 492 // + _blockTimeout + " -----"); 493 _sink.Fetch(_blockTimeout, userChanMap); 494 495 // send data out output ports. 496 // System.out.println("----- about to call outputData -----"); 497 outputData(userChanMap); 498 } while (!_stopRequested.get()); 499 } else if (_sinkMode.equals(SINKMODE_SUBSCRIBE)) { 500 // setup Subscribe 501 // System.out 502 // .println("----- about to sink.Subscribe(userChanMap, " 503 // + _startTimeDouble + ", " + _durationDouble 504 // + ", " + _reference + ") -----"); 505 _sink.Subscribe(userChanMap, _startTimeDouble, 506 _durationDouble, _reference); 507 do { 508 // fetch data in userMap. 509 // System.out.println("----- fetching data, using timeout:" 510 // + _blockTimeout + " -----"); 511 _sink.Fetch(_blockTimeout, userChanMap); 512 513 // send data out output ports. 514 // System.out.println("----- about to call outputData -----"); 515 outputData(userChanMap); 516 } while (!_stopRequested.get()); 517 } 518 } 519 520 _icon.setReady(); 521 } catch (SAPIException sapie) { 522 // ignore errors if we've stopped. 523 if(! _stopRequested.get()) 524 { 525 log.error("DataTurbine actor Error: during fire:" + sapie); 526 _icon.setReady(); 527 log.debug("disconnect from DataTurbine"); 528 _sink.CloseRBNBConnection(); 529 throw new IllegalActionException(this, sapie, "Error during fire()"); 530 } 531 } catch (Exception e){ 532 log.error("DataTurbine actor Error: during fire:" + e); 533 _icon.setReady(); 534 log.debug("disconnect from DataTurbine"); 535 _sink.CloseRBNBConnection(); 536 throw new IllegalActionException(this, e, "Error during fire()"); 537 } 538 539 // disconnect from DataTurbine in wrapup() so same connection can be shared 540 // across a workflow execution. 541 }// fire 542 543 /** Reset the stop requested boolean. */ 544 @Override 545 public void preinitialize() throws IllegalActionException { 546 super.preinitialize(); 547 _stopRequested.set(false); 548 } 549 550 /** The director told us to stop firing immediately. */ 551 @Override 552 public void stop() { 553 super.stop(); 554 _stopRequested.set(true); 555 log.debug("disconnect from DataTurbine"); 556 _sink.CloseRBNBConnection(); 557 } 558 559 /** 560 * Sets all the object variables by gathering info from the RBNB server. 561 * (fire() and attributeChanged() should probably be the only methods to 562 * call getDataTurbineInfo.) 563 * 564 * @throws IllegalActionException 565 */ 566 private boolean getDataTurbineInfo() throws IllegalActionException { 567 568 int chanType = 0; 569 // filtered lists will exclude channels with duration 0 570 // and those with names that begin with an underscore 571 int filteredListLength = 0; 572 573 try{ 574 openDataTurbine(); 575 576 _map.Clear(); 577 _registrationMap.Clear(); 578 579 _sink.RequestRegistration(_registrationMap); 580 _sink.Fetch(_blockTimeout, _registrationMap); 581 _numChans = _registrationMap.NumberOfChannels(); 582 _chanNames = new String[_numChans]; 583 _chanTypes = new String[_numChans]; 584 _chanDurations = new double[_numChans]; 585 _chanMimeTypes = new String[_numChans]; 586 587 _userInfoTypesMap.clear(); 588 589 // Determine length of the filtered list. Add all chans to _map. Set 590 // _chanNames[] 591 for (int i = 0; i < _numChans; i++) { 592 _chanNames[i] = _registrationMap.GetName(i); 593 _chanDurations[i] = _registrationMap.GetTimeDuration(i); 594 chanType = _registrationMap.GetType(i); 595 _chanTypes[i] = _registrationMap.TypeName(chanType); 596 _chanMimeTypes[i] = _registrationMap.GetMime(i); 597 598 String userInfo = _registrationMap.GetUserInfo(i); 599 if(! userInfo.isEmpty()) { 600 Matcher matcher = DataTurbineWriter.PTOLEMY_TYPE_PATTERN.matcher(userInfo); 601 if(matcher.matches()) { 602 _userInfoTypesMap.put(_chanNames[i], matcher.group(1)); 603 } 604 } 605 606 if (_chanDurations[i] < 0 || _chanNames[i].matches("_.*")) { 607 // System.out.println("WARNING: chan " + i + " named: " + 608 // _chanNames[i] + " duration: " + _chanDurations[i] + 609 // " will not be added"); 610 } else { 611 _map.Add(_chanNames[i]); 612 // System.out.println("added chan " + i + ": " + 613 // _chanNames[i] + " _chanType: " + _chanTypes[i] + 614 // "_chanDuration: " + _chanDurations[i]); 615 filteredListLength++; 616 } 617 } 618 619 _filteredChanNames = new String[filteredListLength]; 620 _filteredChanTypes = new String[filteredListLength]; 621 _filteredChanDurations = new double[filteredListLength]; 622 _filteredChanMimeTypes = new String[filteredListLength]; 623 _filteredChanStartTimes = new double[filteredListLength]; 624 625 // Just get one small chunk of data so we are able to check the data 626 // types. If server contains no non-metrics channels, do not make 627 // (illegal) empty request. 628 if (filteredListLength > 0) { 629 _sink.Request(_map, 0, 1, "oldest"); 630 631 // Fetch data. Channels with duration 0 will not be returned. 632 _sink.Fetch(_blockTimeout, _map); 633 _numChans2 = _map.NumberOfChannels(); 634 635 boolean didfetchtimeout = _map.GetIfFetchTimedOut(); 636 if (didfetchtimeout) { 637 log 638 .warn("WARNING: fetch timed out. Try increasing the blockTimeOut value"); 639 if (filteredListLength != _numChans2) { 640 log.error("DataTurbine actor Error: only " + _numChans2 641 + " channels were returned"); 642 // throw error 643 } 644 } 645 646 Double smallestDuration = null; 647 Double earliestStartTime = null; 648 for (int i = 0; i < _numChans2; i++) { 649 chanType = _map.GetType(i); 650 _filteredChanTypes[i] = _map.TypeName(chanType); 651 _filteredChanNames[i] = _map.GetName(i); 652 _filteredChanDurations[i] = _map.GetTimeDuration(i); 653 // TODO: durations apparently change after a fetch as well? 654 // do they really change to sample duration? 655 _filteredChanMimeTypes[i] = _map.GetMime(i); 656 _filteredChanStartTimes[i] = _map.GetTimeStart(i); 657 658 if (i == 0){ 659 smallestDuration = _filteredChanDurations[i]; 660 earliestStartTime = _filteredChanStartTimes[i]; 661 } 662 else{ 663 if (_filteredChanDurations[i] < smallestDuration){ 664 smallestDuration = _filteredChanDurations[i]; 665 } 666 if (_filteredChanStartTimes[i] < earliestStartTime){ 667 earliestStartTime = _filteredChanStartTimes[i]; 668 } 669 } 670 671 //System.out.println("_filteredChanNames: " + 672 //_filteredChanNames[i] + " _filteredChanType: " + 673 //_filteredChanTypes[i] + " _filteredChanDuration: " + 674 //_filteredChanDurations[i] + " _filteredChanStartTime: " + 675 //_filteredChanStartTimes[i]); 676 } 677 678 // try to be helpful, if blank, set duration & startTime to smallest & earliest 679 if (smallestDuration != null){ 680 if (((StringToken)durationPortParam.getToken()).stringValue().equals("")){ 681 durationPortParam.setToken(smallestDuration.toString()); 682 _duration = smallestDuration.toString(); 683 } 684 } 685 if (earliestStartTime != null){ 686 if (((StringToken)startTimePortParam.getToken()).stringValue().equals("")){ 687 long l = (new Double(earliestStartTime)).longValue(); 688 Date earliestDate = new Date(l*1000); 689 String startDateString = format.format(earliestDate); 690 startTimePortParam.setToken(startDateString); 691 _startTime = earliestStartTime.toString(); 692 } 693 } 694 695 } 696 697 return true; 698 } catch (Exception e) { 699 throw new IllegalActionException(this, e, 700 "Problem opening DataTurbine connection in getDataTurbineInfo()"); 701 } 702 703 } 704 705 /** 706 * Return a channel map that contains only channels with duration >=0 and 707 * that were requested. 708 * 709 * @return chanMap The ChannelMap. 710 */ 711 public ChannelMap getUserChannelMap(String[] requestedChannels) 712 throws SAPIException { 713 714 ChannelMap chanMap = new ChannelMap(); 715 716 Iterator<?> i = this.outputPortList().iterator(); 717 while (i.hasNext()) { 718 TypedIOPort port = (TypedIOPort) i.next(); 719 String outputPortName = port.getName(); 720 for (int j = 0; j < requestedChannels.length; j++) { 721 if (outputPortName.equals(requestedChannels[j])) { 722 if (outputPortName.equals(SPECIFIC_CHANNEL) 723 && _specifiedOutputChannel != null) { 724 chanMap.Add(_specifiedOutputChannel); 725 } else { 726 // if channel is already present, its current index is 727 // returned, and no other action is taken. 728 chanMap.Add(outputPortName); 729 } 730 break; 731 } 732 } 733 } 734 735 return chanMap; 736 } 737 738 /** 739 * Get the server name and port string. 740 * 741 * @return server URL 742 */ 743 public String getServer() { 744 return _url; 745 } 746 747 /** 748 * Get the name of this DataTurbine client. 749 * 750 * @return client name 751 */ 752 public String getRBNBClientName() { 753 return _rbnbClientName; 754 } 755 756 /** 757 * Output the channel names through the CHANNEL_NAMES_OUTPUT_PORT 758 * as an ArrayToken. 759 */ 760 public void outputChannelNames(){ 761 Iterator<?> j = this.outputPortList().iterator(); 762 while (j.hasNext()) { 763 TypedIOPort port = (TypedIOPort) j.next(); 764 String currPortName = port.getName(); 765 if (currPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){ 766 for (int i=0; i< _connectedOutputPortNames.length; i++){ 767 if (_connectedOutputPortNames[i].equals(CHANNEL_NAMES_OUTPUT_PORT)){ 768 String chanNames = "{"; 769 for (int h=0; h < _filteredChanNames.length; h++){ 770 if (h != _filteredChanNames.length-1){ 771 chanNames = chanNames.concat("\""+ _filteredChanNames[h] + "\"" + ","); 772 } 773 else{ 774 chanNames = chanNames.concat("\""+ _filteredChanNames[h] + "\""); 775 } 776 } 777 chanNames = chanNames.concat("}"); 778 ArrayToken at; 779 try { 780 at = new ArrayToken(chanNames); 781 port.send(0, at); 782 } catch (IllegalActionException e) { 783 // TODO Auto-generated catch block 784 e.printStackTrace(); 785 } 786 } 787 } 788 } 789 } 790 791 } 792 793 /** 794 * Push the data out the output ports. 795 * 796 * @param ChannelMap 797 */ 798 public void outputData(ChannelMap cmap) throws IllegalActionException { 799 800 // TODO: check if we should be accessing the map in outputData, or 801 // instead referencing eg _chanNames 802 803 // The RBNB data types: 804 // UNKNOWN = 0. 805 // TYPE_FROM_INPUT = 1. Take the data type from the input data payload. 806 // The resulting data type value will be one of the other types. 807 // TYPE_BOOLEAN = 2. Boolean (8-bit, one byte) 808 // TYPE_INT8 = 3. 8-bit (one byte) signed integer data type 809 // TYPE_INT16 = 4. 16-bit (two byte) signed integer data type. 810 // TYPE_INT32 = 5; 32-bit (four byte) signed integer data type. 811 // TYPE_INT64 = 6. 64-bit (eight byte) signed integer data type 812 // TYPE_FLOAT32 = 7. 32-bit (four byte) floating point data type 813 // TYPE_FLOAT64 = 8. 64-bit (eight byte) floating point data type. 814 // TYPE_STRING = 9. Character string data type. 815 // TYPE_BYTEARRAY = 10. Byte array data type. 816 // TYPE_USER = 11. User registration type. 817 818 int outputPortListSize = this.outputPortList().size(); 819 820 // When RBNB returns no channels, output NIL tokens out every port 821 if (cmap.NumberOfChannels() == 0) { 822 // System.out 823 // .println("* outputting nils * - Your request returned no channels"); 824 Iterator<?> j = this.outputPortList().iterator(); 825 while (j.hasNext()) { 826 TypedIOPort port = (TypedIOPort) j.next(); 827 //String currPortName = port.getName(); 828 // System.out.println("* outputting nils * - Your request returned no data for channel: " 829 // + currPortName); 830 outputNils(port); 831 } 832 } else { 833 // When only some channels have data, output NILs on those 834 // channels that have no data. 835 boolean[] portHasData = new boolean[outputPortListSize]; 836 Iterator<?> z = this.outputPortList().iterator(); 837 int h = -1; 838 while (z.hasNext()) { 839 h++; 840 TypedIOPort port = (TypedIOPort) z.next(); 841 String outputPortName = port.getName(); 842 portHasData[h] = false; // initialize false 843 844 int index = -1; 845 //for the specificChannel output port we use the index of the specified channel 846 if (outputPortName.equals(SPECIFIC_CHANNEL) && _specifiedOutputChannel != null){ 847 index = cmap.GetIndex(_specifiedOutputChannel); 848 } 849 else{ 850 index = cmap.GetIndex(outputPortName); 851 } 852 if (index != -1){ 853 portHasData[h] = true; 854 int chanType = cmap.GetType(index); 855 String chanTypeName = cmap.TypeName(chanType); 856 857 double[] someTimes = cmap.GetTimes(index); 858 Token someTimeTokens[] = new Token[someTimes.length]; 859 860 for (int j = 0; j < someTimeTokens.length; j++) { 861 someTimeTokens[j] = new DoubleToken(someTimes[j]); 862 } 863 864 Token someDataTokens[] = new Token[someTimes.length]; 865 Token paddedTimeTokens[] = null; 866 Token paddedDataTokens[] = null; 867 868 Vector<Object> timeAndDataVector = new Vector<Object>(); 869 Vector<Object> vectorOfArrays = new Vector<Object>(); 870 871 if (chanTypeName.equals("Float32")) { 872 float[] someData = cmap.GetDataAsFloat32(index); 873 checkLengths(someData.length, someTimes.length); 874 875 if (paddingOn && someData.length == 1) { 876 willPad = false; 877 log 878 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 879 } 880 881 if (willPad) { 882 for (int j = 0; j < someData.length; j++) { 883 timeAndDataVector.add(someTimes[j]); 884 timeAndDataVector.add(someData[j]); 885 } 886 vectorOfArrays = doPadding(timeAndDataVector, 887 chanTypeName); 888 paddedTimeTokens = (Token[]) vectorOfArrays 889 .get(0); 890 paddedDataTokens = (Token[]) vectorOfArrays 891 .get(1); 892 } else { 893 for (int j = 0; j < someData.length; j++) { 894 someDataTokens[j] = new FloatToken( 895 someData[j]); 896 } 897 } 898 899 } else if (chanTypeName.equals("Float64")) { 900 double[] someData = cmap.GetDataAsFloat64(index); 901 checkLengths(someData.length, someTimes.length); 902 if (paddingOn && someData.length == 1) { 903 willPad = false; 904 log 905 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 906 } 907 908 if (willPad) { 909 for (int j = 0; j < someData.length; j++) { 910 timeAndDataVector.add(someTimes[j]); 911 timeAndDataVector.add(someData[j]); 912 } 913 vectorOfArrays = doPadding(timeAndDataVector, 914 chanTypeName); 915 paddedTimeTokens = (Token[]) vectorOfArrays 916 .get(0); 917 paddedDataTokens = (Token[]) vectorOfArrays 918 .get(1); 919 } else { 920 for (int j = 0; j < someData.length; j++) { 921 someDataTokens[j] = new DoubleToken( 922 someData[j]); 923 } 924 } 925 } else if (chanTypeName.equals("String")) { 926 String[] someData = cmap.GetDataAsString(index); 927 checkLengths(someData.length, someTimes.length); 928 if (paddingOn && someData.length == 1) { 929 willPad = false; 930 log 931 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 932 } 933 934 if (willPad) { 935 for (int j = 0; j < someData.length; j++) { 936 timeAndDataVector.add(someTimes[j]); 937 timeAndDataVector.add(someData[j]); 938 } 939 vectorOfArrays = doPadding(timeAndDataVector, 940 chanTypeName); 941 paddedTimeTokens = (Token[]) vectorOfArrays 942 .get(0); 943 paddedDataTokens = (Token[]) vectorOfArrays 944 .get(1); 945 } else { 946 String userInfoType = _userInfoTypesMap.get(outputPortName); 947 for (int j = 0; j < someData.length; j++) { 948 // see if the user info field contained a ptolemy type 949 // if not, then put into string token 950 if(userInfoType == null) { 951 someDataTokens[j] = new StringToken( 952 someData[j]); 953 } else if(userInfoType.equals("[int]")) { 954 someDataTokens[j] = new IntMatrixToken(someData[j]); 955 } else if(userInfoType.equals("[double]")) { 956 someDataTokens[j] = new DoubleMatrixToken(someData[j]); 957 } else { 958 MessageHandler.error("Channel " + outputPortName + 959 " has unsupported ptolemy type: " + userInfoType); 960 } 961 } 962 } 963 } else if (chanTypeName.equals("Int8")) { 964 byte[] someData = cmap.GetDataAsInt8(index); 965 checkLengths(someData.length, someTimes.length); 966 if (paddingOn && someData.length == 1) { 967 willPad = false; 968 log 969 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 970 } 971 972 if (willPad) { 973 for (int j = 0; j < someData.length; j++) { 974 timeAndDataVector.add(someTimes[j]); 975 timeAndDataVector.add(someData[j]); 976 } 977 vectorOfArrays = doPadding(timeAndDataVector, 978 chanTypeName); 979 paddedTimeTokens = (Token[]) vectorOfArrays 980 .get(0); 981 paddedDataTokens = (Token[]) vectorOfArrays 982 .get(1); 983 } else { 984 for (int j = 0; j < someData.length; j++) { 985 someDataTokens[j] = new UnsignedByteToken( 986 someData[j]); 987 } 988 } 989 } else if (chanTypeName.equals("Int16")) { 990 short[] someData = cmap.GetDataAsInt16(index); 991 checkLengths(someData.length, someTimes.length); 992 if (paddingOn && someData.length == 1) { 993 willPad = false; 994 log 995 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 996 } 997 998 if (willPad) { 999 for (int j = 0; j < someData.length; j++) { 1000 timeAndDataVector.add(someTimes[j]); 1001 timeAndDataVector.add(someData[j]); 1002 } 1003 vectorOfArrays = doPadding(timeAndDataVector, 1004 chanTypeName); 1005 paddedTimeTokens = (Token[]) vectorOfArrays 1006 .get(0); 1007 paddedDataTokens = (Token[]) vectorOfArrays 1008 .get(1); 1009 } else { 1010 for (int j = 0; j < someData.length; j++) { 1011 someDataTokens[j] = new ShortToken( 1012 someData[j]); 1013 } 1014 } 1015 } else if (chanTypeName.equals("Int32")) { 1016 int[] someData = cmap.GetDataAsInt32(index); 1017 checkLengths(someData.length, someTimes.length); 1018 if (paddingOn && someData.length == 1) { 1019 willPad = false; 1020 log 1021 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 1022 } 1023 1024 if (willPad) { 1025 for (int j = 0; j < someData.length; j++) { 1026 timeAndDataVector.add(someTimes[j]); 1027 timeAndDataVector.add(someData[j]); 1028 } 1029 vectorOfArrays = doPadding(timeAndDataVector, 1030 chanTypeName); 1031 paddedTimeTokens = (Token[]) vectorOfArrays 1032 .get(0); 1033 paddedDataTokens = (Token[]) vectorOfArrays 1034 .get(1); 1035 } else { 1036 for (int j = 0; j < someData.length; j++) { 1037 someDataTokens[j] = new IntToken( 1038 someData[j]); 1039 } 1040 } 1041 } else if (chanTypeName.equals("Int64")) { 1042 long[] someData = cmap.GetDataAsInt64(index); 1043 checkLengths(someData.length, someTimes.length); 1044 if (paddingOn && someData.length == 1) { 1045 willPad = false; 1046 log 1047 .warn("DataTurbine actor - Warning: cannot pad. Need at least 2 samples, your request returned 1."); 1048 } 1049 1050 if (willPad) { 1051 for (int j = 0; j < someData.length; j++) { 1052 timeAndDataVector.add(someTimes[j]); 1053 timeAndDataVector.add(someData[j]); 1054 } 1055 vectorOfArrays = doPadding(timeAndDataVector, 1056 chanTypeName); 1057 paddedTimeTokens = (Token[]) vectorOfArrays 1058 .get(0); 1059 paddedDataTokens = (Token[]) vectorOfArrays 1060 .get(1); 1061 } else { 1062 for (int j = 0; j < someData.length; j++) { 1063 someDataTokens[j] = new LongToken( 1064 someData[j]); 1065 } 1066 } 1067 } 1068 if (!chanTypeName.equals("ByteArray")) { 1069 // System.out 1070 // .println("about to send dataTokens out of port: " 1071 // + currPortName); 1072 try { 1073 if (outputRecOfArrays 1074 .equals(RECORD_OF_2_ARRAYS)) { 1075 1076 if (willPad) { 1077 ArrayToken atSomeTimeTokens = new ArrayToken( 1078 paddedTimeTokens); 1079 ArrayToken atSomeDataTokens = new ArrayToken( 1080 paddedDataTokens); 1081 Token[] dataValues = { 1082 atSomeTimeTokens, 1083 atSomeDataTokens }; 1084 RecordToken recToken = new RecordToken( 1085 labels, dataValues); 1086 port.send(0, recToken); 1087 } else { 1088 ArrayToken atSomeTimeTokens = new ArrayToken( 1089 someTimeTokens); 1090 ArrayToken atSomeDataTokens = new ArrayToken( 1091 someDataTokens); 1092 Token[] dataValues = { 1093 atSomeTimeTokens, 1094 atSomeDataTokens }; 1095 RecordToken recToken = new RecordToken( 1096 labels, dataValues); 1097 port.send(0, recToken); 1098 } 1099 } else { 1100 1101 if (willPad) { 1102 RecordToken[] recTokens = new RecordToken[paddedTimeTokens.length]; 1103 for (int k = 0; k < paddedTimeTokens.length; k++) { 1104 values[0] = paddedTimeTokens[k]; 1105 values[1] = paddedDataTokens[k]; 1106 recTokens[k] = new RecordToken( 1107 labels, values); 1108 } 1109 ArrayToken at = new ArrayToken( 1110 recTokens); 1111 port.send(0, at); 1112 } else { 1113 RecordToken[] recTokens = new RecordToken[someTimeTokens.length]; 1114 for (int k = 0; k < someTimeTokens.length; k++) { 1115 values[0] = someTimeTokens[k]; 1116 values[1] = someDataTokens[k]; 1117 recTokens[k] = new RecordToken( 1118 labels, values); 1119 } 1120 ArrayToken at = new ArrayToken( 1121 recTokens); 1122 port.send(0, at); 1123 } 1124 } 1125 } catch (Exception e) { 1126 throw new IllegalActionException(this, e, 1127 "Exception trying to send dataTokens" 1128 + " out of port: " 1129 + outputPortName); 1130 1131 } 1132 } else if (chanTypeName.equals("ByteArray")) { 1133 // TODO: padding for byte channel 1134 byte[][] someData = cmap.GetDataAsByteArray(index); 1135 Token[][] someByteDataTokens = new Token[someData.length][]; 1136 checkLengths(someData.length, someTimes.length); 1137 1138 for (int j = 0; j < someData.length; j++) { 1139 someByteDataTokens[j] = new Token[someData[j].length]; 1140 // System.out.println("debug Tokens: someData[" 1141 // + j + "]: " + someData[j] + " someTimes[" + j 1142 // + "]: " +someTimes[j]); 1143 for (int k = 0; k < someData[j].length; k++) { 1144 try { 1145 someByteDataTokens[j][k] = new UnsignedByteToken( 1146 someData[j][k]); 1147 } catch (Exception e) { 1148 throw new IllegalActionException(this, 1149 e, 1150 "Exception trying to create and assign an UnsignedByteToken."); 1151 } 1152 } 1153 } 1154 try { 1155 if (outputRecOfArrays 1156 .equals(RECORD_OF_2_ARRAYS)) { 1157 ArrayToken[] atSomeDataTokens = new ArrayToken[someTimeTokens.length]; 1158 for (int k = 0; k < someTimeTokens.length; k++) { 1159 atSomeDataTokens[k] = new ArrayToken( 1160 someByteDataTokens[k]); 1161 } 1162 ArrayToken atSomeTimeTokens = new ArrayToken( 1163 someTimeTokens); 1164 ArrayToken atatSomeDataTokens = new ArrayToken( 1165 atSomeDataTokens); 1166 Token[] dataValues = { atSomeTimeTokens, 1167 atatSomeDataTokens }; 1168 RecordToken recToken = new RecordToken( 1169 labels, dataValues); 1170 port.send(0, recToken); 1171 } else { 1172 RecordToken[] recTokens = new RecordToken[someTimeTokens.length]; 1173 for (int k = 0; k < someTimeTokens.length; k++) { 1174 values[0] = someTimeTokens[k]; 1175 values[1] = new ArrayToken( 1176 someByteDataTokens[k]); 1177 recTokens[k] = new RecordToken(labels, 1178 values); 1179 } 1180 ArrayToken at = new ArrayToken(recTokens); 1181 port.send(0, at); 1182 } 1183 } catch (Exception e) { 1184 throw new IllegalActionException(this, e, 1185 "Exception trying to send out port 0."); 1186 } 1187 } 1188 } 1189 if (!portHasData[h]) { 1190 if (!outputPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){ 1191 //System.out.println("* outputting nils * - Your request returned no data for channel: " 1192 // + currPortName); 1193 outputNils(port); 1194 } 1195 } 1196 } // end while z 1197 1198 } 1199 1200 } // outputData() 1201 1202 /** 1203 * 1204 * @param timeAndDataVector 1205 * @param chanTypeName 1206 * @return 1207 */ 1208 public Vector<Object> doPadding(Vector<Object> timeAndDataVector, 1209 String chanTypeName) { 1210 1211 // System.out.println("Will try to pad any gaps..."); 1212 1213 Token paddedTimeTokens[] = null; 1214 Token paddedDataTokens[] = null; 1215 1216 timeAndDataVector = fillGaps(timeAndDataVector); 1217 paddedTimeTokens = new Token[timeAndDataVector.size() / 2]; 1218 paddedDataTokens = new Token[timeAndDataVector.size() / 2]; 1219 1220 int itr = -1; 1221 for (int j = 0; j < timeAndDataVector.size() - 1; j += 2) { 1222 itr++; 1223 paddedTimeTokens[itr] = new DoubleToken((Double) timeAndDataVector 1224 .get(j)); 1225 1226 // TODO: surely there's a better way of doing this: 1227 String t = timeAndDataVector.get(j + 1).toString(); 1228 if (t.equals("nil")) { 1229 paddedDataTokens[itr] = Token.NIL; 1230 } else { 1231 if (chanTypeName.equals("Float32")) { 1232 paddedDataTokens[itr] = new FloatToken( 1233 (Float) timeAndDataVector.get(j + 1)); 1234 } else if (chanTypeName.equals("Float64")) { 1235 paddedDataTokens[itr] = new DoubleToken( 1236 (Double) timeAndDataVector.get(j + 1)); 1237 } else if (chanTypeName.equals("String")) { 1238 paddedDataTokens[itr] = new StringToken( 1239 (String) timeAndDataVector.get(j + 1)); 1240 } else if (chanTypeName.equals("Int8")) { 1241 paddedDataTokens[itr] = new UnsignedByteToken( 1242 (Byte) timeAndDataVector.get(j + 1)); 1243 } else if (chanTypeName.equals("Int16")) { 1244 paddedDataTokens[itr] = new ShortToken( 1245 (Short) timeAndDataVector.get(j + 1)); 1246 } else if (chanTypeName.equals("Int32")) { 1247 paddedDataTokens[itr] = new IntToken( 1248 (Integer) timeAndDataVector.get(j + 1)); 1249 } else if (chanTypeName.equals("Int64")) { 1250 paddedDataTokens[itr] = new LongToken( 1251 (Long) timeAndDataVector.get(j + 1)); 1252 } else { 1253 log 1254 .error("DataTurbine Actor: ERROR. padding for this datatype not yet implemented!"); 1255 } 1256 } 1257 } 1258 1259 Vector<Object> vectorOfArrays = new Vector<Object>(); 1260 vectorOfArrays.add(paddedTimeTokens); 1261 vectorOfArrays.add(paddedDataTokens); 1262 1263 return vectorOfArrays; 1264 } 1265 1266 public Vector<Object> fillGaps(Vector<Object> timeAndDataVector) { 1267 1268 long sampleInterval = guessSamplingRate(timeAndDataVector); 1269 1270 // TODO: track down adcp binary chan bug (unless fixed, will require an 1271 // if here on the +1) 1272 1273 // note: duration is number of seconds that might contain data, not 1274 // number of data points. 1275 1276 double durationDouble = (double) _durationDouble; 1277 double sampleIntervalDouble = (double) sampleInterval; 1278 // double firstTimeSample = (Double) timeAndDataVector.get(0); 1279 // double missingFront = firstTimeSample - _startTimeDouble; 1280 1281 double dnumSamples = durationDouble / sampleIntervalDouble; 1282 // TODO: double check this round 1283 int numSamples = (int) Math.round(dnumSamples); 1284 if (sampleIntervalDouble > durationDouble) { 1285 log 1286 .error("DataTurbine Actor: ERROR: sampleIntervalDouble > durationDouble, something went wrong"); 1287 } 1288 1289 // System.out.println("durationDouble: " + durationDouble + 1290 // " sampleIntervalDouble: " + sampleIntervalDouble + " numSamples:"+ 1291 // numSamples); 1292 1293 Vector<Object> paddedTimeAndDataVector = null; 1294 1295 if (numSamples != timeAndDataVector.size() / 2) { 1296 // System.out.println(numSamples + " != " + 1297 // timeAndDataVector.size()/2); 1298 int numMissing = numSamples - (timeAndDataVector.size() / 2); 1299 log 1300 .debug("DataTurbine Actor: Padding - requested timeslice missing " 1301 + numMissing 1302 + " samples, padding using sampleInterval of majority:" 1303 + sampleInterval + "sec"); 1304 1305 // it will get larger 1306 paddedTimeAndDataVector = new Vector<Object>(numSamples * 2); 1307 paddedTimeAndDataVector = padGaps(sampleInterval, numMissing, 1308 timeAndDataVector); 1309 1310 return paddedTimeAndDataVector; 1311 } 1312 1313 log.debug("DataTurbine Actor: No need to pad"); 1314 return timeAndDataVector; 1315 } 1316 1317 // Guess Sampling Rate 1318 // 1319 // TODO: fix math - this will not work for faster than 1sps sampling! 1320 // 1321 public long guessSamplingRate(Vector<Object> timeAndDataVector) { 1322 1323 Map hashmap = new HashMap(); // hash table 1324 double timesDifference; 1325 long roundedTimesDiff; // used for rounded value 1326 boolean keyExists; 1327 for (int j = 0; j < timeAndDataVector.size() - 2; j += 2) { 1328 timesDifference = (Double) timeAndDataVector.get(j + 2) 1329 - (Double) timeAndDataVector.get(j); 1330 roundedTimesDiff = Math.round(timesDifference); 1331 // System.out.println("timesDifference:" + timesDifference + 1332 // "roundedTimesDiff:" + roundedTimesDiff); 1333 keyExists = hashmap.containsKey(roundedTimesDiff); 1334 if (keyExists) { 1335 int hashValue = (Integer) hashmap.get(roundedTimesDiff); 1336 hashmap.put(Long.valueOf(roundedTimesDiff), Integer 1337 .valueOf(++hashValue)); 1338 hashValue = (Integer) hashmap.get(roundedTimesDiff); 1339 } else { 1340 hashmap.put(Long.valueOf(roundedTimesDiff), Integer.valueOf(1)); 1341 int hashValue = (Integer) hashmap.get(roundedTimesDiff); 1342 } 1343 } 1344 1345 int maxHashValue = 0; 1346 long keyOfMax = 0l; 1347 Iterator hashItr = hashmap.keySet().iterator(); 1348 while (hashItr.hasNext()) { 1349 long hashKey = (Long) hashItr.next(); 1350 int hashValue = (Integer) hashmap.get(hashKey); 1351 if (hashValue > maxHashValue) { 1352 maxHashValue = hashValue; 1353 keyOfMax = hashKey; 1354 } 1355 // System.out.println(""+hashKey+ " => "+hashValue + "times"); 1356 } 1357 long sampleInterval = keyOfMax; 1358 1359 return sampleInterval; 1360 } 1361 1362 public Vector<Object> padGaps(long sampleInterval, int numMissing, 1363 Vector<Object> timeAndDataVector) { 1364 // int newLength = timeAndDataVector.size() + numMissing * 2; 1365 1366 double time, fakeTimeStamp; 1367 1368 double firstTimeStamp = (Double) timeAndDataVector.get(0); 1369 double lastTimeStamp = (Double) timeAndDataVector.get(timeAndDataVector 1370 .size() - 2); 1371 // double firstPossibleTimeStamp = _startTimeDouble; 1372 double lastPossibleTimeStamp = _startTimeDouble + _durationDouble; 1373 1374 // System.out.println("firstPossibleTimeStamp" + firstPossibleTimeStamp 1375 // + " firstTimeStamp " + firstTimeStamp + 1376 // " lastTimeStamp: " + lastTimeStamp + " lastPossibleTimeStamp: " + 1377 // lastPossibleTimeStamp); 1378 1379 // insert - fill any gaps between first and last obtained samples 1380 int itr = -1; 1381 log.debug("padGaps numMissing:"+numMissing); 1382 for (int i = 0; i < timeAndDataVector.size(); i += 2) { // dynamic end 1383 // condition 1384 itr++; 1385 fakeTimeStamp = firstTimeStamp + sampleInterval * itr; 1386 time = (Double) timeAndDataVector.get(i); 1387 if (fakeTimeStamp != time) { 1388 // System.out.println("inserting fakeTimeStamp: " + 1389 // fakeTimeStamp); 1390 timeAndDataVector.add(i, Token.NIL); 1391 timeAndDataVector.add(i, fakeTimeStamp); 1392 numMissing--; 1393 if (numMissing < 0) { 1394 log 1395 .error("DataTurbine Actor Error: tried to insert too many samples"); 1396 } 1397 } 1398 } 1399 1400 // append - if needed, fill between last obtained sample and last 1401 // possible time 1402 if (numMissing != 0) { 1403 itr = 1; 1404 fakeTimeStamp = lastTimeStamp + sampleInterval * itr; 1405 while (fakeTimeStamp <= lastPossibleTimeStamp) { 1406 // System.out.println("appending fakeTimeStamp: " + 1407 // fakeTimeStamp); 1408 timeAndDataVector.add(fakeTimeStamp); 1409 timeAndDataVector.add(Token.NIL); 1410 numMissing--; 1411 1412 itr++; 1413 fakeTimeStamp = lastTimeStamp + sampleInterval * itr; 1414 } 1415 } 1416 1417 // prepend - if needed, fill between first possible time to first 1418 // obtained time 1419 itr = 0; 1420 while (numMissing > 0) { 1421 itr++; 1422 fakeTimeStamp = firstTimeStamp - sampleInterval * itr; 1423 if (fakeTimeStamp < _startTimeDouble) { // sanity-check 1424 log 1425 .error("ERROR! Tried to prepend before requested start time, something wrong!"); 1426 // throw exception 1427 numMissing = 0; 1428 } else { 1429 // System.out.println("prepending fakeTimeStamp: " + 1430 // fakeTimeStamp); 1431 timeAndDataVector.add(0, Token.NIL); 1432 timeAndDataVector.add(0, fakeTimeStamp); 1433 numMissing--; 1434 // sanity check: 1435 if (numMissing == 0) { 1436 double tmp = fakeTimeStamp - sampleInterval; 1437 // System.out.println("fakeTimeStamp - sampleInterval " + 1438 // tmp + " _startTimeDouble " + _startTimeDouble); 1439 } 1440 } 1441 } 1442 1443 return timeAndDataVector; 1444 } 1445 1446 /** 1447 * Output nils for a given port. 1448 * 1449 * @param TypedIOPort 1450 * port 1451 */ 1452 public void outputNils(TypedIOPort port) { 1453 1454 Type currPortType = port.getType(); 1455 // String currPortName = port.getName(); 1456 Token[] nilTokenArray = new Token[1]; 1457 nilTokenArray[0] = Token.NIL; 1458 1459 RecordToken[] recTokens = new RecordToken[1]; 1460 1461 try { 1462 1463 ArrayToken atSomeTimeTokens = new ArrayToken(nilTokenArray); 1464 1465 // hardcode: 1466 if ((currPortType.toString().contains(" = arrayType(unsignedByte)") && outputRecOfArrays 1467 .equals(ARRAY_OF_X_RECORDS)) 1468 || (currPortType.toString().contains( 1469 " = arrayType(arrayType(unsignedByte))") && outputRecOfArrays 1470 .equals(RECORD_OF_2_ARRAYS))) { 1471 1472 // System.out.println("about to send * NIL dataTokens * out of BYTE port: "+currPortName); 1473 if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) { 1474 ArrayToken[] atSomeDataTokens = new ArrayToken[1]; 1475 atSomeDataTokens[0] = new ArrayToken(nilTokenArray); 1476 ArrayToken atatSomeDataTokens = new ArrayToken( 1477 atSomeDataTokens); 1478 values[0] = atSomeTimeTokens; 1479 values[1] = atatSomeDataTokens; 1480 RecordToken recToken = new RecordToken(labels, values); 1481 port.send(0, recToken); 1482 } else { 1483 values[0] = Token.NIL; 1484 values[1] = new ArrayToken(nilTokenArray); 1485 recTokens[0] = new RecordToken(labels, values); 1486 ArrayToken at = new ArrayToken(recTokens); 1487 port.send(0, at); 1488 } 1489 } else { 1490 1491 // System.out.println("about to send * NIL dataTokens * out of port: "+currPortName); 1492 if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) { 1493 ArrayToken atSomeDataTokens = new ArrayToken(nilTokenArray); 1494 values[0] = atSomeTimeTokens; 1495 values[1] = atSomeDataTokens; 1496 RecordToken recToken = new RecordToken(labels, values); 1497 port.send(0, recToken); 1498 } else { 1499 values[0] = Token.NIL; 1500 values[1] = Token.NIL; 1501 recTokens[0] = new RecordToken(labels, values); 1502 ArrayToken at = new ArrayToken(recTokens); 1503 port.send(0, at); 1504 } 1505 } 1506 } catch (Exception e) { 1507 log.error("DataTurbine actor: exception trying to sendout port 0" 1508 + e); 1509 } 1510 } 1511 1512 /** 1513 * Compare two array lengths. 1514 * 1515 * @param int arraylength1, int arraylength2 1516 * @throws IllegalActionException 1517 */ 1518 void checkLengths(int dataLength, int timesLength) 1519 throws IllegalActionException { 1520 if (dataLength != timesLength) { 1521 throw new IllegalActionException( 1522 this, 1523 "Error. someData.length != someTimes.length, This shouldn't be. Check your DataTurbine server data"); 1524 } 1525 } 1526 1527 /** 1528 * Post fire the actor. Return false to indicated that the process has 1529 * finished. If it returns true, the process will continue indefinitely. 1530 * public boolean postfire() throws IllegalActionException { if 1531 * (_sinkMode.equals(SINKMODE_MONITOR) ){ return true; } else { return false; } } 1532 */ 1533 1534 /** 1535 * Open connection to DataTurbine server. 1536 * 1537 * @throws SAPIException 1538 * @throws IllegalActionException 1539 * 1540 */ 1541 public boolean openDataTurbine() throws SAPIException, IllegalActionException { 1542 try { 1543 if (getServer() != null && !getServer().equals("") 1544 && getRBNBClientName() != null && !getRBNBClientName().equals("")){ 1545 if (_sink.VerifyConnection()) { 1546 return true; 1547 } 1548 log.debug("open DataTurbine connection. OpenRBNBConnection(" + 1549 getServer() + "," + getRBNBClientName() + ")"); 1550 _sink.OpenRBNBConnection(getServer(), getRBNBClientName()); 1551 if (!_sink.VerifyConnection()) { 1552 log.error("Error trying to open DataTurbine connection"); 1553 return false; 1554 } 1555 return true; 1556 } 1557 } catch (SAPIException sap) { 1558 log.warn("\nFailed to connect to "+getServer()+"\n"+ 1559 "Verify server URL. Enter empty string for none.\n"); 1560 throw new SAPIException("\nFailed to connect to "+getServer()+"\n"+ 1561 "Verify server URL. Enter empty string for none.\n"); 1562 } 1563 return false; 1564 } 1565 1566 /** 1567 * Reconfigure actor when certain attributes change. 1568 * 1569 * @param attribute 1570 * The changed Attribute. 1571 * @throws ptolemy.kernel.util.IllegalActionException 1572 * 1573 */ 1574 @Override 1575 public void attributeChanged(ptolemy.kernel.util.Attribute attribute) 1576 throws ptolemy.kernel.util.IllegalActionException { 1577 1578 boolean outputChannelChanged = false; 1579 1580 if (attribute == dataTurbineAddressInputParam) { 1581 if (dataTurbineAddressInputParam != null && dataTurbineAddressInputParam.getToken() != null){ 1582 String tmpUrl = ((StringToken)dataTurbineAddressInputParam.getToken()).stringValue(); 1583 tmpUrl = tmpUrl.replaceAll("\"", ""); 1584 1585 if (!_url.equals(tmpUrl)) { 1586 _url = tmpUrl; 1587 reload = true; 1588 } 1589 } 1590 } 1591 else if (attribute == outputChannelPortParam){ 1592 if (outputChannelPortParam != null && outputChannelPortParam.getToken() != null) { 1593 1594 String tmpSpecifiedOutputChannel = ((StringToken) outputChannelPortParam 1595 .getToken()).stringValue(); 1596 if ((tmpSpecifiedOutputChannel != null && 1597 !_specifiedOutputChannel.equals(tmpSpecifiedOutputChannel))) { 1598 _specifiedOutputChannel = tmpSpecifiedOutputChannel; 1599 outputChannelChanged = true; 1600 1601 } 1602 } 1603 } else if (attribute == outputRecordOfArrays) { 1604 String tmpoutputRecOfArrays = ((StringToken)outputRecordOfArrays.getToken()).stringValue(); 1605 if (tmpoutputRecOfArrays != null && !outputRecOfArrays.equals(tmpoutputRecOfArrays)) { 1606 reload = true; 1607 outputRecOfArrays = tmpoutputRecOfArrays; 1608 1609 } 1610 } else if (attribute == tryToPad) { 1611 boolean tmpPaddingOn = ((BooleanToken) tryToPad.getToken()) 1612 .booleanValue(); 1613 1614 if (paddingOn != tmpPaddingOn) { 1615 paddingOn = tmpPaddingOn; 1616 willPad = paddingOn; 1617 } 1618 } else{ 1619 super.attributeChanged(attribute); 1620 } 1621 1622 if(outputChannelChanged) { 1623 getDataTurbineInfo(); 1624 } 1625 else if (reload) { 1626 try { 1627 reload = false; 1628 log.debug("Fetching info from DataTurbine: " + _url); 1629 // first make sure we can connect 1630 // if already connected to a server, disconnect first 1631 log.debug("disconnect from DataTurbine"); 1632 _sink.CloseRBNBConnection(); 1633 boolean opened = openDataTurbine(); 1634 if (opened){ 1635 boolean gotInfo = getDataTurbineInfo(); 1636 if (gotInfo){ 1637 configureOutputPorts(null); 1638 } 1639 } 1640 else{ 1641 // should only occur when the user manually changes to blank 1642 // (and not also during instantiation). 1643 if (attribute == dataTurbineAddressInputParam){ 1644 removeAllOutputPorts(); 1645 } 1646 } 1647 } 1648 catch (SAPIException e) { 1649 e.printStackTrace(); 1650 throw new IllegalActionException("\nFailed to connect to "+getServer()+"\n"+ 1651 "Verify server URL. Enter empty string for none.\n"); 1652 } 1653 catch (Exception e) { 1654 e.printStackTrace(); 1655 throw new IllegalActionException(this, e, 1656 "Error opening DataTurbine connection."); 1657 } 1658 } 1659 } 1660 1661 /** Clone the actor into the specified workspace. */ 1662 @Override 1663 public Object clone(Workspace workspace) throws CloneNotSupportedException { 1664 DataTurbine newObject = (DataTurbine) super.clone(workspace); 1665 newObject._icon = null; 1666 newObject._map = null; 1667 newObject._registrationMap = null; 1668 newObject._sink = null; 1669 newObject._stopRequested = new AtomicBoolean(false); 1670 newObject._userInfoTypesMap = new HashMap<String,String>(); 1671 newObject.values = new Token[labels.length]; 1672 return newObject; 1673 } 1674 1675 /** 1676 * Remove all existing output ports. 1677 * 1678 * @throws ptolemy.kernel.util.IllegalActionException 1679 */ 1680 private void removeAllOutputPorts() throws IllegalActionException { 1681 Iterator<?> i = this.outputPortList().iterator(); 1682 while (i.hasNext()) { 1683 TypedIOPort port = (TypedIOPort) i.next(); 1684 String currPortName = port.getName(); 1685 try { 1686 port.setContainer(null); 1687 } catch (Exception ex) { 1688 throw new IllegalActionException(this, ex, 1689 "Error removing port: " + currPortName); 1690 } 1691 } 1692 } 1693 1694 /** 1695 * Remove all ports with names not in the selected vector. 1696 * @param nonRemovePortName 1697 * @throws IllegalActionException 1698 */ 1699 void removeOtherOutputPorts(Collection<?> nonRemovePortName) 1700 throws IllegalActionException { 1701 // Use toArray() to make a deep copy of this.portList(). 1702 // Do this to prevent ConcurrentModificationExceptions. 1703 1704 TypedIOPort[] l = new TypedIOPort[0]; 1705 l = (TypedIOPort[]) this.portList().toArray(l); 1706 1707 for (int i = 0; i < l.length; i++) { 1708 TypedIOPort port = l[i]; 1709 if (port == null || port.isInput()) { 1710 continue; 1711 } 1712 String currPortName = port.getName(); 1713 if (!nonRemovePortName.contains(currPortName)) { 1714 try { 1715 port.setContainer(null); 1716 } catch (Exception ex) { 1717 throw new IllegalActionException(this, ex, 1718 "Error removing port: " + currPortName); 1719 } 1720 } 1721 } 1722 } 1723 1724 /** 1725 * Reconfigure output ports. 1726 * 1727 * Some channels may have a period in their name. Ptolemy does not allow 1728 * ports to have periods in their name, so replace . with PERIOD 1729 * 1730 * @param requestedChanNames 1731 * A list of channels from which the actor will request data, and 1732 * will therefore need output ports for. If null, use all 1733 * filteredChanNames. 1734 * @throws ptolemy.kernel.util.IllegalActionException 1735 * 1736 */ 1737 private void configureOutputPorts(String[] requestedChanNames) 1738 throws IllegalActionException { 1739 1740 Vector<String> portsToKeep = new Vector<String>(); 1741 1742 // add the output port that simply outputs all filtered 1743 // channel names 1744 addPort(CHANNEL_NAMES_OUTPUT_PORT, "String"); 1745 portsToKeep.add(CHANNEL_NAMES_OUTPUT_PORT); 1746 1747 // add the specific channel output port 1748 addPort(SPECIFIC_CHANNEL,"String"); 1749 portsToKeep.add(SPECIFIC_CHANNEL); 1750 1751 if (_filteredChanNames.length != _filteredChanTypes.length) { 1752 throw new IllegalActionException(this, 1753 "ERROR filteredNames.length:" + _filteredChanNames.length 1754 + " and filteredTypes.length:" 1755 + _filteredChanTypes.length + " need to match!"); 1756 } 1757 1758 for (int z = 0; z < _filteredChanNames.length; z++) { 1759 if (_filteredChanNames[z] == null) { 1760 continue; 1761 } 1762 if (requestedChanNames == null) { 1763 addPort(_filteredChanNames[z].replaceAll("\\.", "PERIOD"), 1764 _filteredChanTypes[z]); 1765 portsToKeep.add(_filteredChanNames[z].replaceAll("\\.", "PERIOD")); 1766 } else { 1767 for (int i = 0; i < requestedChanNames.length; i++) { 1768 if (_filteredChanNames[z].equals(requestedChanNames[i])) { 1769 addPort(_filteredChanNames[z].replaceAll("\\.", "PERIOD"), 1770 _filteredChanTypes[z]); 1771 portsToKeep.add(_filteredChanNames[z].replaceAll("\\.", 1772 "PERIOD")); 1773 break; 1774 } 1775 } 1776 } 1777 } 1778 1779 removeOtherOutputPorts(portsToKeep); 1780 } 1781 1782 /** 1783 * Add an output port to DataTurbine actor if it does not already exist and 1784 * set the type appropriately. 1785 * 1786 * @param aPortName 1787 * The name of the port. 1788 * @param aPortType 1789 * The type of the port. 1790 * 1791 */ 1792 private void addPort(String aPortName, String rbnbType) { 1793 try { 1794 TypedIOPort port = (TypedIOPort) this.getPort(aPortName); 1795 boolean aIsNew = (port == null); 1796 if (aIsNew) { 1797 port = new TypedIOPort(this, aPortName, false, true); 1798 } 1799 setPortType(aPortName, rbnbType, port); 1800 } catch (Exception e) { 1801 log.error("DataTurbine Error. Trouble making port: " + aPortName 1802 + " with type: " + rbnbType); 1803 } 1804 } 1805 1806 /** 1807 * Set the output port type based on outputRecOfArrays boolean 1808 * 1809 * @param aPortName 1810 * The name of the port. 1811 * @param aPortType 1812 * The type of the port. 1813 * 1814 */ 1815 private void setPortType(String aPortName, String rbnbType, TypedIOPort port) { 1816 1817 Type[] types = new Type[labels.length]; 1818 1819 String rbnbUserInfoType = _userInfoTypesMap.get(aPortName); 1820 1821 //special case for output port that simply outputs channel names 1822 if (aPortName.equals(CHANNEL_NAMES_OUTPUT_PORT)){ 1823 ArrayType at = new ArrayType(BaseType.STRING); 1824 port.setTypeEquals(at); 1825 } 1826 else if (outputRecOfArrays.equals(RECORD_OF_2_ARRAYS)) { 1827 // System.out.println("Setting port " + aPortName + 1828 // " to send out a Record of arrays"); 1829 1830 types[0] = new ArrayType(BaseType.DOUBLE); 1831 1832 if(rbnbUserInfoType != null) { 1833 types[1] = new ArrayType(BaseType.forName(rbnbUserInfoType)); 1834 } else if (rbnbType.equals("Float32")) { 1835 types[1] = new ArrayType(BaseType.FLOAT); 1836 } else if (rbnbType.equals("Float64")) { 1837 types[1] = new ArrayType(BaseType.DOUBLE); 1838 } else if (rbnbType.equals("String")) { 1839 types[1] = new ArrayType(BaseType.STRING); 1840 } else if (rbnbType.equals("Int8")) { 1841 types[1] = new ArrayType(BaseType.UNSIGNED_BYTE); 1842 } else if (rbnbType.equals("Int16")) { 1843 types[1] = new ArrayType(BaseType.SHORT); 1844 } else if (rbnbType.equals("Int32")) { 1845 types[1] = new ArrayType(BaseType.INT); 1846 } else if (rbnbType.equals("Int64")) { 1847 types[1] = new ArrayType(BaseType.LONG); 1848 } else if (rbnbType.equals("Unknown")) { 1849 types[1] = new ArrayType(BaseType.UNKNOWN); 1850 } else if (rbnbType.equals("ByteArray")) { 1851 types[1] = new ArrayType(new ArrayType(BaseType.UNSIGNED_BYTE)); 1852 } 1853 else if (rbnbType.equals("User")) { 1854 System.out.println("type is User for " + aPortName); 1855 } else { 1856 log.error("DataTurbine actor Error: trouble making port: " 1857 + aPortName + " unhandled DataTurbine type"); 1858 } 1859 RecordType rt = new RecordType(labels, types); 1860 port.setTypeEquals(rt); 1861 } else { 1862 1863 types[0] = BaseType.DOUBLE; 1864 1865 // System.out.println("Setting " + aPortName + 1866 // " to send out an Array of records"); 1867 if(rbnbUserInfoType != null) { 1868 types[1] = BaseType.forName(rbnbUserInfoType); 1869 } else if (rbnbType.equals("Float32")) { 1870 types[1] = BaseType.FLOAT; 1871 } else if (rbnbType.equals("Float64")) { 1872 types[1] = BaseType.DOUBLE; 1873 } else if (rbnbType.equals("String")) { 1874 types[1] = BaseType.STRING; 1875 } else if (rbnbType.equals("Int8")) { 1876 types[1] = BaseType.UNSIGNED_BYTE; 1877 } else if (rbnbType.equals("Int16")) { 1878 types[1] = BaseType.SHORT; 1879 } else if (rbnbType.equals("Int32")) { 1880 types[1] = BaseType.INT; 1881 } else if (rbnbType.equals("Int64")) { 1882 types[1] = BaseType.LONG; 1883 } else if (rbnbType.equals("Unknown")) { 1884 types[1] = BaseType.UNKNOWN; 1885 } else if (rbnbType.equals("ByteArray")) { 1886 types[1] = new ArrayType(BaseType.UNSIGNED_BYTE); 1887 } 1888 // else if (rbnbType.equals("User")){ 1889 // ?? 1890 // } 1891 else { 1892 log.error("DataTurbine actor Error: trouble making port: " 1893 + aPortName + " unhandled DataTurbine type"); 1894 } 1895 1896 RecordType rt = new RecordType(labels, types); 1897 ArrayType at = new ArrayType(rt); 1898 port.setTypeEquals(at); 1899 } 1900 1901 } 1902 1903 @Override 1904 public void wrapup() throws IllegalActionException { 1905 super.wrapup(); 1906 log.debug("disconnect from DataTurbine"); 1907 _sink.CloseRBNBConnection(); 1908 } 1909 1910 1911}