001/* An actor that writes data to a DataTurbine server. 002 * 003 * Copyright (c) 2011 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:45:41 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33631 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.data.datasource.dataturbine; 031 032import java.io.File; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Set; 038import java.util.concurrent.atomic.AtomicBoolean; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041 042import org.ecoinformatics.seek.datasource.DataSourceIcon; 043import org.kepler.util.DotKeplerManager; 044 045import com.rbnb.api.Server; 046import com.rbnb.sapi.ChannelMap; 047import com.rbnb.sapi.SAPIException; 048import com.rbnb.sapi.Sink; 049import com.rbnb.sapi.Source; 050 051import ptolemy.actor.IOPort; 052import ptolemy.actor.TypedAtomicActor; 053import ptolemy.actor.TypedIOPort; 054import ptolemy.data.BooleanToken; 055import ptolemy.data.DoubleToken; 056import ptolemy.data.IntToken; 057import ptolemy.data.Token; 058import ptolemy.data.expr.FileParameter; 059import ptolemy.data.expr.Parameter; 060import ptolemy.data.expr.StringParameter; 061import ptolemy.data.type.BaseType; 062import ptolemy.data.type.MatrixType; 063import ptolemy.data.type.Type; 064import ptolemy.kernel.CompositeEntity; 065import ptolemy.kernel.util.Attribute; 066import ptolemy.kernel.util.IllegalActionException; 067import ptolemy.kernel.util.NameDuplicationException; 068import ptolemy.kernel.util.Workspace; 069import ptolemy.util.MessageHandler; 070 071/** An actor that writes data to a DataTurbine server. 072 * 073 * @author Daniel Crawl 074 * @version $Id: DataTurbineWriter.java 33631 2015-08-24 22:45:41Z crawl $ 075 * 076 * TODO 077 * 078 * - add input port _timestamp, date token, use as timestamp for data 079 * - add input ports _specificChannelName, _specificChannelValue for writing to 080 * a specific channel (analogous to dt reader actor) 081 * - create kar 082 * 083 * 084 */ 085public class DataTurbineWriter extends TypedAtomicActor 086{ 087 /** Construct a new DataTurbineWriter with a container and name. */ 088 public DataTurbineWriter(CompositeEntity container, String name) 089 throws IllegalActionException, NameDuplicationException 090 { 091 092 super(container, name); 093 094 _icon = new DataSourceIcon(this); 095 096 persistDataAfterWorkflowEnd = new Parameter(this, "persistDataAfterWorkflowEnd"); 097 persistDataAfterWorkflowEnd.setTypeEquals(BaseType.BOOLEAN); 098 persistDataAfterWorkflowEnd.setToken(BooleanToken.TRUE); 099 100 channelContainer = new StringParameter(this, "channelContainer"); 101 102 flushAfterSeconds = new Parameter(this, "flushAfterSeconds"); 103 flushAfterSeconds.setTypeEquals(BaseType.INT); 104 flushAfterSeconds.setToken("60"); 105 106 flushAfterNumData = new Parameter(this, "flushAfterNumData"); 107 flushAfterNumData.setTypeEquals(BaseType.INT); 108 flushAfterNumData.setToken("100"); 109 110 serverAddress = new StringParameter(this, "serverAddress"); 111 serverAddress.setToken("localhost:3333"); 112 113 showChannels = new Parameter(this, "showChannels"); 114 showChannels.setTypeEquals(BaseType.BOOLEAN); 115 showChannels.setToken(BooleanToken.TRUE); 116 117 startServer = new Parameter(this, "startServer"); 118 startServer.setTypeEquals(BaseType.BOOLEAN); 119 startServer.setToken(BooleanToken.FALSE); 120 121 archiveDirectory = new FileParameter(this, "archiveDirectory"); 122 String dir = DotKeplerManager.getInstance().getPersistentUserDataDirString() + "dataturbine"; 123 archiveDirectory.setToken(dir); 124 125 } 126 127 /** React to a change in an attribute. */ 128 @Override 129 public void attributeChanged(Attribute attribute) throws IllegalActionException 130 { 131 if(attribute == flushAfterNumData) 132 { 133 Token token = flushAfterNumData.getToken(); 134 if(token != null) 135 { 136 _flushAfterNumData = ((IntToken)token).intValue(); 137 if(_flushAfterNumData < 0) 138 { 139 throw new IllegalActionException(this, "flushAfterNumData must be >= 0."); 140 } 141 } 142 } 143 else if(attribute == flushAfterSeconds) 144 { 145 Token token = flushAfterSeconds.getToken(); 146 if(token != null) 147 { 148 _flushAfterSeconds = ((IntToken)token).intValue(); 149 if(_flushAfterSeconds < 0) 150 { 151 throw new IllegalActionException(this, "flushAfterSeconds must be >= 0."); 152 } 153 } 154 } 155 else if(attribute == serverAddress) 156 { 157 String addr = serverAddress.stringValue(); 158 if(addr.length() == 0) 159 { 160 throw new IllegalActionException(this, "Must provide serverAddress."); 161 } 162 163 if(_serverAddress == null || !_serverAddress.equals(addr)) 164 { 165 _serverAddress = addr; 166 if(_showChannels) 167 { 168 _reconfigureInputPorts(); 169 } 170 } 171 } 172 else if(attribute == channelContainer) 173 { 174 String container = channelContainer.stringValue(); 175 176 if(container.isEmpty() || container.contains("/")) 177 { 178 throw new IllegalActionException(this, "channelContainer cannot be empty or contain slashes."); 179 } 180 181 if(_channelContainer == null || !_channelContainer.equals(container)) 182 { 183 _channelContainer = container; 184 if(_showChannels) 185 { 186 _reconfigureInputPorts(); 187 } 188 } 189 } 190 else if(attribute == showChannels) 191 { 192 boolean val = ((BooleanToken)showChannels.getToken()).booleanValue(); 193 if(val != _showChannels) 194 { 195 _showChannels = val; 196 197 if(_showChannels) 198 { 199 _reconfigureInputPorts(); 200 } 201 else 202 { 203 // remove all unconnected ports 204 List<?> ports = inputPortList(); 205 for(Object obj : ports) 206 { 207 IOPort port = (IOPort)obj; 208 if(port.numberOfSources() == 0) 209 { 210 try { 211 port.setContainer(null); 212 } catch (NameDuplicationException e) { 213 throw new IllegalActionException(this, e, "Unable to remove port."); 214 } 215 } 216 } 217 } 218 } 219 } 220 else 221 { 222 super.attributeChanged(attribute); 223 } 224 } 225 226 /** Clone the actor into the specified workspace. */ 227 @Override 228 public Object clone(Workspace workspace) throws CloneNotSupportedException { 229 DataTurbineWriter newObject = (DataTurbineWriter) super.clone(workspace); 230 newObject._channelMap = null; 231 newObject._flushThread = null; 232 newObject._icon = null; 233 newObject._quit = new AtomicBoolean(false); 234 newObject._server = null; 235 newObject._sink = null; 236 newObject._source = null; 237 newObject._unknownChannelTypeSet = null; 238 return newObject; 239 } 240 241 242 /** Fire the actor. */ 243 @Override 244 public void fire() throws IllegalActionException 245 { 246 super.fire(); 247 248 boolean wroteData = false; 249 250 List<?> inputList = inputPortList(); 251 for(Object obj : inputList) 252 { 253 IOPort port = (IOPort)obj; 254 255 String portName = port.getName(); 256 257 if(portName.startsWith("_")) 258 { 259 continue; 260 } 261 262 // see if port is connected 263 if(port.numberOfSources() > 0) 264 { 265 int index = _channelMap.GetIndex(portName); 266 if(index == -1) 267 { 268 throw new IllegalActionException(this, "Channel " + portName + " not found in channel map"); 269 } 270 271 Token token = port.get(0); 272 Type tokenType = token.getType(); 273 274 try 275 { 276 if(tokenType == BaseType.DOUBLE) 277 { 278 _channelMap.PutDataAsFloat64(index, 279 new double [] { ((DoubleToken)token).doubleValue() }); 280 } 281 else if(tokenType == BaseType.INT) 282 { 283 _channelMap.PutDataAsInt32(index, 284 new int [] { ((IntToken)token).intValue() }); 285 } 286 else if(tokenType instanceof MatrixType) 287 { 288 String tokenStr = token.toString(); 289 _channelMap.PutDataAsString(index, tokenStr); 290 //_channelMap.PutDataAsByteArray(index, tokenStr.getBytes()); 291 //System.out.println(portName + " is a " + typeStr); 292 } 293 else 294 { 295 throw new IllegalActionException(this, "Unsupported data type: " + tokenType); 296 } 297 298 wroteData = true; 299 300 } 301 catch(SAPIException e) 302 { 303 throw new IllegalActionException(this, e, "Error writing data."); 304 } 305 } 306 } 307 308 if(wroteData) 309 { 310 _numDataWriten++; 311 312 if(_numDataWriten >= _flushAfterNumData) 313 { 314 _flush(); 315 } 316 } 317 } 318 319 /** Initialize the actor. */ 320 @Override 321 public void initialize() throws IllegalActionException 322 { 323 // for any port types that were unknown during preinitialize, 324 // see if they were determined during type resolution and put the 325 // (ptolemy) type string in the channel map user info field. 326 // 327 for(String name : _unknownChannelTypeSet) 328 { 329 TypedIOPort port = (TypedIOPort) getPort(name); 330 if(port == null) 331 { 332 throw new IllegalActionException(this, "No port called " + name); 333 } 334 335 // see if the port type is supported by dataturbine 336 Type portType = port.getType(); 337 338 if(portType == BaseType.DOUBLE || 339 portType == BaseType.INT) 340 { 341 // this type is supported 342 continue; 343 } 344 345 int index = _channelMap.GetIndex(name); 346 //XXX check for index = -1 347 348 // set the ptolemy port type as user info 349 try 350 { 351 _channelMap.PutUserInfo(index, "ptolemyType=" + port.getType().toString()); 352 //System.out.println("set user info for " + name + " = " + port.getType()); 353 } 354 catch (SAPIException e) 355 { 356 throw new IllegalActionException(this, e, "Error setting user info."); 357 } 358 } 359 360 // register the channel map to save the user info fields on the server. 361 try 362 { 363 _source.Register(_channelMap); 364 } 365 catch (SAPIException e) 366 { 367 throw new IllegalActionException(this, e, "Error registering channel map."); 368 } 369 } 370 371 /** Preinitialize the actor. */ 372 @Override 373 public void preinitialize() throws IllegalActionException 374 { 375 super.preinitialize(); 376 377 _numDataWriten = 0; 378 _lastFlushTime = System.currentTimeMillis() / 1000; 379 380 // see if we should start a server 381 if(((BooleanToken)startServer.getToken()).booleanValue()) 382 { 383 _startServer(_serverAddress); 384 } 385 386 try 387 { 388 // the sink may be connected in start server 389 if(_sink == null) 390 { 391 _sink = new Sink(); 392 try { 393 _sink.OpenRBNBConnection(_serverAddress, getName()); 394 } catch (SAPIException e) { 395 throw new IllegalActionException(this, e, "Error opening connection to server."); 396 } 397 } 398 399 _source = new Source(100, "append", 10000); 400 try 401 { 402 _source.OpenRBNBConnection(_serverAddress, _channelContainer); 403 } 404 catch (SAPIException e) 405 { 406 throw new IllegalActionException(this, e, "Error opening connection to server."); 407 } 408 409 // initially add all input ports to unknown type set 410 _unknownChannelTypeSet = new HashSet<String>(); 411 List<?> inputList = inputPortList(); 412 for(Object obj : inputList) 413 { 414 IOPort port = (IOPort)obj; 415 _unknownChannelTypeSet.add(port.getName()); 416 } 417 418 // get a channel map based on the names of input ports 419 _channelMap = _getMapFromInputPorts(); 420 421 // get the user info fields 422 ChannelMap serverChannelMap = _getMapAndUserInfo(_sink); 423 424 // get the type fields by doing a fetch 425 ChannelMap serverDataChannelMap = _getMapFromFetch(_sink, serverChannelMap.GetChannelList()); 426 427 // set input port types based on channel types 428 String[] channelList = serverChannelMap.GetChannelList(); 429 for(int i = 0; i < channelList.length; i++) 430 { 431 String name = channelList[i].substring(channelList[i].indexOf("/") + 1); 432 TypedIOPort port = (TypedIOPort) getPort(name); 433 434 if(name.startsWith("_") || port == null || port.numberOfSources() == 0) 435 { 436 continue; 437 } 438 439 int typeId = serverChannelMap.GetType(i); 440 int serverDataChannelMapIndex = serverDataChannelMap.GetIndex(_channelContainer + "/" + name); 441 if(serverDataChannelMapIndex != -1) 442 { 443 typeId = serverDataChannelMap.GetType(serverDataChannelMapIndex); 444 } 445 446 String userInfo = serverChannelMap.GetUserInfo(i); 447 448 // set the user info in the source channel map, otherwise 449 // when Register() is called in initialize(), we'll delete 450 // the user info. 451 if(!userInfo.isEmpty()) 452 { 453 int sourceChannelMapIndex = _channelMap.GetIndex(name); 454 if(sourceChannelMapIndex != -1) 455 { 456 try 457 { 458 _channelMap.PutUserInfo(sourceChannelMapIndex, userInfo); 459 } 460 catch (SAPIException e) 461 { 462 throw new IllegalActionException(this, e, "Error setting user info."); 463 } 464 } 465 } 466 467 // get the ptolemy type either from type field in the channel 468 // map or the user info field. 469 Type ptolemyType = _getTypeFromChannelMap(_channelContainer + "/" + name, serverDataChannelMap); 470 if(ptolemyType == null) 471 { 472 ptolemyType = _getTypeFromUserInfo(_channelContainer + "/" + name, serverChannelMap); 473 } 474 475 // see if it's a valid ptolemy type 476 if(ptolemyType != null) 477 { 478 port.setTypeEquals(ptolemyType); 479 _unknownChannelTypeSet.remove(name); 480 } 481 else 482 { 483 System.out.println("WARNING: Channel " + name + 484 " has unsupported or unknown DataTurbine type: " + 485 serverChannelMap.TypeName(typeId)); 486 } 487 } 488 489 _quit.set(false); 490 _flushThread = new PeriodicFlushThread(); 491 _flushThread.start(); 492 } 493 catch(IllegalActionException e) 494 { 495 // stop the server ignoring any exceptions 496 _stopServerIgnoreException(); 497 throw e; 498 } 499 } 500 501 /** Cleanup after execution or error. */ 502 @Override 503 public void wrapup() throws IllegalActionException 504 { 505 super.wrapup(); 506 507 // flush any buffered data 508 _flush(); 509 510 // close connection to server 511 if(((BooleanToken)persistDataAfterWorkflowEnd.getToken()).booleanValue()) 512 { 513 _source.Detach(); 514 } 515 else 516 { 517 _source.CloseRBNBConnection(); 518 } 519 520 _source = null; 521 522 _sink.CloseRBNBConnection(); 523 _sink = null; 524 525 // stop server if running. 526 if(_server != null) 527 { 528 _stopServer(); 529 System.out.println("in wrapup; stopped server."); 530 } 531 532 _channelMap = null; 533 534 _quit.set(true); 535 synchronized(_flushThread) 536 { 537 _flushThread.notify(); 538 } 539 540 try 541 { 542 _flushThread.join(); 543 } 544 catch (InterruptedException e) 545 { 546 throw new IllegalActionException(this, e, "Error waiting for flush thread to stop."); 547 } 548 _flushThread = null; 549 550 } 551 552 /////////////////////////////////////////////////////////////////// 553 //// public fields //// 554 555 /** If true, data written to DataTurbine server will be accessible 556 * after workflow ends. 557 */ 558 public Parameter persistDataAfterWorkflowEnd; 559 560 /** The number of seconds between writing data. */ 561 public Parameter flushAfterSeconds; 562 563 /** The number of data point between writing data. */ 564 public Parameter flushAfterNumData; 565 566 /** The hostname and port of the server. */ 567 public StringParameter serverAddress; 568 569 /** If true, show an input port for each channel on server. */ 570 public Parameter showChannels; 571 572 /** If true, start server if not already running (must provide archiveDirectory). */ 573 public Parameter startServer; 574 575 /** The directory containing the DataTurbine archive. This parameter is 576 * ignored if startServer is false. 577 */ 578 public FileParameter archiveDirectory; 579 580 /** Name of channel container. Cannot have slashes. */ 581 public StringParameter channelContainer; 582 583 /** Regex pattern of ptolemy type in user info field. */ 584 public final static Pattern PTOLEMY_TYPE_PATTERN = Pattern.compile(".*ptolemyType=([^,]+).*"); 585 586 /////////////////////////////////////////////////////////////////// 587 //// private methods //// 588 589 /** Flush any pending data. */ 590 private synchronized void _flush() throws IllegalActionException 591 { 592 if(_debugging) 593 { 594 _debug(getFullName() + " going to flush"); 595 } 596 597 _icon.setBusy(); 598 599 synchronized(_numDataWriten) 600 { 601 if(_numDataWriten > 0) 602 { 603 synchronized(_channelMap) 604 { 605 try 606 { 607 _source.Flush(_channelMap); 608 } 609 catch (SAPIException e) 610 { 611 throw new IllegalActionException(this, e, "Error flushing data."); 612 } 613 614 _numDataWriten = 0; 615 _lastFlushTime = System.currentTimeMillis() / 1000; 616 } 617 } 618 } 619 620 _icon.setReady(); 621 } 622 623 /** Stop the server. */ 624 private void _stopServer() throws IllegalActionException 625 { 626 if(_server != null) 627 { 628 try { 629 _server.stop(); 630 } catch (Exception e) { 631 throw new IllegalActionException(this, e, "Error stopping DataTurbineServer."); 632 } finally { 633 _server = null; 634 } 635 } 636 } 637 638 /** Stop the server ignoring any exceptions. */ 639 private void _stopServerIgnoreException() 640 { 641 try { 642 _stopServer(); 643 } catch(IllegalActionException e) { 644 System.out.println("Error stopping server: " + e.getMessage()); 645 } 646 647 } 648 649 /** Get the channel names and corresponding ptolemy types. */ 650 private Map<String,Type> _getChannelsNameAndType() throws IllegalActionException 651 { 652 Map<String,Type> retval = new HashMap<String,Type>(); 653 654 // see if server address and channel container has been set yet 655 if(_serverAddress != null && _channelContainer != null) 656 { 657 Sink sink = new Sink(); 658 try 659 { 660 sink.OpenRBNBConnection(_serverAddress, getName()); 661 } 662 catch (SAPIException e) 663 { 664 throw new IllegalActionException(this, e, "Error connecting to DataTurbine."); 665 } 666 667 ChannelMap channelMap = _getMapAndUserInfo(sink); 668 String[] channels = channelMap.GetChannelList(); 669 ChannelMap serverDataChannelMap = _getMapFromFetch(sink, channels); 670 for(String name : channels) 671 { 672 if(name.startsWith("_") || !name.startsWith(_channelContainer + "/")) 673 { 674 continue; 675 } 676 677 Type type = _getTypeFromChannelMap(name, serverDataChannelMap); 678 if(type == null) 679 { 680 type = _getTypeFromUserInfo(name, channelMap); 681 } 682 683 if(type == null) 684 { 685 System.out.println("Unable to find type for channel " + name); 686 type = BaseType.UNKNOWN; 687 } 688 689 retval.put(name, type); 690 } 691 692 693 sink.CloseRBNBConnection(); 694 } 695 696 return retval; 697 } 698 699 /** Get a ptolemy type for a channel name based on the type returned from the channel map. 700 * NOTE: the channel map must be retrieved with a Sink.Fetch(). 701 */ 702 private Type _getTypeFromChannelMap(String name, ChannelMap map) throws IllegalActionException 703 { 704 int index = map.GetIndex(name); 705 if(index == -1) 706 { 707 throw new IllegalActionException(this, name + " not found in channel map"); 708 } 709 710 711 int typeId = map.GetType(index); 712 switch(typeId) 713 { 714 case ChannelMap.TYPE_FLOAT64: 715 return BaseType.DOUBLE; 716 case ChannelMap.TYPE_INT32: 717 return BaseType.INT; 718 default: 719 return null; 720 } 721 } 722 723 /** Get a ptolemy type based on information in the user info metadata. */ 724 private Type _getTypeFromUserInfo(String name, ChannelMap map) throws IllegalActionException 725 { 726 int index = map.GetIndex(name); 727 if(index == -1) 728 { 729 throw new IllegalActionException(this, name + " not found in channel map"); 730 } 731 732 String userInfo = map.GetUserInfo(index); 733 if(!userInfo.isEmpty()) 734 { 735 // extract the ptolemyType from the user info 736 Matcher matcher = PTOLEMY_TYPE_PATTERN.matcher(userInfo); 737 if(matcher.matches()) 738 { 739 String userInfoType = matcher.group(1); 740 return BaseType.forName(userInfoType); 741 } 742 } 743 return null; 744 } 745 746 /** Reconfigure input ports based on DataTurbine channels. */ 747 private void _reconfigureInputPorts() throws IllegalActionException 748 { 749 Map<String,Type> nameTypeMap = _getChannelsNameAndType(); 750 for(Map.Entry<String, Type> entry : nameTypeMap.entrySet()) 751 { 752 String name = entry.getKey(); 753 // remove the container prefix from the name 754 name = name.substring(_channelContainer.length() + 1); 755 756 // see if port already exists 757 TypedIOPort port = (TypedIOPort)getPort(name); 758 if(port == null) 759 { 760 // create the port 761 762 763 764 try { 765 port = new TypedIOPort(this, name, true, false); 766 new Attribute(port, "_showName"); 767 } catch (NameDuplicationException e) { 768 throw new IllegalActionException(this, e, "Error creating port " + name); 769 } 770 } 771 772 // set type 773 port.setTypeEquals(entry.getValue()); 774 } 775 } 776 777 /** Get a ChannelMap by calling Sink.Request and Fetch. */ 778 private ChannelMap _getMapFromFetch(Sink sink, String[] names) throws IllegalActionException 779 { 780 boolean foundChannel = false; 781 782 ChannelMap map = new ChannelMap(); 783 for(String name : names) 784 { 785 if(name.startsWith("_")) 786 { 787 continue; 788 } 789 790 try 791 { 792 map.Add(name); 793 foundChannel = true; 794 } 795 catch (SAPIException e) 796 { 797 throw new IllegalActionException(this, e, "Error adding to channel map."); 798 } 799 } 800 801 if(foundChannel) 802 { 803 try 804 { 805 sink.Request(map, 0, 0, "oldest"); 806 sink.Fetch(-1, map); 807 } 808 catch(SAPIException e) 809 { 810 throw new IllegalActionException(this, e, "Error determining types of channels."); 811 } 812 } 813 814 return map; 815 } 816 817 /** Get a ChannelMap and user info for existing channels on the server. */ 818 private ChannelMap _getMapAndUserInfo(Sink sink) throws IllegalActionException 819 { 820 ChannelMap map; 821 try 822 { 823 sink.RequestRegistration(); 824 map = sink.Fetch(-1); 825 } 826 catch(SAPIException e) 827 { 828 throw new IllegalActionException(this, e, "Error retrieving channels from server."); 829 } 830 831 return map; 832 } 833 834 /** Add the names of inputs ports to a ChannelMap. */ 835 private ChannelMap _getMapFromInputPorts() throws IllegalActionException 836 { 837 ChannelMap map = new ChannelMap(); 838 List<?> inputList = inputPortList(); 839 for(Object obj : inputList) 840 { 841 IOPort port = (IOPort)obj; 842 843 String portName = port.getName(); 844 845 if(portName.startsWith("_") || port.numberOfSources() == 0) 846 { 847 continue; 848 } 849 850 try 851 { 852 map.Add(portName); 853 } 854 catch (SAPIException e) 855 { 856 throw new IllegalActionException(this, e, "Error updating channel map."); 857 } 858 } 859 860 return map; 861 } 862 863 /** Start a DT server if not already running. */ 864 private void _startServer(String addr) throws IllegalActionException 865 { 866 867 // first see if we can connect to existing server 868 boolean alreadyRunning = true; 869 870 _sink = new Sink(); 871 try { 872 _sink.OpenRBNBConnection(addr, getName()); 873 } catch (SAPIException e) { 874 alreadyRunning = false; 875 _sink = null; 876 } 877 878 if(!alreadyRunning) 879 { 880 881 String dir = archiveDirectory.stringValue(); 882 if(dir.length() == 0) { 883 throw new IllegalActionException(this, "Must provide archiveDirectory to start server."); 884 } 885 886 // make sure directory exists 887 File file = new File(dir); 888 if(!file.exists()) 889 { 890 if(!file.mkdir()) 891 { 892 throw new IllegalActionException(this, "Unable to create directory " + dir); 893 } 894 } 895 896 String[] args = new String[] { 897 "-a ", addr, 898 "-F ", // loads existing archives 899 "-H ", dir }; 900 901 try { 902 _server = Server.launchNewServer(args); 903 } catch (Exception e) { 904 throw new IllegalActionException(this, e, "Error starting server."); 905 } 906 907 // XXX if we don't wait, sometimes we cannot connect to server 908 // should find a better way to wait until server is running 909 try { 910 Thread.sleep(1000); 911 } catch (InterruptedException e) { 912 // TODO Auto-generated catch block 913 e.printStackTrace(); 914 } 915 } 916 } 917 918 /** A thread that flushes data. */ 919 private class PeriodicFlushThread extends Thread 920 { 921 @Override 922 public void run() 923 { 924 while(!_quit.get()) 925 { 926 synchronized(this) 927 { 928 try 929 { 930 wait(_flushAfterSeconds * 1000); 931 } 932 catch (InterruptedException e) 933 { 934 MessageHandler.error("Error while waiting in flush thread.", e); 935 return; 936 } 937 } 938 939 long elapsed = (System.currentTimeMillis() / 1000) - _lastFlushTime; 940 System.out.println("elapsed time is " + elapsed); 941 942 if(elapsed > _flushAfterSeconds) 943 { 944 try 945 { 946 _flush(); 947 } 948 catch (IllegalActionException e) 949 { 950 MessageHandler.error("Error while flushing in flush thread.", e); 951 return; 952 } 953 } 954 } 955 } 956 } 957 958 /////////////////////////////////////////////////////////////////// 959 //// private fields //// 960 961 /** A DataTurbine server object. */ 962 private Server _server; 963 964 /** An object to write to DT. */ 965 private Source _source; 966 967 /** An sink object for get type information.*/ 968 private Sink _sink; 969 970 /** Channel map to write data. */ 971 private ChannelMap _channelMap; 972 973 /** The number of data written since the last flush. */ 974 private Integer _numDataWriten; 975 976 /** The number of data to write before flushing. */ 977 private int _flushAfterNumData; 978 979 /** The amount of time to wait before flushing. */ 980 private int _flushAfterSeconds; 981 982 /** The name of the channel container. */ 983 private String _channelContainer; 984 985 /** A set of port names with unknown types. */ 986 private Set<String> _unknownChannelTypeSet; 987 988 /** The actor icon. */ 989 private DataSourceIcon _icon; 990 991 /** A thread to periodically flush data. */ 992 private Thread _flushThread; 993 994 /** If true, cleanup threads. */ 995 private AtomicBoolean _quit = new AtomicBoolean(false); 996 997 /** The last time of the flush. */ 998 private long _lastFlushTime; 999 1000 /** If true, create input ports for each channel on server (within the container). */ 1001 private boolean _showChannels = false; 1002 1003 /** The host name and port of the server. */ 1004 private String _serverAddress; 1005 1006}