001/* SQL provenance output to v8 schema.
002
003Copyright (c) 2008-2010 The Regents of the University of California.
004All rights reserved.
005Permission is hereby granted, without written agreement and without
006license or royalty fees, to use, copy, modify, and distribute this
007software and its documentation for any purpose, provided that the above
008copyright notice and the following two paragraphs appear in all copies
009of this software.
010
011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015SUCH DAMAGE.
016
017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022ENHANCEMENTS, OR MODIFICATIONS.
023
024*/
025
026package org.kepler.provenance.sql;
027
028import java.io.BufferedInputStream;
029import java.io.File;
030import java.io.FileInputStream;
031import java.io.FileNotFoundException;
032import java.io.FileOutputStream;
033import java.io.IOException;
034import java.io.InputStream;
035import java.io.OutputStream;
036import java.lang.ref.WeakReference;
037import java.math.BigInteger;
038import java.security.MessageDigest;
039import java.security.NoSuchAlgorithmException;
040import java.sql.PreparedStatement;
041import java.sql.ResultSet;
042import java.sql.SQLException;
043import java.sql.Timestamp;
044import java.util.ArrayList;
045import java.util.Collections;
046import java.util.Date;
047import java.util.HashMap;
048import java.util.HashSet;
049import java.util.Iterator;
050import java.util.LinkedList;
051import java.util.List;
052import java.util.Map;
053import java.util.Set;
054import java.util.WeakHashMap;
055import java.util.jar.JarEntry;
056import java.util.regex.Matcher;
057
058import org.kepler.kar.KAREntry;
059import org.kepler.kar.KARFile;
060import org.kepler.kar.ModuleDependencyUtil;
061import org.kepler.moml.NamedObjId;
062import org.kepler.moml.NamedObjIdReferralList;
063import org.kepler.objectmanager.lsid.KeplerLSID;
064import org.kepler.provenance.FireState;
065import org.kepler.provenance.IOPortRefillEvent;
066import org.kepler.provenance.PortConnector;
067import org.kepler.provenance.ProvenanceEvent;
068import org.kepler.provenance.QueryException;
069import org.kepler.provenance.Queryable;
070import org.kepler.provenance.RecordingException;
071import org.kepler.provenance.RecordingParameters;
072import org.kepler.sms.NamedOntClass;
073import org.kepler.sms.SMSServices;
074import org.kepler.sms.SemanticType;
075import org.kepler.tagging.TagEvent;
076import org.kepler.util.FileUtil;
077import org.kepler.util.RenameUtil;
078import org.kepler.util.WorkflowRenameListener;
079import org.kepler.util.WorkflowRun;
080import org.kepler.util.sql.Schema;
081
082import ptolemy.actor.Actor;
083import ptolemy.actor.Director;
084import ptolemy.actor.FiringEvent;
085import ptolemy.actor.IOPort;
086import ptolemy.actor.IORelation;
087import ptolemy.actor.TypedIOPort;
088import ptolemy.actor.gui.EditorFactory;
089import ptolemy.actor.lib.hoc.IterateOverArray;
090import ptolemy.actor.parameters.ParameterPort;
091import ptolemy.actor.parameters.PortParameter;
092import ptolemy.actor.sched.Scheduler;
093import ptolemy.data.ArrayToken;
094import ptolemy.data.BooleanToken;
095import ptolemy.data.IntToken;
096import ptolemy.data.RecordToken;
097import ptolemy.data.StringToken;
098import ptolemy.data.Token;
099import ptolemy.data.expr.Parameter;
100import ptolemy.domains.sdf.kernel.SDFDirector;
101import ptolemy.domains.sdf.lib.SampleDelay;
102import ptolemy.kernel.attributes.VersionAttribute;
103import ptolemy.kernel.util.Attribute;
104import ptolemy.kernel.util.IllegalActionException;
105import ptolemy.kernel.util.NameDuplicationException;
106import ptolemy.kernel.util.Nameable;
107import ptolemy.kernel.util.NamedObj;
108import ptolemy.kernel.util.SingletonAttribute;
109import ptolemy.kernel.util.StringAttribute;
110import ptolemy.moml.MoMLParser;
111import ptolemy.util.MessageHandler;
112
113
114/** SQL provenance output.
115 *
116 * <b>NOTE: The v8 schema is under development and likely to change.</b>
117 *
118 * This uses version 8 of the schema. New features:
119 * <ul>
120 * <li>Saving the workflow contents for each execution.</li>
121 * <li>Saving contents of files referenced in tokens.</li>
122 * <li>Each workflow and workflow execution has unique LSID.</li>
123 * <li>Workflow name no longer required.</li>
124 * <li>Explicit mapping from token write to token read.</li>
125 * <li>Combining actor firing events into a single "firing cycle event".</li>
126 * <li>Recording execution exceptions.</li>
127 * <li>Storing associated data for executions.</li>
128 * </ul>
129 *
130 * @author Derik Barseghian, Daniel Crawl, Ben Leinfelder, Sean Riddle
131 * @version $Id: SQLRecordingV8.java 34597 2017-08-10 23:34:49Z crawl $
132 *
133 */
134    
135public class SQLRecordingV8 extends SQLRecordingV7
136{
137    public SQLRecordingV8() throws RecordingException
138    {
139        super();
140
141        _portConnector = new PortConnector<TokenInfo>();
142
143        _queryable = null;
144
145        // start with empty workflow name
146        _wfNameStr = "";
147
148        _nextExecLSIDStr = null;
149        
150        _executionLSIDtoIdMap = new WeakHashMap<KeplerLSID, Integer>();
151        
152        _v8Set.add(new WeakReference<SQLRecordingV8>(this));
153        
154        _executionHadAnError = false;
155    }
156    
157    /** Associate the contents of a file with the most recent workflow execution. */
158    @Override
159    public void addFileForLastExecution(Map<String,String> metadataMap, File file) 
160        throws RecordingException 
161    {
162        addFileForExecution(metadataMap, file, RegEntity.UNKNOWN_ID);
163    }
164    
165    /** Associate the contents of a file with the most recent workflow execution. 
166     *  If execId == RegEntity.UNKNOWN_ID, use last execution.
167     */
168    public void addFileForExecution(Map<String,String> metadataMap, File file, int execId) 
169        throws RecordingException
170    {   
171        int execIdToUse = execId;
172        
173        if (execId == RegEntity.UNKNOWN_ID){
174                if(_wfLastExecId == RegEntity.UNKNOWN_ID) 
175                {
176                        throw new RecordingException("No previous workflow execution.");
177                }
178                execIdToUse = _wfLastExecId;
179        }
180                
181        if(metadataMap.size() == 0)
182                {
183                        throw new RecordingException("Metadata map is empty.");
184                }
185        // add the file contents to data table if not already there
186        String fileMD5Str = _checkData(file);
187
188        try
189        {
190            synchronized(_psAssocDataInsert)
191            {
192                for(Map.Entry<String,String> entry : metadataMap.entrySet())
193                {
194                    _psAssocDataInsert.setString(1, fileMD5Str); 
195                    _psAssocDataInsert.setString(2, entry.getKey());
196                    _psAssocDataInsert.setString(3, entry.getValue());
197                    _psAssocDataInsert.setInt(4, execIdToUse); 
198                    _dbType.insert(_psAssocDataInsert, "associated_data", "id");
199        
200                    if(_debugWriter != null)
201                    {
202                        _debugWrite("INSERT INTO ASSOCIATED_DATA (" +
203                            fileMD5Str + ", " + entry.getKey() + ", " +
204                            entry.getValue() + ", " + execIdToUse + ")");
205                    }
206                }
207            }
208        }
209        catch(SQLException e)
210        {
211            throw new RecordingException("Error trying to insert into " +
212                "associated_data table: " + e.getMessage());
213        }
214           
215    }
216
217    /** React to a change in an attribute. */
218    @Override
219    public void attributeChanged(Attribute attribute)
220        throws IllegalActionException
221    {
222        boolean notifyParent = true;
223
224        String name = attribute.getName();
225
226        // see if the workflow name has changed
227        if(name.equals(SQLRecordingParameters.wfNameParamStr))
228        {
229            String val = 
230                ((StringToken)((Parameter)attribute).getToken()).stringValue();
231
232            try
233            {
234                // see if name changed and we're already connected to the db
235                if(!val.equals(_wfNameStr) && !_needReconnectWF)
236                {
237                    // change the wf name in the column
238                    _changeWorkflowNameColumn(val);
239                    notifyParent = false;
240                }
241                    
242            }
243            catch(RecordingException e)
244            {
245                throw new IllegalActionException(e.getMessage());
246            }
247        }
248        else if(name.equals(SQLRecordingParametersV8.maxFileIncludeSizeKBStr))
249        {
250            notifyParent = false; 
251
252            Token token = ((Parameter)attribute).getToken();
253            if(token == null)
254            {
255                _maxFileIncludeSizeKBVal = _maxFileIncludeSizeKBDefaultVal;
256            }
257            else
258            {
259                _maxFileIncludeSizeKBVal = ((IntToken)token).intValue();
260            }    
261        }
262        else if(name.equals(SQLRecordingParametersV8.execAnnotationStr))
263        {
264            notifyParent = false;
265
266            _execAnnotation = ((StringToken)((Parameter)attribute).getToken()).stringValue();
267        }
268        else if(name.equals(SQLRecordingParametersV8.nextExecLSIDStr))
269        {
270            notifyParent = false;
271            
272            _nextExecLSIDStr = ((StringToken)((Parameter)attribute).getToken()).stringValue();
273        }
274        else if(name.equals(SQLRecordingParametersV8.watchForLSIDChangesStr))
275        {
276            notifyParent = false;
277            
278            _watchForLSIDChanges = ((BooleanToken)((Parameter)attribute).getToken()).booleanValue();
279        }
280        
281        if(notifyParent)
282        {
283            super.attributeChanged(attribute);
284        }
285    }
286
287    /** Add Parameters for ProvenanceRecorder. */
288    @Override
289    public RecordingParameters generateParameters(NamedObj no)
290        throws IllegalActionException, NameDuplicationException
291    {
292        _params = new SQLRecordingParametersV8(no);
293        return _params;
294    }
295
296    /** Register a port or portparameter.  */
297    @Override
298    public boolean regPort(TypedIOPort port) throws RecordingException
299    {
300        boolean retval = super.regPort(port);
301
302        _portConnector.createConnections(port);
303
304        return retval;
305    }
306
307    /** Record the starting of workflow execution. */
308    @Override
309    public void executionStart(KeplerLSID executionLSID, Date timestamp) throws RecordingException
310    {
311        //System.out.println("execution start");
312
313        if(_wfExecId != RegEntity.UNKNOWN_ID)
314        {
315            throw new RecordingException("Workflow already running.");
316        }
317
318        // NOTE: do not set this to false so that workflow contents
319        // will be registered again. this handles cases where entities
320        // are renamed between runs.
321        //_needWorkflowContents(false);
322        
323        if(_wfUserStr == null)
324        {
325            throw new RecordingException("Need workflow user name.");
326        }
327
328        if(_machineStr == null)
329        {
330            throw new RecordingException("Need machine name.");
331        }
332
333        // update the db
334        try
335        {
336            byte[] wfBytes = _recorderContainer.exportMoML().getBytes();
337            String wfMD5Str = _checkData(wfBytes);
338
339            String annotationStr;
340            if(_execAnnotation != null)
341            {
342                annotationStr = _execAnnotation;
343            }
344            else
345            {
346                annotationStr = "";
347            }
348
349            // if _nextExecLSID defined, we should use it instead of runLSID
350            if(_nextExecLSIDStr != null && _nextExecLSIDStr.length() != 0)
351            {
352                try
353                {
354                        executionLSID = new KeplerLSID(_nextExecLSIDStr);
355                }
356                catch(Exception e)
357                {
358                    throw new RecordingException("Error converting given LSID " +
359                            _nextExecLSIDStr + " to LSID: ", e);
360                }
361                
362                // clear it so that we don't use it again.
363                _nextExecLSIDStr = null;
364            }
365
366            // get the workflow lsid
367            KeplerLSID wfLSID = NamedObjId.getIdFor(_recorderContainer.toplevel());
368
369            synchronized(_psWorkflowExecStart)
370            {
371                _psWorkflowExecStart.setInt(1, _wfId);
372                _psWorkflowExecStart.setString(2, _wfUserStr);
373                _psWorkflowExecStart.setTimestamp(3, new Timestamp(timestamp.getTime()));
374                _psWorkflowExecStart.setString(4, wfMD5Str);
375                _psWorkflowExecStart.setString(5, _machineStr);
376                _psWorkflowExecStart.setString(6, annotationStr);
377                _psWorkflowExecStart.setString(7, executionLSID.toString());
378                _psWorkflowExecStart.setString(8, wfLSID.toString());
379                        _psWorkflowExecStart.setString(9, ModuleDependencyUtil.
380                                        buildModuleDependenciesString());
381                _psWorkflowExecStart.setString(10, WorkflowRun.type.Running.toString());
382                _wfExecId = _dbType.insert(_psWorkflowExecStart,
383                    "workflow_exec", "id");
384            }
385            
386            _executionLSIDtoIdMap.put(executionLSID, _wfExecId);
387            
388            //insert any workflow tags here
389                insertWorkflowTags(_wfExecId);
390
391            if(_debugWriter != null)
392            {
393                _debugWrite("INSERT INTO WORKFLOW_EXEC (" + _wfId +
394                    ", " + _wfUserStr + ", curTime, md5, " + _machineStr +
395                    ", " + annotationStr + ", lsid)");
396            }
397
398            _updateParameterExecTable();
399            
400        }
401        catch(SQLException e)
402        {
403            throw new RecordingException(_getExceptionMessage(e));
404        }
405    }
406
407    /** An actor threw an exception.  */
408    @Override
409    public void executionError(Nameable source, Throwable throwable, KeplerLSID executionLSID)
410        throws RecordingException
411    {
412
413        int entityId = RegEntity.UNKNOWN_ID;
414        if(source != null)
415        {
416            RegEntity entity = _entityCacheTable.get(source);
417            
418            // XXX what if null?
419            if(entity != null)
420            {
421                entityId = entity.getId();
422            }
423        }
424
425        Integer execId = _executionLSIDtoIdMap.get(executionLSID);
426        if(execId == null)
427        {
428            throw new RecordingException("No id found for execution " + executionLSID);
429        }
430
431        String message = throwable.getMessage();
432        int maxLength = 
433            _schema.getTable("error").getColumn("message").getLength();
434
435        // if there is a message and it is larger than the column,
436        // truncate it.
437        if(message != null && message.length() > maxLength)
438        {
439            message = message.substring(0, maxLength);
440        }
441
442        try
443        {
444            synchronized(_psErrorInsert)
445            {
446                _psErrorInsert.setInt(1, entityId);
447                _psErrorInsert.setInt(2, execId);
448                _psErrorInsert.setString(3, message);
449                _dbType.insert(_psErrorInsert, "error", "id");
450                    
451                if(_debugWriter != null)
452                {
453                    _debugWrite("INSERT INTO ERROR(" + entityId + ", " +
454                        execId + ", " + throwable.getMessage() + ")");
455                }
456            }
457        }
458        catch(SQLException e)
459        {
460            throw new RecordingException("Error inserting into error table: ",
461                e);
462        }
463        changeExecutionType(executionLSID, WorkflowRun.type.Error);
464        _executionHadAnError = true;
465    }
466
467    /** Record the stopping of workflow execution. */
468    @Override
469    public void executionStop(KeplerLSID executionLSID, Date timestamp) throws RecordingException
470    {
471        // save the execution id.
472        _wfLastExecId = _wfExecId;
473        if (!_executionHadAnError)
474        {
475            changeExecutionType(executionLSID, WorkflowRun.type.Complete);
476        }
477
478        super.executionStop(timestamp);
479    }
480
481    /** Returns true if workflow contents should be registered. */
482    @Override
483    public boolean regContents() throws RecordingException
484    {
485        // see if we're watching for workflow LSID changes and
486        // the workflow LSID is not null
487        if(_watchForLSIDChanges && _wfLSID != null)
488        {
489            // see if the workflow LSID has changed.
490            KeplerLSID lsid = NamedObjId.getIdFor(_recorderContainer);
491            if(! lsid.equalsWithoutRevision(_wfLSID))
492            {
493                //_debug("regContents: lsid appears to have changed.");
494                //_debug("  " + _wfLSID + " --> " + lsid);
495                
496                // reconnect to the workflow so that we get the new LSID
497                _wfLSID = null;
498                _reconnectWorkflow();
499                
500                // return true so the workflow contents get registered.
501                return true;
502            }
503        }
504
505        return super.regContents();
506    }
507    
508    /** Register a link between two endpoints.  */
509    @Override
510    public boolean regLink(NamedObj endPoint1, NamedObj endPoint2)
511        throws RecordingException
512    {
513        // do nothing
514        return false;
515    }
516    
517    /** Register a parameter. A parameter can be any <b>entity</b>
518     * stored in the MoML that does not have its own
519     * <code>regNNN()</code> method. This can be user-level 
520     * parameters (e.g., Parameter, StringParameter, etc.) or
521     * internal to Kepler (e.g., _location, semanticType000, etc.).
522     * (A "parameter" corresponds to a property in the MoML).
523     *
524     */
525    @Override
526    public boolean regParameter(NamedObj parameter) throws RecordingException
527    {
528        boolean skip = false;
529        
530        String name = parameter.getName();
531        NamedObj container = parameter.getContainer();
532        
533        // filter out certain types of parameters
534        if(parameter instanceof EditorFactory ||
535            parameter instanceof Scheduler ||
536            parameter instanceof SemanticType ||
537            parameter instanceof SingletonAttribute ||
538            parameter instanceof VersionAttribute)
539        {
540            skip = true;
541        }
542        // do not record class parameters since the
543        // type is already recorded and the class lsid is not used.
544        else if(name.equals("class") &&
545                parameter instanceof StringAttribute)
546        {        
547            String val = ((StringAttribute)parameter).getExpression();
548            String containerClass =
549                parameter.getContainer().getClass().getName();
550            if(containerClass.equals(val))
551            {
552                skip = true;
553            }
554        }
555        // do not record special attributes for ports
556        else if((container instanceof IOPort) &&
557            (name.equals("tokenConsumptionRate") ||
558            name.equals("tokenProductionRate") ||
559            name.equals("_showName") ||
560            name.equals("_hide")))
561        {
562            skip = true;
563        }
564        // do not record internal parameter iterationCount in IterateOverArray
565        else if((container instanceof IterateOverArray) &&
566            name.equals("iterationCount")) {
567                skip = true;
568        }
569        // do not record AUTO and UNBOUNDED in SDF
570        else if((container instanceof SDFDirector) &&
571                        (name.equals(SDFDirector.AUTO_NAME) ||
572                        name.equals(SDFDirector.UNBOUNDED_NAME))) {
573                skip = true;
574        }
575        // do not record start and stop time in Director
576        else if((container instanceof Director) &&
577                        (name.equals("startTime") ||
578                        name.equals("stopTime"))) {
579                skip = true;
580        }
581        
582    
583        if(skip)
584        {
585            return false;
586        }
587        else
588        {
589            return super.regParameter(parameter);
590        }
591    }
592
593    /** Register a relation. */
594    @Override
595    public boolean regRelation(IORelation relation) throws RecordingException
596    {
597        // do nothing.
598        return false;
599    }
600
601    /** Record a port refill event. */
602    @Override
603    public void refillPortEvent(IOPortRefillEvent event) throws RecordingException
604    {
605        //System.out.println("got refill event: " + event);
606        _portConnector.refillId(event.getPort(), event.getChannel());
607    }
608    
609    /** Get a Queryable connected to the Recording output.
610     * @param allowReconnectWF - false if you want to force no _reconnectWF
611     * @exception QueryException may be thrown if Queryable not implemented
612     *  for the Recording type.
613     * @throws RecordingException 
614     */
615    @Override
616    public Queryable getQueryable(boolean allowReconnectWF) throws QueryException, RecordingException
617    {
618        // make sure we're connected to db
619        _checkConnection(allowReconnectWF);
620        
621        return _queryable;
622    }
623    
624    /** A workflow was renamed.
625     * 
626     * @param namedObj the workflow
627     * @param oldLSID the previous LSID
628     * @param newLSID the new LSID
629     * @param oldName the previous name
630     * @param newName the new name
631     * @throws RecordingException 
632     * @see WorkflowRenameListener
633     */
634    @Override
635    public void renamedWorkflow(NamedObj namedObj, KeplerLSID oldLSID,
636        KeplerLSID newLSID, String oldName, String newName) throws RecordingException
637    {
638        
639        _checkConnection(true);
640                
641        // make sure we're connected to the database
642        if(!_needReconnectDB && !_needReconnectWF)
643        {
644            Matcher matcher = RenameUtil.unnamedIdPattern.matcher(oldName);
645            
646            // make sure that the id matches the unnamed regex.
647            if(oldName != null)
648            {
649
650                if (newName != null)
651                {
652                        if (matcher.matches())
653                        {
654                                try 
655                                {
656                                        _changeWorkflowNameColumn(newName, oldLSID);
657                                }
658                                catch (RecordingException e)
659                                {
660                                        MessageHandler.error("Error changing workflow name.", e);
661                                }
662                        }
663                        else
664                        {
665                                // just change this workflow name
666                                _wfNameStr = newName;
667                                
668                                                // note:
669                                                // we call _changeWorkflowNameColumn, but with newLSID,
670                                                // because workflow has already been recorded at this point. 
671                                // An alternative might be to check for NamedObjIdChangeRequest
672                                                // in provenanceRecorder changeExecuted and somehow do
673                                                // the rename then, before _recordWorkflowContents.
674                                try 
675                                {
676                                        _changeWorkflowNameColumn(newName, newLSID);
677                                }
678                                catch (RecordingException e)
679                                {
680                                        MessageHandler.error("Error changing workflow name.", e);
681                                }
682                        }
683                }
684            }
685        }
686    }
687    
688    /** Delete executions given a list of execution LSIDs.
689     * 
690     * @param lsidList
691     * @return
692     * @throws RecordingException
693     */
694    public int deleteExecutions(List<KeplerLSID> lsidList) throws RecordingException 
695    {
696
697        // make sure we're connected to db
698        _checkConnection(false);
699
700        int numRowsDeleted = 0;
701
702        String wfExecStr = _dbType.getTableName("workflow_exec");
703        String dataStr = _dbType.getTableName("data");
704
705        StringBuilder lsids = new StringBuilder();
706        for (Iterator<KeplerLSID> i=lsidList.iterator(); i.hasNext(); ) 
707        {
708                lsids.append("'"+i.next().toString()+"'");
709                if(i.hasNext())
710                {
711                        lsids.append(",");
712                }
713        }
714        
715                // delete the executions
716        String deleteExecutions = "DELETE FROM " + wfExecStr + " wfe " +
717                "WHERE wfe.lsid IN (" +lsids+ ")";
718        try 
719        {               
720                        _psDeleteExecutions = _dbType.getPrepStatement(deleteExecutions);
721                numRowsDeleted = _psDeleteExecutions.executeUpdate();
722                } 
723        catch (SQLException e) {
724            throw new RecordingException("Error deleting executions:", e);
725                }
726                
727        // determine which rows of data table will become cruft
728        // after executions deletion occurs
729        String md5sToDelete = getDataMD5sToDelete(lsids);
730
731        // see if there are any rows in the data table to delete
732        if(md5sToDelete.length() > 0)
733        {
734                    // now cleanup data table
735            String deleteData = "DELETE FROM " + dataStr + " d " +
736                            "WHERE d.md5 IN (" +md5sToDelete+ ")";
737            try 
738            {                   
739                    _psDeleteData = _dbType.getPrepStatement(deleteData);
740                    _psDeleteData.executeUpdate();
741            } 
742            catch (SQLException e) {
743                    throw new RecordingException("Error deleting data:", e);
744            }           
745        }
746        
747        // tags table now automatically taken care of by cascade 
748        // now that it uses WF_EXEC_ID fk
749        deleteWorkflowsWithNoExecutions();
750
751        return numRowsDeleted;
752    }
753    
754    /** Delete workflows from the database that have no executions.
755     *  @return the number of workflows deleted.
756     */
757    public int deleteWorkflowsWithNoExecutions() throws RecordingException
758    {
759        _checkConnection(false);
760        
761
762        // see which workflows we're going to delete
763        List<Integer> wfIdsToBeDeleted = new LinkedList<Integer>();
764        synchronized(_psWorkflowIdsForNoExecutions)
765        {
766            try
767            {
768                ResultSet result = null;
769                try
770                {
771                    result = _psWorkflowIdsForNoExecutions.executeQuery();
772                    while(result.next())
773                    {
774                        wfIdsToBeDeleted.add(result.getInt(1));
775                    }
776                }
777                finally
778                {
779                    if(result != null)
780                    {
781                        result.close();
782                    }
783                }
784            }
785            catch(SQLException e)
786            {
787                throw new RecordingException("Error querying workflows to " + 
788                    "be deleted: ", e);
789            }
790        }
791        
792        // see if any other recordings have this workflow open
793        // if so, tell them to reconnect to the workflow.
794        for(Integer wfId : wfIdsToBeDeleted)
795        {
796            synchronized(_v8Set)
797            {
798                // use an iterator so we can safely remove items.
799                Iterator<WeakReference<SQLRecordingV8>> iterator = _v8Set.iterator();
800                while(iterator.hasNext())
801                {
802                    SQLRecordingV8 recording = iterator.next().get();
803                    
804                    // see if recording was garbage collected.
805                    if(recording == null)
806                    {
807                        iterator.remove();
808                    }
809                    else if(recording._wfId == wfId)
810                    {
811                        recording._needReconnectWF = true;
812                    }
813                }
814            }
815        }
816        
817        // delete the workflows.
818        // NOTE: this deletes all information about a workflow
819        // if it has no executions.
820        synchronized(_psDeleteWorkflowsForNoExecutions)
821        {
822            try
823            {                
824                int numDeleted =
825                    _psDeleteWorkflowsForNoExecutions.executeUpdate();
826                //System.out.println("deleted " + numDeleted + " workflows.");
827                return numDeleted;
828            }
829            catch(SQLException e)
830            {
831                throw new RecordingException("Unable to delete workflows " +
832                    "with no runs: ", e);
833            }
834        }
835    }
836
837    /** A tag was added, insert it */
838    @Override
839    public void tagAdded(TagEvent event) throws RecordingException
840    {
841        // make sure we're connected to db
842        _checkConnection(false);
843        
844                String urnStr = event.getTag().getConceptId();
845        String tagStr = event.getTag().toString();
846
847        // get the entity id of the tagged object.
848        NamedObj namedObj = event.getSource();
849        WorkflowRun run = null;
850        
851        if (namedObj instanceof WorkflowRun)
852        {
853                run = (WorkflowRun)namedObj;
854                boolean alreadyInserted = checkIfTagAlreadyInserted(run.getExecId(), urnStr);
855
856                if(!alreadyInserted)
857                {
858                        // TODO if we remove the caveat for not updateRevision on workflowRuns in
859                        // Tagging.tagAdded, do not rollRevision here:
860                        run.rollRevision(run.getExecLSID(), this);
861                        insertTag(WorkflowRun.TAG_TYPE_RUN, tagStr, urnStr, run.getExecId());
862                }
863
864        }
865
866    }
867    
868        /**
869         * Parse workflow moml for tags, and insert them, associated with execID 
870         * @param runLsid
871         */
872    public void insertWorkflowTags(int execId)
873    {
874                try 
875                {
876                    NamedObj workflow = _recorderContainer.toplevel();
877                        Iterator<?> iter = SMSServices.getActorSemanticTypes(workflow).iterator();
878                        while (iter.hasNext()) 
879                        {
880                                SemanticType semtype = (SemanticType) iter.next();
881                                insertTag(WorkflowRun.TAG_TYPE_WORKFLOW, semtype.getConceptName(),
882                                        semtype.getConceptId(), execId);
883                        }
884                } 
885                catch (Exception e) 
886                {
887                        // TODO Auto-generated catch block
888                        e.printStackTrace();
889                }
890    }
891    
892    /** Insert a new row into the tag table. */
893    public void insertTag(String type, String tagStr, String urnStr,
894        int execId) throws RecordingException
895    {
896                synchronized(_psTagInsert)
897                {
898                        try
899                        {
900                        _psTagInsert.setString(1, type);
901                                _psTagInsert.setString(2, tagStr);
902                                _psTagInsert.setString(3, urnStr);
903                                _psTagInsert.setInt(4, execId);                         
904                                _psTagInsert.execute();
905                                
906                if(_debugWriter != null)
907                {
908                    _debugWrite("INSERT INTO TAG(" + type + ", " + tagStr +
909                        ", " + urnStr + ", "+ execId +")");
910                }
911
912                        }
913                        catch(SQLException e)
914                        {
915                                throw new RecordingException("Could not insert tag:", e);
916                        }
917                }
918    }
919    
920    
921    public boolean checkIfTagAlreadyInserted(int execId, String urn)
922    {
923        boolean found = false;
924                // see if tag is in table        
925        synchronized(_psGetTagForExecIdAndURN)
926        {
927                try
928                {
929                        ResultSet result = null;
930                        try
931                        {
932                                _psGetTagForExecIdAndURN.setInt(1, execId);
933                                _psGetTagForExecIdAndURN.setString(2, urn);
934                                result = _psGetTagForExecIdAndURN.executeQuery();
935                                if(result.next())
936                                {
937                                        found = true;
938                                }
939                        }
940                        finally
941                        {
942                                if(result != null)
943                                {
944                                        result.close();
945                                }
946                        }
947                }
948                catch(SQLException e)
949                {
950                        e.printStackTrace();
951                }
952        }
953                return found;
954    }
955    
956    
957    /** Record a custom provenance event. */
958    @Override
959    public void customProvEvent(ProvenanceEvent event) throws RecordingException
960    {
961        if(_wfExecId == RegEntity.UNKNOWN_ID) 
962        {
963            throw new RecordingException("Workflow must be running to save associated data.");
964        }
965
966        final Map<String,String> map = event.getMap();
967        
968        if(map.isEmpty())
969        {
970            throw new RecordingException("Metadata map is empty.");
971        }
972
973        try
974        {
975            synchronized(_psAssocDataInsertNoDataId)
976            {
977                for(Map.Entry<String,String> entry : map.entrySet())
978                {
979                    _psAssocDataInsertNoDataId.setString(1, entry.getKey());
980                    _psAssocDataInsertNoDataId.setString(2, entry.getValue());
981                    _psAssocDataInsertNoDataId.setInt(3, _wfExecId); 
982                    _dbType.insert(_psAssocDataInsertNoDataId, "associated_data", "id");
983        
984                    if(_debugWriter != null)
985                    {
986                        _debugWrite("INSERT INTO ASSOCIATED_DATA (" +
987                            entry.getKey() + ", " + 
988                            entry.getValue() + ", " +
989                            _wfExecId + ")");
990                    }
991                }
992            }
993        }
994        catch(SQLException e)
995        {
996            throw new RecordingException("Error trying to insert into " +
997                "associated_data table: " + e.getMessage());
998        }
999    }
1000
1001    /** A tag was removed. */
1002    @Override
1003    public void tagRemoved(TagEvent event) throws RecordingException
1004    {
1005        // make sure we're connected to db
1006        _checkConnection(false);
1007
1008        String urnStr = event.getTag().getConceptId();
1009        NamedObj namedObj = event.getSource();
1010        
1011        // just dealing with WorkflowRuns here. We don't want to
1012        // allow the removal of a tag on a workflow that's already run
1013        //RegEntity entity = _entityCacheTable.get(namedObj);
1014                
1015        if (namedObj instanceof WorkflowRun)
1016        {
1017                WorkflowRun run = (WorkflowRun)namedObj;
1018            synchronized(_psDeleteTagForExecIdAndURN)
1019            {
1020                try
1021                {
1022                        boolean alreadyInserted = checkIfTagAlreadyInserted(run.getExecId(), urnStr);
1023                        if (alreadyInserted){
1024                                _psDeleteTagForExecIdAndURN.setInt(1, run.getExecId());
1025                                _psDeleteTagForExecIdAndURN.setString(2, urnStr);
1026                                _psDeleteTagForExecIdAndURN.execute();
1027                                run.rollRevision(run.getExecLSID(), this);
1028                        }
1029                }
1030                catch(SQLException e)
1031                {
1032                    throw new RecordingException("Error deleting tag: ", e);
1033                }
1034            }
1035        }
1036    }
1037    
1038    /** If run's exec LSID does not already exist in provenance, insert WorkflowRun
1039     *  into provenance. This is a special case for 'preview' runs, where no other
1040     *  associated data (workflow data, reporting files) are inserted.
1041     *  
1042     * @param karFile
1043     * @param run
1044     * @throws RecordingException
1045     */
1046    public boolean insertPreviewRun(WorkflowRun run) throws RecordingException
1047    {
1048        
1049        Queryable q = null;
1050        try 
1051        {
1052            q = getQueryable(false); //this will make sure we're connected to db
1053        } 
1054        catch (QueryException e1) 
1055        {
1056            // TODO Auto-generated catch block
1057            e1.printStackTrace();
1058            return false;
1059        }
1060
1061        try 
1062        {
1063                // don't insert if some form of this execution already exists in
1064                // this provenance store.
1065                
1066                // don't insert if run lsid is the same as one of the oldest referrals
1067                // of an execution in this prov store (without rev).
1068                KeplerLSID runLSID = run.getExecLSID();
1069                Integer execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision(
1070                                runLSID.toStringWithoutRevision());
1071                if (execId != null)
1072                {
1073                        _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+
1074                                        " has the same originalExecLSID without rev:"+
1075                                        runLSID.toStringWithoutRevision());
1076                        return false;
1077                }
1078                
1079                // don't insert if one of the oldest referrals of an execution in this
1080                // prov store (without rev) is the same as the oldest referral of this run
1081                // (without rev)
1082                KeplerLSID originalExecLSID = null;
1083                List<KeplerLSID>refs = run.getReferralList();
1084                if (refs != null && !refs.isEmpty())
1085                {
1086                        originalExecLSID = refs.get(refs.size()-1);
1087                        execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision(
1088                                        originalExecLSID.toStringWithoutRevision());
1089                }
1090                if (execId != null)
1091                {
1092                        _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! run:"+execId+
1093                                        " has the same originalExecLSID without rev:"+
1094                                        originalExecLSID.toStringWithoutRevision());
1095                        return false;
1096                }
1097                
1098                // don't insert if the execution originated in this prov store
1099                // (ie oldest referral (without rev) is in this prov store's execution list)
1100                if (originalExecLSID != null)
1101                {
1102                        execId = q.getExecutionForExecutionLSIDWithoutRevision(
1103                                originalExecLSID.toStringWithoutRevision());
1104                        if (execId != null)
1105                        {
1106                                _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! " +
1107                                                "Run originated on this machine and still exists - run:"+
1108                                                execId+"'s lsid is the same as this run's originalExecLSID " +
1109                                                "without rev:"+originalExecLSID.toStringWithoutRevision());
1110                                return false;
1111                        }
1112                }
1113                
1114                // don't insert if the execution (without rev) is in this 
1115                // prov store's execution list
1116                execId = q.getExecutionForExecutionLSIDWithoutRevision(
1117                                run.getExecLSID().toStringWithoutRevision());
1118                if (execId != null)
1119                {
1120                        _warn("SQLRecordingV8 insertPreviewRun - Not inserting run! " +
1121                                        "run:"+ execId+"'s lsid is same as this run's lsid without rev:"+
1122                                        run.getExecLSID().toStringWithoutRevision());
1123                        return false;
1124                }
1125                
1126                
1127                Integer workflowExecId = null;
1128                        
1129                // only insert workflow if not already in provenance
1130                int workflowId = _getWorkflowId(run.getWorkflowLSID().toStringWithoutRevision());
1131                if (workflowId == RegEntity.UNKNOWN_ID)
1132                {
1133                        synchronized(_psWorkflowInsert)
1134                        {
1135                                _psWorkflowInsert.setString(1, run.getWorkflowName());
1136                                _psWorkflowInsert.setString(2, run.getWorkflowLSID().toStringWithoutRevision());
1137                                workflowId = _dbType.insert(_psWorkflowInsert, "workflow", "id");
1138                        }
1139                }
1140
1141                // now insert workflow exec start info
1142                synchronized(_psWorkflowExecStart)
1143                {
1144                        // NOTE we do not have the workflow for preview run,
1145                        // so we use fake workflow data for the Data table.
1146                        byte[] fakeWfBytes = "".getBytes();
1147                        String wfMD5Str = _checkData(fakeWfBytes);
1148
1149                        _psWorkflowExecStart.setInt(1, workflowId);
1150                        _psWorkflowExecStart.setString(2, run.getUser());
1151                        _psWorkflowExecStart.setTimestamp(3,
1152                                        new Timestamp(run.getStartTime().getTime()));
1153                        _psWorkflowExecStart.setString(4, wfMD5Str);
1154                        _psWorkflowExecStart.setString(5, run.getHostId());
1155                        _psWorkflowExecStart.setString(6, run.getAnnotation());
1156                        _psWorkflowExecStart.setString(7, run.getExecLSID().toString());
1157                        _psWorkflowExecStart.setString(8, run.getWorkflowLSID().toString());
1158                        _psWorkflowExecStart.setString(9, run.getModuleDependencies());
1159                        if (run.getErrorMessages().isEmpty()){
1160                                _psWorkflowExecStart.setString(10, WorkflowRun.type.Preview.toString());
1161                        }
1162                        else{
1163                                _psWorkflowExecStart.setString(10, WorkflowRun.type.Preview_Error.toString());
1164                        }
1165                        workflowExecId = _dbType.insert(_psWorkflowExecStart,
1166                                        "workflow_exec", "id");
1167                }
1168
1169                // if workflow exec start info insert worked, now insert exec stop info
1170                // and reporting items and error messages if they exist
1171                if (workflowExecId != RegEntity.UNKNOWN_ID)
1172                {
1173                        synchronized(_psWorkflowExecStop)
1174                        {
1175                                Timestamp endTime = new Timestamp(run.getStartTime().getTime() + 
1176                                                (run.getDuration()*numMillisecondsInASecond));
1177                                _psWorkflowExecStop.setTimestamp(1, endTime);
1178                                _psWorkflowExecStop.setInt(2, workflowExecId);
1179                                _psWorkflowExecStop.executeUpdate();
1180                        }
1181                        
1182                                KeplerLSID runLSIDBeforeExecIdChange = run.getExecLSID();
1183                        
1184                        // replace original execId. this must be before call to insertAnyHuskRunErrorMessages
1185                        run.resetExecId(workflowExecId, this);
1186
1187                        //insert any referrals
1188                        insertAnyHuskRunReferralList(run);
1189
1190                        insertAnyHuskRunErrorMessages(run);
1191
1192                        insertAnyHuskRunTags(run);
1193                }
1194                else
1195                {
1196                        return false;
1197                }
1198                
1199                return true;
1200        }
1201        catch (SQLException e) 
1202        {
1203                // TODO Auto-generated catch block
1204                e.printStackTrace();
1205                return false;
1206        } 
1207        catch (Exception e) 
1208        {
1209                // TODO Auto-generated catch block
1210                e.printStackTrace();
1211                return false;
1212        }
1213
1214    }
1215    
1216    /** If run's exec LSID does not already exist in provenance, insert WorkflowRun
1217     *  into provenance. This is a special case where the run may come from a KAR, 
1218     *  potentially from a different system.
1219     *  
1220     * @param karFile
1221     * @param run
1222     * @throws RecordingException
1223     */
1224    public boolean insertHuskRun(KARFile karFile, WorkflowRun run) throws RecordingException
1225    {
1226
1227        Queryable q = null;
1228        try 
1229        {
1230            q = getQueryable(false); //this will make sure we're connected to db
1231        } 
1232        catch (QueryException e1) 
1233        {
1234            // TODO Auto-generated catch block
1235            e1.printStackTrace();
1236            return false;
1237        }
1238
1239        try 
1240        {
1241                // don't insert if some form of this execution already exists in
1242                // this provenance store.
1243                
1244                // don't insert if run lsid is the same as one of the oldest referrals
1245                // of an execution in this prov store (without rev).
1246                KeplerLSID runLSID = run.getExecLSID();
1247                Integer execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision(
1248                                runLSID.toStringWithoutRevision());
1249                if (execId != null)
1250                {
1251                        _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+
1252                                        " has the same originalExecLSID without rev:"+
1253                                        runLSID.toStringWithoutRevision());
1254                        return false;
1255                }
1256                
1257                // don't insert if one of the oldest referrals of an execution in this
1258                // prov store (without rev) is the same as the oldest referral of this run
1259                // (without rev)
1260                KeplerLSID originalExecLSID = null;
1261                List<KeplerLSID>refs = run.getReferralList();
1262                if (refs != null && !refs.isEmpty())
1263                {
1264                        originalExecLSID = refs.get(refs.size()-1);
1265                        execId = q.getExecutionForOldestReferralExecutionLSIDWithoutRevision(
1266                                        originalExecLSID.toStringWithoutRevision());
1267                }
1268                if (execId != null)
1269                {
1270                        _warn("SQLRecordingV8 insertHuskRun - Not inserting run! run:"+execId+
1271                                        " has the same originalExecLSID without rev:"+
1272                                        originalExecLSID.toStringWithoutRevision());
1273                        return false;
1274                }
1275                
1276                // don't insert if the execution originated in this prov store
1277                // (ie oldest referral (without rev) is in this prov store's execution list)
1278                if (originalExecLSID != null)
1279                {
1280                        execId = q.getExecutionForExecutionLSIDWithoutRevision(
1281                                originalExecLSID.toStringWithoutRevision());
1282                        if (execId != null)
1283                        {
1284                                _warn("SQLRecordingV8 insertHuskRun - Not inserting run! " +
1285                                                "Run originated on this machine and still exists - run:"+
1286                                                execId+"'s lsid is the same as this run's originalExecLSID " +
1287                                                "without rev:"+originalExecLSID.toStringWithoutRevision());
1288                                return false;
1289                        }
1290                }
1291                
1292                // don't insesrt if the execution (without rev) is in this 
1293                // prov store's execution list
1294                execId = q.getExecutionForExecutionLSIDWithoutRevision(
1295                                run.getExecLSID().toStringWithoutRevision());
1296                if (execId != null)
1297                {
1298                        _warn("SQLRecordingV8 insertHuskRun - Not inserting run! " +
1299                                        "run:"+ execId+"'s lsid is same as this run's lsid without rev:"+
1300                                        run.getExecLSID().toStringWithoutRevision());
1301                        return false;
1302                }
1303                
1304                
1305                Integer workflowExecId = null;
1306                        
1307                // only insert workflow if not already in provenance
1308                int workflowId = _getWorkflowId(run.getWorkflowLSID().toStringWithoutRevision());
1309                if (workflowId == RegEntity.UNKNOWN_ID)
1310                {
1311                        synchronized(_psWorkflowInsert)
1312                        {
1313                                _psWorkflowInsert.setString(1, run.getWorkflowName());
1314                                _psWorkflowInsert.setString(2, run.getWorkflowLSID().toStringWithoutRevision());
1315                                workflowId = _dbType.insert(_psWorkflowInsert, "workflow", "id");
1316                        }
1317                }
1318
1319                // now insert workflow exec start info
1320                synchronized(_psWorkflowExecStart)
1321                {
1322                        //byte[] wfBytes = om.getHighestObjectRevision(run.getWorkflowLSID()).exportMoML().getBytes();
1323                                List<KAREntry> karEntries = karFile.karEntries();
1324                                KAREntry workflowEntry = null;
1325                                for (KAREntry karEntry: karEntries){
1326                                        if (karEntry.getLSID().equals(run.getWorkflowLSID())){
1327                                                workflowEntry = karEntry;
1328                                                break;
1329                                        }
1330                                }
1331                                // FIXME: workflowEntry could be null
1332                                // FIXME: close workflowStream?
1333                        InputStream workflowStream = karFile.getInputStream(workflowEntry);
1334                        String workflowString = FileUtil.convertStreamToString(workflowStream);
1335                        //TODO would be better to avoid parsing the workflow (but getting it from OM
1336                        // is not desirable either, it's not always there) -- see r22173.
1337                        // Parsing can cause bug related to text provenance type -- a second prov 
1338                        // recorder is created and tries to write to the same text file.
1339                        // But discussed w/ Dan, this is rare and text type is only used for debugging. 
1340                        MoMLParser parser = new MoMLParser();
1341                                NamedObj workflow = parser.parse(workflowString);
1342                        byte[] wfBytes = workflow.exportMoML().getBytes();
1343                        
1344                        String wfMD5Str = _checkData(wfBytes);
1345
1346                        _psWorkflowExecStart.setInt(1, workflowId);
1347                        _psWorkflowExecStart.setString(2, run.getUser());
1348                        _psWorkflowExecStart.setTimestamp(3,
1349                                        new Timestamp(run.getStartTime().getTime()));
1350                        _psWorkflowExecStart.setString(4, wfMD5Str);
1351                        _psWorkflowExecStart.setString(5, run.getHostId());
1352                        _psWorkflowExecStart.setString(6, run.getAnnotation());
1353                        _psWorkflowExecStart.setString(7, run.getExecLSID().toString());
1354                        _psWorkflowExecStart.setString(8, run.getWorkflowLSID().toString());
1355                        _psWorkflowExecStart.setString(9, run.getModuleDependencies());
1356                        if (run.getErrorMessages().isEmpty()){
1357                                _psWorkflowExecStart.setString(10, WorkflowRun.type.Imported.toString());
1358                        }
1359                        else{
1360                                _psWorkflowExecStart.setString(10, WorkflowRun.type.Imported_Error.toString());
1361                        }
1362                        workflowExecId = _dbType.insert(_psWorkflowExecStart,
1363                                        "workflow_exec", "id");
1364                }
1365
1366                // if workflow exec start info insert worked, now insert exec stop info
1367                // and reporting items and error messages if they exist
1368                if (workflowExecId != RegEntity.UNKNOWN_ID)
1369                {
1370                        synchronized(_psWorkflowExecStop)
1371                        {
1372                                Timestamp endTime = new Timestamp(run.getStartTime().getTime() + 
1373                                                (run.getDuration()*numMillisecondsInASecond));
1374                                _psWorkflowExecStop.setTimestamp(1, endTime);
1375                                _psWorkflowExecStop.setInt(2, workflowExecId);
1376                                _psWorkflowExecStop.executeUpdate();
1377                        }
1378                        
1379                                KeplerLSID runLSIDBeforeExecIdChange = run.getExecLSID();
1380                        
1381                        // replace original execId. this must be before call to insertAnyHuskRunErrorMessages
1382                        run.resetExecId(workflowExecId, this);
1383
1384                        //insert any referrals
1385                        insertAnyHuskRunReferralList(run);
1386
1387                        //insert any reporting files that might be in KAR
1388                        insertAnyHuskRunReportFiles(run, runLSIDBeforeExecIdChange, karFile);
1389
1390                        insertAnyHuskRunErrorMessages(run);
1391
1392                        insertAnyHuskRunTags(run);
1393                }
1394                else
1395                {
1396                        return false;
1397                }
1398                
1399                return true;
1400        }
1401        catch (SQLException e) 
1402        {
1403                // TODO Auto-generated catch block
1404                e.printStackTrace();
1405                return false;
1406        } 
1407        catch (Exception e) 
1408        {
1409                // TODO Auto-generated catch block
1410                e.printStackTrace();
1411                return false;
1412        }
1413
1414    }
1415    
1416    
1417    public void insertAnyHuskRunReferralList(WorkflowRun run)
1418    {
1419        StringAttribute referralListAttr = (StringAttribute) run.getAttribute(NamedObjIdReferralList.NAME);
1420        if (referralListAttr != null)
1421        {
1422                try 
1423                {                       
1424                        _psChangeExecutionReferralList.setString(1, referralListAttr.getExpression());
1425                        _psChangeExecutionReferralList.setInt(2, run.getExecId());
1426                        _psChangeExecutionReferralList.execute();
1427                } 
1428                catch (Exception e) 
1429                {
1430                        // TODO Auto-generated catch block
1431                        e.printStackTrace();
1432                }
1433        }
1434        }
1435    
1436    private void insertAnyHuskRunReportFiles(WorkflowRun run, KeplerLSID runLSIDBeforeExecIdChange, KARFile karFile)
1437    {
1438        
1439        try
1440        {
1441                // get all entries that are ReportLayoutKAREntryHandler and ReportInstanceKAREntryHandler
1442                List<KAREntry> entries = karFile.karEntries();
1443                Iterator<KAREntry> itr = entries.iterator();
1444                while (itr.hasNext())
1445                {
1446                        KAREntry karEntry = itr.next();
1447                        String handler = karEntry.getHandler();
1448                                String name = karEntry.getName();
1449                                String type = karEntry.getType();
1450
1451                        //FIXME hardcoded strings
1452                        if (handler.endsWith("ReportLayoutKAREntryHandler") || 
1453                                        handler.endsWith("ReportInstanceKAREntryHandler"))
1454                        {
1455                                if (karEntry.dependsOn(runLSIDBeforeExecIdChange)){
1456                                        insertHuskRunReportFile(name, type, karFile, run.getExecId());
1457                                }
1458                        }
1459                        
1460                }
1461                } 
1462        catch (Exception e) 
1463        {
1464                        // TODO Auto-generated catch block
1465                        e.printStackTrace();
1466                }
1467    }
1468    
1469    private void insertAnyHuskRunErrorMessages(WorkflowRun run) throws RecordingException
1470    {
1471        try
1472        {
1473                Map<Integer, String> errorMsgs = run.getErrorMessages();
1474                
1475                Iterator<Integer> itr = errorMsgs.keySet().iterator();
1476                while (itr.hasNext()){
1477                        String errorMsg = errorMsgs.get(itr.next());
1478                        synchronized(_psErrorInsert)
1479                        {
1480                                //TODO I believe it makes sense to not include the original entity id
1481                                        // since we may be on a different system. But is UNKNOWN_ID what we want?
1482                                _psErrorInsert.setInt(1, RegEntity.UNKNOWN_ID); 
1483                                _psErrorInsert.setInt(2, run.getExecId());
1484                                _psErrorInsert.setString(3, errorMsg);
1485                                _dbType.insert(_psErrorInsert, "error", "id");
1486                    
1487                                if(_debugWriter != null)
1488                                {
1489                                        _debugWrite("INSERT INTO ERROR(" + RegEntity.UNKNOWN_ID + ", " +
1490                                                        run.getExecId() + ", " + errorMsg + ")");
1491                                }
1492                        }
1493                }
1494        }
1495        catch(SQLException e)
1496        {
1497            throw new RecordingException("Error inserting into error table: ",e);
1498        } 
1499        catch (RecordingException e) 
1500        {
1501                        // TODO Auto-generated catch block
1502                        e.printStackTrace();
1503                }
1504    }
1505    
1506    
1507    private void insertAnyHuskRunTags(WorkflowRun run) throws RecordingException
1508    {
1509        
1510        Map<NamedOntClass, String> tags = run.getTags();
1511        for(Map.Entry<NamedOntClass, String> entry : tags.entrySet())
1512        {
1513                NamedOntClass tag = entry.getKey();
1514            String type = entry.getValue();
1515                tag.getConceptId();
1516                type = tags.get(tag);
1517                insertTag(type, tag.toString(), tag.getConceptId(), run.getExecId());
1518        }
1519        
1520    }
1521    
1522    
1523    private void insertHuskRunReportFile(String reportItem, String type, KARFile karFile, int runExecId)
1524    {
1525        
1526                //any reason file should be gotten out of ObjectManager instead of straight from kar?
1527                //File reportLayoutFile = karFile.getKAREntry(reportLayoutLSID);
1528                //NamedObj reportLayout = ObjectManager.getInstance().getHighestObjectRevision(reportLayoutLSID);
1529        
1530                JarEntry je = karFile.getJarEntry(reportItem);
1531                Map<String, String>metadataMap = new HashMap<String, String>();
1532                byte[] buf = new byte[1024];
1533                int len = 0;
1534                
1535        try 
1536        {       
1537                File file = File.createTempFile("keplerTemp", je.getName());
1538                try(InputStream in = new BufferedInputStream(karFile.getInputStream(je));
1539                        OutputStream out = new FileOutputStream(file.getAbsolutePath());)
1540                {
1541                        while ((len = in.read(buf)) > 0)
1542                        {
1543                                out.write(buf, 0, len);
1544                        }
1545                }
1546                
1547                metadataMap.put("type", type);
1548                addFileForExecution(metadataMap, file, runExecId);
1549                boolean deleted = file.delete();
1550                if (!deleted){
1551                        System.out.println("SQLRecordingV8 WARN - unable to delete file:"+file.toString());
1552                }
1553        } 
1554        catch (IOException e) 
1555        {
1556                // TODO Auto-generated catch block
1557                e.printStackTrace();
1558        } 
1559        catch (RecordingException e) 
1560        {
1561                        // TODO Auto-generated catch block
1562                        e.printStackTrace();
1563                }
1564    }
1565    
1566
1567    //helper method determines which rows in data table can be deleted when deleting workflow executions
1568    private String getDataMD5sToDelete(StringBuilder lsids) throws RecordingException
1569    {
1570        
1571        StringBuilder md5sToDelete = new StringBuilder();
1572        
1573        String wfExecStr = _dbType.getTableName("workflow_exec");
1574        String associatedDataStr = _dbType.getTableName("associated_data");
1575        String portEventStr = _dbType.getTableName("port_event");
1576        String actorFireStr = _dbType.getTableName("actor_fire");
1577        
1578        // FIXME: declare all PreparedStatements locally,
1579        // close all PreperatedStatements and ResultSets.
1580           
1581        try 
1582        {
1583                // gather md5s (wf_contents_id) for workflow_exec
1584                String wfContentsIdsToDelete = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr +
1585                        " wfe WHERE wfe.lsid IN (" +lsids+ ")";
1586                        _psGetWfContentsIdsToDeleteQuery = _dbType.getPrepStatement(wfContentsIdsToDelete);
1587                        ResultSet resultwfContents = _psGetWfContentsIdsToDeleteQuery.executeQuery();
1588                        
1589                // gather md5s (wf_contents_id) for workflow_exec that are still referred to and can't be deleted
1590                String wfContentsIdsUsedByOthers = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr +
1591                        " wfe WHERE wfe.lsid NOT IN (" +lsids+ ")";
1592                _psGetWfContentsIdsToNotDeleteQuery = _dbType.getPrepStatement(wfContentsIdsUsedByOthers);
1593                ResultSet resultwfContents2 = _psGetWfContentsIdsToNotDeleteQuery.executeQuery();
1594                        
1595                        // gather md5s (data_id) from associated_data
1596                String associatedDataDataIdsToDelete = "SELECT ad.data_id FROM "+associatedDataStr+" ad WHERE ad.wf_exec_id IN " +
1597                                "(SELECT wfe.id FROM "+wfExecStr+ " wfe WHERE wfe.lsid IN (" +lsids+ "))";
1598                        _psGetAssociatedDataDataIdsToDeleteQuery = _dbType.getPrepStatement(associatedDataDataIdsToDelete);
1599                        ResultSet resultAssociatedDataDataIds = _psGetAssociatedDataDataIdsToDeleteQuery.executeQuery();
1600                        
1601                        // gather md5s (data_id) from associated_data that are still referred to and can't be deleted
1602                String associatedDataDataIdsToNotDelete = "SELECT ad.data_id FROM "+associatedDataStr+" ad WHERE ad.wf_exec_id IN " +
1603                                "(SELECT wfe.id FROM "+wfExecStr+ " wfe WHERE wfe.lsid NOT IN (" +lsids+ "))";
1604                _psGetAssociatedDataDataIdsToNotDeleteQuery = _dbType.getPrepStatement(associatedDataDataIdsToNotDelete);
1605                ResultSet resultAssociatedDataDataIds2 = _psGetAssociatedDataDataIdsToNotDeleteQuery.executeQuery();
1606                        
1607                        // gather md5s (data_id) from port_event
1608                        String portEventDataIdsToDelete = "SELECT DISTINCT pe.data_id FROM "+portEventStr+" pe WHERE pe.fire_id IN " +
1609                                        "(SELECT af.id FROM "+actorFireStr+" af WHERE af.wf_exec_id IN " +
1610                                        "(SELECT wfe.id FROM "+wfExecStr+" wfe WHERE wfe.lsid IN ("+lsids+")))";
1611                        // written this way, the query is slower.
1612                        //String portEventDataIdsToDelete2 = "SELECT DISTINCT pe.data_id FROM "+ portEventStr+" pe,"+ actorFireStr+" af,"+
1613                        //      wfExecStr+" wfe WHERE pe.fire_id = af.id AND af.wf_exec_id = wfe.id AND wfe.lsid IN ("+lsids+")";
1614                        
1615                        _psGetPortEventDataIdsToDeleteQuery = _dbType.getPrepStatement(portEventDataIdsToDelete);
1616                        ResultSet resultPortEventDataIds = _psGetPortEventDataIdsToDeleteQuery.executeQuery();
1617                        
1618                        // gather md5s (data_id) from port_event that are still referred to and can't be deleted
1619                        String portEventDataIdsToNotDelete = "SELECT DISTINCT pe.data_id FROM "+portEventStr+" pe WHERE pe.fire_id IN " +
1620                                        "(SELECT af.id FROM "+actorFireStr+" af WHERE af.wf_exec_id IN " +
1621                                                        "(SELECT wfe.id FROM "+wfExecStr+" wfe WHERE wfe.lsid NOT IN ("+lsids+")))";
1622                        // written this way, the query is slower.
1623                        //String portEventDataIdsToNotDelete2 = "SELECT DISTINCT pe.data_id FROM "+ portEventStr+" pe,"+ actorFireStr+" af,"+
1624                        //      wfExecStr+" wfe WHERE pe.fire_id = af.id AND af.wf_exec_id = wfe.id AND wfe.lsid NOT IN ("+lsids+")";
1625                        
1626                        _psGetPortEventDataIdsToNotDeleteQuery = _dbType.getPrepStatement(portEventDataIdsToNotDelete);
1627                        ResultSet resultPortEventDataIds2 = _psGetPortEventDataIdsToNotDeleteQuery.executeQuery();                      
1628                        
1629                        List<String> md5s = new ArrayList<String>();
1630            while(resultwfContents.next())
1631            {
1632                md5s.add(resultwfContents.getString("wf_contents_id"));
1633            }
1634            resultwfContents.close();
1635            while (resultwfContents2.next())
1636            {
1637                md5s.remove(resultwfContents2.getString("wf_contents_id"));
1638            }
1639            resultwfContents2.close();
1640            while(resultAssociatedDataDataIds.next())
1641            {
1642                md5s.add(resultAssociatedDataDataIds.getString("data_id"));
1643            }
1644            resultAssociatedDataDataIds.close();
1645            while (resultAssociatedDataDataIds2.next())
1646            {
1647                md5s.remove(resultAssociatedDataDataIds2.getString("data_id"));
1648            }
1649            resultAssociatedDataDataIds2.close();
1650            while(resultPortEventDataIds.next())
1651            {
1652                md5s.add(resultPortEventDataIds.getString("data_id"));
1653            }
1654            resultPortEventDataIds.close();
1655            while (resultPortEventDataIds2.next())
1656            {
1657                md5s.remove(resultPortEventDataIds2.getString("data_id"));
1658            }
1659            resultPortEventDataIds2.close();
1660            
1661            Iterator<String> itr = md5s.iterator();
1662            while (itr.hasNext())
1663            {
1664                md5sToDelete.append("'"+itr.next()+"',");
1665            }
1666            
1667            // remove trailing comma
1668            if (md5sToDelete.length() > 0)
1669            {
1670                md5sToDelete.delete(md5sToDelete.length()-1, md5sToDelete.length());
1671            }
1672                                
1673                } 
1674        catch (SQLException e) 
1675                {
1676            throw new RecordingException("Error while determining which rows in data table to delete:", e);
1677                }
1678
1679                return md5sToDelete.toString();
1680    }
1681    
1682    ////////////////////////////////////////////////////////////////////////
1683    //// protected methods                                              ////
1684
1685    /** Add a new row to the entity table. */
1686    @Override
1687    protected RegEntity _addEntity(int containerId, RegEntity.EntityType type,
1688        String fullName, String displayName, int prevId)
1689        throws RecordingException, SQLException
1690    {
1691        // see if we've recording the workflow change
1692        if(_evolId == RegEntity.UNKNOWN_ID)
1693        {
1694            _addWorkflowChange();
1695        }
1696
1697        // see if display name is different
1698        String rowDisplayName;
1699                
1700        String name = fullName.substring(fullName.lastIndexOf(".") + 1);
1701
1702        if(name.equals(displayName))
1703        {
1704            rowDisplayName = "";
1705        }
1706        else
1707        {
1708            rowDisplayName = displayName;
1709        }
1710
1711        String typeStr = type.toString();
1712        
1713        //(wf_change_id, container_id, type, name, prev_id, wf_id) " + 
1714        //System.out.println("evolId = " + _evolId);
1715        
1716        _psEntityInsert.setInt(1, _evolId);
1717        _psEntityInsert.setString(2, typeStr);
1718        _psEntityInsert.setString(3, fullName);
1719        _psEntityInsert.setString(4, rowDisplayName);
1720        _psEntityInsert.setInt(5, prevId);
1721        _psEntityInsert.setInt(6, _wfId);
1722        int newId = _dbType.insert(_psEntityInsert, "entity", "id");
1723        RegEntity retval =  new RegEntity(newId, true, containerId, type);
1724
1725        if(_debugWriter != null)
1726        {
1727            _debugWrite("INSERT INTO ENTITY (" + _evolId + ", " + type +
1728                ", " + fullName + ", " + rowDisplayName + ", " + prevId +
1729                ", " + _wfId + ")");
1730        }
1731
1732        return retval;
1733    }
1734
1735    /** Add a new row to the workflow table. */
1736    @Override
1737    protected void _addWorkflow() throws RecordingException
1738    {
1739        if(_wfLSID == null)
1740        {
1741            throw new RecordingException("No workflow LSID");
1742        }
1743
1744        //System.out.println("INSERTING new workflow " + _wfLSID.toString());
1745        
1746        try
1747        {
1748            synchronized(_psWorkflowInsert)
1749            {
1750                _psWorkflowInsert.setString(1, _wfNameStr);
1751                _psWorkflowInsert.setString(2, _wfLSID.toStringWithoutRevision());
1752                _wfId = _dbType.insert(_psWorkflowInsert, "workflow", "id");
1753                _wfReset();
1754            }
1755
1756            if(_debugWriter != null)
1757            {
1758                _debugWrite("INSERT INTO WORKFLOW(" + _wfNameStr + ", lsid)");
1759            }
1760        }
1761        catch(SQLException e)
1762        {
1763            throw new RecordingException("Error adding row to workflow:", e);
1764        }
1765    }
1766
1767    /** Create a new row in the workflow_change table. */
1768    @Override
1769    protected void _addWorkflowChange() throws RecordingException
1770    {
1771        if(_wfUserStr == null)
1772        {
1773            throw new RecordingException("Need workflow user name.");
1774        }
1775    
1776        if(_machineStr == null)
1777        {
1778            throw new RecordingException("Need machine name.");
1779        }
1780
1781        try
1782        {
1783            synchronized(_psWorkflowChangeInsert)
1784            {
1785                _psWorkflowChangeInsert.setString(1, _wfUserStr);
1786                _psWorkflowChangeInsert.setTimestamp(2, 
1787                    _workflowChangeTimeStack.peek());
1788                _psWorkflowChangeInsert.setInt(3, _wfId);
1789                _psWorkflowChangeInsert.setString(4, _machineStr);
1790                _evolId = _dbType.insert(_psWorkflowChangeInsert,
1791                    "workflow_change", "id");
1792                //System.out.println("got new evolId = " + _evolId);
1793
1794                if(_debugWriter != null)
1795                {
1796                    _debugWrite("INSERT INTO WORKFLOW_CHANGE(" + 
1797                        _wfUserStr + ", wfChangeTime, " + _wfId + ", " +
1798                        _machineStr + ")");
1799                }
1800            }
1801        }
1802        catch(SQLException e)
1803        {
1804            throw new RecordingException("Error adding to workflow_change: ",
1805                e);
1806        }
1807    }
1808
1809    /** Modify a NamedObj's full name. We remove the containing composite's
1810     *  name, including the period.
1811     */
1812    @Override
1813    protected String _changeEntityFullName(String name)
1814    {
1815        //_debug("name = " + name + "  | containerFullName  = " + _containerFullName);
1816        //_debug("  real container name = " + _recorder.getContainer().getFullName());        
1817        
1818        // see if the name is our container's 
1819        if(name.equals(_containerFullName))
1820        {
1821            // see if we have a relative name
1822            if(_containerName == null || _containerName.length() == 0)
1823            {
1824                return "";
1825            }
1826            else
1827            {
1828                return _containerFullName.substring(_containerFullName.indexOf(".", 1));
1829            }
1830        }
1831        // see if it is the Graph Editor's default blank composite.
1832        else if(name.equals(".") && _containerFullName.equals(
1833            ".configuration.effigyFactory.Graph Editor.blank"))
1834        {
1835            return "";
1836        }
1837        else
1838        {
1839            String relativeName;
1840            
1841            // NOTE: _containerFullName may not be a substring of name.
1842            // this can happen, e.g., when using the ModelReference actor
1843            // see http://bugzilla.ecoinformatics.org/show_bug.cgi?id=4359
1844            //
1845            // instead of using _containerFullName, remove everything up to
1846            // the second period. e.g., .foo.bar becomes .bar
1847            //
1848                        
1849            int index = name.indexOf(".", 1);
1850            
1851            // if there is no second period, i.e., name is the top level
1852            // composite, return an empty string.
1853            if(index == -1)
1854            {
1855                relativeName = "";
1856            }
1857            else
1858            {
1859                relativeName = name.substring(index);
1860            }
1861                
1862            //_debug("name " + name + " --> " + relativeName +
1863                //" (container name = " + _containerFullName + ")");
1864            return relativeName;
1865        }
1866    }
1867
1868    /** Check parameters before (re)connecting to database or workflow. */
1869    /*
1870    protected void _checkParameters() throws IllegalActionException,
1871        RecordingException
1872    {
1873        if((_wfNameStr == null || _wfNameStr.length() == 0) &&
1874            (_containerFullName.equals(".") || _containerFullName.equals(
1875                ".configuration.effigyFactory.Graph Editor.blank")))
1876        {
1877            _wfNameStr = TMP_START_NAME_STR + _random.nextInt(); 
1878
1879            _debug("checkParameters() setting workflow name to: "
1880                + _wfNameStr);
1881            _debug("    container is " + _containerFullName + " hash = "
1882                + _recorder.getContainer().hashCode());
1883        }
1884
1885        super._checkParameters();
1886    }
1887    */
1888    
1889    /** Check validity of workflow name parameter. */
1890    @Override
1891    protected void _checkWorkflowName() throws RecordingException
1892    {
1893        // do nothing since workflow name not required.
1894    }
1895
1896    /** Initialize the prepared statements. */
1897    @Override
1898    protected void _createPreparedStatements() throws SQLException
1899    {
1900        if(_psWorkflowInsert == null && _schema.containsTable("workflow"))
1901        {
1902            _psWorkflowInsert = _dbType.getSQLInsert("workflow", "id",
1903                "name, lsid", "?, ?");
1904        }
1905
1906        if(_psWorkflowQuery == null && _schema.containsTable("workflow"))
1907        {
1908            _psWorkflowQuery = _dbType.getSQLSelect("workflow", "id",
1909                "lsid = ?");
1910        }
1911        
1912        if(_psActorFireStart == null && _schema.containsTable("actor_fire"))
1913        {
1914            String defaultTimeStr = _dbType.getDefaultTimeStr();
1915            _psActorFireStart = _dbType.getSQLInsert("actor_fire", "id",
1916                "actor_id, wf_exec_id, start_time, end_time", "?, ?, ?, " +
1917                defaultTimeStr);
1918        }
1919
1920        if(_psEntityInsert == null && _schema.containsTable("entity"))
1921        {
1922            _psEntityInsert = _dbType.getSQLInsert("entity", "id",
1923                "wf_change_id, type, name, display, prev_id, " +
1924                "wf_id", "?, ?, ?, ?, ?, ?");
1925        }
1926
1927        if(_psWorkflowChangeInsert == null && 
1928            _schema.containsTable("workflow_change"))
1929        {
1930            _psWorkflowChangeInsert = _dbType.getSQLInsert("workflow_change",
1931                "id", "user, time, wf_id, host_id", "?, ?, ?, ?");
1932        }
1933       
1934        if(_psWorkflowExecStart == null &&
1935            _schema.containsTable("workflow_exec"))
1936        {
1937            String defaultTimeStr = _dbType.getDefaultTimeStr();
1938            _psWorkflowExecStart = _dbType.getSQLInsert("workflow_exec", "id",
1939                "wf_id, user, start_time, wf_contents_id, end_time, host_id, " +
1940                "annotation, lsid, wf_full_lsid, module_dependencies, type",
1941                "?, ?, ?, ?, " + defaultTimeStr + ", ?, ?, ?, ?, ?, ?");
1942        }
1943
1944        // port.type was removed.
1945        if(_psPortInsert == null && _schema.containsTable("port"))
1946        {
1947            _psPortInsert = _dbType.getSQLInsert("port",
1948                "id, direction, multiport", "?, ?, ?");
1949        }
1950    
1951        if(_psPortEventInsertWithChecksum == null && _schema.containsTable("port_event"))
1952        {
1953            _psPortEventInsertWithChecksum = _dbType.getSQLInsert("port_event", "id",
1954                //"channel, data_id, fire_id, file_id, port_id, rw_fire_id, " +
1955                "channel, data_id, fire_id, file_id, port_id, " +
1956                "time, type, write_event_id",  
1957                "?, ?, ?, ?, ?, ?, ?, ?");
1958        }
1959
1960        if(_psPortEventInsert == null && _schema.containsTable("port_event"))
1961        {
1962            _psPortEventInsert = _dbType.getSQLInsert("port_event", "id",
1963                "channel, data, fire_id, file_id, port_id, " +
1964                "time, type, write_event_id",  
1965                "?, ?, ?, ?, ?, ?, ?, ?");
1966        }
1967
1968        if(_psDataInsert == null && _schema.containsTable("data"))
1969        {
1970            _psDataInsert = _dbType.getSQLInsert("data",
1971                "contents, md5, truncated", "?, ?, ?");
1972        }
1973        
1974        if(_psDataMD5Query == null && _schema.containsTable("data"))
1975        {
1976            _psDataMD5Query = _dbType.getSQLSelect("data", "md5",
1977                "md5 = ?");
1978        }
1979        /*
1980        //Note. not setting these up currently, see Note1 in SQLQueryV8.
1981        if(_psGetWfContentsIdsToDeleteQuery == null && _schema.containsTable("workflow_exec"))
1982        {
1983                String wfExecStr = _dbType.getTableName("workflow_exec");
1984                String wfContentsIdsOfToDelete = "SELECT DISTINCT wfe.wf_contents_id FROM " + wfExecStr +
1985                        " wfe WHERE wfe.lsid IN (";
1986                int someLargeNumber = 10000;
1987                for (int i=0; i< someLargeNumber; i++){
1988                        wfContentsIdsOfToDelete.concat("?,?,?,?,?,?,?,?,?,?");
1989                }
1990                wfContentsIdsOfToDelete.concat(")");
1991                        _psGetWfContentsIdsToDeleteQuery = _dbType.getPrepStatement(wfContentsIdsOfToDelete);
1992        }
1993        if(_psGetWfContentsIdsToNotDeleteQuery == null && ...)
1994        {
1995        }
1996        if(_psGetAssociatedDataDataIdsToDeleteQuery == null && ...)
1997        {
1998        }
1999        if(_psGetAssociatedDataDataIdsToNotDeleteQuery == null && ...)
2000        {
2001        }
2002        if(_psGetPortEventDataIdsToDeleteQuery == null && ...)
2003        {
2004        }
2005        if(_psGetPortEventDataIdsToNotDeleteQuery == null && ...)
2006        {
2007        }
2008        */
2009        
2010        if(_psChangeWorkflowName == null && _schema.containsTable("workflow"))
2011        {
2012            _psChangeWorkflowName = _dbType.getSQLUpdate("workflow",
2013                "name = ?", "lsid = ?");
2014        }
2015        
2016        if(_psChangeExecutionLSID == null && _schema.containsTable("workflow_exec"))
2017        {
2018                _psChangeExecutionLSID = _dbType.getSQLUpdate("workflow_exec",
2019                "lsid = ?", "id = ?");
2020        }
2021
2022        if(_psChangeExecutionReferralList == null &&
2023                _schema.containsTable("workflow_exec"))
2024        {
2025                _psChangeExecutionReferralList = _dbType.getSQLUpdate("workflow_exec",
2026                "derived_from = ?", "id = ?");
2027        }
2028        
2029        if(_psChangeExecutionType == null &&
2030                _schema.containsTable("workflow_exec"))
2031        {
2032            _psChangeExecutionType = _dbType.getSQLUpdate("workflow_exec",
2033                "type = ?", "lsid = ?");
2034        }
2035
2036        if(_psAssocDataInsert == null &&
2037            _schema.containsTable("associated_data"))
2038        {
2039            _psAssocDataInsert = _dbType.getSQLInsert("associated_data", "id",
2040                "data_id, name, val, wf_exec_id", "?, ?, ?, ?");
2041        }
2042
2043        if(_psAssocDataInsertNoDataId == null &&
2044                _schema.containsTable("associated_data"))
2045        {
2046            _psAssocDataInsertNoDataId = _dbType.getSQLInsert("associated_data", "id",
2047                "name, val, wf_exec_id", "?, ?, ?");
2048        }
2049    
2050        if(_psErrorInsert == null &&
2051            _schema.containsTable("error"))
2052        {
2053            _psErrorInsert = _dbType.getSQLInsert("error", "id",
2054                "entity_id, exec_id, message", "?, ?, ?");
2055        }
2056        
2057        // tagging
2058        
2059        if(_psGetTagForExecIdAndURN == null && _schema.containsTable("tag"))
2060        {
2061                _psGetTagForExecIdAndURN = _dbType.getSQLSelect("tag", "urn", "wf_exec_id = ? and urn = ?");   
2062        }
2063        
2064        if(_psTagInsert == null && _schema.containsTable("tag"))
2065        {
2066            _psTagInsert = _dbType.getSQLInsert("tag", "type, searchstring, urn, wf_exec_id",
2067                "?,?,?,?");
2068        }
2069        
2070        if(_psDeleteTagForExecIdAndURN == null && _schema.containsTable("tag"))
2071        {
2072                _psDeleteTagForExecIdAndURN = _dbType.getSQLDelete("tag",
2073                "wf_exec_id = ? and urn = ?");
2074        }
2075        
2076        //TODO use or create a getSQLxxxx ?
2077        /*
2078        if(_psDeleteExecutions == null && 
2079                        _schema.containsTable("workflow_exec"))
2080        {
2081                String wfExecStr = _dbType.getTableName("workflow_exec");
2082                String queryStr = "DELETE FROM " +
2083                        wfExecStr + " wfe " +
2084                        "WHERE wfe.id IN (?)";
2085                _psDeleteExecutions = _dbType.getPrepStatement(queryStr);
2086        }
2087        if(_psDeleteData == null){
2088        
2089        }
2090        */
2091        
2092        if(_psParameterExecInsert == null &&
2093            _schema.containsTable("parameter_exec"))
2094        {
2095            _psParameterExecInsert = _dbType.getSQLInsert("parameter_exec",
2096                "parameter_id, wf_exec_id", "?, ?");
2097        }
2098        
2099        String wfStr = _dbType.getTableName("workflow");
2100        String wfExecStr = _dbType.getTableName("workflow_exec");
2101        
2102        if(_psDeleteWorkflowsForNoExecutions == null)
2103        {
2104            String sqlStr = "DELETE FROM " + wfStr + " wf " +
2105                "WHERE wf.id NOT IN (SELECT wf_id FROM " + wfExecStr + ")"; 
2106            _psDeleteWorkflowsForNoExecutions = _dbType.getPrepStatement(sqlStr);
2107        }
2108        
2109        if(_psWorkflowIdsForNoExecutions == null)
2110        {
2111            String sqlStr = "SELECT wf.id FROM " + wfStr + " wf " +
2112                "WHERE wf.id NOT IN (SELECT wf_id FROM " + wfExecStr + ")";
2113            _psWorkflowIdsForNoExecutions = _dbType.getPrepStatement(sqlStr);
2114        }
2115        
2116        // create the remaining prepared statements.
2117        super._createPreparedStatements();
2118    }
2119
2120    /** Create a Schema to reflect the v8 schema. */
2121    @Override
2122    protected Schema _createSchema()
2123    {
2124        return Schemas.createSchemaV8();
2125    }
2126
2127    /** Get the maximum size of token data that can be stored in the 
2128     *  database.
2129     */
2130    @Override
2131    protected int _getMaxTokenDataSize() throws SQLException
2132    {
2133        return _dbType.getColumnSize("data", "contents");
2134    }
2135
2136    /** Get the internal workflow id. */
2137    @Override
2138    protected int _getWorkflowId() throws RecordingException
2139    {
2140        if(_wfLSID == null) 
2141        {
2142            _wfLSID = NamedObjId.getIdFor(_recorderContainer);
2143        }
2144        
2145        //System.out.println("WF LSID is " + _wfLSID);
2146
2147        // use the lsid string as the string id.
2148        return _getWorkflowId(_wfLSID.toStringWithoutRevision());
2149    }   
2150
2151    /** Find the container id of an entity. Always returns
2152     *  RegEntity.UNKNOWN_ID since the V8 schema does not use the
2153     *  container id.
2154     */
2155    @Override
2156    protected int _getContainerId(Nameable namedObj) 
2157        throws RecordingException
2158    {
2159        return RegEntity.UNKNOWN_ID;
2160    }
2161    
2162    /** Set our prepared statements to null. */
2163    @Override
2164    protected void _nullPreparedStatements()
2165    {
2166        super._nullPreparedStatements();
2167
2168        _psDataInsert = null;
2169        _psDataMD5Query = null;
2170        _psPortEventInsert = null;
2171        _psPortEventInsertWithChecksum = null;
2172        _psChangeWorkflowName = null;
2173        _psChangeExecutionLSID = null;
2174        _psAssocDataInsert = null;
2175        _psAssocDataInsertNoDataId = null;
2176        _psErrorInsert = null;
2177        _psDeleteExecutions = null;
2178        _psDeleteData = null;
2179        _psGetWfContentsIdsToDeleteQuery = null;
2180        _psGetWfContentsIdsToNotDeleteQuery = null;
2181        _psGetAssociatedDataDataIdsToDeleteQuery = null;
2182        _psGetPortEventDataIdsToDeleteQuery = null;
2183        _psGetTagForExecIdAndURN = null;
2184        _psTagInsert = null;
2185        _psDeleteTagForExecIdAndURN = null;
2186        _psDeleteTags = null;
2187        _psParameterExecInsert = null;
2188        _psDeleteWorkflowsForNoExecutions = null;
2189        _psWorkflowIdsForNoExecutions = null;
2190        _psChangeExecutionType = null;
2191    }
2192
2193    /** Reconnect to the database. */
2194    @Override
2195    protected void _reconnectDatabase(boolean resetDB) throws RecordingException
2196    {
2197        super._reconnectDatabase(resetDB);
2198
2199        try
2200        {
2201            _queryable = new SQLQueryV8(_dbType);
2202        }
2203        catch(QueryException e)
2204        {
2205            throw new RecordingException("Error creating queryable: ", e);
2206        }
2207    }
2208
2209    /** Re-acquire the current workflow id. */
2210    @Override
2211    protected void _reconnectWorkflow() throws RecordingException
2212    {
2213        super._reconnectWorkflow();
2214
2215        //_debug("reconnectWorkflow()");
2216        //_debug("    wf name : " + _wfNameStr);
2217        //_debug("    _containerFullName: " + _containerFullName);
2218
2219
2220        // see if the workflow name is empty and the container has a name
2221        // (the latter means the workflow has been saved to a file).
2222        if(_wfNameStr.length() == 0 &&
2223            _containerFullName.length() > 1 && !_containerFullName.equals(
2224            ".configuration.effigyFactory.Graph Editor.blank"))
2225        {
2226            // change the workflow name to the top level
2227            // composite's name.
2228            _changeWorkflowNameColumn(_recorderContainer.getName());
2229        }       
2230     }
2231
2232    /** Change the workflow.name column for the current workflow. */
2233    protected void _changeWorkflowNameColumn(String newName)
2234        throws RecordingException
2235    {
2236        if(_wfLSID == null)
2237        {
2238            throw new RecordingException("No workflow LSID");
2239        }
2240
2241        _changeWorkflowNameColumn(newName, _wfLSID);
2242    }
2243    
2244    /** Change the workflow.name column for a specific LSID. */
2245    protected void _changeWorkflowNameColumn(String newName, KeplerLSID lsid)
2246        throws RecordingException
2247    {
2248        //_debug("    changing wf name from " + _wfNameStr + " to " + newName);
2249
2250        // change the name in the workflow table
2251        try
2252        {
2253            _psChangeWorkflowName.setString(1, newName);
2254            _psChangeWorkflowName.setString(2, lsid.toStringWithoutRevision());
2255            _psChangeWorkflowName.execute();
2256        }
2257        catch(SQLException e)
2258        {
2259            throw new RecordingException("Unable to change workflow name:", e);
2260        }
2261
2262        if(_debugWriter != null)
2263        {
2264            _debugWrite("UPDATE workflow SET name = " + newName + " " +
2265                "WHERE lsid = " + lsid.toStringWithoutRevision());
2266        }
2267
2268        _wfNameStr = newName;
2269    }
2270        
2271        /**
2272         * Change execution LSID for an execution Id. Does not change and returns
2273         * false for attempts to change to an older or current revision, or if a
2274         * QueryException, or if no such execution Id.
2275         * 
2276         * @param execId
2277         * @param newExecLSID
2278         * @return
2279         * @throws RecordingException
2280         */
2281        @Override
2282    public boolean changeExecutionLSID(int execId, KeplerLSID newExecLSID,
2283                        Queryable q) throws RecordingException 
2284        {
2285
2286                // make sure we're connected to db
2287                _checkConnection(false);
2288
2289                KeplerLSID currentLSID;
2290
2291                try 
2292                {
2293                        currentLSID = q.getExecutionLSIDForExecution(execId);
2294                } 
2295                catch (QueryException e1) 
2296                {
2297                        // TODO Auto-generated catch block
2298                        e1.printStackTrace();
2299                        return false;
2300                }
2301
2302                if (currentLSID == null) 
2303                {
2304                        return false;
2305                }
2306
2307                if (currentLSID.equalsWithoutRevision(newExecLSID))
2308                {
2309                        // disallow changing to an older exec LSID
2310                        if (currentLSID.getRevision() >= newExecLSID.getRevision()) 
2311                        {
2312                                // _warn("Current executionLSID:"+ currentLSID +
2313                                        // " >= "+newExecLSID+" not changing.");
2314                                return false;
2315                        }
2316                }
2317                else
2318                {
2319                        // TODO no need to do anything special for lsid 
2320                        // notLocalToInstance, is there?
2321                }
2322
2323                try 
2324                {
2325
2326                        _psChangeExecutionLSID.setString(1, newExecLSID.toString());
2327                        _psChangeExecutionLSID.setInt(2, execId);
2328                        _psChangeExecutionLSID.execute();
2329
2330                        if (_debugWriter != null) 
2331                        {
2332                                _debugWrite("UPDATE WORKFLOW_EXEC SET lsid = " + newExecLSID
2333                                                + " " + "WHERE id = lsid");
2334                        }
2335                        return true;
2336
2337                } 
2338                catch (SQLException e) 
2339                {
2340                        throw new RecordingException("Could not change execution lsid:", e);
2341                }
2342        }
2343        
2344        /**
2345         * Change the execution type.
2346         * @param executionLSID
2347         * @param type
2348         * @return
2349         * @throws RecordingException
2350         */
2351        public boolean changeExecutionType(KeplerLSID executionLSID, WorkflowRun.type type) throws RecordingException{
2352                try 
2353                {
2354                        _psChangeExecutionType.setString(1, type.toString());
2355                        _psChangeExecutionType.setString(2, executionLSID.toString());
2356                        _psChangeExecutionType.execute();
2357                        
2358                        if (_debugWriter != null) 
2359                        {
2360                                _debugWrite("UPDATE WORKFLOW_EXEC SET type = " + type.toString()
2361                                                + " " + "WHERE lsid = lsid");
2362                        }
2363                        return true;
2364                        
2365                } 
2366                catch (SQLException e) 
2367                {
2368                        throw new RecordingException("Could not change execution type:", e);
2369
2370                }
2371        }
2372
2373    /** Add a port to the port table. */
2374    @Override
2375    protected void _regPortReal(TypedIOPort port, RegEntity re)
2376        throws RecordingException
2377    {
2378        boolean multi = port.isMultiport();
2379
2380        int direction;
2381        if(port.isInput() && port.isOutput())
2382        {
2383            direction = PortDirection.InOut.ordinal();
2384        }
2385        else if(port.isInput())
2386        {
2387            direction = PortDirection.In.ordinal();
2388        }
2389        else
2390        {
2391            direction = PortDirection.Out.ordinal();
2392        }
2393
2394        try
2395        {
2396            synchronized(_psPortInsert)
2397            {
2398                _psPortInsert.setInt(1, re.getId());
2399                _psPortInsert.setInt(2, direction);
2400                _psPortInsert.setBoolean(3, multi);
2401                _psPortInsert.executeUpdate();
2402            }
2403
2404            if(_debugWriter != null)
2405            {
2406                _debugWrite("INSERT INTO PORT (id, " + direction +
2407                    ", " + multi + ")");
2408            }
2409
2410        }
2411        catch(SQLException e)
2412        {
2413            throw new RecordingException(_getExceptionMessage(e));
2414        }
2415    }
2416
2417
2418    /** Record a specific type of firing for an actor. */
2419    @Override
2420    protected void _recordFiringEvent(Actor actor, 
2421        FiringEvent.FiringEventType type, Date timestamp) throws RecordingException
2422    {
2423        /*
2424        System.out.println("firing event: " + type +
2425                " for " + _getNameableFullName(actor) +
2426                " at " + timestamp);
2427        */
2428        
2429        // see if it's a start or stop.
2430        FireState<Integer> fs = _fireStateTable.get(actor);
2431
2432        // make sure the fire state exists.
2433        if(fs == null)
2434        {   
2435            throw new RecordingException(
2436                "Received actor fire event for unregistered actor: " +
2437                _getNameableFullName(actor));
2438        }
2439
2440        synchronized(fs)
2441        {
2442            // get the last type of firing start
2443            FiringEvent.FiringEventType lastStartType =
2444                fs.getLastStartFireType();
2445
2446            // see if current firing is new iteration:
2447            // NOTE: PN does not report iterate firings so the iteration
2448            // may begin with prefire if the last type of firing was not
2449            // iterate.
2450            if(type == FiringEvent.BEFORE_ITERATE ||
2451                (type == FiringEvent.BEFORE_PREFIRE &&
2452                lastStartType != FiringEvent.BEFORE_ITERATE))
2453            {
2454                int fireId;
2455
2456                //_debug("start firing: " + type + " for " + _getNameableFullName(actor));
2457
2458                try
2459                {
2460                    synchronized(_psActorFireStart)
2461                    {
2462                        _psActorFireStart.setInt(1, fs.getActorId());
2463                        _psActorFireStart.setInt(2, _wfExecId);
2464                        _psActorFireStart.setTimestamp(3, new Timestamp(timestamp.getTime()));
2465                        fireId = _dbType.insert(_psActorFireStart,
2466                            "actor_fire", "id");
2467                    }
2468
2469                }
2470                catch(SQLException e)
2471                {
2472                    String message = "Unable to record start of actor " +
2473                        "firing: " + e.getMessage();
2474                    _debugWrite(message);
2475                    throw new RecordingException(message);
2476                }
2477
2478                /*
2479                System.out.println("started firing " + fireId + ": " +
2480                        " for " + _getNameableFullName(actor) +
2481                        " at " + timestamp);
2482                 */
2483                
2484                if(_debugWriter != null)
2485                {
2486                    _debugWrite("INSERT INTO ACTOR_FIRE(" +
2487                        fs.getActorId() + ", " + _wfExecId + ", " +
2488                        "curTime)");
2489                }
2490
2491                fs.fireStart(type, fireId);
2492            }
2493            // see if current firing is end of iteration:
2494            else if(type == FiringEvent.AFTER_ITERATE ||
2495                (type == FiringEvent.AFTER_POSTFIRE &&
2496                lastStartType == FiringEvent.BEFORE_PREFIRE))
2497            {
2498                //_debug("end firing: " + type + " for " + _getNameableFullName(actor));
2499               
2500                Integer fireIdInteger;
2501                // NOTE: if the type is a AFTER_POSTFIRE and last start
2502                // type is BEFORE_PREFIRE, we are running in PN.
2503                // in this case, tell the fireState to stop firing using
2504                // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never
2505                // told the fireState about the BEFORE_POSTFIRE.
2506                if(type == FiringEvent.AFTER_POSTFIRE)
2507                {
2508                    fireIdInteger = fs.fireStop(FiringEvent.AFTER_PREFIRE);
2509                }
2510                else
2511                {
2512                    fireIdInteger = fs.fireStop(type);
2513                }
2514
2515                int fireId = RegEntity.UNKNOWN_ID;
2516                if(fireIdInteger != null)
2517                {
2518                    fireId = fireIdInteger;
2519                }
2520                
2521                int tries = 2;
2522                SQLException exception = null;
2523                while(tries > 0)
2524                {
2525                    synchronized(_psActorFireStop)
2526                    {
2527                        try
2528                        {
2529                            tries--;
2530                            exception = null;
2531                            _recordActorFireStop(fireId, actor, timestamp);
2532                            break;
2533                        }
2534                        catch(SQLException e)
2535                        {
2536                            exception = e;  
2537                        }
2538                    }
2539                    
2540                    if(exception != null && tries > 0)
2541                    {
2542                        _reconnectDatabase(false);
2543                    }
2544                }
2545                
2546                if(exception != null)
2547                {
2548                    throw new RecordingException("Unable to record " +
2549                            "end of actor firing: ", exception);                    
2550                }
2551            }
2552            /*
2553            else
2554            {
2555                _debug("ignoring firing: " + type + " for " +
2556                    _getNameableFullName(actor));
2557            }
2558            */
2559        }
2560    }
2561    
2562    /** Record the end of an actor fire and attempt to serialize the actor's
2563     *  state.
2564     */
2565    protected void _recordActorFireStop(int fireId, Actor actor, Date timestamp) throws SQLException
2566    {
2567        
2568        /*
2569        System.out.println("stop firing " + fireId + ": " +
2570                " for " + _getNameableFullName(actor) +
2571                " at " + timestamp);
2572         */
2573        
2574        synchronized(_psActorFireStop)
2575        {
2576            _psActorFireStop.setTimestamp(1, new Timestamp(timestamp.getTime()));
2577            _psActorFireStop.setInt(2, fireId);
2578            _psActorFireStop.executeUpdate();
2579        }
2580
2581        if(_debugWriter != null)
2582        {
2583            try {
2584                _debugWrite("UPDATE ACTOR_FIRE set end_time " +
2585                    "= curTime WHERE id = " + fireId + ")");
2586            } catch (RecordingException e) {
2587                System.out.println("Error writing to debug writer: " + e.getMessage());
2588            }
2589        }
2590        
2591        if(_stateSerializer != null)
2592        {
2593            byte[] state = _stateSerializer.serializeActor(actor);
2594            if(state != null)
2595            {
2596                /* XXX todo once state table is created
2597                PreparedStatement _psActorStateInsert = _dbType.getSQLInsert("actor_state", "id, state", "?, ?");
2598                synchronized(_psActorStateInsert)
2599                {
2600                    _psActorStateInsert.setInt(1, fireId);
2601                    _psActorStateInsert.setBytes(2, state);
2602                    _psActorStateInsert.executeUpdate();
2603                }
2604                */
2605                
2606                if(_debugWriter != null)
2607                {
2608                    try {
2609                        _debugWrite("INSERT INTO ACTOR_STATE(" + fireId + ", " + String.valueOf(state) + ")");
2610                    } catch (RecordingException e) {
2611                        System.out.println("Error writing to debug writer: " + e.getMessage());
2612                    }
2613                }
2614            }
2615        }
2616    }
2617
2618    /** Record a specific type of firing for an actor. */
2619    @Override
2620    protected void _recordPortEvent(IOPort port, int fireId, int rwfireId, 
2621        boolean isRead, int channel, Token token, IOPort destPort, Date timestamp)
2622        throws RecordingException
2623    {
2624        //if(isRead) _debug("port event read " + _getNameableFullName(port));
2625        //else _debug("port event write " + _getNameableFullName(port));
2626         
2627        RegEntity.EntityType type;
2628        
2629        if(port instanceof ParameterPort)
2630        {
2631            type = RegEntity.EntityType.PortParameter;
2632        }
2633        else
2634        {
2635            type = RegEntity.EntityType.Port;
2636        }
2637        
2638        RegEntity re = _checkEntity(port, type);
2639
2640        if(re == null)
2641        {
2642            throw new RecordingException("Port has not been registered: " +
2643                _getNameableFullName(port));
2644        }
2645
2646        // an actor may read or write a port in a non-fire method, e.g.,
2647        // SampleDelay. in this case, generate fake before and after fire
2648        // events since the firing id is a foreign key.
2649        boolean fakeFired = false;
2650        if(fireId == RegEntity.UNKNOWN_ID)
2651        {
2652            fakeFired = true;
2653            Actor actor = (Actor)port.getContainer();
2654            Director director = actor.getDirector();
2655            actorFire(new FiringEvent(director, actor, FiringEvent.BEFORE_ITERATE),
2656                    timestamp);
2657            
2658            // now get the firing id
2659            FireState<Integer> fs = _fireStateTable.get(actor);
2660            fireId = fs.getCurFireId();
2661          
2662            // XXX print a warning if the actor is not sampledelay
2663            if(! (actor instanceof SampleDelay))
2664            {
2665                _warn("actor port event in non-fire method: " + actor.getClass());
2666            }
2667            
2668        }
2669        
2670        // get the port id
2671        int portId = re.getId();
2672       
2673        String dataMD5Str = null;
2674        String fileMD5Str = "";
2675        String typeStr = null;
2676        String tokenValue = null;
2677
2678        int writeId = -1;
2679  
2680        // if this is a read event, retrieve the write even id that
2681        // produced this token
2682        if(isRead)
2683        {
2684            TokenInfo tokenInfo = _portConnector.getNextId(port, channel);
2685            writeId = tokenInfo.writeId;
2686            // NOTE: these are not necessary for reads since they are
2687            // in the row for the write event.
2688            //dataMD5Str = tokenInfo.dataMD5;
2689            //fileMD5Str = tokenInfo.fileMD5;
2690            //tokenValue = tokenInfo.tokenValue;
2691            //typeStr = token.getClass().getName();
2692        }
2693        else
2694        {
2695            typeStr = token.getClass().getName();
2696
2697            tokenValue = token.toString();
2698            
2699            // only checksum the data is larger than port_event.data column
2700            if(tokenValue.length() > MAX_PORT_EVENT_DATA_LENGTH) {
2701                dataMD5Str = _checkData(token.toString().getBytes());
2702            }
2703            
2704            if(_maxFileIncludeSizeKBVal > 0)
2705            {
2706                fileMD5Str = _checkForFileData(token);    
2707            }
2708        }
2709
2710        // insert the new row in port_event table.
2711        try
2712        {
2713            if(dataMD5Str == null) {
2714                synchronized(_psPortEventInsert)
2715                {
2716                    _psPortEventInsert.setInt(1, channel);
2717                    _psPortEventInsert.setString(2, tokenValue);
2718                    _psPortEventInsert.setInt(3, fireId);
2719                    _psPortEventInsert.setString(4, fileMD5Str);
2720                    _psPortEventInsert.setInt(5, portId);
2721                    _psPortEventInsert.setTimestamp(6, new Timestamp(timestamp.getTime()));
2722                    _psPortEventInsert.setString(7, typeStr);
2723                    _psPortEventInsert.setInt(8, writeId);
2724    
2725                    // save the value we wrote into the write_event_id column
2726                    // for debug writer.
2727                    int writeEventId = writeId;
2728    
2729                    writeId = _dbType.insert(_psPortEventInsert, "port_event", "id");
2730    
2731                    if(_debugWriter != null)
2732                    {
2733                        _debugWrite("INSERT INTO PORT_EVENT(" +
2734                            channel + ", " +
2735                            tokenValue + ", " +
2736                            fireId + ", " +
2737                            fileMD5Str + ", " +
2738                            portId + ", " +
2739                            "curTime, " +
2740                            typeStr + ", " +
2741                            writeEventId + ")");
2742                    }
2743                }
2744
2745            } else {
2746                synchronized(_psPortEventInsertWithChecksum)
2747                {
2748                    _psPortEventInsertWithChecksum.setInt(1, channel);
2749                    _psPortEventInsertWithChecksum.setString(2, dataMD5Str);
2750                    _psPortEventInsertWithChecksum.setInt(3, fireId);
2751                    _psPortEventInsertWithChecksum.setString(4, fileMD5Str);
2752                    _psPortEventInsertWithChecksum.setInt(5, portId);
2753                    _psPortEventInsertWithChecksum.setTimestamp(6, new Timestamp(timestamp.getTime()));
2754                    _psPortEventInsertWithChecksum.setString(7, typeStr);
2755                    _psPortEventInsertWithChecksum.setInt(8, writeId);
2756    
2757                    // save the value we wrote into the write_event_id column
2758                    // for debug writer.
2759                    int writeEventId = writeId;
2760    
2761                    writeId = _dbType.insert(_psPortEventInsertWithChecksum, "port_event",
2762                        "id");
2763    
2764                    if(_debugWriter != null)
2765                    {
2766                        _debugWrite("INSERT INTO PORT_EVENT(" +
2767                            channel + ", " +
2768                            dataMD5Str + ", " +
2769                            fireId + ", " +
2770                            fileMD5Str + ", " +
2771                            portId + ", " +
2772                            "curTime, " +
2773                            typeStr + ", " +
2774                            writeEventId + ")");
2775                    }
2776                }   
2777            }
2778            
2779            // if this is a write event, update the queues for each
2780            // of the receiving ports
2781            if(!isRead)
2782            {
2783                //_debug("updating any queues connected to " + _getNameableFullName(port));
2784                            
2785                TokenInfo tokenInfo = new TokenInfo(writeId, dataMD5Str,
2786                    fileMD5Str, tokenValue);
2787
2788                if(destPort == null)
2789                {
2790                    _portConnector.sendIdToConnections(port, channel,
2791                        tokenInfo);
2792                }
2793                else
2794                {
2795                    _portConnector.receiveId(destPort, channel, tokenInfo);
2796                }
2797            }
2798        }
2799        catch(SQLException e)
2800        {
2801            throw new RecordingException(_getExceptionMessage(e));
2802        }
2803
2804        if(fakeFired)
2805        {
2806            Actor actor = (Actor)port.getContainer();
2807            Director director = actor.getDirector();
2808            actorFire(new FiringEvent(director, actor, FiringEvent.AFTER_ITERATE));
2809        }
2810    }
2811
2812    /** Adds data to the data table if not already there.
2813     *  @return the md5 of the data
2814     */
2815    protected String _checkData(byte[] data) throws RecordingException
2816    {
2817        boolean truncated = false;
2818
2819        // see if data is too large
2820        if(data.length > _maxDataSize)
2821        {
2822            truncated = true;
2823        }
2824
2825        // compute checksum
2826        String retval = _getMD5(data);
2827
2828        boolean found = false;
2829
2830        try
2831        {
2832            // query table for checksum
2833            synchronized(_psDataMD5Query)
2834            {
2835                _psDataMD5Query.setString(1, retval);
2836                ResultSet result = _psDataMD5Query.executeQuery();
2837        
2838                if(result.next())
2839                {
2840                    found = true;
2841                }
2842        
2843                result.close();
2844            }
2845        }
2846        catch(SQLException e)
2847        {
2848            throw new RecordingException("Unable to query data table: " +
2849                e.getMessage());
2850        }
2851
2852        // if not found insert
2853        if(!found)
2854        {
2855            try
2856            {
2857                synchronized(_psDataInsert)
2858                {
2859                    _psDataInsert.setBytes(1, data);
2860                    _psDataInsert.setString(2, retval);
2861                    _psDataInsert.setBoolean(3, truncated);
2862                    _psDataInsert.executeUpdate();
2863                }
2864                
2865                if(_debugWriter != null)
2866                {
2867                    // both data and md5 can change depending on time, so don't output.
2868                    _debugWrite("INSERT INTO DATA(" +
2869                        "data, " +
2870                        "md5, " +
2871                        truncated + ")");
2872                }
2873            }
2874            catch(SQLException e)
2875            {
2876                throw new RecordingException("Unable to insert into data " +
2877                    "table: " + e.getMessage());
2878            }
2879        }
2880        return retval;
2881    }
2882
2883    /** Adds data to the data table if not already there.
2884     *  @return the md5 of the data
2885     */
2886    protected String _checkData(File file) throws RecordingException
2887    {
2888        String retval = "";
2889        int fileBytes = Long.valueOf(file.length()).intValue();
2890        FileInputStream stream = null;
2891
2892        try
2893        {
2894            try
2895            {
2896                stream = new FileInputStream(file);                    
2897            }
2898            catch(FileNotFoundException e)
2899            {
2900                throw new RecordingException("File " + file.getAbsolutePath() +
2901                " not found: ", e);
2902            }
2903
2904            byte[] data = new byte[fileBytes];
2905
2906            try
2907            {
2908                if(stream.read(data) != fileBytes)
2909                {
2910                    throw new RecordingException("Unable to read entire " +
2911                        "file: " + file.getAbsolutePath());
2912                }
2913            }
2914            catch(IOException e)
2915            {
2916                throw new RecordingException("Error reading file " +
2917                    file.getAbsolutePath() + ": ", e);
2918            }
2919                
2920            retval = _checkData(data); 
2921        }
2922        finally
2923        {
2924            if(stream != null)
2925            {
2926                try
2927                {
2928                    stream.close();
2929                }
2930                catch(IOException e)
2931                {
2932                    throw new RecordingException("Error closing file " +
2933                        file.getAbsolutePath() + ": ", e);
2934                }
2935            }
2936        }
2937        
2938        return retval;
2939    }
2940
2941    /** See if a token contains a valid filename.
2942     *  @return If token contains a valid filename, and its size is less than
2943     *  minFileIncludeKB, then returns the MD5 of the file's contents.
2944     */
2945    protected String _checkForFileData(Token token) throws RecordingException
2946    {
2947        String retval = "";
2948
2949        ArrayToken arrayToken = null;
2950        RecordToken recordToken = null;
2951
2952        if(token instanceof StringToken)
2953        {
2954            String possibleFilePath = ((StringToken)token).stringValue();
2955            File possibleFile = new File(possibleFilePath);
2956
2957            //_debug("seeing if file: " + possibleFile.getAbsolutePath());
2958
2959            int fileBytes = Long.valueOf(possibleFile.length()).intValue();
2960            float fileKiloBytes = 0;
2961            if(fileBytes > 1023)
2962            {
2963                fileKiloBytes = Float.valueOf(fileBytes) / 1024;
2964            }
2965
2966            // see if file is a file (not a directory), is readable, 
2967            // and is less than the max size.
2968            if(possibleFile.isFile() && possibleFile.canRead() &&
2969                fileKiloBytes <= _maxFileIncludeSizeKBVal)
2970            {
2971                /*
2972                _debug("    file is readable and under max (size = " +
2973                    fileKiloBytes + " max = " + _maxFileIncludeSizeKBVal +
2974                    "); going to include");
2975                */
2976
2977                retval = _checkData(possibleFile);
2978            }
2979        }
2980        //if ArrayToken or RecordToken recurse to check elements
2981        else if (token instanceof ArrayToken){
2982                arrayToken = ((ArrayToken)token);
2983                for (int i=0; i<arrayToken.length(); i++){
2984                        final Token aToken = arrayToken.getElement(i);
2985                        if (!aToken.isNil()){
2986                        _checkForFileData(aToken);
2987                        }
2988                }
2989        }
2990        else if (token instanceof RecordToken){
2991                recordToken = ((RecordToken)token);
2992                Iterator<?> recordTokenItr = recordToken.labelSet().iterator();
2993                while (recordTokenItr.hasNext()){
2994                        final String label = (String)recordTokenItr.next();
2995                        final Token aToken = recordToken.get(label);
2996                        if (!aToken.isNil()){
2997                                _checkForFileData(aToken);
2998                        }
2999                }
3000        }
3001
3002        return retval;
3003    }
3004
3005    /** Reset when we use a different db connection. */
3006    @Override
3007    protected void _dbReset() throws RecordingException
3008    {
3009        _wfLSID = _containerLSID;
3010        super._dbReset();
3011    }
3012
3013    /** Reset when we use a different workflow. */
3014    @Override
3015    protected void _wfReset()
3016    {
3017        _wfLastExecId = RegEntity.UNKNOWN_ID;
3018        super._wfReset();
3019    }
3020
3021    /** Add all the parameter ids to the parameter_exec table. */
3022    protected void _updateParameterExecTable() throws RecordingException
3023    {
3024        // XXX synchronize access to _entityCacheTable
3025        for(Map.Entry<Nameable,RegEntity> entry : _entityCacheTable.entrySet())
3026        {
3027            Nameable nameable = entry.getKey();
3028            RegEntity entity = entry.getValue();
3029            RegEntity.EntityType type = entity.getType();
3030            
3031            // add to parameter exec table if a parameter or unconnected
3032            // port parameter. don't add connected port parameters since
3033            // their values come from the port.
3034            if(type == RegEntity.EntityType.Parameter ||
3035                ((nameable instanceof PortParameter) &&
3036                ((PortParameter)nameable).getPort().numberOfSources() == 0))
3037            {
3038                try
3039                {                    
3040                    _psParameterExecInsert.setInt(1, entity.getId());
3041                    _psParameterExecInsert.setInt(2, _wfExecId);
3042                    
3043                    _psParameterExecInsert.executeUpdate();
3044  
3045                    if(_debugWriter != null)
3046                    {
3047                        // both data and md5 can change depending on time, so don't output.
3048                        /* XXX
3049                        _debugWrite("INSERT INTO PARAMETER_EXEC(" +
3050                            entity.getId() + ", " + _wfExecId + ")");
3051                        */
3052                    }
3053                }
3054                catch(SQLException e)
3055                {
3056                    throw new RecordingException("Error updating parameter_exec table: ", e);
3057                }
3058            }
3059        }
3060    }
3061    
3062    ////////////////////////////////////////////////////////////////////////
3063    //// protected classes                                              ////
3064
3065    /** The class used in the port queues to contain the port event write id
3066     *  and data md5.
3067     */
3068    protected static class TokenInfo 
3069    {
3070        /** Construct a new PortQueueEntry for a write id and md5 of the
3071         *  data.
3072         */
3073        TokenInfo(int writeId, String dataMD5, String fileMD5, String tokenValue)
3074        {
3075            this.writeId = writeId;
3076            this.dataMD5 = dataMD5;
3077            this.fileMD5 = fileMD5;
3078            this.tokenValue = tokenValue;
3079        }
3080        
3081        /** Get a string representation. */
3082        @Override
3083        public String toString()
3084        {
3085            return "writeId = " + writeId; // + " data md5 = " + _dataMD5; 
3086        }
3087
3088        /** The MD5 checksum. */
3089        public String dataMD5;
3090
3091        /** The file MD5 checksum. */
3092        public String fileMD5;
3093
3094        /** The port event write id. */
3095        public int writeId;
3096        
3097        public String tokenValue;
3098    }
3099
3100    /** A subclass of SQLRecordingParameters that adds more parameters. */
3101    protected static class SQLRecordingParametersV8 
3102        extends SQLRecordingParameters
3103    {
3104        /** Construct a new SQLRecordParametersV8. */
3105        protected SQLRecordingParametersV8(NamedObj no)
3106            throws IllegalActionException, NameDuplicationException
3107        {
3108            super(no);
3109            addIntParameter(maxFileIncludeSizeKBStr, _maxFileIncludeSizeKBDefaultVal);
3110            addStringParameter(execAnnotationStr);
3111            addStringParameter(nextExecLSIDStr);
3112            addBooleanParameter(watchForLSIDChangesStr, true);
3113        }
3114
3115        /** Name of parameter that specifies maximum size, in KB, of file
3116         *  to include in provenance.
3117         */
3118        protected static final String maxFileIncludeSizeKBStr =
3119            "maxFileInclusionSizeKB";
3120
3121        /** Name of execution annotation parameter. */
3122        protected static final String execAnnotationStr = "execAnnotation";
3123        
3124        /** Name of next execution LSID parameter. */
3125        protected static final String nextExecLSIDStr = "nextExecLSID";
3126        
3127        /** Name of watch for LSID changes parameter. */
3128        protected static final String watchForLSIDChangesStr = "watchForLSIDChanges";
3129    }
3130
3131    ////////////////////////////////////////////////////////////////////////
3132    //// protected variables                                            ////
3133
3134    /** Class for token dependencies. */
3135    protected PortConnector<TokenInfo> _portConnector;
3136
3137    // prepared statements
3138    protected PreparedStatement _psDataInsert;
3139    protected PreparedStatement _psDataMD5Query;
3140    protected PreparedStatement _psPortEventInsert;
3141    protected PreparedStatement _psPortEventInsertWithChecksum;
3142    protected PreparedStatement _psChangeWorkflowName;
3143    protected PreparedStatement _psChangeExecutionLSID;
3144    protected PreparedStatement _psChangeExecutionReferralList;
3145    protected PreparedStatement _psChangeExecutionType;
3146    protected PreparedStatement _psAssocDataInsert;
3147    protected PreparedStatement _psAssocDataInsertNoDataId;
3148    protected PreparedStatement _psErrorInsert;
3149    protected PreparedStatement _psDeleteExecutions;
3150    protected PreparedStatement _psDeleteData;
3151    protected PreparedStatement _psGetWfContentsIdsToDeleteQuery;
3152    protected PreparedStatement _psGetWfContentsIdsToNotDeleteQuery;
3153    protected PreparedStatement _psGetAssociatedDataDataIdsToDeleteQuery;
3154    protected PreparedStatement _psGetAssociatedDataDataIdsToNotDeleteQuery;
3155    protected PreparedStatement _psGetPortEventDataIdsToDeleteQuery;
3156    protected PreparedStatement _psGetPortEventDataIdsToNotDeleteQuery;
3157    protected PreparedStatement _psGetTagForExecIdAndURN;
3158    protected PreparedStatement _psTagInsert;
3159    protected PreparedStatement _psDeleteTagForExecIdAndURN;
3160    protected PreparedStatement _psDeleteTags;
3161    protected PreparedStatement _psParameterExecInsert;
3162    protected PreparedStatement _psDeleteWorkflowsForNoExecutions;
3163    protected PreparedStatement _psWorkflowIdsForNoExecutions;
3164    
3165    /** Queryable connection. */
3166    protected Queryable _queryable;
3167    
3168    /** The most recently completed workflow execution id. */
3169    protected int _wfLastExecId;
3170
3171    /** Value of execution annotation parameter. */
3172    protected String _execAnnotation;
3173
3174    /** Value of next execution LSID parameter. */
3175    protected String _nextExecLSIDStr;
3176        
3177    /** The current workflow id. */
3178    protected KeplerLSID _wfLSID;
3179
3180    /** True if executionError is called */
3181    protected boolean _executionHadAnError;
3182
3183    protected Map<KeplerLSID, Integer> _executionLSIDtoIdMap;
3184    
3185    private static final int numMillisecondsInASecond = 1000;
3186
3187    ////////////////////////////////////////////////////////////////////////
3188    //// private methods                                                ////
3189        
3190    /** Compute an MD5 checksum from a byte array. */
3191    private String _getMD5(byte[] data) throws RecordingException
3192    {
3193        String retval = null;
3194        try
3195        {
3196            MessageDigest digest = MessageDigest.getInstance("MD5");
3197            digest.update(data);
3198            byte[] md5sum = digest.digest();
3199         
3200            //XXX necessary?
3201            BigInteger bigInt = new BigInteger(1, md5sum);
3202            retval = bigInt.toString(16);
3203                    
3204            // make sure it's 32 chars
3205            while(retval.length() < 32)
3206            {
3207                retval = "0" + retval;
3208            }
3209        }
3210        catch (NoSuchAlgorithmException e)
3211        {
3212            throw new RecordingException("MD5 digest does not exist.");
3213        }
3214        return retval;    
3215    }
3216  
3217    ////////////////////////////////////////////////////////////////////////
3218    //// private variables                                              ////
3219
3220    /** Maximum size, in kilobytes, of files to include in provenance. */
3221    private int _maxFileIncludeSizeKBVal;
3222
3223    /** The default value for maximum size of file for inclusion. */
3224    private final static int _maxFileIncludeSizeKBDefaultVal = 1024;
3225
3226    /** A synchronized collection of instances of this class. */
3227    private static Set<WeakReference<SQLRecordingV8>> _v8Set = 
3228        Collections.synchronizedSet(new HashSet<WeakReference<SQLRecordingV8>>());
3229
3230    /** If true, when the workflow LSID changes, reconnect to workflow. */
3231    private boolean _watchForLSIDChanges;
3232    
3233    /** The maximum length of the data stored in the port_event table.
3234     *  Larger data is stored in the data table.
3235     */
3236    public final static int MAX_PORT_EVENT_DATA_LENGTH = 4096;
3237    
3238}