001/*
002 * Copyright (c) 2007-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-12-17 17:59:53 +0000 (Thu, 17 Dec 2015) $' 
007 * '$Revision: 34345 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.provenance.sql;
031
032import java.sql.PreparedStatement;
033import java.sql.ResultSet;
034import java.sql.SQLException;
035import java.sql.Timestamp;
036import java.util.Collections;
037import java.util.Date;
038import java.util.LinkedHashMap;
039import java.util.Map;
040import java.util.WeakHashMap;
041
042import org.kepler.provenance.FireState;
043import org.kepler.provenance.Recording;
044import org.kepler.provenance.RecordingException;
045import org.kepler.provenance.RecordingParameters;
046import org.kepler.util.sql.DatabaseFactory;
047import org.kepler.util.sql.DatabaseType;
048import org.kepler.util.sql.Schema;
049
050import ptolemy.actor.Actor;
051import ptolemy.actor.Director;
052import ptolemy.actor.FiringEvent;
053import ptolemy.actor.IOPort;
054import ptolemy.actor.IOPortEvent;
055import ptolemy.actor.IORelation;
056import ptolemy.actor.LocalClock;
057import ptolemy.actor.TypedIOPort;
058import ptolemy.actor.gui.ColorAttribute;
059import ptolemy.actor.gui.SizeAttribute;
060import ptolemy.actor.gui.WindowPropertiesAttribute;
061import ptolemy.actor.gui.style.LineStyle;
062import ptolemy.actor.gui.style.TextStyle;
063import ptolemy.actor.parameters.ParameterPort;
064import ptolemy.actor.parameters.PortParameter;
065import ptolemy.actor.sched.Scheduler;
066import ptolemy.data.StringToken;
067import ptolemy.data.Token;
068import ptolemy.data.expr.Parameter;
069import ptolemy.domains.sdf.kernel.SDFDirector;
070import ptolemy.kernel.undo.UndoStackAttribute;
071import ptolemy.kernel.util.AbstractSettableAttribute;
072import ptolemy.kernel.util.Attribute;
073import ptolemy.kernel.util.ConfigurableAttribute;
074import ptolemy.kernel.util.IllegalActionException;
075import ptolemy.kernel.util.Location;
076import ptolemy.kernel.util.NameDuplicationException;
077import ptolemy.kernel.util.Nameable;
078import ptolemy.kernel.util.NamedObj;
079import ptolemy.vergil.basic.KeplerDocumentationAttribute;
080import ptolemy.vergil.icon.BoxedValueIcon;
081import ptolemy.vergil.icon.EditIconTableau;
082import ptolemy.vergil.icon.EditorIcon;
083import ptolemy.vergil.icon.TextIcon;
084import ptolemy.vergil.icon.UpdatedValueIcon;
085import ptolemy.vergil.icon.ValueIcon;
086import ptolemy.vergil.icon.XMLIcon;
087
088
089/** SQL Implementation of Recording using SDM SPA schema.
090 *
091 * @author Daniel Crawl
092 * @version $Id: SQLRecording.java 34345 2015-12-17 17:59:53Z crawl $
093 *
094 */
095    
096public class SQLRecording extends Recording
097{
098    /** Construct a new SQLRecording. */
099    public SQLRecording() throws RecordingException
100    {
101        super();
102
103        _dbType = null;
104
105        _schema = null;
106
107        // NOTE: we use a LinkedHashMap instead of a HashMap for predictable
108        // iteration order (the order in which keys are inserted into it).
109        // The map is iterated at the end of workflow execution to record
110        // the stop of rw-firings and we want a repeatable ordering for
111        // tests.
112        _fireStateTable = //Collections.synchronizedMap(
113            new LinkedHashMap<Actor, FireState<Integer>>();//);
114
115        _entityCacheTable = Collections.synchronizedMap(
116            new WeakHashMap<Nameable, RegEntity>());
117
118        _dbReset();
119    }
120
121    /** Stop recording. Called when the Recording type changes
122     *  or ProvenanceListener removed from canvas. 
123     *  NOTE: this is NOT called when the GUI exits or when the
124     *  workflow runs from the command line.
125     *  
126     *  This is the last method called to a Recording instance.
127     */
128    @Override
129    public void disconnect() throws RecordingException
130    {
131        _dbReset();
132    } 
133    
134    // Specification
135    
136    /** Register an actor. */
137    @Override
138    public boolean regActor(Actor actor) throws RecordingException
139    {
140        //_debug("regActor(" + _getNameableFullName(actor) + ")");
141
142        int id = _regActorDirector((NamedObj)actor, true);
143
144        FireState<Integer> fs = new FireState<Integer>(actor, id);
145        _fireStateTable.put(actor, fs);
146
147        return true;
148    }
149
150    /** Register a director. */
151    @Override
152    public boolean regDirector(Director director) throws RecordingException
153    {
154        _regActorDirector(director, false);
155        return true; 
156    }
157
158    /** Register a parameter. A parameter can be any <b>entity</b>
159     * stored in the MoML that does not have its own
160     * <code>regNNN()</code> method. This can be user-level 
161     * parameters (e.g., Parameter, StringParameter, etc.) or
162     * internal to Kepler (e.g., _location, semanticType000, etc.).
163     * (A "parameter" corresponds to a property in the MoML).
164     *
165     */
166    @Override
167    public boolean regParameter(NamedObj parameter) throws RecordingException
168    {
169        boolean ignore = false;
170
171        String name = parameter.getName();
172
173        if(parameter instanceof WindowPropertiesAttribute ||
174            parameter instanceof ConfigurableAttribute ||
175            parameter instanceof Location ||
176            parameter instanceof KeplerDocumentationAttribute ||
177            parameter instanceof SizeAttribute ||
178            parameter instanceof ColorAttribute ||
179            parameter instanceof TextIcon ||
180            parameter instanceof EditIconTableau.Factory ||
181            parameter instanceof TextStyle ||
182            parameter instanceof ValueIcon ||
183            parameter instanceof XMLIcon ||
184            parameter instanceof LineStyle ||
185            parameter instanceof UndoStackAttribute ||
186            parameter instanceof BoxedValueIcon ||
187            parameter instanceof UpdatedValueIcon  ||
188            parameter instanceof EditorIcon ||
189            parameter instanceof org.kepler.moml.NamedObjIdReferralList ||
190            parameter instanceof LocalClock ||
191            parameter instanceof Scheduler ||
192            ((parameter.getContainer() instanceof SDFDirector) &&
193                        (name.equals(SDFDirector.AUTO_NAME) || name.equals(SDFDirector.UNBOUNDED_NAME))) ||
194            ((parameter.getContainer() instanceof Director) &&
195                        (name.equals("startTime") || name.equals("stopTime"))) ||                       
196            name.equals("bold") ||
197            name.equals("italic") ||
198            name.equals("fontFamily") ||
199            name.equals("textSize") ||
200            name.equals("_notDraggable") ||
201            name.indexOf("_vergil") == 0 ||
202            name.indexOf("_hide") == 0 ||
203            name.equals("_showName"))
204        {
205            ignore = true;
206        }
207        else
208        {
209            RegEntity re;
210            boolean isPort = false;
211
212            // if it's a PortParameter, we'll need to register both the
213            // port and parameter.
214            if(parameter instanceof PortParameter)
215            {
216                re = _checkEntity(parameter, RegEntity.EntityType.PortParameter);
217                isPort = true;
218            }
219            else
220            {
221                re = _checkEntity(parameter, RegEntity.EntityType.Parameter);
222            }
223
224            if(re.isNew())
225            {
226                _regParameterReal(parameter, re);
227
228                if(isPort)
229                {
230                    _regPortReal(((PortParameter)parameter).getPort(), re);
231                }
232            }
233        }
234
235        return (! ignore);
236    }
237
238    /** Register a link between two endpoints.  */
239    @Override
240    public boolean regLink(NamedObj endPoint1, NamedObj endPoint2)
241        throws RecordingException
242    {
243      
244        /*
245         _debug("regLink(" + _getNameableFullName(endPoint1) + ", " + 
246            _getNameableFullName(endPoint2) + ")");
247        */
248
249        // make sure both endpoints have been registered. 
250        RegEntity.EntityType type = RegEntity.EntityType.getType(endPoint1);
251        RegEntity re1 = _checkEntity(endPoint1, type);
252        if(re1 == null)
253        {
254            throw new RecordingException("endPoint1 is not registered.");
255        }
256        
257        type = RegEntity.EntityType.getType(endPoint2);
258        RegEntity re2 = _checkEntity(endPoint2, type);
259        if(re2 == null)
260        {
261            throw new RecordingException("endPoint2 is not registered.");
262        }
263
264        Link link = new Link(endPoint1, endPoint2);
265        
266        //_debug("linkname = " + linkName);
267
268        RegEntity reLink = _checkEntity(link, RegEntity.EntityType.Link);
269        if(reLink.isNew())
270        {
271            try
272            {
273                synchronized(_psLinkInsert)
274                {
275                    _psLinkInsert.setInt(1, reLink.getId());
276                    _psLinkInsert.setInt(2, re1.getId());
277                    _psLinkInsert.setInt(3, re2.getId());
278                    _psLinkInsert.executeUpdate();
279                }
280
281                if(_debugWriter != null)
282                {
283                    _debugWrite("INSERT INTO LINK (" + reLink.getId() +
284                        ", " + re1.getId() + ", " + re2.getId() + ")");
285                }
286            }
287            catch(SQLException e)
288            {
289                throw new RecordingException(_getExceptionMessage(e));
290            }
291        }
292        /*
293        else
294        {
295            _debug("link already recorded: " + linkName);
296        }
297        */
298
299        return true; 
300
301    }
302
303    /** Register a port or portparameter.  */
304    @Override
305    public boolean regPort(TypedIOPort port) throws RecordingException
306    {
307        //_debug("regPort(" + _getNameableFullName(port) + ")");
308        //
309
310        RegEntity re;
311        boolean isParameter = false;
312        
313        // if it's a ParameterPort, we'll need to register both the
314        // port and parameter.
315        if(port instanceof ParameterPort)
316        {
317            re = _checkEntity(port, RegEntity.EntityType.PortParameter);
318            isParameter = true;
319        }
320        else
321        {
322            re = _checkEntity(port, RegEntity.EntityType.Port);
323        }
324
325        if(re.isNew())
326        {
327            _regPortReal(port, re);
328
329            if(isParameter)
330            {
331                _regParameterReal(((ParameterPort)port).getParameter(), re);
332            }
333        }
334        
335        return true;
336    }
337   
338    /** Register a relation. */
339    @Override
340    public boolean regRelation(IORelation relation) throws RecordingException
341    {
342        //_debug("regRelation(" + _getNameableFullName(relation) + ")");
343
344        RegEntity re = _checkEntity(relation, RegEntity.EntityType.Relation);
345        if(re.isNew())
346        {
347            try
348            {
349                int width = relation.getWidth();
350                synchronized(_psRelationInsert)
351                {
352                    _psRelationInsert.setInt(1, re.getId());
353                    _psRelationInsert.setInt(2, width);
354                    _psRelationInsert.executeUpdate();
355                }
356                if(_debugWriter != null)
357                {
358                    _debugWrite("INSERT INTO RELATION (id, " +
359                        width + ")");
360                }
361            }
362            catch(SQLException e)
363            {
364                throw new RecordingException(_getExceptionMessage(e));
365            }
366            catch(IllegalActionException e)
367            {
368                throw new RecordingException(_getExceptionMessage(e));
369            }
370        }
371        
372        return true; 
373    }
374
375    /** Called before registering workflow contents. */
376    @Override
377    public void specificationStart() throws RecordingException
378    {
379        super.specificationStart();
380
381        // clear the entity cache table so that entity renames are handled
382        // FIXME this will cause performance problems on large workflows
383        _entityCacheTable.clear();
384        
385        // always clear the fire state table since the id of actors
386        // can change between workflow executions due to the actor
387        // being renamed.
388        _fireStateTable.clear();
389    }
390
391  
392    ////////////////////////////////////////////////////////////////////////
393    //// Execution interface                                            ////
394    
395    /** Record the starting of workflow execution. */
396    @Override
397    public void executionStart(Date timestamp) throws RecordingException
398    {
399        if(_wfExecId != RegEntity.UNKNOWN_ID)
400        {
401            throw new RecordingException("Workflow already running.");
402        }
403            
404        // NOTE: do not set this to false so that workflow contents
405        // will be registered again. this handles cases where entities
406        // are renamed between runs.
407        //_needWorkflowContents(false);
408
409        if(_wfUserStr == null)
410        {
411            throw new RecordingException("Need workflow user name");
412        }
413        
414        // update the db
415        try
416        {
417            synchronized(_psWorkflowExecStart)
418            {
419                _psWorkflowExecStart.setInt(1, _wfId);
420                _psWorkflowExecStart.setString(2, _wfUserStr);
421                _psWorkflowExecStart.setTimestamp(3, new Timestamp(timestamp.getTime()));
422                _wfExecId = _dbType.insert(_psWorkflowExecStart,
423                    "workflow_exec", "id");
424            }
425
426            if(_debugWriter != null)
427            {
428                _debugWrite("INSERT INTO WORKFLOW_EXEC (" + _wfId +
429                    ", " + _wfUserStr + ", curTime)");
430            }
431        }
432        catch(SQLException e)
433        {
434            throw new RecordingException(_getExceptionMessage(e));
435        }
436    }
437
438    /** Record the stopping of workflow execution. */
439    @Override
440    public void executionStop(Date timestamp) throws RecordingException
441    {
442        if(_wfExecId == RegEntity.UNKNOWN_ID)
443        {
444            throw new RecordingException("Workflow not running.");
445        }
446        try
447        {
448            // stop any actors that may be rw firing
449            for(FireState<Integer> fs : _fireStateTable.values())
450            {
451                synchronized(fs)
452                {
453                    Integer id = fs.getCurFireId(true);
454                    if(id != null)
455                    {
456                        _recordFiringEvent(fs.getActor(), 
457                            FiringEvent.AFTER_RW_FIRE, timestamp);
458                        // reset the port state
459                        // XXX what about port events between wf execs?
460                        fs.setPortLastAccess(
461                            FireState.PortAccessType.Uninitialized);
462                    }
463
464                    id = fs.getCurFireId(false);
465                    if(id != null)
466                    {
467                        _recordFiringEvent(fs.getActor(),
468                            fs.getNextStopFiringType(), timestamp);
469                    }
470                }
471            }
472
473            // 
474            synchronized(_psWorkflowExecStop)
475            {
476                _psWorkflowExecStop.setTimestamp(1, new Timestamp(timestamp.getTime()));
477                _psWorkflowExecStop.setInt(2, _wfExecId);
478                _psWorkflowExecStop.executeUpdate();
479            }
480
481            if(_debugWriter != null)
482            {
483                _debugWrite("UPDATE WORKFLOW_EXEC SET end_time = " +
484                    "curTime WHERE id = " + _wfExecId + ")");
485            }
486
487            _wfExecId = RegEntity.UNKNOWN_ID;
488        }
489        catch(SQLException e)
490        {
491            throw new RecordingException(_getExceptionMessage(e));
492        }
493    }
494    
495    /** An execution was imported. */
496    @Override
497    public void executionImported()
498    {
499        
500    }
501    
502    /** Record an actor fire event. */
503    @Override
504    public void actorFire(FiringEvent event, Date timestamp) throws RecordingException
505    {
506        //_debug("actorFire: " + event);
507        _recordFiringEvent(event.getActor(), event.getType(), timestamp);
508    }
509
510    /** Record a port event. */
511    @Override
512    public void portEvent(IOPortEvent event, Date timestamp) throws RecordingException
513    {
514        IOPort port = (TypedIOPort)event.getPort();
515
516        int type = event.getEventType();
517        boolean isRead = false;
518        boolean recordEvent = false;
519
520        switch(type)
521        {
522            case IOPortEvent.SEND_BEGIN:
523                isRead = false;
524                recordEvent = true;
525                break;
526            case IOPortEvent.GET_END:
527                isRead = true;
528                recordEvent = true;
529                break;
530            // ignore get_begin and send_end
531            case IOPortEvent.GET_BEGIN:
532            case IOPortEvent.SEND_END:
533                break;
534            // warn if received unknown type
535            default:
536                _warn("unhandled type of IOPortEvent: " + type);
537                break;
538        }
539
540        Actor actor = (Actor)port.getContainer();
541        FireState<Integer> fs = _fireStateTable.get(actor);
542
543        // XXX check for null
544        synchronized(fs)
545        {
546            // see if we need to generate rw fire event(s)
547            FireState.PortAccessType lastAccess = fs.getPortLastAccess();
548            if(lastAccess == FireState.PortAccessType.Uninitialized)
549            {
550                _recordFiringEvent(actor, FiringEvent.BEFORE_RW_FIRE, timestamp);
551            }
552            else if(lastAccess == FireState.PortAccessType.Write && isRead)
553            {
554                _recordFiringEvent(actor, FiringEvent.AFTER_RW_FIRE, timestamp);
555                _recordFiringEvent(actor, FiringEvent.BEFORE_RW_FIRE, timestamp);
556            }
557
558            // record the type of last port access for this actor
559            if(isRead)
560            {
561                fs.setPortLastAccess(FireState.PortAccessType.Read);
562            }
563            else
564            {
565                fs.setPortLastAccess(FireState.PortAccessType.Write);
566            }
567
568            if(recordEvent)
569            {
570                /*
571                 _Debug("port " + _getNameableFullName(port) + 
572                    (isRead ? " read " : " write ") +
573                    (port.isOpaque() ? "opaque " : "transparent "));
574                */
575                //_Debug("type " + port.getType());
576
577                Integer fireId = fs.getCurFireId(false);
578                if(fireId == null)
579                {
580                    fireId = RegEntity.UNKNOWN_ID; 
581                }
582               
583                // getCurFireId may return null
584                int rwfireId = RegEntity.UNKNOWN_ID;
585                Integer rwfireIdInteger = fs.getCurFireId(true);
586                if(rwfireIdInteger != null)
587                {
588                    rwfireId = rwfireIdInteger; 
589                }
590
591                // an actor may read or write a token in a non-firing
592                // method. e.g., SampleDelay writes a token in initilize().
593                if(fireId == RegEntity.UNKNOWN_ID)
594                {
595                    /*
596                    _warn("port event not in fire method for " +
597                        _getNameableFullName(actor));
598                    */
599
600                    fireId = rwfireId;
601                }
602                
603                if(event.getVectorLength() == IOPortEvent.SINGLETOKEN)
604                {
605                    _recordPortEvent(port, fireId, rwfireId, isRead, 
606                        event.getChannel(), event.getToken(),
607                        event.getReceiverPort(), timestamp);
608                }
609                else
610                {
611                    Token[] tokenArray = event.getTokenArray();
612                    for(int i = 0; i < tokenArray.length; i++)
613                    {
614                        _recordPortEvent(port, fireId, rwfireId, isRead, 
615                            event.getChannel(), tokenArray[i],
616                            event.getReceiverPort(), timestamp);
617                    }
618                }
619            }
620        }
621    }
622    
623    /** Add Parameters for ProvenanceListener. */
624    @Override
625    public RecordingParameters generateParameters(NamedObj no) 
626        throws IllegalActionException, NameDuplicationException
627    {
628        _params = new SQLRecordingParameters(no);
629        return _params;
630    }
631    
632    /** React to a change in an attribute. */
633    @Override
634    public void attributeChanged(Attribute attribute)
635        throws IllegalActionException
636    {
637        String name = attribute.getName();
638        String val;
639        
640        //_debug("begin sql attr changed: " + attribute + " hash = " + _recorder.getContainer().hashCode() );
641
642        DatabaseFactory.Parameter dbParameter =
643            DatabaseFactory.Parameter.getType(name);
644        
645        if(dbParameter != null)
646        {
647            switch(dbParameter)
648            {
649            case USER:
650                val = 
651                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
652
653                String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName());
654                if(dbUserStr == null || !val.equals(dbUserStr)) 
655                {
656                    _dbParams.put(DatabaseFactory.Parameter.USER.getName(), val);
657                    _needReconnectDB = true;
658                }
659
660                break;
661                
662            case PASSWD:
663                val = 
664                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
665
666                String dbPasswdStr = _dbParams.get(DatabaseFactory.Parameter.PASSWD.getName());
667                if(dbPasswdStr == null || !val.equals(dbPasswdStr)) 
668                {
669                    _dbParams.put(DatabaseFactory.Parameter.PASSWD.getName(), val); 
670                    _needReconnectDB = true;
671                }
672                
673                break;
674                
675            case HOST:
676                val = 
677                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
678
679                String dbHostStr = _dbParams.get(DatabaseFactory.Parameter.HOST.getName());
680                if(dbHostStr == null || !val.equals(dbHostStr)) 
681                {
682                    _dbParams.put(DatabaseFactory.Parameter.HOST.getName(), val); 
683                    _needReconnectDB = true;
684                }
685
686                break;
687
688            case NAME:
689                val = 
690                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
691
692                String dbNameStr = _dbParams.get(DatabaseFactory.Parameter.NAME.getName());
693                if(dbNameStr == null || !val.equals(dbNameStr)) 
694                {
695                    _dbParams.put(DatabaseFactory.Parameter.NAME.getName(), val); 
696                    _needReconnectDB = true;
697                }
698 
699                break;
700            
701            case TYPE:
702                val = 
703                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
704
705                if(_dbType == null || !val.equals(_dbType.getName()))
706                {
707                    if(_dbType != null)
708                    {
709                        try
710                        {
711                            _dbType.disconnect();
712                        }
713                        catch(SQLException e)
714                        {
715                            throw new IllegalActionException("Error " +
716                                "disconnecting from database: " + e.getMessage());
717                        }
718                    }
719       
720                    _dbParams.put(DatabaseFactory.Parameter.TYPE.getName(), val);
721                    _dbType = DatabaseFactory.getType(val);
722                    if(_dbType == null)
723                    {
724                        throw new IllegalActionException(
725                            "Invalid database type: " + val);
726                    }
727                    _needReconnectDB = true;
728                }
729
730                break;
731            
732            case PORT:
733                val =
734                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
735                
736                String dbPortStr = _dbParams.get(DatabaseFactory.Parameter.PORT.getName());
737                if(dbPortStr == null || !val.equals(dbPortStr)) 
738                {
739                    _dbParams.put(DatabaseFactory.Parameter.PORT.getName(), val); 
740                    _needReconnectDB = true;
741                }
742
743                break;
744                
745            case TABLEPREFIX:
746                val =
747                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
748                
749                String dbTablePrefixStr = _dbParams.get(DatabaseFactory.Parameter.TABLEPREFIX.getName());
750                if(dbTablePrefixStr == null || !val.equals(dbTablePrefixStr)) 
751                {
752                    _dbParams.put(DatabaseFactory.Parameter.TABLEPREFIX.getName(), val); 
753                    _needReconnectDB = true;
754                }
755
756                break;
757            
758            case JDBC_URL:                
759                val =
760                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
761                
762                String dbJDBCURLStr = _dbParams.get(DatabaseFactory.Parameter.JDBC_URL.getName());
763                if(dbJDBCURLStr == null || !val.equals(dbJDBCURLStr))
764                {
765                    _dbParams.put(DatabaseFactory.Parameter.JDBC_URL.getName(), val);
766                    _needReconnectDB = true;
767                }
768                
769                break;
770                
771            case CREATE_INDEXES:                
772                val =
773                    ((StringToken)((Parameter)attribute).getToken()).stringValue();
774                
775                String createIndexesStr = _dbParams.get(DatabaseFactory.Parameter.CREATE_INDEXES.getName());
776                if(createIndexesStr == null || !val.equals(createIndexesStr))
777                {
778                    _dbParams.put(DatabaseFactory.Parameter.CREATE_INDEXES.getName(), val);
779                    _needReconnectDB = true;
780                }
781                
782                break;
783                
784            default:
785                System.out.println("WARNING: unknown " +
786                    "DatabaseFactory.Parameter type: " +
787                    dbParameter.getName());
788                break;
789            }
790        }
791        else if(name.equals(SQLRecordingParameters.wfNameParamStr))
792        {
793            val = 
794                ((StringToken)((Parameter)attribute).getToken()).stringValue();
795
796            if(_wfNameStr == null || !val.equals(_wfNameStr))
797            {
798                _wfNameStr = val;
799                _needReconnectWF = true;
800                //_debug("wf name changed to " + _wfNameStr);
801            }
802        }
803        else if(name.equals("User Name"))
804        {
805            val = 
806                ((StringToken)((Parameter)attribute).getToken()).stringValue();
807           
808            // if empty, set default from system property
809            if(val.length() == 0)
810            {
811                val = System.getProperty("user.name");
812            }
813
814            if(_wfUserStr == null || !val.equals(_wfUserStr))
815            {
816                _wfUserStr = val;
817            }
818
819            // for backwards compatibility
820            String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName());
821            if(dbUserStr == null || dbUserStr.length() == 0)
822            {
823                _dbParams.put(DatabaseFactory.Parameter.USER.getName(), val);
824                _needReconnectDB = true;
825            }
826        }
827        else
828        {
829            super.attributeChanged(attribute);
830        }
831
832        // get workflow contents if we need to reconnect to db
833        // or workflow
834        if(_needReconnectDB || _needReconnectWF)
835        {
836            _needWorkflowContents(true);
837
838            // XXX
839            if(_needReconnectWF)
840            {
841                _wfReset();
842            }
843
844            // XXX need dbReset?
845        }
846        
847        //_debug("end sql attr changed: " + attribute);
848    }
849   
850    ////////////////////////////////////////////////////////////////////////
851    //// public variables                                               ////
852    
853    /** The token data is valid. */
854    public static final int DATA_VALID              = 0;
855
856    /** The token data was not recorded. */
857    public static final int DATA_NONE               = 1;
858
859    /** The token data was too large to completely store. */
860    public static final int DATA_TRUNCATED          = 2;
861 
862    // port io types
863    public enum PortDirection 
864    {
865        In,
866        Out,
867        InOut;
868
869        static public PortDirection valueOf(int id)
870        {
871            if(id < 0 || id >= values().length)
872            {
873                return null;
874            }
875            else
876            {
877                return values()[id];
878            }
879        }
880    };
881            
882    ////////////////////////////////////////////////////////////////////////
883    //// protected methods                                              ////
884
885    /** Check parameters before (re)connecting to database or workflow. */
886    protected void _checkParameters() throws IllegalActionException,
887        RecordingException
888    {
889        if(_needReconnectWF)
890        {
891            //_debug("checkParameters; need reconnectWF; name len = " +
892            //  _wfNameStr.length() + " name = " + _wfNameStr);
893            
894            _checkWorkflowName();
895        }
896
897        if(_needReconnectDB)
898        {
899            if(_dbType == null)
900            {
901                throw new RecordingException("No database connection.");
902            }
903
904            // see if the jdbc url override is empty.
905            // if so, need to check individual parameters.
906            String dbJDBCURLStr = _dbParams.get(DatabaseFactory.Parameter.JDBC_URL.getName());
907            if(dbJDBCURLStr == null || dbJDBCURLStr.length() == 0)
908            {
909                String dbUserStr = _dbParams.get(DatabaseFactory.Parameter.USER.getName());
910                if(_dbType.needUserForConnect() && 
911                    (dbUserStr == null || dbUserStr.length() == 0))
912                {
913                    throw new RecordingException("Need database user name."); 
914                }
915    
916                String dbPasswdStr = _dbParams.get(DatabaseFactory.Parameter.PASSWD.getName());
917                if(_dbType.needPasswordForConnect() && 
918                    (dbPasswdStr == null || dbPasswdStr.length() == 0))
919                {
920                    throw new RecordingException("Need database password");
921                }
922    
923                String dbHost = _dbParams.get(DatabaseFactory.Parameter.HOST.getName());
924                if(_dbType.needHostForConnect() &&
925                    (dbHost == null ||  dbHost.length() == 0))
926                {
927                    throw new RecordingException("Need database host name."); 
928                }
929
930                String dbNameStr = _dbParams.get(DatabaseFactory.Parameter.NAME.getName());
931                if(dbNameStr == null || dbNameStr.length() == 0)
932                {
933                    throw new RecordingException("Need database name."); 
934                }
935            }
936        }
937
938        /*
939        _debug("going to _connectProvStore: ");
940        _debug("wf: " + _wfNameStr);
941        _debug("user: " + _dbUserStr);
942        _debug("dbhost: " + _dbHostStr);
943        _debug("dbhost: " + _dbPortStr);
944        _debug("passwd: " + _dbPasswdStr);
945        */
946    }
947
948    /** Check validity of workflow name parameter. */
949    protected void _checkWorkflowName() throws RecordingException
950    {
951        if(_wfNameStr == null || _wfNameStr.length() == 0)
952        {
953            throw new RecordingException("Need workflow name.");
954        }
955    }
956
957    /** Create a Schema to reflect the v6 schema. */
958    protected Schema _createSchema()
959    {
960        return Schemas.createSchemaV6();
961    }
962
963    /** Reconnect to the database. */
964    protected void _reconnectDatabase(boolean resetDB) throws RecordingException
965    {
966        if(resetDB)
967        {
968            _dbReset();
969        }
970        
971        try
972        {
973            _dbType = DatabaseFactory.getConnectedDatabaseType(_dbParams, "provenance");
974            Schemas.checkVersion(_dbType, _schema);
975
976            _maxDataSize = _getMaxTokenDataSize();
977            _maxParameterValueSize = _getMaxParameterValueSize();
978
979            //System.out.println("max data size = " + _maxDataSize);
980            //System.out.println("max param value size = " + __maxParameterValueSize);
981        }
982        catch(SQLException e)
983        {
984            _errorReset();
985            throw new RecordingException("Error connecting or " +
986                "initializing database.\n" + _dbParams + "\n", e);
987        }
988
989        try
990        {
991            _nullPreparedStatements();
992            _createPreparedStatements();
993        }
994        catch(SQLException e)
995        {
996            throw new RecordingException("Error creating prepared " +
997                "statements: " + e.getMessage());
998        }
999
1000        _needReconnectDB = false;
1001    }
1002
1003    /** Get the maximum size of a parameter value that can be stored in the 
1004     *  database.
1005     */
1006    protected int _getMaxParameterValueSize() throws SQLException
1007    {
1008        return _dbType.getColumnSize("parameter", "value");
1009    }
1010
1011    /** Get the maximum size of token data that can be stored in the 
1012     *  database.
1013     */
1014    protected int _getMaxTokenDataSize() throws SQLException
1015    {
1016        return _dbType.getColumnSize("token_flow", "data");
1017    }
1018
1019    /** Re-acquire the current workflow id. */
1020    protected void _reconnectWorkflow() throws RecordingException
1021    {
1022        _wfReset();
1023
1024        _needReconnectWF = false;
1025
1026        int rc = _updateWorkflowID(false);
1027                
1028        if(rc == WORKFLOW_ALREADY_EXISTS)
1029        {
1030                        
1031        }
1032        else if(rc == WORKFLOW_DOES_NOT_EXIST)
1033        {
1034            //_debug("workflow does not exist; attempting to create " +
1035                //"new one.");
1036
1037            //XXX need to check for return value
1038            _updateWorkflowID(true);
1039        }
1040    }
1041
1042    /** Initialize the prepared statements. */
1043    protected void _createPreparedStatements() throws SQLException
1044    {
1045        // NOTE: for each PreparedStatement we check to see if
1046        // it has already been created by a subclass: it may
1047        // have a different columns for a table, or removed
1048        // the entire table.
1049
1050        if(_psWorkflowInsert == null && _schema.containsTable("workflow"))
1051        {
1052            _psWorkflowInsert = _dbType.getSQLInsert("workflow", "id",
1053                "name, version, action_id", "?, ?, ?");
1054        }
1055       
1056        if(_psWorkflowQuery == null && _schema.containsTable("workflow"))
1057        {
1058            _psWorkflowQuery = _dbType.getSQLSelect("workflow", "id",
1059                "name = ?");
1060        }
1061   
1062        if(_psActionInsert == null && _schema.containsTable("action"))
1063        {
1064            _psActionInsert = _dbType.getSQLInsert("action", "id",
1065                "parent_id, user, time", "?, ?, ?");
1066        }
1067
1068        if(_psEntityInsert == null && _schema.containsTable("entity"))
1069        {
1070            _psEntityInsert = _dbType.getSQLInsert("entity", "id",
1071                "container_id, workflow_id, type, name", "?, ?, ?, ?");
1072        }
1073            
1074        if(_psEntityQuery == null && _schema.containsTable("entity"))
1075        {
1076            _psEntityQuery = _dbType.getSQLSelect("entity", "id",
1077                "workflow_id = ? AND name = ? AND type = ?");
1078        }
1079
1080        if(_psActorInsert == null && _schema.containsTable("actor"))
1081        {
1082            _psActorInsert = _dbType.getSQLInsert("actor", "id, class", "?, ?");
1083        }
1084
1085        if(_psDirectorInsert == null && _schema.containsTable("director"))
1086        {
1087            _psDirectorInsert = _dbType.getSQLInsert("director", "id, class",
1088                "?, ?");
1089        }
1090    
1091        if(_psParameterInsert == null && _schema.containsTable("parameter"))
1092        {
1093            _psParameterInsert = _dbType.getSQLInsert("parameter",
1094                "id, type, value", "?, ?, ?");
1095        }
1096
1097        if(_psLinkInsert == null && _schema.containsTable("link"))
1098        {
1099            _psLinkInsert = _dbType.getSQLInsert("link",
1100                "id, end_point_1, end_point_2", "?, ?, ?");
1101        }
1102    
1103        if(_psPortInsert == null && _schema.containsTable("port"))
1104        {
1105            _psPortInsert = _dbType.getSQLInsert("port",
1106                "id, type, direction, multiport", "?, ?, ?, ?");
1107        }
1108    
1109        if(_psRelationInsert == null && _schema.containsTable("relation"))
1110        {
1111            _psRelationInsert = _dbType.getSQLInsert("relation", "id, width",
1112                "?, ?");
1113        }
1114                
1115        if(_psWorkflowExecStart == null &&
1116            _schema.containsTable("workflow_exec"))
1117        {
1118            String defaultTimeStr = _dbType.getDefaultTimeStr();
1119            _psWorkflowExecStart = _dbType.getSQLInsert("workflow_exec", "id",
1120                "workflow_id, user, start_time, end_time", "?, ?, ?, " +
1121                defaultTimeStr);
1122        }
1123
1124        if(_psWorkflowExecStop == null &&
1125            _schema.containsTable("workflow_exec"))
1126        {
1127            _psWorkflowExecStop = _dbType.getSQLUpdate("workflow_exec",
1128                "end_time = ?", "id = ?");
1129        }
1130        
1131        if(_psActorFireStart == null && _schema.containsTable("actor_fire"))
1132        {
1133            String defaultTimeStr = _dbType.getDefaultTimeStr();
1134            _psActorFireStart = _dbType.getSQLInsert("actor_fire", "id",
1135                "actor_id, wf_exec_id, start_time, type, end_time",
1136                "?, ?, ?, ?, " + defaultTimeStr);
1137        }
1138    
1139        if(_psActorFireStop == null && _schema.containsTable("actor_fire"))
1140        {
1141            _psActorFireStop = _dbType.getSQLUpdate("actor_fire",
1142                "end_time = ?", "id = ?");
1143        }
1144
1145        if(_psTokenFlowInsert == null && _schema.containsTable("token_flow"))
1146        {
1147            _psTokenFlowInsert = _dbType.getSQLInsert("token_flow", "id",
1148                "port_id, time, data, channel, is_read, fire_id, rw_fire_id, " +
1149                "data_description", "?, ?, ?, ?, ?, ?, ?, ?");
1150        }
1151    }
1152    
1153    /** Set our prepared statements to null. */
1154    protected void _nullPreparedStatements()
1155    {
1156        _psWorkflowInsert = null;
1157        _psWorkflowQuery = null;
1158        _psActionInsert = null;
1159        _psEntityInsert = null;
1160        _psEntityQuery = null;
1161        _psActorInsert = null;
1162        _psDirectorInsert = null;
1163        _psParameterInsert = null;
1164        _psLinkInsert = null;
1165        _psPortInsert = null;
1166        _psRelationInsert = null;
1167        _psWorkflowExecStart = null;
1168        _psWorkflowExecStop = null;
1169        _psActorFireStart = null;
1170        _psActorFireStop = null;
1171        _psTokenFlowInsert = null;
1172    }
1173
1174    /** Update workflow ID either by retrieving the id from an existing
1175     *  workflow in the database or creating a new one.
1176     *
1177     * @param isNew
1178     * @return
1179     */
1180    protected int _updateWorkflowID(boolean isNew) throws RecordingException
1181    {
1182        int retval = SUCCESS;
1183
1184        // check to see if it already exists.
1185        int id = _getWorkflowId();
1186
1187        // see if this is a new workflow
1188        if(isNew)
1189        {
1190            // see if it already exists
1191            if(id != RegEntity.UNKNOWN_ID)
1192            {
1193                retval = WORKFLOW_ALREADY_EXISTS;
1194            }
1195            else
1196            {
1197        
1198                if(_wfUserStr == null)
1199                {
1200                    throw new RecordingException("Need workflow user " +
1201                        "name");
1202                }
1203                    
1204                _addWorkflow();
1205            }
1206        }
1207        // see if workflow was not in db
1208        else if(id == RegEntity.UNKNOWN_ID)
1209        {
1210            retval = WORKFLOW_DOES_NOT_EXIST;
1211        }
1212        else
1213        {
1214            int oldWfId = _wfId;
1215
1216            // workflow exists in db; get id from query.
1217            _wfId = id;
1218
1219            // see if id is different
1220            if(oldWfId != _wfId)
1221            {
1222                _wfReset();
1223            }
1224        }
1225
1226        return retval;
1227    }
1228
1229    /** Get the internal workflow id. */
1230    protected int _getWorkflowId() throws RecordingException
1231    {
1232        // use the workflow name as the id string.
1233        return _getWorkflowId(_wfNameStr);
1234    }
1235
1236    /** Get the internal workflow id given a id string. If no workflow has
1237     *  the id string, returns RegEntity.UNKNOWN_ID.
1238     */
1239    protected int _getWorkflowId(String idStr) throws RecordingException
1240    {
1241        int retval = RegEntity.UNKNOWN_ID;
1242
1243        try
1244        {
1245            synchronized(_psWorkflowQuery)
1246            {
1247                _psWorkflowQuery.setString(1, idStr);
1248                ResultSet rs = _psWorkflowQuery.executeQuery();
1249                
1250                if(rs.next())
1251                {
1252                    retval = rs.getInt("id");
1253                }
1254                rs.close();
1255            }
1256        }
1257        catch(SQLException e)
1258        {
1259            throw new RecordingException("Error querying workflow.name: ", e);
1260        }
1261
1262        //_debug("getWorkflowId idStr = " + idStr + " found id = " + retval);
1263        //_debug(_psWorkflowQuery);
1264
1265        return retval;
1266    }
1267
1268    /** Reset when we use a different db connection. */
1269    protected void _dbReset() throws RecordingException
1270    {
1271        _wfId = RegEntity.UNKNOWN_ID;
1272        _maxDataSize = -1;
1273        _needReconnectWF = true;
1274        _needReconnectDB = true;
1275       
1276        _wfReset();
1277
1278        if(_dbType != null)
1279        {
1280            try
1281            {
1282                _dbType.disconnect();
1283            }
1284            catch(SQLException e)
1285            {
1286                throw new RecordingException("Error disconnection from " +
1287                    "database.", e);
1288            }
1289        }
1290    }
1291
1292    /** Reset when we use a different workflow. */
1293    protected void _wfReset()
1294    {
1295        _wfExecId = RegEntity.UNKNOWN_ID;
1296        _entityCacheTable.clear();
1297        _fireStateTable.clear();
1298    }
1299
1300    /** Reset when we encounter an error. */
1301    protected void _errorReset() throws RecordingException
1302    {
1303        _dbReset();
1304    }
1305
1306    /**
1307     *  This method opens the database connection if necessary.
1308     *  @param allowReconnectWF - default true, only false if you want to force no _reconnectWF, e.g
1309     *  for non-write events like delete
1310     */ 
1311    protected void _checkConnection(boolean allowReconnectWF) throws RecordingException
1312    {
1313        // see if we need to (re)connect to DB
1314        if(_needReconnectDB || _needReconnectWF)
1315        {
1316            //_debug("in checkConnection: need to reconnectdb or wf");
1317            _updateContainerName();
1318            
1319            try
1320            {
1321                _checkParameters();
1322
1323                if(_schema == null)
1324                {
1325                    _schema = _createSchema();
1326                }
1327
1328                if(_needReconnectDB)
1329                {
1330                    _reconnectDatabase(true);
1331                }
1332        
1333                if(_needReconnectWF && allowReconnectWF)
1334                {
1335                    _reconnectWorkflow();
1336                }
1337            }
1338            catch(IllegalActionException e)
1339            {
1340                throw new RecordingException(_getExceptionMessage(e));
1341            }
1342        }
1343
1344    }
1345
1346    /** Modify a NamedObj's full name. */
1347    protected String _changeEntityFullName(String name)
1348    {
1349        return name;
1350    }
1351
1352    /** Check if a NamedObj has been inserted into the entity table.
1353     * 
1354     * @param fullName fully qualified name of NamedObj.
1355     * @param displayName display name (NOTE: can be null)
1356     * @param type type of NamedObj. (NOTE: can be null)
1357     * @return
1358     */
1359    protected RegEntity _checkEntity(Nameable namedObj, RegEntity.EntityType type)
1360        throws RecordingException
1361    {
1362        RegEntity retval = null;
1363        
1364        String fullName = _getNameableFullName(namedObj);
1365        String displayName = namedObj.getDisplayName();
1366       
1367        _checkConnection(true);
1368
1369        String changedName = _changeEntityFullName(fullName);
1370        
1371        String typeStr = type.toString();
1372        
1373        //_debug("getting from cache: " + cacheName);
1374        retval = _entityCacheTable.get(namedObj);
1375        
1376        // see if it's cached
1377        if(retval != null)
1378        {
1379            //_debug("found existing id for " + _getNameableFullName(namedObj) + " : " + retval.getId());
1380            
1381            // it was cached, so no longer new.
1382            retval.setOld();
1383        }
1384        else
1385        {
1386            try
1387            {
1388                // determine containing id
1389                int cntId = _getContainerId(namedObj);
1390
1391                // see if name is in entity table
1392                synchronized(_psEntityQuery)
1393                {
1394                    _psEntityQuery.setInt(1, _wfId);
1395                    _psEntityQuery.setString(2, changedName);
1396                    _psEntityQuery.setString(3, typeStr);
1397                    ResultSet rs = _psEntityQuery.executeQuery();
1398
1399                    boolean haveResult = rs.next();
1400
1401                    if(!haveResult)
1402                    {
1403                        // not in entity table, so insert
1404                        retval = _addEntity(cntId, type, changedName, displayName,
1405                            RegEntity.UNKNOWN_ID);
1406                        
1407                        //_debug("added entity " + changedName + " id = " + retval.getId());
1408                    }
1409                    else if(haveResult)
1410                    {
1411                        retval = new RegEntity(rs.getInt("id"), false, cntId,
1412                            type);
1413                        
1414                        //_debug("queried entity " + changedName + " id = " + retval.getId());
1415                    }
1416                    
1417                    rs.close();
1418                }
1419            }
1420            catch(SQLException e)
1421            {
1422                throw new RecordingException("ERROR querying entity: ", e);
1423            }
1424
1425
1426            if(retval == null)
1427            {
1428                //throw new RecordingException("Entity " + name + 
1429                //    " not registered.");
1430            }
1431            else
1432            {
1433                //_debug("caching " + cacheName);
1434                _entityCacheTable.put(namedObj, retval);
1435            }
1436        }
1437
1438        return retval;
1439    }
1440
1441    /** Add a new row to the entity table. */
1442    protected RegEntity _addEntity(int containerId, RegEntity.EntityType type,
1443        String fullName, String displayName, int prevId)
1444        throws RecordingException, SQLException
1445    {
1446
1447        int id = RegEntity.UNKNOWN_ID;
1448        
1449        synchronized(_psEntityInsert)
1450        {
1451            _psEntityInsert.setInt(1, containerId);
1452            _psEntityInsert.setInt(2, _wfId);
1453            _psEntityInsert.setString(3, type.toString());
1454            _psEntityInsert.setString(4, fullName);
1455            id = _dbType.insert(_psEntityInsert, "entity", "id");
1456        }
1457        
1458        RegEntity retval = new RegEntity(id, true, containerId, type);
1459        
1460        //_debug("inserted " + fullName + ", id = " + id);
1461        
1462        if(_debugWriter != null)
1463        {
1464            _debugWrite("INSERT INTO ENTITY(" + containerId + ", " + _wfId +
1465                ", " + type + ", " + fullName + ")");
1466        }
1467
1468        return retval;
1469    }
1470
1471    /** Add a new row to the workflow table. */
1472    protected void _addWorkflow() throws RecordingException
1473    {
1474        try
1475        {
1476            // insert the new action table row
1477            synchronized(_psActionInsert)
1478            {
1479                _psActionInsert.setInt(1, 0);
1480                _psActionInsert.setString(2, _wfUserStr);
1481                _psActionInsert.setTimestamp(3, new Timestamp(new Date().getTime()));
1482                int actionId = _dbType.insert(_psActionInsert, "action", "id");
1483                                
1484                if(_debugWriter != null)
1485                {
1486                    _debugWrite("INSERT INTO ACTION(0, " + _wfUserStr +
1487                        ", curTime)");
1488                }
1489                
1490                //XXX should psWorkflowInsert be synchronized?
1491
1492                // insert the new workflow table row
1493                _psWorkflowInsert.setString(1, _wfNameStr);
1494                _psWorkflowInsert.setString(2, "v0");
1495                _psWorkflowInsert.setInt(3, actionId);
1496                _wfId = _dbType.insert(_psWorkflowInsert, "workflow","id");
1497                _wfReset();
1498                    
1499                if(_debugWriter != null)
1500                {
1501                    _debugWrite("INSERT INTO WORKFLOW(" + _wfNameStr +
1502                        ", v0, " + actionId + ")");
1503                }
1504            }
1505        }
1506        catch(SQLException e)
1507        {
1508            throw new RecordingException("Error adding row to workflow:", e);
1509        }
1510    }
1511
1512    /** Find the container id of an entity. If entity has no container, i.e.,
1513     *  is the top-level object, returns 0. Otherwise if the container could
1514     *  not be found, returns RegEntity.UNKNOWN_ID. */
1515    protected int _getContainerId(Nameable namedObj) 
1516        throws RecordingException
1517    {
1518        //_debug("_getContainerId(" + fullName + ")");
1519
1520        int retval = RegEntity.UNKNOWN_ID;
1521   
1522        NamedObj container = namedObj.getContainer();
1523
1524        if(container == null)
1525        {
1526            retval = 0;
1527        }
1528        else
1529        {
1530            RegEntity.EntityType containerType =
1531                RegEntity.EntityType.getType(container);
1532            RegEntity re = _checkEntity(container, containerType);
1533            if(re == null)
1534            {
1535                //throw new RecordingException("Container for " + name +
1536                //    " is not in entity table.");
1537                retval = RegEntity.UNKNOWN_ID;
1538            }
1539            else
1540            {
1541                retval = re.getId();
1542            }
1543        }
1544
1545        //_debug("getContainerId " + retval + " for " + name);
1546
1547        return retval;
1548    }
1549
1550    /** Convenience routine that regActor and regDirector can use since
1551     * the schema for actors and directors only differs by "actor" vs
1552     * "director".
1553     */
1554    protected int _regActorDirector(NamedObj namedObj, boolean actor) throws RecordingException
1555    {
1556        RegEntity.EntityType type = actor ? 
1557            RegEntity.EntityType.Actor : RegEntity.EntityType.Director;
1558        RegEntity re = _checkEntity(namedObj, type);
1559        if(re.isNew())
1560        {
1561            
1562            //_debug("is new actor " + namedObj.getFullName());
1563            
1564            PreparedStatement ps = null;
1565
1566            if(actor)
1567            {
1568                ps = _psActorInsert;
1569            }
1570            else
1571            {
1572                ps = _psDirectorInsert;
1573            }
1574            
1575            try
1576            {
1577                String className = namedObj.getClassName();
1578                
1579                synchronized(ps)
1580                {
1581                    ps.setInt(1, re.getId());
1582                    ps.setString(2, className);
1583                    ps.executeUpdate();
1584                }
1585
1586                if(_debugWriter != null)
1587                {
1588                    String table = actor ? "ACTOR" : "DIRECTOR";
1589                    _debugWrite("INSERT INTO " + table + " (" +
1590                        re.getId() + ", " + className + ")");
1591                }
1592
1593            }
1594            catch(SQLException e)
1595            {
1596                String msg = "Error registering ";
1597                if(actor)
1598                {
1599                    msg += "actor: ";
1600                }
1601                else
1602                {
1603                    msg += "director: ";
1604                }
1605
1606                if(_debugWriter != null)
1607                {
1608                    _debugWrite(msg, e);
1609                }
1610
1611                throw new RecordingException(msg, e);
1612            }
1613        }
1614
1615        return re.getId();
1616    }
1617    
1618    /** Add a port to the port table. */
1619    protected void _regPortReal(TypedIOPort port, RegEntity re)
1620        throws RecordingException
1621    {
1622        int multi = port.isMultiport() ? 1 : 0;
1623
1624        int direction;
1625        if(port.isInput() && port.isOutput())
1626        {
1627            direction = PortDirection.InOut.ordinal();
1628        }
1629        else if(port.isInput())
1630        {
1631            direction = PortDirection.In.ordinal();
1632        }
1633        else
1634        {
1635            direction = PortDirection.Out.ordinal();
1636        }
1637
1638        try
1639        {
1640            synchronized(_psPortInsert)
1641            {
1642                _psPortInsert.setInt(1, re.getId());
1643                _psPortInsert.setString(2, port.getType().toString());
1644                _psPortInsert.setInt(3, direction);
1645                _psPortInsert.setInt(4, multi);
1646                _psPortInsert.executeUpdate();
1647            }
1648
1649            if(_debugWriter != null)
1650            {
1651                _debugWrite("INSERT INTO PORT (id, type, " + direction +
1652                    ", " + multi + ")");
1653            }
1654
1655        }
1656        catch(SQLException e)
1657        {
1658            String msg = "Error inserting port " + _getNameableFullName(port) + ": ";
1659            _debugWrite(msg, e);
1660            throw new RecordingException(msg, e);
1661        }
1662    }
1663
1664    /** Add an entity to the parameter table. */
1665    protected void _regParameterReal(NamedObj parameter, RegEntity re)
1666        throws RecordingException
1667    {
1668        String className = parameter.getClassName();
1669        String valueStr = "none";
1670
1671        try
1672        {
1673            synchronized(_psParameterInsert)
1674            {
1675                //_debug("going to insert parameter " + _getNameableFullName(parameter) + " with id " + re.getId());
1676
1677                _psParameterInsert.setInt(1, re.getId());
1678                _psParameterInsert.setString(2, className);
1679
1680                if(parameter instanceof AbstractSettableAttribute)
1681                {
1682                    valueStr = ((AbstractSettableAttribute)parameter).
1683                        getValueAsString();
1684                }
1685
1686                // replace null string with empty string.
1687                if(valueStr == null)
1688                {
1689                    valueStr = "";
1690                }
1691                
1692                //_debug(_getNameableFullName(parameter) + " value length = " + valueStr.length());
1693
1694                // XXX need to set truncated bit somewhere
1695                if(valueStr.length() > _maxParameterValueSize)
1696                {
1697                    //_debug("TRUNCATING!");
1698                    valueStr = valueStr.substring(0, _maxParameterValueSize); 
1699                }
1700
1701                _psParameterInsert.setString(3, valueStr);
1702
1703                _psParameterInsert.executeUpdate();
1704            }
1705        
1706            if(_debugWriter != null)
1707            {
1708                if(className.equals("org.kepler.moml.NamedObjId"))
1709                {
1710                    _debugWrite("INSERT INTO PARAMETER (" + re.getId() +
1711                            ", " + className + ", lsid)");                    
1712                }   
1713                else
1714                {
1715                    _debugWrite("INSERT INTO PARAMETER (" + re.getId() +
1716                        ", " + className + ", " + valueStr + ")");
1717                }
1718            }
1719        }
1720        catch(SQLException e)
1721        {
1722            String msg = "Error registering parameter " + _getNameableFullName(parameter) + ": ";
1723            _debugWrite(msg, e);
1724            throw new RecordingException(msg, e);
1725        }
1726    }
1727
1728    /** Record a specific type of firing for an actor. */
1729    protected void _recordFiringEvent(Actor actor, 
1730        FiringEvent.FiringEventType type, Date timestamp) throws RecordingException
1731    {
1732        //_debug("firing event: " + type + " for " + _getNameableFullName(actor));
1733
1734        // see if it's a start or stop.
1735        FireState<Integer> fs = _fireStateTable.get(actor);
1736
1737        if(fs == null)
1738        {
1739            throw new RecordingException(
1740                "Received actor fire event for unregistered actor: " +
1741                _getNameableFullName(actor));
1742        }
1743
1744        synchronized(fs)
1745        {
1746            try
1747            {
1748                if(type.isStart())
1749                {
1750                    int fireId;
1751
1752                    synchronized(_psActorFireStart)
1753                    {
1754                        _psActorFireStart.setInt(1, fs.getActorId());
1755                        _psActorFireStart.setInt(2, _wfExecId);
1756                        _psActorFireStart.setTimestamp(3,new Timestamp(timestamp.getTime()));
1757                        _psActorFireStart.setString(4, type.getTypeName());
1758                        fireId = _dbType.insert(_psActorFireStart, "actor_fire",
1759                            "id");
1760                    }
1761
1762                    if(_debugWriter != null)
1763                    {
1764                        _debugWrite("INSERT INTO ACTOR_FIRE(" +
1765                            fs.getActorId() + ", " + _wfExecId + ", " +
1766                            "curTime, " + type.getTypeName() + ")");
1767                    }
1768
1769                    fs.fireStart(type, fireId);
1770                }
1771                else
1772                {
1773                    Integer fireId = fs.fireStop(type);
1774                    
1775                    if(fireId == null)
1776                    {
1777                        String msg = "fire id is null for " + _getNameableFullName(actor);
1778                        _debugWrite(msg);
1779                        throw new RecordingException(msg);
1780                    }
1781                    
1782                    synchronized(_psActorFireStop)
1783                    {
1784                        _psActorFireStop.setTimestamp(1, new Timestamp(timestamp.getTime()));
1785                        _psActorFireStop.setInt(2, fireId);
1786                        _psActorFireStop.executeUpdate();
1787                    }
1788
1789                    if(_debugWriter != null)
1790                    {
1791                        _debugWrite("UPDATE ACTOR_FIRE set end_time " +
1792                            "= curTime WHERE id = " + fireId + ")");
1793                    }
1794                }
1795            }
1796            catch(SQLException e)
1797            {
1798                throw new RecordingException(_getExceptionMessage(e));
1799            }
1800        }
1801    }
1802
1803    /** Record a port read or write to the token_flow table. */
1804    protected void _recordPortEvent(IOPort port, int fireId, int rwfireId, 
1805        boolean isRead, int channel, Token token, IOPort destPort, Date timestamp)
1806        throws RecordingException
1807    {
1808    
1809        String dataStr;
1810        int dataDesc;
1811
1812        String data = token.toString();
1813
1814        // see if we have data
1815        /* FindBugs: data is known to be non-null
1816        if(data == null)
1817        {
1818            // we are not recording data
1819            dataStr = "";
1820            dataDesc = DATA_NONE;
1821        }
1822        // see if the data is too large
1823        else*/ if(data.length() > _maxDataSize)
1824        {
1825            // store as much data as possible
1826            dataStr = data.substring(0, _maxDataSize);
1827            dataDesc = DATA_TRUNCATED;
1828        }
1829        else
1830        {
1831            // data not too large
1832            dataStr = data;
1833            dataDesc = DATA_VALID;
1834        }
1835        
1836        RegEntity.EntityType type;
1837        if(port instanceof ParameterPort)
1838        {
1839            type = RegEntity.EntityType.PortParameter;
1840        }
1841        else
1842        {
1843            type = RegEntity.EntityType.Port;
1844        }
1845
1846        RegEntity re = _checkEntity(port, type);
1847        if(re == null)
1848        {
1849            throw new RecordingException("Port has not been registered: " +
1850                _getNameableFullName(port));
1851        }
1852
1853        int portId = re.getId();
1854
1855        try
1856        {
1857            synchronized(_psTokenFlowInsert)
1858            {
1859                _psTokenFlowInsert.setInt(1, portId);
1860                _psTokenFlowInsert.setTimestamp(2, new Timestamp(timestamp.getTime()));
1861                _psTokenFlowInsert.setString(3, dataStr);
1862                _psTokenFlowInsert.setInt(4, channel);
1863
1864                if(isRead)
1865                {
1866                    _psTokenFlowInsert.setInt(5, 1);
1867                }
1868                else
1869                {
1870                    _psTokenFlowInsert.setInt(5, 0);
1871                }
1872
1873                _psTokenFlowInsert.setInt(6, fireId);
1874                _psTokenFlowInsert.setInt(7, rwfireId);
1875                _psTokenFlowInsert.setInt(8, dataDesc);
1876                _psTokenFlowInsert.executeUpdate();
1877            }
1878
1879            if(_debugWriter != null)
1880            {
1881                _debugWrite("INSERT INTO TOKEN_FLOW(" + portId +
1882                    ", curTime, " + dataStr + ", " + channel + ", " +
1883                    isRead + ", " + fireId + ", " + rwfireId + ", " +
1884                    dataDesc + ")");
1885            }
1886
1887        }
1888        catch(SQLException e)
1889        {
1890            _error("port = " + _getNameableFullName(port) + ", " +
1891                "portId = " + portId + ", " +
1892                "fireId = " + fireId + ", " +
1893                "rwFireId = " + rwfireId + ", " +
1894                "isRead = " + isRead + ", " +
1895                "channel = " + channel + ", " +
1896                "data = " + data);
1897            e.printStackTrace();
1898            _error("------------------");
1899            throw new RecordingException(_getExceptionMessage(e));
1900        }
1901    }
1902    
1903    ////////////////////////////////////////////////////////////////////////
1904    //// protected variables                                            ////
1905
1906    /** The schema containing table and column definitions. */
1907    protected Schema _schema;
1908
1909    /** The current workflow id. */
1910    protected int _wfId;
1911
1912    /** The current workflow execution id. */
1913    protected int _wfExecId;
1914
1915    /** The maximum size in the SQL table for data values. */
1916    protected int _maxDataSize;
1917    
1918    /** The maximum size in the SQL table for parameter values. */
1919    protected int _maxParameterValueSize;
1920
1921    /** A table to map actor to its firing state object. */
1922    protected Map<Actor, FireState<Integer>> _fireStateTable = null;
1923
1924    /** A table to cache RegEntity objects. */
1925    protected Map<Nameable,RegEntity> _entityCacheTable = null;
1926
1927    protected PreparedStatement _psWorkflowInsert;
1928    protected PreparedStatement _psWorkflowQuery;
1929    protected PreparedStatement _psActionInsert;
1930    protected PreparedStatement _psEntityInsert;
1931    protected PreparedStatement _psEntityQuery;
1932    protected PreparedStatement _psActorInsert;
1933    protected PreparedStatement _psDirectorInsert;
1934    protected PreparedStatement _psParameterInsert;
1935    protected PreparedStatement _psLinkInsert;
1936    protected PreparedStatement _psPortInsert;
1937    protected PreparedStatement _psRelationInsert;
1938    protected PreparedStatement _psWorkflowExecStart;
1939    protected PreparedStatement _psWorkflowExecStop;
1940    protected PreparedStatement _psActorFireStart;
1941    protected PreparedStatement _psActorFireStop;
1942    protected PreparedStatement _psTokenFlowInsert;
1943    
1944    protected String _wfNameStr = null;
1945    protected String _wfUserStr = "unknown";
1946
1947    /** The follow hold values from DatabaseFactory.Parameters. 
1948     *  NOTE: use a LinkedHashMap for predictable iteration order.
1949     */
1950    protected Map<String,String> _dbParams = new LinkedHashMap<String,String>();
1951    
1952    /** Connection to the database. */
1953    protected DatabaseType _dbType;
1954
1955    // return values for connect
1956    protected static final int SUCCESS                 = 0;
1957    protected static final int WORKFLOW_ALREADY_EXISTS = 1;
1958    protected static final int WORKFLOW_DOES_NOT_EXIST = 2;
1959
1960    /** The sql parameters object. */
1961    protected SQLRecordingParameters _params = null;
1962
1963    /** True when we need to reconnect to the database. */
1964    protected boolean _needReconnectDB = true;
1965
1966    /** True when we need to load workflow id from database. */
1967    protected boolean _needReconnectWF = true;
1968}