001/* Play a Recording from a Queryable.
002 *
003 * Copyright (c) 2015 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2017-07-07 16:27:30 +0000 (Fri, 07 Jul 2017) $' 
008 * '$Revision: 34587 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.provenance;
031
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Collections;
035import java.util.Comparator;
036import java.util.Date;
037import java.util.HashMap;
038import java.util.HashSet;
039import java.util.Iterator;
040import java.util.LinkedList;
041import java.util.List;
042import java.util.Map;
043import java.util.Set;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Executors;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049
050import javax.swing.SwingUtilities;
051
052import org.kepler.objectmanager.lsid.KeplerLSID;
053
054import ptolemy.actor.Actor;
055import ptolemy.actor.AtomicActor;
056import ptolemy.actor.CompositeActor;
057import ptolemy.actor.Director;
058import ptolemy.actor.FiringEvent;
059import ptolemy.actor.IOPort;
060import ptolemy.actor.IOPortEvent;
061import ptolemy.actor.IOPortEventListener;
062import ptolemy.actor.IORelation;
063import ptolemy.actor.Manager;
064import ptolemy.actor.NoRoomException;
065import ptolemy.actor.Receiver;
066import ptolemy.actor.TypedIOPort;
067import ptolemy.actor.gui.Configuration;
068import ptolemy.actor.gui.Effigy;
069import ptolemy.actor.gui.ModelDirectory;
070import ptolemy.actor.gui.PtolemyEffigy;
071import ptolemy.actor.parameters.PortParameter;
072import ptolemy.data.Token;
073import ptolemy.data.expr.Parameter;
074import ptolemy.domains.pn.kernel.PNDirector;
075import ptolemy.kernel.CompositeEntity;
076import ptolemy.kernel.Entity;
077import ptolemy.kernel.Relation;
078import ptolemy.kernel.util.Attribute;
079import ptolemy.kernel.util.IllegalActionException;
080import ptolemy.kernel.util.NameDuplicationException;
081import ptolemy.kernel.util.NamedObj;
082import ptolemy.vergil.basic.KeplerDocumentationAttribute;
083import ptolemy.vergil.kernel.attributes.TextAttribute;
084
085/** A class to convert provenance formats by reading from a
086 *  a Queryable and writing to a Recording.
087 * 
088 *  @see Queryable
089 *  @see Recording
090 *  
091 *  @author Daniel Crawl
092 *  @version $Id: RecordPlayer.java 34587 2017-07-07 16:27:30Z crawl $
093 * 
094 */
095public class RecordPlayer {
096
097    /** Create a new RecordPlayer using a Queryable as the source
098     *  and no Recording output. This is useful if actors within
099     *  workflow will be re-executed.
100     *  @see RecordPlayer#setExecuteGraphicalActors
101     */
102    public RecordPlayer(Queryable queryable) {
103        this(queryable, null);
104    }
105    
106    /** Create a new RecordPlayer using a Queryable as the source,
107     *  and Recording for the output.
108     */
109    public RecordPlayer(Queryable queryable, Recording recording) {
110        _queryable = queryable;
111        _recording = recording;
112    }
113            
114    /** Close any effigies created while replaying workflows. */
115    public static void closeAllEffigies() throws IllegalActionException, NameDuplicationException {
116        synchronized(_effigies) {
117            for(Effigy effigy : _effigies.values()) {
118                effigy.closeTableaux();
119                effigy.setContainer(null);
120            }
121        }
122    }
123    
124    /** Re-execute an actor.
125     * @param className the class name of the actor to re-execute.
126     */
127    public void executeActor(String className) {
128        _executeActorNames.add(className);
129    }
130    
131    /** Get the queryable. */
132    public Queryable getQueryable() {
133        return _queryable;
134    }
135    
136    /** Play a specific execution. */
137    public void play(Integer execId) throws QueryException, RecordingException {
138
139        CompositeActor workflow;
140        try {
141            workflow = (CompositeActor) _queryable.getWorkflowForExecution(execId);
142        } catch (Exception e) {
143            throw new RecordingException("Error parsing workflow.", e);
144        }
145
146        play(execId, workflow);
147    }
148    
149    /** Play a specific execution for a workflow. */
150    public void play(Integer execId, final CompositeActor workflow) throws QueryException, RecordingException {
151        
152        // port events must be played if executing actors.
153        if(!_playPortEvents && 
154                (!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty())) {
155            throw new QueryException("Must play port events if executing actors.");
156        }
157        
158        _workflow = workflow;
159        
160        KeplerLSID execLSID = _queryable.getExecutionLSIDForExecution(execId);
161        if(execLSID == null) {
162            throw new RecordingException("Could not get execution LSID.");
163        }
164
165        // NOTE: if the workflow has never been executed, we 
166        // create receivers in all the actors so that if the
167        // output Recording is using a PortConnector, then
168        // PortConnector.sendIdToConnections() will find all
169        // the connected ports. 
170       
171        Manager manager = workflow.getManager();
172        if(manager == null) {
173            try {
174                manager = new Manager(workflow.workspace(), "_Manager");
175               workflow.setManager(manager);
176            } catch (IllegalActionException e) {
177                throw new QueryException("Error setting manager.", e);
178            }
179        }
180        if(manager.getPreinitializeVersion() != workflow.workspace().getVersion()) {
181            
182            // when creating receivers, an exception can sometimes occur due
183            // to not being able to uniquely infer the width of relations.
184            // to avoid this, create a top level parameter _defaultInferredWidthTo1
185            // with value to true.
186            // see ptolemy.actor.RelationWidthInference
187            
188            Attribute defaultWidth = workflow.getAttribute("_defaultInferredWidthTo1");
189            if(defaultWidth == null) {
190                try {
191                    Parameter defaultWidthParameter = new Parameter(workflow, "_defaultInferredWidthTo1");
192                    defaultWidthParameter.setToken("true");
193                    defaultWidthParameter.setPersistent(false);
194                } catch(IllegalActionException | NameDuplicationException e) {
195                    throw new QueryException("Error creating parameter to set default relation width.", e);
196                }
197            }
198            
199            List<Actor> actorList = workflow.entityList(Actor.class);
200            while(!actorList.isEmpty()) {
201                Actor actor = actorList.remove(0);
202                // create receivers in atomic actors, and opaque composite actors.
203                if(actor instanceof AtomicActor ||
204                    (actor instanceof CompositeActor && ((CompositeActor)actor).isOpaque())) {
205                    try {
206                       //System.out.println("creating receivers for " + actor);
207                       actor.createReceivers();
208                    } catch (IllegalActionException e) {
209                       throw new QueryException("Error creating receivers for " +
210                           actor.getFullName(), e);
211                   }
212               }
213               // if the actor is a composite actor, add any contained
214               // actors so we create their receivers.
215               if(actor instanceof CompositeActor) {
216                   actorList.addAll(((CompositeActor)actor).entityList(Actor.class));
217                }
218            }
219        }
220        
221        Date[] workflowExecutionTimestamps = null;
222        
223        List<ProvenanceRecorder> recorders = _workflow.attributeList(ProvenanceRecorder.class);
224        if(recorders.isEmpty()) {
225            Map<String,String> defaults = ProvenanceRecorder.getDefaultsMap();
226            try {
227                _recorder = new ProvenanceRecorder(_workflow, defaults.get("Provenance Recorder Name"));
228            } catch (IllegalActionException | NameDuplicationException e) {
229                throw new QueryException("Error creating default provenance recorder in workflow.", e);
230            }
231        } else {
232            _recorder = recorders.get(0);
233        }
234
235        /*
236        try {
237            _recorder.recordingType.setToken(ProvenanceRecorder.DEFAULT_REC_TYPE_IGNORE);
238        } catch (IllegalActionException e) {
239            throw new RecordingException("Error turning off provenance recording.", e);
240        }
241        */
242
243        if(_recording != null) {
244
245            // NOTE: must add the recording to the recorder since the
246            // recorder access the recorder in _recorderContainer.
247            _recorder.addPiggyback(_recording);
248                    
249            // TODO set the user and machine names, tags, etc.        
250            
251            if(_recording.regContents()) {
252                
253                // specification start
254                _recording.specificationStart();
255            
256                // reg contents
257                _recordContainerContents(_workflow);
258                
259                // specification stop
260                _recording.specificationStop();
261            }
262            
263            // TODO play evolutions
264            
265            workflowExecutionTimestamps = _queryable.getTimestampsForExecution(execId);
266    
267            // execution start
268            if(workflowExecutionTimestamps == null || workflowExecutionTimestamps.length != 2 ||
269                    workflowExecutionTimestamps[0] == null) {
270                System.err.println("WARNING: could not query start/stop timestamps for execution.");
271                _recording.executionStart(execLSID);
272            } else {
273                _recording.executionStart(execLSID, workflowExecutionTimestamps[0]);
274            }
275        }
276        
277        boolean createdEffigies = false;
278        
279        if(!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty()) {
280
281            if(_recorder != null) {
282                workflow.removeInitializable(_recorder);
283            }
284            
285            _createEffigyIfNecessary(execId);
286            createdEffigies = true;
287            
288            try {
289                workflow.preinitialize();
290            } catch (IllegalActionException e) {
291                System.err.println("Error preinitializing the workflow: " + e.getMessage());
292            }
293            
294            try {
295                workflow.initialize();
296            } catch (IllegalActionException e) {
297                System.err.println("Error initializing the workflow: " + e.getMessage());
298            }
299        }
300        
301        try {
302            // replay the execution events (actor fires and port read/writes)
303            _replayExecution(execId, execLSID);
304            
305            if(!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty()) {
306                
307                // call wrapup without a timeout of 5 seconds since
308                // NondeterministicMerge can block in wrapup.
309                ExecutorService service = Executors.newSingleThreadExecutor();
310                try {
311                    service.submit(new Runnable() {
312                        @Override
313                        public void run() {
314                            try {
315                                workflow.wrapup();
316                            } catch (IllegalActionException e) {
317                                System.err.println("Error wrapping-up the workflow: " + e.getMessage());
318                            }
319                        }
320                    }).get(5000, TimeUnit.MILLISECONDS);
321                } catch (InterruptedException | ExecutionException | TimeoutException e) {
322                    System.err.println("WARNING: Timeout wrapping up replay.");
323                } finally {
324                    service.shutdownNow();
325                }                
326            }
327            
328            if(_recording != null) {
329                // execution stop
330                if(workflowExecutionTimestamps == null || workflowExecutionTimestamps[1] == null) {
331                    _recording.executionStop(execLSID);
332                } else {
333                    _recording.executionStop(execLSID, workflowExecutionTimestamps[1]);
334                }
335            
336                // see if there was an error.
337                if(_queryable.isErrorForExecution(execLSID)) {
338        
339                    String errorStr = _queryable.getErrorForExecution(execLSID);
340                    
341                    // TODO need to get the error source and throwable
342                    Throwable throwable = new Throwable(errorStr);
343                    _recording.executionError(_workflow, throwable, execLSID);
344                }
345            }
346        } finally {
347            // if we are replaying actors, set any created effigies to
348            // be not modified. this prevents a 'Save Changes?' dialog from
349            // appearing when closing windows created by replayed actors.
350            if(createdEffigies) {
351                final Effigy effigy = _effigies.get(execId);
352                if(effigy != null) {
353                    // NOTE: perform this in the event thread in case there
354                    // are effigies about to be created (new windows).
355                    SwingUtilities.invokeLater(new Runnable() {
356                        @Override
357                        public void run() {
358                            _setEffigiesToNotModified(effigy);
359                        }
360                    });
361                }
362            }
363        }
364    }
365    
366    /** Execute graphical actors in workflow. */
367    public void setExecuteGraphicalActors(boolean execute) {
368        
369        // TODO find a list of graphical actors
370        
371        if(execute) {
372            _executeActorPackages.add("ptolemy.actor.lib.gui");            
373            _executeActorPackages.add("org.kepler.actor.gui");
374            _executeActorNames.add("pl.psnc.kepler.common.actor.MultipleTabDisplay");
375        } else {           
376            _executeActorPackages.remove("ptolemy.actor.lib.gui");            
377            _executeActorPackages.remove("org.kepler.actor.gui");            
378            _executeActorNames.remove("pl.psnc.kepler.common.actor.MultipleTabDisplay");
379        }
380    }
381    
382    /** Set if port events are to be played. */
383    public void setPlayPortEvents(boolean playPortEvents) {
384        _playPortEvents = playPortEvents;
385    }
386
387    ///////////////////////////////////////////////////////////////////
388    ////                       private methods                     ////
389
390    /** Create an effigy for a workflow execution if one does not already exist. */
391    private void _createEffigyIfNecessary(Integer execId) {
392        
393        synchronized(_effigies) {
394        
395            PtolemyEffigy effigy = _effigies.remove(execId);
396            if(effigy != null) {
397                //System.out.println("found existing cached effigy for run " + execId + "; closing open tableaux");
398                // close existing windows opened.
399                effigy.closeTableaux();
400                effigy.setModel(null);
401                try {
402                    effigy.setContainer(null);
403                    //System.out.println("removing effigy " + effigy.getFullName());
404                } catch (IllegalActionException | NameDuplicationException e) {
405                    System.err.println("Error removing effigy: " + e.getMessage());
406                }
407            }
408            
409            // see if the Configuration has an effigy for the workflow.
410            effigy = (PtolemyEffigy) Configuration.findEffigy(_workflow);
411            if(effigy != null) {
412                // close existing windows opened.
413                effigy.closeTableaux();
414                effigy.setModel(null);
415                try {
416                    effigy.setContainer(null);
417                    //System.out.println("removing effigy " + effigy.getFullName());
418                } catch (IllegalActionException | NameDuplicationException e) {
419                    System.err.println("Error removing effigy: " + e.getMessage());
420                }
421            }
422                        
423            // make sure there is at least one Configuration. there are none when
424            // running headless.
425            if(!Configuration.configurations().isEmpty()) {
426                ptolemy.actor.gui.Configuration configuration = 
427                        (ptolemy.actor.gui.Configuration) Configuration.configurations().iterator().next();
428
429                try {
430                    ModelDirectory directory = configuration.getDirectory();
431                    //System.out.println("Creating effigy for " +
432                        //_workflow.getName() + " in " + directory.getFullName());
433                    
434                    effigy = new PtolemyEffigy(directory,
435                            directory.uniqueName(_workflow.getName()));
436                    //System.out.println("created effigy " + effigy.getFullName());
437                    effigy.setModel(_workflow);                                                
438                    effigy.identifier.setExpression(effigy.getName());
439                } catch(IllegalActionException | NameDuplicationException e) {
440                    System.err.println("Error creating effigy: " + e.getMessage());
441                    return;
442                }
443            }
444            
445            if(effigy != null) {
446                _effigies.put(execId, effigy);
447            }
448        }
449    }
450    
451    /** Output the model contents.
452     *  TODO copied from ProvenanceRecorder
453     */ 
454    private void _recordContainerContents(NamedObj namedObj)
455        throws RecordingException
456    {
457        // whether to recurse based on Recording register method.
458        boolean recurse = true;
459                
460        if(namedObj instanceof Actor)
461        {            
462            recurse = _recording.regActor((Actor)namedObj);
463        }
464        else if(namedObj instanceof Director)
465        {
466            recurse = _recording.regDirector((Director)namedObj);       
467        }
468        else if(namedObj instanceof TypedIOPort)
469        {
470            TypedIOPort port = (TypedIOPort)namedObj;
471            _recording.regPort(port);
472            
473            // do not record information about attributes contained by the port
474            recurse = false;
475        }
476        else if(namedObj instanceof IORelation)
477        {
478            IORelation rel = (IORelation)namedObj;
479            recurse = _recording.regRelation(rel);
480        }
481        else if(namedObj != _recorder &&
482            // NOTE: do not register a connected port parameter as a parameter
483            // since during execution, reading a token by the port generates
484            // both port and value change events. value change events are
485            // treated as a workflow evolution, which can be expensive.
486            (!(namedObj instanceof PortParameter) || 
487            ((PortParameter)namedObj).getPort().numberOfSources() == 0))
488        {
489            recurse = _recording.regParameter(namedObj);
490        }
491        else // namedObj == _recorder
492        {
493            recurse = false;
494        }
495
496        if(recurse)
497        {
498            // recursively register contents of the current namedObj
499            List<NamedObj> contentsList = new LinkedList<NamedObj>();
500
501            boolean isPLOpaque = false;
502
503            if(namedObj instanceof CompositeEntity)
504            {
505                CompositeEntity composite = (CompositeEntity)namedObj;
506                contentsList.addAll(composite.entityList());
507
508                // NOTE: we add the relations last since for each one,
509                // regLink() is called and both ends of the link must be
510                // registered first.
511                contentsList.addAll(composite.relationList());
512
513                // check to see if named obj contains an instance of
514                // provenance listener
515                List<?> provs = namedObj.attributeList(ProvenanceRecorder.class);
516                if(provs != null && provs.size() > 0 && namedObj != namedObj.toplevel())
517                {
518                    isPLOpaque = true;
519                }
520                else if((namedObj instanceof CompositeActor) &&
521                    !ProvenanceRecorder.containsSupportedDirector((CompositeActor)namedObj))
522                {
523                    isPLOpaque = true;
524                }
525                
526            }
527
528            // add the ports to the beginning
529            if(namedObj instanceof Entity)
530            {
531                contentsList.addAll(0, ((Entity<?>)namedObj).portList());
532            }
533
534            // add the attributes to the beginning
535            contentsList.addAll(0, namedObj.attributeList());
536
537            //System.out.println("is pl opaque: " + isPLOpaque);
538            
539            for(NamedObj containedNamedObj : contentsList)
540            {
541                if(isPLOpaque &&
542                    // do not record contents of these types of objects:
543                    ((containedNamedObj instanceof Actor) ||
544                     (containedNamedObj instanceof Director) ||
545                     (containedNamedObj instanceof IORelation) ||
546                     (containedNamedObj instanceof TextAttribute) ||
547                     (containedNamedObj instanceof ProvenanceRecorder) ||
548                     (containedNamedObj instanceof
549                        KeplerDocumentationAttribute)))
550                {
551                    //_Debug("not recursing down " +
552                        //containedNamedObj.getFullName());
553                }
554                else
555                {
556                    _recordContainerContents(containedNamedObj);
557                }
558            }
559
560            // now that all the ports and relations have been registered
561            // (if namedObj is a CompositeEntity), it is safe to register
562            // the links (since each end point must already registered).
563
564            if(!isPLOpaque && (namedObj instanceof CompositeEntity))
565            {
566                // iterate over the contained relations
567                List<?> relationList = ((CompositeEntity)namedObj).relationList();
568                Iterator<?> relationIter = relationList.iterator();
569                while(relationIter.hasNext())
570                {
571                    Relation curRelation = (Relation)relationIter.next();
572
573                    ///if(!adding)
574                    //{
575                        //containedName = oldName + "." +
576                            //curRelation.getName();
577                    //}
578                    _recordLinksInRelation(curRelation);
579                }
580            }
581        }
582    }
583
584    /** Record all the links within a relation.
585     *  TODO copied from ProvenanceRecorder
586     */
587    private void _recordLinksInRelation(Relation relation)
588        throws RecordingException
589    {
590        // iterate over the object linked in the current relation
591        List<?> linkedObjList = relation.linkedObjectsList();
592        Iterator<?> linkedObjsIter = linkedObjList.iterator();
593        while(linkedObjsIter.hasNext())
594        {
595            NamedObj linkedObj = (NamedObj)linkedObjsIter.next();
596
597            // see if current relation is linked to a port
598            if(linkedObj instanceof TypedIOPort)
599            {
600                _recording.regLink(linkedObj, relation);                 
601            }
602            // otherwise the link is between two relations.
603            // we do not want to register this link twice,
604            // e.g., a <--> b and b <--> a
605            // so only perform registration if the current
606            // relation's name lexigraphically precedes the
607            // linked relation's name.
608            else if(relation.getName().compareTo(
609                    linkedObj.getName()) < 0)
610            {
611                _recording.regLink(relation, linkedObj);                
612            }
613        }
614    }
615    
616    /** Replay a workflow execution. */
617    private void _replayExecution(int execId, KeplerLSID execLSID) throws QueryException, RecordingException {        
618        
619        List<Event> events = new ArrayList<Event>();
620        
621        long startNano = System.nanoTime();
622        
623        // collect the provenance events to replay. this consists of
624        // firing start, firing stop, port read, and port write events.
625        
626        System.out.print("Replaying..collecting:..");
627        
628        // collect the firings
629        int eventCount = 0;
630        List<Integer> firings = _queryable.getActorFirings(execId);
631        for(Integer firingId : firings) {
632                        
633            Date[] timestamps = _queryable.getTimestampsForActorFiring(firingId);
634            if(timestamps == null || timestamps.length != 2 ||
635                timestamps[0] == null || timestamps[1] == null) {
636                System.err.println("WARNING: missing timestamps for firing: " + firingId);
637                continue;
638            }
639            
640            // add the firing start
641            Event event = new Event(EventType.FiringStart, timestamps[0]);
642            event.firingId = firingId;
643            events.add(event);
644            
645            eventCount++;
646            if(eventCount % 10000 == 0) {
647                System.out.print(eventCount + "..");
648            }
649
650            // add the firing stop
651            event = new Event(EventType.FiringStop, timestamps[1]);            
652            event.firingId = firingId;
653            events.add(event);
654            
655            eventCount++;
656            if(eventCount % 10000 == 0) {
657                System.out.print(eventCount + "..");
658            }
659
660        }
661        
662        // collect the port events
663        if(_playPortEvents) {
664            List<Integer> tokens = _queryable.getTokensForExecution(execId, false);
665            for(Integer tokenId : tokens) {
666                
667                // add the port write for this token
668                Date writeTimestamp = _queryable.getTimestampForTokenWrite(tokenId);
669                if(writeTimestamp == null) {
670                    System.err.println("WARNING: missing timestamp for token write: " + tokenId);
671                    continue;
672                }
673                
674                Event event = new Event(EventType.PortWrite, writeTimestamp);
675                event.portEventId = tokenId;
676    
677                List<Integer> writeFirings = _queryable.getActorFiringForToken(tokenId, false);
678                event.firingId = writeFirings.get(0);
679                
680                events.add(event);
681    
682                eventCount++;
683                if(eventCount % 10000 == 0) {
684                    System.out.print(eventCount + "..");
685                }
686    
687                // add any port reads for this token
688                Date[] timestamps = _queryable.getTimestampsForTokenRead(tokenId);
689                if(timestamps != null && timestamps.length > 0) {
690                    List<Integer> readFirings = _queryable.getActorFiringForToken(tokenId, true);
691                    if(timestamps.length != readFirings.size()) {
692                        System.err.println("WARNING: number of timestamps for token read " +
693                                "does not match number of firings for token read.");
694                        continue;
695                    }
696                    Integer[] readFiringArray = readFirings.toArray(
697                        new Integer[readFirings.size()]);
698                    Arrays.sort(readFiringArray);
699                    
700                    int i = 0;
701                    for(Date readTimestamp : timestamps) {
702                        event = new Event(EventType.PortRead, readTimestamp);
703                        event.portEventId = tokenId;
704                        event.firingId = readFiringArray[i];
705                        i++;
706                        events.add(event);
707                        
708                        eventCount++;
709                        if(eventCount % 10000 == 0) {
710                            System.out.print(eventCount + "..");
711                        }
712                    }
713                }            
714            }
715        }
716        
717        // sort the events.
718        // we perform 3 sorts to get the events in order that:
719        //
720        // 1. the events are sorted chronologically (primary key)
721        //
722        // 2. events can occur at the same time, so sort on
723        // firing id (secondary key).
724        //
725        // 3. events can occur at the same time and with the
726        // same firing id, e.g., a port write and firing stop,
727        // so sort on event type order (tertiary key).
728        //
729        // the 3 sorts are performed sorting the least-significant key
730        // first: tertiary key (event type), secondary key (firing id),
731        // and primary key (event timestamp).
732        
733        System.out.print("sorting..");
734        
735        if(_playPortEvents) {
736            Collections.sort(events, new Comparator<Event>() {
737                @Override
738                public int compare(Event o1, Event o2) {
739                    return o1.type.compareTo(o2.type);
740                }
741            });        
742        }
743        
744        //System.out.print("sort2..");
745
746        Collections.sort(events, new Comparator<Event>() {
747            @Override
748            public int compare(Event o1, Event o2) {
749                return o1.firingId.compareTo(o2.firingId);
750            }
751        });
752
753        //System.out.print("sort3..");
754
755        Collections.sort(events, new Comparator<Event>() {
756            @Override
757            public int compare(Event o1, Event o2) {
758                return o1.timestamp.compareTo(o2.timestamp);
759            }
760        });
761
762        System.out.print("processing " + events.size() + "events..");
763
764        /*
765        for(Event event : events) {
766            System.out.println(event);
767        }
768        */
769        
770        // replay the events
771        
772        Map<Integer, Firing> firingMap = new HashMap<Integer, Firing>();
773        eventCount = 0;
774        for(Event event : events) {
775            
776            eventCount++;
777            if(eventCount % 1000 == 0) {
778                System.out.print(eventCount + "..");
779            }
780
781            switch(event.type) {
782            case FiringStart:
783                Firing firing = new Firing(event.firingId, event.timestamp);
784                firingMap.put(event.firingId, firing);
785                break;
786            case FiringStop:
787                firing = firingMap.remove(event.firingId);
788                firing.stop(event.timestamp);
789                break;
790            case PortRead:
791                firing = firingMap.get(event.firingId);
792                if(firing == null) {
793                    System.out.println("null firing " + event.firingId +
794                        " for pe " + event.timestamp);
795                }
796                firing.replayPortEvent(event.portEventId, true, event.timestamp);                
797                break;
798            case PortWrite:
799                firing = firingMap.get(event.firingId);
800                /*if(firing == null) {
801                    System.out.println("null firing " + event.firingId +
802                            " for pe " + event.timestamp);
803                }*/
804                firing.replayPortEvent(event.portEventId, false, event.timestamp);
805                break;
806            default:
807                System.err.println("WARNING: unknown type of event: " + event.type);
808                break;            
809            }                            }
810        
811        if(!firingMap.isEmpty()) {
812            System.err.println("WARNING: Firings not stopped: " + firingMap.size());
813        }
814        
815        System.out.print("done.");
816        double elapsed = (System.nanoTime() - startNano) / 1000000;
817        if(elapsed >= 5000) {
818            System.out.print(" elapsed time = " + elapsed + " ms");
819        }
820        System.out.println();
821    }
822
823    /** Set an effigy and any contained effigies to be not modified. */
824    private void _setEffigiesToNotModified(Effigy effigy) {
825        //System.out.println("setting not modified for : " + effigy.getFullName());
826        effigy.setModified(false);
827        for(Effigy containedEffigy : effigy.entityList(Effigy.class)) {
828            _setEffigiesToNotModified(containedEffigy);
829        }
830    }
831
832    /** A provenance event to replay. */
833    private static class Event {
834        
835        /** Create a new event with a type and timestamp. */
836        public Event(EventType type, Date timestamp) {
837            this.type = type;
838            this.timestamp = timestamp;
839        }
840
841        /** Get a string representation of the event. */
842        @Override
843        public String toString() {
844            return timestamp.toString() + " " + type.toString() + " f=" + firingId;
845        }
846        
847        /** The type of event. */
848        public EventType type;
849        
850        /** The firing id. If the event is a port read or write,
851         *  this value is the firing id in which the port event
852         *  occurs.
853         */
854        public Integer firingId;
855        
856        /** The port event id. If the event is a firing start or
857         *  stop, this is null.
858         */
859        public Integer portEventId;
860        
861        /** The timestamp of the event. */
862        public Date timestamp;
863    }
864    
865    /** A class that represents an actor firing. */
866    private class Firing {
867        
868        public Firing(Integer firingId, Date startTimestamp) throws QueryException, RecordingException {  
869            
870            _firingId = firingId;
871            
872            String actorName = _queryable.getActorName(firingId);
873            //System.out.println(firingId + " " + actorName);
874            
875            if(actorName == null) {
876                throw new QueryException(
877                    "WARNING: Could not find actor name for firing " +
878                    firingId);
879            }
880                        
881            // remove the leading "."
882            actorName = actorName.substring(1);
883            
884            // start firing
885            _actor = (Actor) _workflow.getEntity(actorName);
886            if(_actor == null) {
887                System.err.println("WARNING: could not find actor " + actorName +
888                        " in workflow.");
889                return;
890            }
891            
892            //System.out.println("    " + startTimestamp +
893                //" start firing " + firingId +
894                //": " + _actor.getFullName());
895
896            _director = _actor.getDirector();
897            if(_recording != null) {
898                FiringEvent firingEvent = new FiringEvent(_director,
899                    _actor,
900                    FiringEvent.BEFORE_ITERATE);
901                _recording.actorFire(firingEvent, startTimestamp);
902            }
903            
904            if(_executeActorNames.contains(_actor.getClass().getName()) ||
905                _executeActorPackages.contains(_actor.getClass().getPackage().getName())) {
906                _executeActor = true;                                
907            }
908            
909            if(_actor.getExecutiveDirector() instanceof PNDirector) {
910                _directorIsPN = true;
911            }
912        }
913        
914        /** Replay a port event and stop the actor firing if there are
915         *  no more port events that occurred during this firing.
916         *  @param tokenId the port event id to replay
917         *  @param read If true, the port event is a read, otherwise
918         *  port event is a write.
919         *  @return Returns true if there are no more port events
920         *  for this firing. Returns false if there are more port events.
921         */
922        public void replayPortEvent(Integer tokenId, boolean read, Date timestamp)
923            throws RecordingException, QueryException {
924            
925            Token token = _queryable.getToken(tokenId);
926            
927            if(token == null) {
928                System.err.println("WARNING: could not get token " +
929                    (read ? "read." : "written."));
930                return;
931            }
932                            
933            String portName = null;
934            if(read) {
935                portName = _queryable.getInputRoleForTokenAndFireId(tokenId, _firingId);
936            } else {
937                portName = _queryable.getOutputRoleToken(tokenId);
938            }
939            
940            if(portName == null) {
941                System.err.println("WARNING: could not get port for token " +
942                    (read ? "read " : "written ") + " " + tokenId);
943                return;
944            }
945            
946            Integer channel = _queryable.getChannelForToken(tokenId, read, _firingId);
947            if(channel == null) {
948                System.err.println("WARNING: could not get channel for token " +
949                    (read ? "read." : "written."));
950                channel = 0;
951            }
952            
953            // portName is the fully-qualified name; get just the 
954            // last part
955            portName = portName.substring(portName.lastIndexOf('.') + 1);
956            IOPort port = (IOPort) ((Entity<?>) _actor).getPort(portName);
957        
958            //System.out.println("    " + timestamp +
959                //(read ? " read " : " write ") +
960                //"firing " + _firingId); 
961        
962            List<IOPortEventListener> portListeners = port.getIOPortEventListeners();
963            if(_recording != null || !portListeners.isEmpty()) {
964                IOPortEvent event = new IOPortEvent(port,
965                        read ? IOPortEvent.GET_BEGIN : IOPortEvent.SEND_BEGIN,
966                        channel,
967                        true,
968                        read ? null : token); 
969                
970                if(_recording != null) {
971                    _recording.portEvent(event, timestamp);
972                }
973                
974                for(IOPortEventListener listener: portListeners) {
975                    try {
976                        listener.portEvent(event);
977                    } catch (IllegalActionException e) {
978                        System.err.println("Port event error: " + e.getMessage());
979                    }
980                }
981
982                event = new IOPortEvent(port,
983                        read ? IOPortEvent.GET_END : IOPortEvent.SEND_END,
984                        channel,
985                        true,
986                        token);     
987                
988                if(_recording != null) {
989                    _recording.portEvent(event, timestamp);
990                }
991
992                for(IOPortEventListener listener: portListeners) {
993                    try {
994                        listener.portEvent(event);
995                    } catch (IllegalActionException e) {
996                        System.err.println("Port event error: " + e.getMessage());
997                    }
998                }
999
1000            }
1001            
1002            if(_executeActor && read) {
1003                                                
1004                Receiver[][] receivers = port.getReceivers();
1005                if(receivers[channel] == null) {
1006                    System.err.println("missing receiver for channel " + channel + " in " + port.getFullName());
1007                } else {
1008                    try {
1009                        //System.out.println("putting token inside " + port.getFullName());
1010                        receivers[channel][0].put(token);
1011                    } catch (NoRoomException | IllegalActionException e) {
1012                        System.err.println("Error putting token in " + port.getFullName() + ": " + e.getMessage());
1013                    }
1014                }
1015                
1016            }
1017        }
1018              
1019        public void stop(Date stopTimestamp) throws RecordingException {
1020            
1021            if(_executeActor) {
1022                
1023                // if the director is PN, set a time out before calling
1024                // iterate, since the last iteration will block waiting
1025                // for input tokens that will never arrive.
1026                if(_directorIsPN) {
1027                    ExecutorService service = Executors.newSingleThreadExecutor();
1028                    try {
1029                        service.submit(new Runnable() {
1030                            @Override
1031                            public void run() {
1032                                try {
1033                                    //System.out.println("executing actor " + _actor);
1034                                    _actor.iterate(1);
1035                                } catch (IllegalActionException e) {
1036                                    System.err.println("Error iterating " + _actor.getFullName() + ": " + e.getMessage());
1037                                }                            }
1038                        }).get(_ACTOR_ITERATE_TIMEOUT, TimeUnit.MILLISECONDS);
1039                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
1040                        // do nothing since this always occurs during the last iteration
1041                        //System.err.println("WARNING: Timeout iterating actor.");
1042                    } finally {
1043                        service.shutdownNow();
1044                    }                
1045                } else {
1046                    try {
1047                        //System.out.println("executing actor " + _actor);
1048                        _actor.iterate(1);
1049                    } catch (IllegalActionException e) {
1050                        System.err.println("Error iterating " + _actor.getFullName() + ": " + e.getMessage());
1051                    }
1052                }
1053            }
1054
1055            //System.out.println("    " + stopTimestamp + 
1056                //" stop firing " + _firingId +
1057                //": " + _actor.getFullName());
1058
1059            if(_recording != null) {
1060                FiringEvent firingEvent = new FiringEvent(_director,
1061                    _actor, FiringEvent.AFTER_ITERATE);
1062                _recording.actorFire(firingEvent, stopTimestamp);
1063            }
1064        }
1065        
1066        /** Print the actor name. */
1067        @Override
1068        public String toString() {
1069            return _actor.getName();// +
1070                    //" r=" + _readTokens.size() +
1071                    //" w=" + _writeTokens.size();            
1072        }        
1073
1074        /** The firing id of this firing. */
1075        private Integer _firingId;
1076                
1077        /** The firing actor. */
1078        private Actor _actor;
1079        
1080        /** The director of the firing actor. */
1081        private Director _director;
1082        
1083        /** If true, execute this actor and place any
1084         *  tokens this actor reads into the input port receivers.
1085         */
1086        private boolean _executeActor = false;
1087        
1088        /** If true, executive director of actor is PN. */
1089        private boolean _directorIsPN = false;
1090    }
1091
1092    ///////////////////////////////////////////////////////////////////
1093    ////                       private variables                   ////
1094
1095    /** The input queryable containing the provenance. */
1096    private Queryable _queryable;
1097    
1098    /** The output recording. */
1099    private Recording _recording;
1100
1101    /** The workflow read from provenance. */
1102    private CompositeActor _workflow;
1103    
1104    /** The provenance recorder in the workflow. */
1105    private ProvenanceRecorder _recorder;
1106
1107    /** Type of replay event. NOTE: the declaration order determines
1108     *  how the events are sorted: events occurring at the same time
1109     *  with the same firing id are sorted based on the type declaration
1110     *  order. Events at the same time and with the same firing id
1111     *  (same actor) must have firing start before any port events before
1112     *  firing stop. By convention port reads occur before port writes,
1113     *  but this is not strictly necessary.
1114     */
1115    private enum EventType { FiringStart, PortRead, PortWrite, FiringStop };
1116
1117    /** A set of actor class names to re-execute. */
1118    private final Set<String> _executeActorNames = new HashSet<String>();
1119
1120    /** A set of actor package names to re-execute. */
1121    private final Set<String> _executeActorPackages = new HashSet<String>();
1122
1123    /** A mapping of execution id to effigy. */
1124    private static final Map<Integer,PtolemyEffigy> _effigies =
1125        new HashMap<Integer,PtolemyEffigy>();
1126    
1127    /** If true, port events are played. */
1128    private boolean _playPortEvents = true;
1129    
1130    /** Timeout (in ms) to wait for an actor to iterate. */
1131    private static final int _ACTOR_ITERATE_TIMEOUT = 2000;
1132}