001/* An actor that writes data to a DataTurbine server.
002 * 
003 * Copyright (c) 2011 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:45:41 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33631 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.data.datasource.dataturbine;
031
032import java.io.File;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.atomic.AtomicBoolean;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041
042import org.ecoinformatics.seek.datasource.DataSourceIcon;
043import org.kepler.util.DotKeplerManager;
044
045import com.rbnb.api.Server;
046import com.rbnb.sapi.ChannelMap;
047import com.rbnb.sapi.SAPIException;
048import com.rbnb.sapi.Sink;
049import com.rbnb.sapi.Source;
050
051import ptolemy.actor.IOPort;
052import ptolemy.actor.TypedAtomicActor;
053import ptolemy.actor.TypedIOPort;
054import ptolemy.data.BooleanToken;
055import ptolemy.data.DoubleToken;
056import ptolemy.data.IntToken;
057import ptolemy.data.Token;
058import ptolemy.data.expr.FileParameter;
059import ptolemy.data.expr.Parameter;
060import ptolemy.data.expr.StringParameter;
061import ptolemy.data.type.BaseType;
062import ptolemy.data.type.MatrixType;
063import ptolemy.data.type.Type;
064import ptolemy.kernel.CompositeEntity;
065import ptolemy.kernel.util.Attribute;
066import ptolemy.kernel.util.IllegalActionException;
067import ptolemy.kernel.util.NameDuplicationException;
068import ptolemy.kernel.util.Workspace;
069import ptolemy.util.MessageHandler;
070
071/** An actor that writes data to a DataTurbine server.
072 * 
073 *  @author Daniel Crawl
074 *  @version $Id: DataTurbineWriter.java 33631 2015-08-24 22:45:41Z crawl $
075 * 
076 * TODO
077 * 
078 * - add input port _timestamp, date token, use as timestamp for data
079 * - add input ports _specificChannelName, _specificChannelValue for writing to
080 *      a specific channel (analogous to dt reader actor)
081 * - create kar
082 * 
083 * 
084 */
085public class DataTurbineWriter extends TypedAtomicActor
086{
087    /** Construct a new DataTurbineWriter with a container and name. */
088    public DataTurbineWriter(CompositeEntity container, String name)
089        throws IllegalActionException, NameDuplicationException
090    {
091
092        super(container, name);
093        
094        _icon = new DataSourceIcon(this);
095        
096        persistDataAfterWorkflowEnd = new Parameter(this, "persistDataAfterWorkflowEnd");
097        persistDataAfterWorkflowEnd.setTypeEquals(BaseType.BOOLEAN);
098        persistDataAfterWorkflowEnd.setToken(BooleanToken.TRUE);
099        
100        channelContainer = new StringParameter(this, "channelContainer");
101        
102        flushAfterSeconds = new Parameter(this, "flushAfterSeconds");
103        flushAfterSeconds.setTypeEquals(BaseType.INT);
104        flushAfterSeconds.setToken("60");
105        
106        flushAfterNumData = new Parameter(this, "flushAfterNumData");
107        flushAfterNumData.setTypeEquals(BaseType.INT);
108        flushAfterNumData.setToken("100");
109        
110        serverAddress = new StringParameter(this, "serverAddress");
111        serverAddress.setToken("localhost:3333");
112        
113        showChannels = new Parameter(this, "showChannels");
114        showChannels.setTypeEquals(BaseType.BOOLEAN);
115        showChannels.setToken(BooleanToken.TRUE);
116        
117        startServer = new Parameter(this, "startServer");
118        startServer.setTypeEquals(BaseType.BOOLEAN);
119        startServer.setToken(BooleanToken.FALSE);
120        
121        archiveDirectory = new FileParameter(this, "archiveDirectory");
122        String dir = DotKeplerManager.getInstance().getPersistentUserDataDirString() + "dataturbine";
123        archiveDirectory.setToken(dir);
124
125    }
126    
127    /** React to a change in an attribute. */
128    @Override
129    public void attributeChanged(Attribute attribute) throws IllegalActionException
130    {
131        if(attribute == flushAfterNumData)
132        {
133            Token token = flushAfterNumData.getToken();
134            if(token != null)
135            {
136                _flushAfterNumData = ((IntToken)token).intValue();
137                if(_flushAfterNumData < 0)
138                {
139                    throw new IllegalActionException(this, "flushAfterNumData must be >= 0.");
140                }
141            }
142        }
143        else if(attribute == flushAfterSeconds)
144        {
145            Token token = flushAfterSeconds.getToken();
146            if(token != null)
147            {
148                _flushAfterSeconds = ((IntToken)token).intValue();
149                if(_flushAfterSeconds < 0)
150                {
151                    throw new IllegalActionException(this, "flushAfterSeconds must be >= 0.");
152                }
153            }
154        }
155        else if(attribute == serverAddress)
156        {
157            String addr = serverAddress.stringValue();
158            if(addr.length() == 0)
159            {
160                throw new IllegalActionException(this, "Must provide serverAddress.");
161            }
162            
163            if(_serverAddress == null || !_serverAddress.equals(addr))
164            {
165                _serverAddress = addr;
166                if(_showChannels)
167                {
168                  _reconfigureInputPorts();  
169                }
170            }
171        }
172        else if(attribute == channelContainer)
173        {
174            String container = channelContainer.stringValue();
175            
176            if(container.isEmpty() || container.contains("/"))
177            {
178                throw new IllegalActionException(this, "channelContainer cannot be empty or contain slashes.");
179            }
180
181            if(_channelContainer == null || !_channelContainer.equals(container))
182            {            
183                _channelContainer = container;
184                if(_showChannels)
185                {
186                    _reconfigureInputPorts();
187                }
188            }
189        }
190        else if(attribute == showChannels)
191        {
192            boolean val = ((BooleanToken)showChannels.getToken()).booleanValue();
193            if(val != _showChannels)
194            {
195                _showChannels = val;
196                
197                if(_showChannels)
198                {
199                    _reconfigureInputPorts();
200                }
201                else
202                {
203                    // remove all unconnected ports
204                    List<?> ports = inputPortList();
205                    for(Object obj : ports)
206                    {
207                        IOPort port = (IOPort)obj;
208                        if(port.numberOfSources() == 0)
209                        {
210                            try {
211                                port.setContainer(null);
212                            } catch (NameDuplicationException e) {
213                                throw new IllegalActionException(this, e, "Unable to remove port.");
214                            }
215                        }
216                    }
217                }
218            }
219        }
220        else
221        {
222            super.attributeChanged(attribute);
223        }
224    }
225    
226    /** Clone the actor into the specified workspace. */
227    @Override
228    public Object clone(Workspace workspace) throws CloneNotSupportedException {
229        DataTurbineWriter newObject = (DataTurbineWriter) super.clone(workspace);
230        newObject._channelMap = null;
231        newObject._flushThread = null;
232        newObject._icon = null;
233        newObject._quit = new AtomicBoolean(false);
234        newObject._server = null;
235        newObject._sink = null;
236        newObject._source = null;
237        newObject._unknownChannelTypeSet = null;
238        return newObject;
239    }
240
241    
242    /** Fire the actor. */
243    @Override
244    public void fire() throws IllegalActionException
245    {
246        super.fire();
247        
248        boolean wroteData = false;
249        
250        List<?> inputList = inputPortList();
251        for(Object obj : inputList)
252        {
253            IOPort port = (IOPort)obj;
254            
255            String portName = port.getName();
256            
257            if(portName.startsWith("_"))
258            {
259                continue;
260            }
261            
262            // see if port is connected
263            if(port.numberOfSources() > 0)
264            {
265                int index = _channelMap.GetIndex(portName);
266                if(index == -1)
267                {
268                    throw new IllegalActionException(this, "Channel " + portName + " not found in channel map");
269                }
270                
271                Token token = port.get(0);
272                Type tokenType = token.getType();
273                
274                try
275                {
276                    if(tokenType == BaseType.DOUBLE)
277                    {
278                        _channelMap.PutDataAsFloat64(index, 
279                                new double [] { ((DoubleToken)token).doubleValue() });
280                    }
281                    else if(tokenType == BaseType.INT)
282                    {
283                        _channelMap.PutDataAsInt32(index, 
284                                new int [] { ((IntToken)token).intValue() });                        
285                    }
286                    else if(tokenType instanceof MatrixType)
287                    {                        
288                        String tokenStr = token.toString();
289                        _channelMap.PutDataAsString(index, tokenStr);
290                        //_channelMap.PutDataAsByteArray(index, tokenStr.getBytes());
291                        //System.out.println(portName + " is a " + typeStr);
292                    }
293                    else
294                    {
295                        throw new IllegalActionException(this, "Unsupported data type: " + tokenType);
296                    }
297                    
298                    wroteData = true;
299                    
300                }
301                catch(SAPIException e)
302                {
303                    throw new IllegalActionException(this, e, "Error writing data.");
304                }
305            }
306        }
307        
308        if(wroteData)
309        {
310            _numDataWriten++;
311         
312            if(_numDataWriten >= _flushAfterNumData)
313            {
314                _flush();
315            }
316        }
317    }
318    
319    /** Initialize the actor. */
320    @Override
321    public void initialize() throws IllegalActionException
322    {   
323        // for any port types that were unknown during preinitialize,
324        // see if they were determined during type resolution and put the
325        // (ptolemy) type string in the channel map user info field.
326        //
327        for(String name : _unknownChannelTypeSet)
328        {
329            TypedIOPort port = (TypedIOPort) getPort(name);
330            if(port == null)
331            {
332                throw new IllegalActionException(this, "No port called " + name);
333            }
334     
335            // see if the port type is supported by dataturbine
336            Type portType = port.getType();
337            
338            if(portType == BaseType.DOUBLE ||
339                portType == BaseType.INT)
340            {
341                // this type is supported
342                continue;
343            }
344            
345            int index = _channelMap.GetIndex(name);
346            //XXX check for index = -1
347            
348            // set the ptolemy port type as user info
349            try 
350            {
351                _channelMap.PutUserInfo(index, "ptolemyType=" + port.getType().toString());
352                //System.out.println("set user info for " + name + " = " + port.getType());
353            }
354            catch (SAPIException e)
355            {
356                throw new IllegalActionException(this, e, "Error setting user info.");
357            }
358        }
359        
360        // register the channel map to save the user info fields on the server.
361        try 
362        {
363            _source.Register(_channelMap);
364        }
365        catch (SAPIException e)
366        {
367            throw new IllegalActionException(this, e, "Error registering channel map.");
368        }
369    }
370    
371    /** Preinitialize the actor. */
372    @Override
373    public void preinitialize() throws IllegalActionException
374    { 
375        super.preinitialize();
376        
377        _numDataWriten = 0;
378        _lastFlushTime = System.currentTimeMillis() / 1000;
379        
380        // see if we should start a server
381        if(((BooleanToken)startServer.getToken()).booleanValue())
382        {
383            _startServer(_serverAddress);
384        }
385        
386        try
387        {
388            // the sink may be connected in start server
389            if(_sink == null)
390            {
391                _sink = new Sink();
392                try {
393                    _sink.OpenRBNBConnection(_serverAddress, getName());
394                } catch (SAPIException e) {
395                    throw new IllegalActionException(this, e, "Error opening connection to server.");
396                }
397            }
398            
399            _source = new Source(100, "append", 10000);
400            try
401            {
402                _source.OpenRBNBConnection(_serverAddress, _channelContainer);
403            }
404            catch (SAPIException e)
405            {
406                throw new IllegalActionException(this, e, "Error opening connection to server.");
407            }
408                   
409            // initially add all input ports to unknown type set
410            _unknownChannelTypeSet = new HashSet<String>();
411            List<?> inputList = inputPortList();
412            for(Object obj : inputList)
413            {
414                IOPort port = (IOPort)obj;
415                _unknownChannelTypeSet.add(port.getName());
416            }
417       
418            // get a channel map based on the names of input ports
419            _channelMap = _getMapFromInputPorts();
420            
421            // get the user info fields
422            ChannelMap serverChannelMap = _getMapAndUserInfo(_sink);
423            
424            // get the type fields by doing a fetch
425            ChannelMap serverDataChannelMap = _getMapFromFetch(_sink, serverChannelMap.GetChannelList());
426
427            // set input port types based on channel types        
428            String[] channelList = serverChannelMap.GetChannelList();
429            for(int i = 0; i < channelList.length; i++) 
430            {
431                String name = channelList[i].substring(channelList[i].indexOf("/") + 1);
432                TypedIOPort port = (TypedIOPort) getPort(name);
433    
434                if(name.startsWith("_") || port == null || port.numberOfSources() == 0)
435                {
436                    continue;
437                }
438    
439                int typeId = serverChannelMap.GetType(i);
440                int serverDataChannelMapIndex = serverDataChannelMap.GetIndex(_channelContainer + "/" + name);
441                if(serverDataChannelMapIndex != -1)
442                {
443                    typeId = serverDataChannelMap.GetType(serverDataChannelMapIndex);
444                }
445                
446                String userInfo = serverChannelMap.GetUserInfo(i);
447                
448                // set the user info in the source channel map, otherwise
449                // when Register() is called in initialize(), we'll delete
450                // the user info.
451                if(!userInfo.isEmpty())
452                {
453                    int sourceChannelMapIndex = _channelMap.GetIndex(name);
454                    if(sourceChannelMapIndex != -1)
455                    {
456                        try
457                        {
458                            _channelMap.PutUserInfo(sourceChannelMapIndex, userInfo);
459                        }
460                        catch (SAPIException e)
461                        {
462                            throw new IllegalActionException(this, e, "Error setting user info.");
463                        }
464                    }
465                }
466                
467                // get the ptolemy type either from type field in the channel
468                // map or the user info field.
469                Type ptolemyType = _getTypeFromChannelMap(_channelContainer + "/" + name, serverDataChannelMap);
470                if(ptolemyType == null)
471                {
472                    ptolemyType = _getTypeFromUserInfo(_channelContainer + "/" + name, serverChannelMap);
473                }
474
475                // see if it's a valid ptolemy type
476                if(ptolemyType != null)
477                {
478                    port.setTypeEquals(ptolemyType);
479                    _unknownChannelTypeSet.remove(name);
480                }
481                else
482                {
483                    System.out.println("WARNING: Channel " + name + 
484                        " has unsupported or unknown DataTurbine type: " +
485                        serverChannelMap.TypeName(typeId));
486                }
487            }
488            
489            _quit.set(false);
490            _flushThread = new PeriodicFlushThread();
491            _flushThread.start();
492        }
493        catch(IllegalActionException e)
494        {
495            // stop the server ignoring any exceptions
496            _stopServerIgnoreException();
497            throw e;
498        }
499    }
500        
501    /** Cleanup after execution or error. */
502    @Override
503    public void wrapup() throws IllegalActionException
504    {    
505        super.wrapup();
506        
507        // flush any buffered data
508        _flush();
509        
510        // close connection to server
511        if(((BooleanToken)persistDataAfterWorkflowEnd.getToken()).booleanValue())
512        {
513            _source.Detach();
514        }
515        else
516        {
517            _source.CloseRBNBConnection();
518        }
519        
520        _source = null;
521
522        _sink.CloseRBNBConnection();
523        _sink = null;
524        
525        // stop server if running.
526        if(_server != null)
527        {
528            _stopServer();
529            System.out.println("in wrapup; stopped server.");
530        }
531        
532        _channelMap = null;
533        
534        _quit.set(true);
535        synchronized(_flushThread)
536        {
537            _flushThread.notify();
538        }
539        
540        try
541        {
542            _flushThread.join();
543        }
544        catch (InterruptedException e)
545        {
546            throw new IllegalActionException(this, e, "Error waiting for flush thread to stop.");
547        }
548        _flushThread = null;
549        
550    }
551
552    ///////////////////////////////////////////////////////////////////
553    ////                         public fields                     ////
554
555    /** If true, data written to DataTurbine server will be accessible
556     *  after workflow ends.
557     */
558    public Parameter persistDataAfterWorkflowEnd;
559    
560    /** The number of seconds between writing data. */
561    public Parameter flushAfterSeconds;
562    
563    /** The number of data point between writing data. */
564    public Parameter flushAfterNumData;
565    
566    /** The hostname and port of the server. */
567    public StringParameter serverAddress;
568    
569    /** If true, show an input port for each channel on server. */
570    public Parameter showChannels;
571    
572    /** If true, start server if not already running (must provide archiveDirectory). */
573    public Parameter startServer;
574    
575    /** The directory containing the DataTurbine archive. This parameter is
576     *  ignored if startServer is false.
577     */
578    public FileParameter archiveDirectory;
579    
580    /** Name of channel container. Cannot have slashes. */
581    public StringParameter channelContainer;
582    
583    /** Regex pattern of ptolemy type in user info field. */
584    public final static Pattern PTOLEMY_TYPE_PATTERN = Pattern.compile(".*ptolemyType=([^,]+).*");
585
586    ///////////////////////////////////////////////////////////////////
587    ////                         private methods                   ////
588
589    /** Flush any pending data. */
590    private synchronized void _flush() throws IllegalActionException
591    {
592        if(_debugging)
593        {
594            _debug(getFullName() + " going to flush");
595        }
596        
597        _icon.setBusy();
598
599        synchronized(_numDataWriten)
600        {            
601            if(_numDataWriten > 0)
602            {
603                synchronized(_channelMap)
604                {
605                    try
606                    {
607                        _source.Flush(_channelMap);
608                    }
609                    catch (SAPIException e)
610                    {
611                        throw new IllegalActionException(this, e, "Error flushing data.");
612                    }
613                    
614                    _numDataWriten = 0;
615                    _lastFlushTime = System.currentTimeMillis() / 1000;
616                }
617            }
618        }
619        
620        _icon.setReady();
621    }
622    
623    /** Stop the server. */
624    private void _stopServer() throws IllegalActionException
625    {
626        if(_server != null)
627        {
628            try {
629                _server.stop();
630            } catch (Exception e) {
631                throw new IllegalActionException(this, e, "Error stopping DataTurbineServer.");
632            } finally {
633                _server = null;
634            }
635        }
636    }
637    
638    /** Stop the server ignoring any exceptions. */
639    private void _stopServerIgnoreException()
640    {
641        try {
642            _stopServer();
643        } catch(IllegalActionException e) {
644            System.out.println("Error stopping server: " + e.getMessage());
645        }
646        
647    }
648    
649    /** Get the channel names and corresponding ptolemy types. */
650    private Map<String,Type> _getChannelsNameAndType() throws IllegalActionException
651    {
652        Map<String,Type> retval = new HashMap<String,Type>();
653        
654        // see if server address and channel container has been set yet
655        if(_serverAddress != null && _channelContainer != null)
656        {
657            Sink sink = new Sink();
658            try
659            {
660                sink.OpenRBNBConnection(_serverAddress, getName());
661            }
662            catch (SAPIException e)
663            {
664                throw new IllegalActionException(this, e, "Error connecting to DataTurbine.");
665            }
666            
667            ChannelMap channelMap = _getMapAndUserInfo(sink);
668            String[] channels = channelMap.GetChannelList();
669            ChannelMap serverDataChannelMap = _getMapFromFetch(sink, channels);
670            for(String name : channels)
671            {
672                if(name.startsWith("_") || !name.startsWith(_channelContainer + "/"))
673                {
674                    continue;
675                }
676                
677                Type type = _getTypeFromChannelMap(name, serverDataChannelMap);
678                if(type == null)
679                {
680                    type = _getTypeFromUserInfo(name, channelMap);
681                }
682                
683                if(type == null)
684                {
685                    System.out.println("Unable to find type for channel " + name);
686                    type = BaseType.UNKNOWN;
687                }
688                
689                retval.put(name, type);
690            }
691            
692            
693            sink.CloseRBNBConnection();
694        }
695        
696        return retval;
697    }
698    
699    /** Get a ptolemy type for a channel name based on the type returned from the channel map.
700     *  NOTE: the channel map must be retrieved with a Sink.Fetch().
701     */
702    private Type _getTypeFromChannelMap(String name, ChannelMap map) throws IllegalActionException
703    {
704        int index = map.GetIndex(name);
705        if(index == -1)
706        {
707            throw new IllegalActionException(this, name + " not found in channel map");
708        }
709
710    
711        int typeId = map.GetType(index);
712        switch(typeId)
713        {
714        case ChannelMap.TYPE_FLOAT64:
715            return BaseType.DOUBLE;
716        case ChannelMap.TYPE_INT32:
717            return BaseType.INT;
718        default:
719            return null;
720        }
721    }
722    
723    /** Get a ptolemy type based on information in the user info metadata. */
724    private Type _getTypeFromUserInfo(String name, ChannelMap map) throws IllegalActionException
725    {
726        int index = map.GetIndex(name);
727        if(index == -1)
728        {
729            throw new IllegalActionException(this, name + " not found in channel map");
730        }
731        
732        String userInfo = map.GetUserInfo(index);
733        if(!userInfo.isEmpty())
734        {
735            // extract the ptolemyType from the user info
736            Matcher matcher = PTOLEMY_TYPE_PATTERN.matcher(userInfo);
737            if(matcher.matches())
738            {
739                String userInfoType = matcher.group(1);
740                return BaseType.forName(userInfoType);
741            }
742        }
743        return null;
744    }
745    
746    /** Reconfigure input ports based on DataTurbine channels. */
747    private void _reconfigureInputPorts() throws IllegalActionException
748    {
749        Map<String,Type> nameTypeMap = _getChannelsNameAndType();
750        for(Map.Entry<String, Type> entry : nameTypeMap.entrySet())
751        {
752            String name = entry.getKey();
753            // remove the container prefix from the name
754            name = name.substring(_channelContainer.length() + 1);
755            
756            // see if port already exists
757            TypedIOPort port = (TypedIOPort)getPort(name);
758            if(port == null)
759            {
760                // create the port
761                
762                
763                
764                try {
765                    port = new TypedIOPort(this, name, true, false);
766                    new Attribute(port, "_showName");
767                } catch (NameDuplicationException e) {
768                    throw new IllegalActionException(this, e, "Error creating port " + name);
769                }
770            }
771            
772            // set type
773            port.setTypeEquals(entry.getValue());
774        }
775    }
776    
777    /** Get a ChannelMap by calling Sink.Request and Fetch. */
778    private ChannelMap _getMapFromFetch(Sink sink, String[] names) throws IllegalActionException
779    {
780        boolean foundChannel = false;
781        
782        ChannelMap map = new ChannelMap();
783        for(String name : names)
784        {
785            if(name.startsWith("_"))
786            {
787                continue;
788            }
789            
790            try 
791            {
792                map.Add(name);
793                foundChannel = true;
794            }
795            catch (SAPIException e) 
796            {
797                throw new IllegalActionException(this, e, "Error adding to channel map.");
798            }
799        }
800        
801        if(foundChannel)
802        {
803            try
804            {
805                sink.Request(map, 0, 0, "oldest");
806                sink.Fetch(-1, map);
807            }
808            catch(SAPIException e)
809            {
810                throw new IllegalActionException(this, e, "Error determining types of channels.");
811            }
812        }
813        
814        return map;
815    }
816    
817    /** Get a ChannelMap and user info for existing channels on the server. */
818    private ChannelMap _getMapAndUserInfo(Sink sink) throws IllegalActionException
819    {
820        ChannelMap map;
821        try
822        {
823            sink.RequestRegistration();
824            map = sink.Fetch(-1);
825        }
826        catch(SAPIException e)
827        {
828            throw new IllegalActionException(this, e, "Error retrieving channels from server.");
829        }        
830        
831        return map;
832    }
833    
834    /** Add the names of inputs ports to a ChannelMap. */
835    private ChannelMap _getMapFromInputPorts() throws IllegalActionException
836    {
837        ChannelMap map = new ChannelMap();
838        List<?> inputList = inputPortList();
839        for(Object obj : inputList)
840        {
841            IOPort port = (IOPort)obj;
842            
843            String portName = port.getName();
844            
845            if(portName.startsWith("_") || port.numberOfSources() == 0)
846            {
847                continue;
848            }
849            
850            try
851            {
852                map.Add(portName);
853            }
854            catch (SAPIException e)
855            {
856                throw new IllegalActionException(this, e, "Error updating channel map.");
857            }
858        }
859        
860        return map;
861    }
862    
863    /** Start a DT server if not already running. */
864    private void _startServer(String addr) throws IllegalActionException
865    {
866        
867        // first see if we can connect to existing server
868        boolean alreadyRunning = true;
869        
870        _sink = new Sink();
871        try {
872            _sink.OpenRBNBConnection(addr, getName());
873        } catch (SAPIException e) {
874            alreadyRunning = false;
875            _sink = null;
876        }
877
878        if(!alreadyRunning)
879        {
880                
881            String dir = archiveDirectory.stringValue();
882            if(dir.length() == 0) {
883                throw new IllegalActionException(this, "Must provide archiveDirectory to start server.");
884            }
885            
886            // make sure directory exists
887            File file = new File(dir);
888            if(!file.exists())
889            {
890                if(!file.mkdir())
891                {
892                    throw new IllegalActionException(this, "Unable to create directory " + dir);
893                }
894            }
895            
896            String[] args = new String[] { 
897                    "-a ", addr,
898                    "-F ", // loads existing archives
899                    "-H ", dir };
900            
901            try {
902                _server = Server.launchNewServer(args);
903            } catch (Exception e) {
904                throw new IllegalActionException(this, e, "Error starting server.");
905            }
906
907            // XXX if we don't wait, sometimes we cannot connect to server
908            // should find a better way to wait until server is running
909            try {
910                Thread.sleep(1000);
911            } catch (InterruptedException e) {
912                // TODO Auto-generated catch block
913                e.printStackTrace();
914            }
915        }
916    }
917
918    /** A thread that flushes data. */
919    private class PeriodicFlushThread extends Thread
920    {
921        @Override
922        public void run()
923        {
924            while(!_quit.get())
925            {
926                synchronized(this)
927                {
928                    try
929                    {
930                        wait(_flushAfterSeconds * 1000);
931                    }
932                    catch (InterruptedException e)
933                    {
934                        MessageHandler.error("Error while waiting in flush thread.", e);
935                        return;
936                    }
937                }
938                
939                long elapsed = (System.currentTimeMillis() / 1000) - _lastFlushTime;
940                System.out.println("elapsed time is " + elapsed);
941                
942                if(elapsed > _flushAfterSeconds)
943                {
944                    try
945                    {
946                        _flush();
947                    }
948                    catch (IllegalActionException e)
949                    {
950                        MessageHandler.error("Error while flushing in flush thread.", e);
951                        return;
952                    }
953                }
954            }
955        }
956    }
957    
958    ///////////////////////////////////////////////////////////////////
959    ////                         private fields                    ////
960
961    /** A DataTurbine server object. */
962    private Server _server;
963    
964    /** An object to write to DT. */
965    private Source _source;
966    
967    /** An sink object for get type information.*/
968    private Sink _sink;
969    
970    /** Channel map to write data. */
971    private ChannelMap _channelMap;
972    
973    /** The number of data written since the last flush. */
974    private Integer _numDataWriten;
975    
976    /** The number of data to write before flushing. */
977    private int _flushAfterNumData;
978    
979    /** The amount of time to wait before flushing. */
980    private int _flushAfterSeconds;
981    
982    /** The name of the channel container. */
983    private String _channelContainer;
984    
985    /** A set of port names with unknown types. */
986    private Set<String> _unknownChannelTypeSet;
987    
988    /** The actor icon. */
989    private DataSourceIcon _icon;
990    
991    /** A thread to periodically flush data. */
992    private Thread _flushThread;
993    
994    /** If true, cleanup threads. */
995    private AtomicBoolean _quit = new AtomicBoolean(false);
996    
997    /** The last time of the flush. */
998    private long _lastFlushTime;
999    
1000    /** If true, create input ports for each channel on server (within the container). */
1001    private boolean _showChannels = false;
1002    
1003    /** The host name and port of the server. */
1004    private String _serverAddress;
1005    
1006}