001/* Play a Recording from a Queryable. 002 * 003 * Copyright (c) 2015 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2017-07-07 16:27:30 +0000 (Fri, 07 Jul 2017) $' 008 * '$Revision: 34587 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.provenance; 031 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.Collections; 035import java.util.Comparator; 036import java.util.Date; 037import java.util.HashMap; 038import java.util.HashSet; 039import java.util.Iterator; 040import java.util.LinkedList; 041import java.util.List; 042import java.util.Map; 043import java.util.Set; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Executors; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049 050import javax.swing.SwingUtilities; 051 052import org.kepler.objectmanager.lsid.KeplerLSID; 053 054import ptolemy.actor.Actor; 055import ptolemy.actor.AtomicActor; 056import ptolemy.actor.CompositeActor; 057import ptolemy.actor.Director; 058import ptolemy.actor.FiringEvent; 059import ptolemy.actor.IOPort; 060import ptolemy.actor.IOPortEvent; 061import ptolemy.actor.IOPortEventListener; 062import ptolemy.actor.IORelation; 063import ptolemy.actor.Manager; 064import ptolemy.actor.NoRoomException; 065import ptolemy.actor.Receiver; 066import ptolemy.actor.TypedIOPort; 067import ptolemy.actor.gui.Configuration; 068import ptolemy.actor.gui.Effigy; 069import ptolemy.actor.gui.ModelDirectory; 070import ptolemy.actor.gui.PtolemyEffigy; 071import ptolemy.actor.parameters.PortParameter; 072import ptolemy.data.Token; 073import ptolemy.data.expr.Parameter; 074import ptolemy.domains.pn.kernel.PNDirector; 075import ptolemy.kernel.CompositeEntity; 076import ptolemy.kernel.Entity; 077import ptolemy.kernel.Relation; 078import ptolemy.kernel.util.Attribute; 079import ptolemy.kernel.util.IllegalActionException; 080import ptolemy.kernel.util.NameDuplicationException; 081import ptolemy.kernel.util.NamedObj; 082import ptolemy.vergil.basic.KeplerDocumentationAttribute; 083import ptolemy.vergil.kernel.attributes.TextAttribute; 084 085/** A class to convert provenance formats by reading from a 086 * a Queryable and writing to a Recording. 087 * 088 * @see Queryable 089 * @see Recording 090 * 091 * @author Daniel Crawl 092 * @version $Id: RecordPlayer.java 34587 2017-07-07 16:27:30Z crawl $ 093 * 094 */ 095public class RecordPlayer { 096 097 /** Create a new RecordPlayer using a Queryable as the source 098 * and no Recording output. This is useful if actors within 099 * workflow will be re-executed. 100 * @see RecordPlayer#setExecuteGraphicalActors 101 */ 102 public RecordPlayer(Queryable queryable) { 103 this(queryable, null); 104 } 105 106 /** Create a new RecordPlayer using a Queryable as the source, 107 * and Recording for the output. 108 */ 109 public RecordPlayer(Queryable queryable, Recording recording) { 110 _queryable = queryable; 111 _recording = recording; 112 } 113 114 /** Close any effigies created while replaying workflows. */ 115 public static void closeAllEffigies() throws IllegalActionException, NameDuplicationException { 116 synchronized(_effigies) { 117 for(Effigy effigy : _effigies.values()) { 118 effigy.closeTableaux(); 119 effigy.setContainer(null); 120 } 121 } 122 } 123 124 /** Re-execute an actor. 125 * @param className the class name of the actor to re-execute. 126 */ 127 public void executeActor(String className) { 128 _executeActorNames.add(className); 129 } 130 131 /** Get the queryable. */ 132 public Queryable getQueryable() { 133 return _queryable; 134 } 135 136 /** Play a specific execution. */ 137 public void play(Integer execId) throws QueryException, RecordingException { 138 139 CompositeActor workflow; 140 try { 141 workflow = (CompositeActor) _queryable.getWorkflowForExecution(execId); 142 } catch (Exception e) { 143 throw new RecordingException("Error parsing workflow.", e); 144 } 145 146 play(execId, workflow); 147 } 148 149 /** Play a specific execution for a workflow. */ 150 public void play(Integer execId, final CompositeActor workflow) throws QueryException, RecordingException { 151 152 // port events must be played if executing actors. 153 if(!_playPortEvents && 154 (!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty())) { 155 throw new QueryException("Must play port events if executing actors."); 156 } 157 158 _workflow = workflow; 159 160 KeplerLSID execLSID = _queryable.getExecutionLSIDForExecution(execId); 161 if(execLSID == null) { 162 throw new RecordingException("Could not get execution LSID."); 163 } 164 165 // NOTE: if the workflow has never been executed, we 166 // create receivers in all the actors so that if the 167 // output Recording is using a PortConnector, then 168 // PortConnector.sendIdToConnections() will find all 169 // the connected ports. 170 171 Manager manager = workflow.getManager(); 172 if(manager == null) { 173 try { 174 manager = new Manager(workflow.workspace(), "_Manager"); 175 workflow.setManager(manager); 176 } catch (IllegalActionException e) { 177 throw new QueryException("Error setting manager.", e); 178 } 179 } 180 if(manager.getPreinitializeVersion() != workflow.workspace().getVersion()) { 181 182 // when creating receivers, an exception can sometimes occur due 183 // to not being able to uniquely infer the width of relations. 184 // to avoid this, create a top level parameter _defaultInferredWidthTo1 185 // with value to true. 186 // see ptolemy.actor.RelationWidthInference 187 188 Attribute defaultWidth = workflow.getAttribute("_defaultInferredWidthTo1"); 189 if(defaultWidth == null) { 190 try { 191 Parameter defaultWidthParameter = new Parameter(workflow, "_defaultInferredWidthTo1"); 192 defaultWidthParameter.setToken("true"); 193 defaultWidthParameter.setPersistent(false); 194 } catch(IllegalActionException | NameDuplicationException e) { 195 throw new QueryException("Error creating parameter to set default relation width.", e); 196 } 197 } 198 199 List<Actor> actorList = workflow.entityList(Actor.class); 200 while(!actorList.isEmpty()) { 201 Actor actor = actorList.remove(0); 202 // create receivers in atomic actors, and opaque composite actors. 203 if(actor instanceof AtomicActor || 204 (actor instanceof CompositeActor && ((CompositeActor)actor).isOpaque())) { 205 try { 206 //System.out.println("creating receivers for " + actor); 207 actor.createReceivers(); 208 } catch (IllegalActionException e) { 209 throw new QueryException("Error creating receivers for " + 210 actor.getFullName(), e); 211 } 212 } 213 // if the actor is a composite actor, add any contained 214 // actors so we create their receivers. 215 if(actor instanceof CompositeActor) { 216 actorList.addAll(((CompositeActor)actor).entityList(Actor.class)); 217 } 218 } 219 } 220 221 Date[] workflowExecutionTimestamps = null; 222 223 List<ProvenanceRecorder> recorders = _workflow.attributeList(ProvenanceRecorder.class); 224 if(recorders.isEmpty()) { 225 Map<String,String> defaults = ProvenanceRecorder.getDefaultsMap(); 226 try { 227 _recorder = new ProvenanceRecorder(_workflow, defaults.get("Provenance Recorder Name")); 228 } catch (IllegalActionException | NameDuplicationException e) { 229 throw new QueryException("Error creating default provenance recorder in workflow.", e); 230 } 231 } else { 232 _recorder = recorders.get(0); 233 } 234 235 /* 236 try { 237 _recorder.recordingType.setToken(ProvenanceRecorder.DEFAULT_REC_TYPE_IGNORE); 238 } catch (IllegalActionException e) { 239 throw new RecordingException("Error turning off provenance recording.", e); 240 } 241 */ 242 243 if(_recording != null) { 244 245 // NOTE: must add the recording to the recorder since the 246 // recorder access the recorder in _recorderContainer. 247 _recorder.addPiggyback(_recording); 248 249 // TODO set the user and machine names, tags, etc. 250 251 if(_recording.regContents()) { 252 253 // specification start 254 _recording.specificationStart(); 255 256 // reg contents 257 _recordContainerContents(_workflow); 258 259 // specification stop 260 _recording.specificationStop(); 261 } 262 263 // TODO play evolutions 264 265 workflowExecutionTimestamps = _queryable.getTimestampsForExecution(execId); 266 267 // execution start 268 if(workflowExecutionTimestamps == null || workflowExecutionTimestamps.length != 2 || 269 workflowExecutionTimestamps[0] == null) { 270 System.err.println("WARNING: could not query start/stop timestamps for execution."); 271 _recording.executionStart(execLSID); 272 } else { 273 _recording.executionStart(execLSID, workflowExecutionTimestamps[0]); 274 } 275 } 276 277 boolean createdEffigies = false; 278 279 if(!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty()) { 280 281 if(_recorder != null) { 282 workflow.removeInitializable(_recorder); 283 } 284 285 _createEffigyIfNecessary(execId); 286 createdEffigies = true; 287 288 try { 289 workflow.preinitialize(); 290 } catch (IllegalActionException e) { 291 System.err.println("Error preinitializing the workflow: " + e.getMessage()); 292 } 293 294 try { 295 workflow.initialize(); 296 } catch (IllegalActionException e) { 297 System.err.println("Error initializing the workflow: " + e.getMessage()); 298 } 299 } 300 301 try { 302 // replay the execution events (actor fires and port read/writes) 303 _replayExecution(execId, execLSID); 304 305 if(!_executeActorNames.isEmpty() || !_executeActorPackages.isEmpty()) { 306 307 // call wrapup without a timeout of 5 seconds since 308 // NondeterministicMerge can block in wrapup. 309 ExecutorService service = Executors.newSingleThreadExecutor(); 310 try { 311 service.submit(new Runnable() { 312 @Override 313 public void run() { 314 try { 315 workflow.wrapup(); 316 } catch (IllegalActionException e) { 317 System.err.println("Error wrapping-up the workflow: " + e.getMessage()); 318 } 319 } 320 }).get(5000, TimeUnit.MILLISECONDS); 321 } catch (InterruptedException | ExecutionException | TimeoutException e) { 322 System.err.println("WARNING: Timeout wrapping up replay."); 323 } finally { 324 service.shutdownNow(); 325 } 326 } 327 328 if(_recording != null) { 329 // execution stop 330 if(workflowExecutionTimestamps == null || workflowExecutionTimestamps[1] == null) { 331 _recording.executionStop(execLSID); 332 } else { 333 _recording.executionStop(execLSID, workflowExecutionTimestamps[1]); 334 } 335 336 // see if there was an error. 337 if(_queryable.isErrorForExecution(execLSID)) { 338 339 String errorStr = _queryable.getErrorForExecution(execLSID); 340 341 // TODO need to get the error source and throwable 342 Throwable throwable = new Throwable(errorStr); 343 _recording.executionError(_workflow, throwable, execLSID); 344 } 345 } 346 } finally { 347 // if we are replaying actors, set any created effigies to 348 // be not modified. this prevents a 'Save Changes?' dialog from 349 // appearing when closing windows created by replayed actors. 350 if(createdEffigies) { 351 final Effigy effigy = _effigies.get(execId); 352 if(effigy != null) { 353 // NOTE: perform this in the event thread in case there 354 // are effigies about to be created (new windows). 355 SwingUtilities.invokeLater(new Runnable() { 356 @Override 357 public void run() { 358 _setEffigiesToNotModified(effigy); 359 } 360 }); 361 } 362 } 363 } 364 } 365 366 /** Execute graphical actors in workflow. */ 367 public void setExecuteGraphicalActors(boolean execute) { 368 369 // TODO find a list of graphical actors 370 371 if(execute) { 372 _executeActorPackages.add("ptolemy.actor.lib.gui"); 373 _executeActorPackages.add("org.kepler.actor.gui"); 374 _executeActorNames.add("pl.psnc.kepler.common.actor.MultipleTabDisplay"); 375 } else { 376 _executeActorPackages.remove("ptolemy.actor.lib.gui"); 377 _executeActorPackages.remove("org.kepler.actor.gui"); 378 _executeActorNames.remove("pl.psnc.kepler.common.actor.MultipleTabDisplay"); 379 } 380 } 381 382 /** Set if port events are to be played. */ 383 public void setPlayPortEvents(boolean playPortEvents) { 384 _playPortEvents = playPortEvents; 385 } 386 387 /////////////////////////////////////////////////////////////////// 388 //// private methods //// 389 390 /** Create an effigy for a workflow execution if one does not already exist. */ 391 private void _createEffigyIfNecessary(Integer execId) { 392 393 synchronized(_effigies) { 394 395 PtolemyEffigy effigy = _effigies.remove(execId); 396 if(effigy != null) { 397 //System.out.println("found existing cached effigy for run " + execId + "; closing open tableaux"); 398 // close existing windows opened. 399 effigy.closeTableaux(); 400 effigy.setModel(null); 401 try { 402 effigy.setContainer(null); 403 //System.out.println("removing effigy " + effigy.getFullName()); 404 } catch (IllegalActionException | NameDuplicationException e) { 405 System.err.println("Error removing effigy: " + e.getMessage()); 406 } 407 } 408 409 // see if the Configuration has an effigy for the workflow. 410 effigy = (PtolemyEffigy) Configuration.findEffigy(_workflow); 411 if(effigy != null) { 412 // close existing windows opened. 413 effigy.closeTableaux(); 414 effigy.setModel(null); 415 try { 416 effigy.setContainer(null); 417 //System.out.println("removing effigy " + effigy.getFullName()); 418 } catch (IllegalActionException | NameDuplicationException e) { 419 System.err.println("Error removing effigy: " + e.getMessage()); 420 } 421 } 422 423 // make sure there is at least one Configuration. there are none when 424 // running headless. 425 if(!Configuration.configurations().isEmpty()) { 426 ptolemy.actor.gui.Configuration configuration = 427 (ptolemy.actor.gui.Configuration) Configuration.configurations().iterator().next(); 428 429 try { 430 ModelDirectory directory = configuration.getDirectory(); 431 //System.out.println("Creating effigy for " + 432 //_workflow.getName() + " in " + directory.getFullName()); 433 434 effigy = new PtolemyEffigy(directory, 435 directory.uniqueName(_workflow.getName())); 436 //System.out.println("created effigy " + effigy.getFullName()); 437 effigy.setModel(_workflow); 438 effigy.identifier.setExpression(effigy.getName()); 439 } catch(IllegalActionException | NameDuplicationException e) { 440 System.err.println("Error creating effigy: " + e.getMessage()); 441 return; 442 } 443 } 444 445 if(effigy != null) { 446 _effigies.put(execId, effigy); 447 } 448 } 449 } 450 451 /** Output the model contents. 452 * TODO copied from ProvenanceRecorder 453 */ 454 private void _recordContainerContents(NamedObj namedObj) 455 throws RecordingException 456 { 457 // whether to recurse based on Recording register method. 458 boolean recurse = true; 459 460 if(namedObj instanceof Actor) 461 { 462 recurse = _recording.regActor((Actor)namedObj); 463 } 464 else if(namedObj instanceof Director) 465 { 466 recurse = _recording.regDirector((Director)namedObj); 467 } 468 else if(namedObj instanceof TypedIOPort) 469 { 470 TypedIOPort port = (TypedIOPort)namedObj; 471 _recording.regPort(port); 472 473 // do not record information about attributes contained by the port 474 recurse = false; 475 } 476 else if(namedObj instanceof IORelation) 477 { 478 IORelation rel = (IORelation)namedObj; 479 recurse = _recording.regRelation(rel); 480 } 481 else if(namedObj != _recorder && 482 // NOTE: do not register a connected port parameter as a parameter 483 // since during execution, reading a token by the port generates 484 // both port and value change events. value change events are 485 // treated as a workflow evolution, which can be expensive. 486 (!(namedObj instanceof PortParameter) || 487 ((PortParameter)namedObj).getPort().numberOfSources() == 0)) 488 { 489 recurse = _recording.regParameter(namedObj); 490 } 491 else // namedObj == _recorder 492 { 493 recurse = false; 494 } 495 496 if(recurse) 497 { 498 // recursively register contents of the current namedObj 499 List<NamedObj> contentsList = new LinkedList<NamedObj>(); 500 501 boolean isPLOpaque = false; 502 503 if(namedObj instanceof CompositeEntity) 504 { 505 CompositeEntity composite = (CompositeEntity)namedObj; 506 contentsList.addAll(composite.entityList()); 507 508 // NOTE: we add the relations last since for each one, 509 // regLink() is called and both ends of the link must be 510 // registered first. 511 contentsList.addAll(composite.relationList()); 512 513 // check to see if named obj contains an instance of 514 // provenance listener 515 List<?> provs = namedObj.attributeList(ProvenanceRecorder.class); 516 if(provs != null && provs.size() > 0 && namedObj != namedObj.toplevel()) 517 { 518 isPLOpaque = true; 519 } 520 else if((namedObj instanceof CompositeActor) && 521 !ProvenanceRecorder.containsSupportedDirector((CompositeActor)namedObj)) 522 { 523 isPLOpaque = true; 524 } 525 526 } 527 528 // add the ports to the beginning 529 if(namedObj instanceof Entity) 530 { 531 contentsList.addAll(0, ((Entity<?>)namedObj).portList()); 532 } 533 534 // add the attributes to the beginning 535 contentsList.addAll(0, namedObj.attributeList()); 536 537 //System.out.println("is pl opaque: " + isPLOpaque); 538 539 for(NamedObj containedNamedObj : contentsList) 540 { 541 if(isPLOpaque && 542 // do not record contents of these types of objects: 543 ((containedNamedObj instanceof Actor) || 544 (containedNamedObj instanceof Director) || 545 (containedNamedObj instanceof IORelation) || 546 (containedNamedObj instanceof TextAttribute) || 547 (containedNamedObj instanceof ProvenanceRecorder) || 548 (containedNamedObj instanceof 549 KeplerDocumentationAttribute))) 550 { 551 //_Debug("not recursing down " + 552 //containedNamedObj.getFullName()); 553 } 554 else 555 { 556 _recordContainerContents(containedNamedObj); 557 } 558 } 559 560 // now that all the ports and relations have been registered 561 // (if namedObj is a CompositeEntity), it is safe to register 562 // the links (since each end point must already registered). 563 564 if(!isPLOpaque && (namedObj instanceof CompositeEntity)) 565 { 566 // iterate over the contained relations 567 List<?> relationList = ((CompositeEntity)namedObj).relationList(); 568 Iterator<?> relationIter = relationList.iterator(); 569 while(relationIter.hasNext()) 570 { 571 Relation curRelation = (Relation)relationIter.next(); 572 573 ///if(!adding) 574 //{ 575 //containedName = oldName + "." + 576 //curRelation.getName(); 577 //} 578 _recordLinksInRelation(curRelation); 579 } 580 } 581 } 582 } 583 584 /** Record all the links within a relation. 585 * TODO copied from ProvenanceRecorder 586 */ 587 private void _recordLinksInRelation(Relation relation) 588 throws RecordingException 589 { 590 // iterate over the object linked in the current relation 591 List<?> linkedObjList = relation.linkedObjectsList(); 592 Iterator<?> linkedObjsIter = linkedObjList.iterator(); 593 while(linkedObjsIter.hasNext()) 594 { 595 NamedObj linkedObj = (NamedObj)linkedObjsIter.next(); 596 597 // see if current relation is linked to a port 598 if(linkedObj instanceof TypedIOPort) 599 { 600 _recording.regLink(linkedObj, relation); 601 } 602 // otherwise the link is between two relations. 603 // we do not want to register this link twice, 604 // e.g., a <--> b and b <--> a 605 // so only perform registration if the current 606 // relation's name lexigraphically precedes the 607 // linked relation's name. 608 else if(relation.getName().compareTo( 609 linkedObj.getName()) < 0) 610 { 611 _recording.regLink(relation, linkedObj); 612 } 613 } 614 } 615 616 /** Replay a workflow execution. */ 617 private void _replayExecution(int execId, KeplerLSID execLSID) throws QueryException, RecordingException { 618 619 List<Event> events = new ArrayList<Event>(); 620 621 long startNano = System.nanoTime(); 622 623 // collect the provenance events to replay. this consists of 624 // firing start, firing stop, port read, and port write events. 625 626 System.out.print("Replaying..collecting:.."); 627 628 // collect the firings 629 int eventCount = 0; 630 List<Integer> firings = _queryable.getActorFirings(execId); 631 for(Integer firingId : firings) { 632 633 Date[] timestamps = _queryable.getTimestampsForActorFiring(firingId); 634 if(timestamps == null || timestamps.length != 2 || 635 timestamps[0] == null || timestamps[1] == null) { 636 System.err.println("WARNING: missing timestamps for firing: " + firingId); 637 continue; 638 } 639 640 // add the firing start 641 Event event = new Event(EventType.FiringStart, timestamps[0]); 642 event.firingId = firingId; 643 events.add(event); 644 645 eventCount++; 646 if(eventCount % 10000 == 0) { 647 System.out.print(eventCount + ".."); 648 } 649 650 // add the firing stop 651 event = new Event(EventType.FiringStop, timestamps[1]); 652 event.firingId = firingId; 653 events.add(event); 654 655 eventCount++; 656 if(eventCount % 10000 == 0) { 657 System.out.print(eventCount + ".."); 658 } 659 660 } 661 662 // collect the port events 663 if(_playPortEvents) { 664 List<Integer> tokens = _queryable.getTokensForExecution(execId, false); 665 for(Integer tokenId : tokens) { 666 667 // add the port write for this token 668 Date writeTimestamp = _queryable.getTimestampForTokenWrite(tokenId); 669 if(writeTimestamp == null) { 670 System.err.println("WARNING: missing timestamp for token write: " + tokenId); 671 continue; 672 } 673 674 Event event = new Event(EventType.PortWrite, writeTimestamp); 675 event.portEventId = tokenId; 676 677 List<Integer> writeFirings = _queryable.getActorFiringForToken(tokenId, false); 678 event.firingId = writeFirings.get(0); 679 680 events.add(event); 681 682 eventCount++; 683 if(eventCount % 10000 == 0) { 684 System.out.print(eventCount + ".."); 685 } 686 687 // add any port reads for this token 688 Date[] timestamps = _queryable.getTimestampsForTokenRead(tokenId); 689 if(timestamps != null && timestamps.length > 0) { 690 List<Integer> readFirings = _queryable.getActorFiringForToken(tokenId, true); 691 if(timestamps.length != readFirings.size()) { 692 System.err.println("WARNING: number of timestamps for token read " + 693 "does not match number of firings for token read."); 694 continue; 695 } 696 Integer[] readFiringArray = readFirings.toArray( 697 new Integer[readFirings.size()]); 698 Arrays.sort(readFiringArray); 699 700 int i = 0; 701 for(Date readTimestamp : timestamps) { 702 event = new Event(EventType.PortRead, readTimestamp); 703 event.portEventId = tokenId; 704 event.firingId = readFiringArray[i]; 705 i++; 706 events.add(event); 707 708 eventCount++; 709 if(eventCount % 10000 == 0) { 710 System.out.print(eventCount + ".."); 711 } 712 } 713 } 714 } 715 } 716 717 // sort the events. 718 // we perform 3 sorts to get the events in order that: 719 // 720 // 1. the events are sorted chronologically (primary key) 721 // 722 // 2. events can occur at the same time, so sort on 723 // firing id (secondary key). 724 // 725 // 3. events can occur at the same time and with the 726 // same firing id, e.g., a port write and firing stop, 727 // so sort on event type order (tertiary key). 728 // 729 // the 3 sorts are performed sorting the least-significant key 730 // first: tertiary key (event type), secondary key (firing id), 731 // and primary key (event timestamp). 732 733 System.out.print("sorting.."); 734 735 if(_playPortEvents) { 736 Collections.sort(events, new Comparator<Event>() { 737 @Override 738 public int compare(Event o1, Event o2) { 739 return o1.type.compareTo(o2.type); 740 } 741 }); 742 } 743 744 //System.out.print("sort2.."); 745 746 Collections.sort(events, new Comparator<Event>() { 747 @Override 748 public int compare(Event o1, Event o2) { 749 return o1.firingId.compareTo(o2.firingId); 750 } 751 }); 752 753 //System.out.print("sort3.."); 754 755 Collections.sort(events, new Comparator<Event>() { 756 @Override 757 public int compare(Event o1, Event o2) { 758 return o1.timestamp.compareTo(o2.timestamp); 759 } 760 }); 761 762 System.out.print("processing " + events.size() + "events.."); 763 764 /* 765 for(Event event : events) { 766 System.out.println(event); 767 } 768 */ 769 770 // replay the events 771 772 Map<Integer, Firing> firingMap = new HashMap<Integer, Firing>(); 773 eventCount = 0; 774 for(Event event : events) { 775 776 eventCount++; 777 if(eventCount % 1000 == 0) { 778 System.out.print(eventCount + ".."); 779 } 780 781 switch(event.type) { 782 case FiringStart: 783 Firing firing = new Firing(event.firingId, event.timestamp); 784 firingMap.put(event.firingId, firing); 785 break; 786 case FiringStop: 787 firing = firingMap.remove(event.firingId); 788 firing.stop(event.timestamp); 789 break; 790 case PortRead: 791 firing = firingMap.get(event.firingId); 792 if(firing == null) { 793 System.out.println("null firing " + event.firingId + 794 " for pe " + event.timestamp); 795 } 796 firing.replayPortEvent(event.portEventId, true, event.timestamp); 797 break; 798 case PortWrite: 799 firing = firingMap.get(event.firingId); 800 /*if(firing == null) { 801 System.out.println("null firing " + event.firingId + 802 " for pe " + event.timestamp); 803 }*/ 804 firing.replayPortEvent(event.portEventId, false, event.timestamp); 805 break; 806 default: 807 System.err.println("WARNING: unknown type of event: " + event.type); 808 break; 809 } } 810 811 if(!firingMap.isEmpty()) { 812 System.err.println("WARNING: Firings not stopped: " + firingMap.size()); 813 } 814 815 System.out.print("done."); 816 double elapsed = (System.nanoTime() - startNano) / 1000000; 817 if(elapsed >= 5000) { 818 System.out.print(" elapsed time = " + elapsed + " ms"); 819 } 820 System.out.println(); 821 } 822 823 /** Set an effigy and any contained effigies to be not modified. */ 824 private void _setEffigiesToNotModified(Effigy effigy) { 825 //System.out.println("setting not modified for : " + effigy.getFullName()); 826 effigy.setModified(false); 827 for(Effigy containedEffigy : effigy.entityList(Effigy.class)) { 828 _setEffigiesToNotModified(containedEffigy); 829 } 830 } 831 832 /** A provenance event to replay. */ 833 private static class Event { 834 835 /** Create a new event with a type and timestamp. */ 836 public Event(EventType type, Date timestamp) { 837 this.type = type; 838 this.timestamp = timestamp; 839 } 840 841 /** Get a string representation of the event. */ 842 @Override 843 public String toString() { 844 return timestamp.toString() + " " + type.toString() + " f=" + firingId; 845 } 846 847 /** The type of event. */ 848 public EventType type; 849 850 /** The firing id. If the event is a port read or write, 851 * this value is the firing id in which the port event 852 * occurs. 853 */ 854 public Integer firingId; 855 856 /** The port event id. If the event is a firing start or 857 * stop, this is null. 858 */ 859 public Integer portEventId; 860 861 /** The timestamp of the event. */ 862 public Date timestamp; 863 } 864 865 /** A class that represents an actor firing. */ 866 private class Firing { 867 868 public Firing(Integer firingId, Date startTimestamp) throws QueryException, RecordingException { 869 870 _firingId = firingId; 871 872 String actorName = _queryable.getActorName(firingId); 873 //System.out.println(firingId + " " + actorName); 874 875 if(actorName == null) { 876 throw new QueryException( 877 "WARNING: Could not find actor name for firing " + 878 firingId); 879 } 880 881 // remove the leading "." 882 actorName = actorName.substring(1); 883 884 // start firing 885 _actor = (Actor) _workflow.getEntity(actorName); 886 if(_actor == null) { 887 System.err.println("WARNING: could not find actor " + actorName + 888 " in workflow."); 889 return; 890 } 891 892 //System.out.println(" " + startTimestamp + 893 //" start firing " + firingId + 894 //": " + _actor.getFullName()); 895 896 _director = _actor.getDirector(); 897 if(_recording != null) { 898 FiringEvent firingEvent = new FiringEvent(_director, 899 _actor, 900 FiringEvent.BEFORE_ITERATE); 901 _recording.actorFire(firingEvent, startTimestamp); 902 } 903 904 if(_executeActorNames.contains(_actor.getClass().getName()) || 905 _executeActorPackages.contains(_actor.getClass().getPackage().getName())) { 906 _executeActor = true; 907 } 908 909 if(_actor.getExecutiveDirector() instanceof PNDirector) { 910 _directorIsPN = true; 911 } 912 } 913 914 /** Replay a port event and stop the actor firing if there are 915 * no more port events that occurred during this firing. 916 * @param tokenId the port event id to replay 917 * @param read If true, the port event is a read, otherwise 918 * port event is a write. 919 * @return Returns true if there are no more port events 920 * for this firing. Returns false if there are more port events. 921 */ 922 public void replayPortEvent(Integer tokenId, boolean read, Date timestamp) 923 throws RecordingException, QueryException { 924 925 Token token = _queryable.getToken(tokenId); 926 927 if(token == null) { 928 System.err.println("WARNING: could not get token " + 929 (read ? "read." : "written.")); 930 return; 931 } 932 933 String portName = null; 934 if(read) { 935 portName = _queryable.getInputRoleForTokenAndFireId(tokenId, _firingId); 936 } else { 937 portName = _queryable.getOutputRoleToken(tokenId); 938 } 939 940 if(portName == null) { 941 System.err.println("WARNING: could not get port for token " + 942 (read ? "read " : "written ") + " " + tokenId); 943 return; 944 } 945 946 Integer channel = _queryable.getChannelForToken(tokenId, read, _firingId); 947 if(channel == null) { 948 System.err.println("WARNING: could not get channel for token " + 949 (read ? "read." : "written.")); 950 channel = 0; 951 } 952 953 // portName is the fully-qualified name; get just the 954 // last part 955 portName = portName.substring(portName.lastIndexOf('.') + 1); 956 IOPort port = (IOPort) ((Entity<?>) _actor).getPort(portName); 957 958 //System.out.println(" " + timestamp + 959 //(read ? " read " : " write ") + 960 //"firing " + _firingId); 961 962 List<IOPortEventListener> portListeners = port.getIOPortEventListeners(); 963 if(_recording != null || !portListeners.isEmpty()) { 964 IOPortEvent event = new IOPortEvent(port, 965 read ? IOPortEvent.GET_BEGIN : IOPortEvent.SEND_BEGIN, 966 channel, 967 true, 968 read ? null : token); 969 970 if(_recording != null) { 971 _recording.portEvent(event, timestamp); 972 } 973 974 for(IOPortEventListener listener: portListeners) { 975 try { 976 listener.portEvent(event); 977 } catch (IllegalActionException e) { 978 System.err.println("Port event error: " + e.getMessage()); 979 } 980 } 981 982 event = new IOPortEvent(port, 983 read ? IOPortEvent.GET_END : IOPortEvent.SEND_END, 984 channel, 985 true, 986 token); 987 988 if(_recording != null) { 989 _recording.portEvent(event, timestamp); 990 } 991 992 for(IOPortEventListener listener: portListeners) { 993 try { 994 listener.portEvent(event); 995 } catch (IllegalActionException e) { 996 System.err.println("Port event error: " + e.getMessage()); 997 } 998 } 999 1000 } 1001 1002 if(_executeActor && read) { 1003 1004 Receiver[][] receivers = port.getReceivers(); 1005 if(receivers[channel] == null) { 1006 System.err.println("missing receiver for channel " + channel + " in " + port.getFullName()); 1007 } else { 1008 try { 1009 //System.out.println("putting token inside " + port.getFullName()); 1010 receivers[channel][0].put(token); 1011 } catch (NoRoomException | IllegalActionException e) { 1012 System.err.println("Error putting token in " + port.getFullName() + ": " + e.getMessage()); 1013 } 1014 } 1015 1016 } 1017 } 1018 1019 public void stop(Date stopTimestamp) throws RecordingException { 1020 1021 if(_executeActor) { 1022 1023 // if the director is PN, set a time out before calling 1024 // iterate, since the last iteration will block waiting 1025 // for input tokens that will never arrive. 1026 if(_directorIsPN) { 1027 ExecutorService service = Executors.newSingleThreadExecutor(); 1028 try { 1029 service.submit(new Runnable() { 1030 @Override 1031 public void run() { 1032 try { 1033 //System.out.println("executing actor " + _actor); 1034 _actor.iterate(1); 1035 } catch (IllegalActionException e) { 1036 System.err.println("Error iterating " + _actor.getFullName() + ": " + e.getMessage()); 1037 } } 1038 }).get(_ACTOR_ITERATE_TIMEOUT, TimeUnit.MILLISECONDS); 1039 } catch (InterruptedException | ExecutionException | TimeoutException e) { 1040 // do nothing since this always occurs during the last iteration 1041 //System.err.println("WARNING: Timeout iterating actor."); 1042 } finally { 1043 service.shutdownNow(); 1044 } 1045 } else { 1046 try { 1047 //System.out.println("executing actor " + _actor); 1048 _actor.iterate(1); 1049 } catch (IllegalActionException e) { 1050 System.err.println("Error iterating " + _actor.getFullName() + ": " + e.getMessage()); 1051 } 1052 } 1053 } 1054 1055 //System.out.println(" " + stopTimestamp + 1056 //" stop firing " + _firingId + 1057 //": " + _actor.getFullName()); 1058 1059 if(_recording != null) { 1060 FiringEvent firingEvent = new FiringEvent(_director, 1061 _actor, FiringEvent.AFTER_ITERATE); 1062 _recording.actorFire(firingEvent, stopTimestamp); 1063 } 1064 } 1065 1066 /** Print the actor name. */ 1067 @Override 1068 public String toString() { 1069 return _actor.getName();// + 1070 //" r=" + _readTokens.size() + 1071 //" w=" + _writeTokens.size(); 1072 } 1073 1074 /** The firing id of this firing. */ 1075 private Integer _firingId; 1076 1077 /** The firing actor. */ 1078 private Actor _actor; 1079 1080 /** The director of the firing actor. */ 1081 private Director _director; 1082 1083 /** If true, execute this actor and place any 1084 * tokens this actor reads into the input port receivers. 1085 */ 1086 private boolean _executeActor = false; 1087 1088 /** If true, executive director of actor is PN. */ 1089 private boolean _directorIsPN = false; 1090 } 1091 1092 /////////////////////////////////////////////////////////////////// 1093 //// private variables //// 1094 1095 /** The input queryable containing the provenance. */ 1096 private Queryable _queryable; 1097 1098 /** The output recording. */ 1099 private Recording _recording; 1100 1101 /** The workflow read from provenance. */ 1102 private CompositeActor _workflow; 1103 1104 /** The provenance recorder in the workflow. */ 1105 private ProvenanceRecorder _recorder; 1106 1107 /** Type of replay event. NOTE: the declaration order determines 1108 * how the events are sorted: events occurring at the same time 1109 * with the same firing id are sorted based on the type declaration 1110 * order. Events at the same time and with the same firing id 1111 * (same actor) must have firing start before any port events before 1112 * firing stop. By convention port reads occur before port writes, 1113 * but this is not strictly necessary. 1114 */ 1115 private enum EventType { FiringStart, PortRead, PortWrite, FiringStop }; 1116 1117 /** A set of actor class names to re-execute. */ 1118 private final Set<String> _executeActorNames = new HashSet<String>(); 1119 1120 /** A set of actor package names to re-execute. */ 1121 private final Set<String> _executeActorPackages = new HashSet<String>(); 1122 1123 /** A mapping of execution id to effigy. */ 1124 private static final Map<Integer,PtolemyEffigy> _effigies = 1125 new HashMap<Integer,PtolemyEffigy>(); 1126 1127 /** If true, port events are played. */ 1128 private boolean _playPortEvents = true; 1129 1130 /** Timeout (in ms) to wait for an actor to iterate. */ 1131 private static final int _ACTOR_ITERATE_TIMEOUT = 2000; 1132}