001/* 002 * Copyright (c) 2007-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 18:20:17 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34660 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.provenance; 031 032import java.io.File; 033import java.io.FileInputStream; 034import java.io.FileNotFoundException; 035import java.io.FileOutputStream; 036import java.io.IOException; 037import java.io.InputStream; 038import java.io.OutputStream; 039import java.io.Writer; 040import java.lang.ref.WeakReference; 041import java.util.Collections; 042import java.util.Date; 043import java.util.HashMap; 044import java.util.HashSet; 045import java.util.Hashtable; 046import java.util.Iterator; 047import java.util.LinkedHashSet; 048import java.util.LinkedList; 049import java.util.List; 050import java.util.Map; 051import java.util.Properties; 052import java.util.Set; 053import java.util.WeakHashMap; 054 055import org.apache.commons.logging.Log; 056import org.apache.commons.logging.LogFactory; 057import org.kepler.configuration.ConfigurationManager; 058import org.kepler.configuration.ConfigurationProperty; 059import org.kepler.configuration.ConfigurationUtilities; 060import org.kepler.objectmanager.lsid.KeplerLSID; 061import org.kepler.objectmanager.lsid.LSIDGenerator; 062import org.kepler.provenance.sql.SQLRecordingParameters; 063import org.kepler.sms.SemanticType; 064import org.kepler.ssh.SshEvent; 065import org.kepler.ssh.SshEventListener; 066import org.kepler.ssh.SshEventRegistry; 067import org.kepler.tagging.TagEvent; 068import org.kepler.tagging.TagEventListener; 069import org.kepler.util.WorkflowRenameListener; 070import org.kepler.util.WorkflowRenameManager; 071import org.kepler.util.WorkflowRun; 072 073import ptolemy.actor.Actor; 074import ptolemy.actor.ActorFiringListener; 075import ptolemy.actor.CompositeActor; 076import ptolemy.actor.Director; 077import ptolemy.actor.Executable; 078import ptolemy.actor.ExecutionListener; 079import ptolemy.actor.FiringEvent; 080import ptolemy.actor.FiringsRecordable; 081import ptolemy.actor.IOPort; 082import ptolemy.actor.IOPortEvent; 083import ptolemy.actor.IOPortEventListener; 084import ptolemy.actor.IORelation; 085import ptolemy.actor.Initializable; 086import ptolemy.actor.Manager; 087import ptolemy.actor.TypedIOPort; 088import ptolemy.actor.gui.ColorAttribute; 089import ptolemy.actor.gui.Effigy; 090import ptolemy.actor.gui.SizeAttribute; 091import ptolemy.actor.gui.WindowPropertiesAttribute; 092import ptolemy.actor.lib.hoc.CaseDirector; 093import ptolemy.actor.lib.hoc.IterateOverArray; 094import ptolemy.actor.lib.hoc.MultiInstanceComposite; 095import ptolemy.actor.parameters.PortParameter; 096import ptolemy.actor.parameters.SharedParameter; 097import ptolemy.data.BooleanToken; 098import ptolemy.data.StringToken; 099import ptolemy.data.Token; 100import ptolemy.data.expr.ExpertParameter; 101import ptolemy.data.expr.Parameter; 102import ptolemy.data.expr.StringParameter; 103import ptolemy.data.expr.Variable; 104import ptolemy.data.type.BaseType; 105import ptolemy.domains.ddf.kernel.DDFDirector; 106import ptolemy.domains.de.kernel.DEDirector; 107import ptolemy.domains.pn.kernel.PNDirector; 108import ptolemy.domains.sdf.kernel.SDFDirector; 109import ptolemy.kernel.CompositeEntity; 110import ptolemy.kernel.Entity; 111import ptolemy.kernel.Relation; 112import ptolemy.kernel.util.Attribute; 113import ptolemy.kernel.util.ChangeListener; 114import ptolemy.kernel.util.ChangeRequest; 115import ptolemy.kernel.util.DebugEvent; 116import ptolemy.kernel.util.DebugListener; 117import ptolemy.kernel.util.Debuggable; 118import ptolemy.kernel.util.IllegalActionException; 119import ptolemy.kernel.util.KernelException; 120import ptolemy.kernel.util.NameDuplicationException; 121import ptolemy.kernel.util.Nameable; 122import ptolemy.kernel.util.NamedObj; 123import ptolemy.kernel.util.Settable; 124import ptolemy.kernel.util.ValueListener; 125import ptolemy.kernel.util.Workspace; 126import ptolemy.moml.MoMLChangeRequest; 127import ptolemy.util.MessageHandler; 128import ptolemy.vergil.basic.KeplerDocumentationAttribute; 129import ptolemy.vergil.kernel.attributes.TextAttribute; 130 131/** A listener that records workflow provenance information. It implements 132 * several Kepler and Ptolemy listener interfaces in order to receive 133 * events for workflow specification, and execution. Provenance data 134 * is stored using a particular <code>Recording</code> implementation. 135 * 136 * @author Daniel Crawl 137 * @version $Id: ProvenanceRecorder.java 34660 2018-02-07 18:20:17Z crawl $ 138 * @see Recording 139 * 140 */ 141public class ProvenanceRecorder extends Attribute implements ExecutionListener, 142 ChangeListener, IOPortEventListener, ActorFiringListener, DebugListener, 143 ValueListener, SshEventListener, Initializable, WorkflowRenameListener, 144 TagEventListener 145{ 146 /** Construct a provenance recorder in the default workspace using the 147 * TOP Recorder Name specified in the configuration file. 148 * NOTE: to use the provenance recorder for recording, you must set the 149 * container. 150 * 151 * NOTE: this constructor can be used for certain recording events that 152 * don't refer to the container. 153 */ 154 public ProvenanceRecorder() 155 throws IllegalActionException, NameDuplicationException 156 { 157 super(); 158 _setDefaultName(); 159 _init(); 160 } 161 162 /** 163 * Construct a provenance recorder in the default workspace using 164 * non-default ConfigurationProperty. 165 * 166 * @param configProperty 167 * @throws IllegalActionException 168 * @throws NameDuplicationException 169 */ 170 public ProvenanceRecorder(ConfigurationProperty configProperty) 171 throws IllegalActionException, NameDuplicationException 172 { 173 super(); 174 _settingsProperty = configProperty; 175 _setDefaultName(); 176 _init(); 177 } 178 179 /** Construct a provenance recorder using the specified container. 180 * The name is set from the TOP Recorder Name specified in the 181 * configuration file. 182 * @param container the container of the provenance recorder. 183 */ 184 public ProvenanceRecorder(CompositeEntity container) 185 throws IllegalActionException, NameDuplicationException 186 { 187 this(container, ""); 188 _setDefaultName(); 189 } 190 191 /** Construct a provenance recorder in the workspace with an empty name. 192 * The listener is added to the list of objects in the workspace. 193 * Increment the version number of the workspace. 194 * 195 * @param workspace The workspace for this object. 196 */ 197 public ProvenanceRecorder(Workspace workspace) 198 throws IllegalActionException, NameDuplicationException 199 { 200 super(workspace); 201 _init(); 202 } 203 204 /** Construct a provenance recorder in the given container with the 205 * given name. The container argument must not be null, or a 206 * NullPointerException will be thrown. If the name argument is null, 207 * then the name is set to the empty string. Increment the version 208 * number of the workspace. 209 * 210 * @param container Container of the director. 211 * @param name Name of this listener. 212 * @exception IllegalActionException If the listener is not compatible 213 * with the specified container. May be thrown in a derived class. 214 * @exception NameDuplicationException If the container is not a 215 * CompositeActor and the name collides with an entity in the container. 216 */ 217 public ProvenanceRecorder(CompositeEntity container, String name) 218 throws IllegalActionException, NameDuplicationException 219 { 220 super(container, name); 221 _init(); 222 } 223 224 /////////////////////////////////////////////////////////////////// 225 //// parameters //// 226 227 /** If true, delay recording specification as long as possible. */ 228 public Parameter delayRecordingSpecification; 229 230 /** Which Recording to use. */ 231 public StringParameter recordingType = null; 232 233 /** Record system data on ssh'd machines. */ 234 //public Parameter recordSshData = null; 235 236 /** Record token values. */ 237 public Parameter recordTokens = null; 238 239 /** Local hostname. */ 240 public StringParameter hostname; 241 242 /** Username. */ 243 public StringParameter username; 244 245 /** String used when disabling provenance recording */ 246 public final static String DEFAULT_REC_TYPE_IGNORE = "Ignore"; 247 248 /** The LSID of the containing workflow. */ 249 public StringParameter containerWorkflowLSID; 250 251 /** The name of the containing workflow. */ 252 public StringParameter containerWorkflowName; 253 254 /////////////////////////////////////////////////////////////////// 255 //// public methods //// 256 257 /** React to attribute change (of ProvenanceRecorder only). 258 * 259 * @param attribute The attribute that changed. 260 * @exception IllegalActionException If the function is not recognized. 261 */ 262 @Override 263 public void attributeChanged(Attribute attribute) 264 throws IllegalActionException 265 { 266 //System.out.println("att changed: " + attribute.getFullName()); 267 268 if(attribute == recordingType) 269 { 270 Recording oldRecording = _provStore; 271 272 _changeRecordingType(); 273 274 // see if this is the first time the recording type was set 275 if(oldRecording == null && _provStore != null) 276 { 277 // for all of the unknown attributes that were changed 278 // earlier that belong to the recording, tell the recording 279 // that they have changed. 280 for(Attribute possibleAtt : _possibleRecordingAttributes) 281 { 282 //if(_provStoreParams.containsParameter(possibleAtt)) 283 //{ 284 _provStore.attributeChanged(possibleAtt); 285 //} 286 } 287 _possibleRecordingAttributes.clear(); 288 } 289 } 290 else if(attribute == recordTokens) 291 { 292 Token token = recordTokens.getToken(); 293 _recordTokensVal = ((BooleanToken)token).booleanValue(); 294 } 295 else if(attribute == delayRecordingSpecification) 296 { 297 Token token = delayRecordingSpecification.getToken(); 298 _delayRecordingSpecificationVal = 299 ((BooleanToken)token).booleanValue(); 300 } 301 else if(attribute == containerWorkflowLSID) 302 { 303 String lsidStr = containerWorkflowLSID.stringValue(); 304 if(lsidStr.length() > 0 && _provStore != null) 305 { 306 try 307 { 308 KeplerLSID lsid = new KeplerLSID(lsidStr); 309 _provStore.setContainerLSID(lsid); 310 } 311 catch(Exception e) 312 { 313 throw new IllegalActionException(this, e, "Error creating KeplerLSID"); 314 } 315 } 316 } 317 else if(attribute == containerWorkflowName) 318 { 319 String name = containerWorkflowName.stringValue(); 320 if(_provStore != null) 321 { 322 _provStore.setContainerName(name); 323 } 324 } 325 /* not implemented 326 else if(attribute == recordSshData) 327 { 328 Token token = recordSshData.getToken(); 329 _recordSshDataVal = ((BooleanToken)token).booleanValue(); 330 331 if(_recordSshDataVal) 332 { 333 throw new IllegalActionException(this, 334 "Recording ssh system data not implemented."); 335 } 336 } 337 */ 338 // see if attribute is from recording type 339 /* 340 else if(_provStoreParams != null && 341 _provStoreParams.containsParameter(attribute)) 342 { 343 _provStore.attributeChanged(attribute); 344 } 345 */ 346 else 347 { 348 super.attributeChanged(attribute); 349 350 // the attribute may belong to the recording; store so 351 // that when the recording type is set, we can notify it 352 // of any of its changed values. 353 if(_provStore == null) 354 { 355 _possibleRecordingAttributes.add(attribute); 356 } 357 else 358 { 359 //System.out.println("sending possible attribute: " + 360 //attribute.getName()); 361 _provStore.attributeChanged(attribute); 362 } 363 } 364 } 365 366 /** Clone the ProvenanceRecorder into a specified workspace. */ 367 @Override 368 public Object clone(Workspace workspace) throws CloneNotSupportedException { 369 370 // do not clone the recording 371 Recording provStore = _provStore; 372 _provStore = null; 373 ProvenanceRecorder newObject = (ProvenanceRecorder) super.clone(workspace); 374 _provStore = provStore; 375 376 try { 377 newObject._init(); 378 } catch (IllegalActionException | NameDuplicationException e) { 379 throw new CloneNotSupportedException( 380 "Error initializing ProvenanceRecorder: " + e.getMessage()); 381 } 382 return newObject; 383 } 384 385 /** Change the container. If previously in a container, stop listening. 386 * If put into a new container, start listening to that container. 387 */ 388 @Override 389 public void setContainer(NamedObj container) 390 throws IllegalActionException, NameDuplicationException 391 { 392 393 NamedObj oldContainer = getContainer(); 394 395 // see if the name is set 396 if(getName().length() == 0) 397 { 398 // NOTE: the model may already contain an object with an empty 399 // name, so set the name to the default value. 400 // the KAR in http://bugzilla.ecoinformatics.org/show_bug.cgi?id=5197 401 // contains objects with empty names. 402 403 _setDefaultName(); 404 } 405 406 super.setContainer(container); 407 408 if(container == null) 409 { 410 //_Debug("set container to null"); 411 } 412 else 413 { 414 //_Debug("set container to " + container.getFullName() + " hash " + container.hashCode()); 415 //_Debug(" my hash = " + hashCode()); 416 } 417 418 // see if we changed containers 419 if(container != oldContainer) 420 { 421 // if we had a previous container, stop listening to that one 422 if(oldContainer != null && (oldContainer instanceof CompositeActor)) 423 { 424 ((CompositeActor)oldContainer).removeInitializable(this); 425 426 // stop listening to all events. 427 _stopListening(); 428 429 // see if we were removed from model 430 if(container == null) 431 { 432 _removedFromModel = true; 433 434 Manager manager = 435 ((CompositeActor)oldContainer).getManager(); 436 if(manager != null) 437 { 438 manager.removeExecutionListener(this); 439 } 440 _addedExecutionListener = false; 441 442 disconnect(); 443 } 444 } 445 else // we were added to model 446 { 447 SshEventRegistry.instance.addListener(this); 448 WorkflowRenameManager wrm = WorkflowRenameManager.getInstance(); 449 wrm.addRenameListener(this); 450 } 451 452 // see if we were not removed from model 453 if(container != null) 454 { 455 if(container instanceof CompositeActor) 456 { 457 ((CompositeActor)container).addInitializable(this); 458 // NOTE: we do not call _addChangeEventSender() since 459 // changeEventSenders is null and we cannot create 460 // it since we are being called from our parent's 461 // constructor. 462 container.addChangeListener(this); 463 } 464 465 _haveExecuted = false; 466 } 467 } 468 469 // notify the recording if present. 470 if(_provStore != null) 471 { 472 _provStore.setContainer(this); 473 474 // notify any piggybacks 475 for(Recording recording : _piggybacks) 476 { 477 recording.setContainer(this); 478 } 479 } 480 } 481 482 /** Perform cleanup. */ 483 public void disconnect() throws IllegalActionException 484 { 485 SshEventRegistry.instance.removeListener(this); 486 487 if(_provStore != null) 488 { 489 try 490 { 491 _provStore.disconnect(); 492 493 // notify any piggybacks 494 for(Recording recording : _piggybacks) 495 { 496 recording.disconnect(); 497 } 498 } 499 catch(RecordingException e) 500 { 501 throw new IllegalActionException(this, e, 502 "Error disconnecting recording."); 503 } 504 } 505 506 WorkflowRenameManager wrm = WorkflowRenameManager.getInstance(); 507 wrm.removeRenameListener(this); 508 } 509 510 /** Perform cleanup for all ProvenanceRecorder instances. */ 511 public static void disconnectAll() throws IllegalActionException 512 { 513 for(WeakReference<ProvenanceRecorder> reference : _recorders) 514 { 515 ProvenanceRecorder recorder = reference.get(); 516 if(recorder != null) 517 { 518 recorder.disconnect(); 519 } 520 } 521 } 522 523 /** Return true if provenance output is turned on. */ 524 public boolean isEnabled() throws IllegalActionException 525 { 526 return _isEnabled; 527 } 528 529 /** Add a ProvenanceEnabledListener to be notified when provenance 530 * is enabled/disabled. 531 */ 532 public void addEnabledListener(ProvenanceEnabledListener listener) 533 { 534 boolean contain = false; 535 for (int i = 0; i < _enabledListeners.size(); i++) { 536 WeakReference<ProvenanceEnabledListener> reference = _enabledListeners.get(i); 537 ProvenanceEnabledListener _listener = reference.get(); 538 if(_listener != null && _listener == listener) { 539 contain = true; 540 break; 541 }else if (listener == null ){ 542 _enabledListeners.remove(i); 543 i--; 544 } 545 546 } 547 if(!contain) 548 { 549 _enabledListeners.add(new WeakReference<ProvenanceEnabledListener>(listener)); 550 } 551 listener.toggle(_isEnabled); 552 } 553 554 /** Remove a ProvenanceEnabledListener from the set of listeners.*/ 555 public void removeEnabledListener(ProvenanceEnabledListener listener) 556 { 557 for (int i = 0; i < _enabledListeners.size(); i++) { 558 WeakReference<ProvenanceEnabledListener> reference = _enabledListeners.get(i); 559 ProvenanceEnabledListener _listener = reference.get(); 560 if(_listener != null && _listener == listener) { 561 _enabledListeners.remove(i); 562 i--; 563 }else if (listener == null ){ 564 _enabledListeners.remove(i); 565 i--; 566 } 567 568 } 569 } 570 571 /////////////////////////////////////////////////////////////////// 572 //// Initializable interface //// 573 574 /** This function is called when the manager is about to run the model 575 * and allows us to register with the manager before the run happens. 576 * We would do this in our constructor but the manager is not created 577 * until the user starts executing the workflow for the first time. 578 */ 579 @Override 580 public void preinitialize() throws IllegalActionException 581 { 582 //_Debug("preinitialize()"); 583 584 585 if(_initializables != null) 586 { 587 for(Initializable initializable : _initializables) 588 { 589 initializable.preinitialize(); 590 } 591 } 592 593 if(checkForSupportedDirector()) 594 { 595 _amExecuting = true; 596 597 CompositeActor topLevel = (CompositeActor)getContainer(); 598 Manager manager = topLevel.getManager(); 599 600 if(!_addedExecutionListener) 601 { 602 manager.addExecutionListener(this); 603 _addedExecutionListener = true; 604 } 605 606 // if requested, record contents of workflow 607 _recordWorkflowContents(); 608 609 // verify all composites containing provenance recorders are opaque 610 for(CompositeActor composite : _provOpaqueSet) 611 { 612 if(!composite.isOpaque()) 613 { 614 String msgStr = "Composite " + composite.getFullName() + 615 " is transparent\n(does not have a director), and " + 616 "contains an instance of the Provenance Recorder.\n" + 617 "The Provenance Recorder can only be placed in " + 618 "opaque composite actors."; 619 620 _debugWrite(msgStr); 621 622 throw new IllegalActionException(this, msgStr); 623 } 624 } 625 626 _haveExecuted = true; 627 628 if(_provStore == null) 629 { 630 throw new IllegalActionException(this, 631 "Invalid recording type: " + recordingType.stringValue()); 632 } 633 else 634 { 635 636 // process any delayed events 637 while(_delayedEventList.size() > 0) 638 { 639 DelayedEvent event = _delayedEventList.remove(0); 640 if(event.isAdd()) 641 { 642 tagAdded((TagEvent)event.getEvent()); 643 } 644 else 645 { 646 tagRemoved((TagEvent)event.getEvent()); 647 } 648 } 649 650 // generate LSID for this execution 651 try 652 { 653 _executionLSID = LSIDGenerator.getInstance().getNewLSID(); 654 } 655 catch(Exception e) 656 { 657 throw new IllegalActionException(this, e, "Error generating new LSID for " + 658 "workflow execution."); 659 } 660 661 // set the execution identifier in the manager 662 manager.setExecutionIdentifier(_executionLSID); 663 664 try 665 { 666 // record start of new workflow execution 667 Date timestamp = new Date(); 668 _provStore.executionStart(_executionLSID, timestamp); 669 670 // notify any piggybacks 671 for(Recording recording : _piggybacks) 672 { 673 recording.executionStart(_executionLSID, timestamp); 674 } 675 } 676 catch(RecordingException e) 677 { 678 throw new IllegalActionException(this, e, "Error recording execution start."); 679 } 680 } 681 } 682 } 683 684 /** Invoke the initialize methods and check for additional 685 * MultiInstanceComposite actors created after preinitialize(). 686 * of objects that have been added using addInitializable(). 687 * @exception IllegalActionException If one of the added objects 688 * throws it. 689 */ 690 @Override 691 public void initialize() throws IllegalActionException 692 { 693 // initialize our Initializables 694 if(_initializables != null) 695 { 696 for(Initializable initializable : _initializables) 697 { 698 initializable.initialize(); 699 } 700 } 701 702 CompositeEntity container = (CompositeEntity)getContainer(); 703 704 // look for additional multi instance composite clones 705 706 for(String masterName : _multiInstanceCompositeMasterNames) 707 { 708 int i = 1; 709 // XXX identifying clones by a specific name is fragile. 710 String cloneName = masterName + "_" + i; 711 NamedObj mic = container.getEntity(cloneName); 712 while(mic != null) 713 { 714 if(! _multiInstanceCompositeClones.contains(mic)) 715 { 716 // register the new multi instance composite 717 718 try 719 { 720 _provStore.evolutionStart(); 721 722 for(Recording recording : _piggybacks) 723 { 724 recording.evolutionStart(); 725 } 726 727 _recordContainerContents(mic); 728 729 // register its relations 730 List<IORelation> relationList = ((Entity<?>)mic).linkedRelationList(); 731 for(IORelation relation : relationList) 732 { 733 _recordContainerContents(relation); 734 735 // register the the links 736 _recordLinksInRelation(relation); 737 } 738 739 _provStore.evolutionStop(); 740 741 for(Recording recording : _piggybacks) 742 { 743 recording.evolutionStop(); 744 } 745 746 } 747 catch(RecordingException e) 748 { 749 throw new IllegalActionException(this, e, "Error registering new MultiInstanceComposites"); 750 } 751 } 752 753 i++; 754 cloneName = masterName + "_" + i; 755 mic = container.getEntity(cloneName); 756 } 757 } 758 759 // clear the multi instance composite sets so that they are empty 760 // at the beginning of each run. 761 _multiInstanceCompositeClones.clear(); 762 _multiInstanceCompositeMasterNames.clear(); 763 } 764 765 /** Perform clean up. This method is executed when the manager is 766 * finishing model execution. 767 */ 768 @Override 769 public void wrapup() throws IllegalActionException 770 { 771 //_Debug("wrapup()"); 772 773 if(_amExecuting) 774 { 775 try 776 { 777 Date timestamp = new Date(); 778 _provStore.executionStop(_executionLSID, timestamp); 779 780 // notify any piggybacks 781 for(Recording recording : _piggybacks) 782 { 783 recording.executionStop(_executionLSID, timestamp); 784 } 785 786 _executionLSID = null; 787 } 788 catch(RecordingException e) 789 { 790 throw new IllegalActionException(this, e, "Error recording execution stop."); 791 } 792 793 _amExecuting = false; 794 } 795 796 // tell initializables to wrapup. 797 // NOTE: we do this after recording execution stop since the 798 // initializables wrapup may rely on workflow execution to 799 // have finished. 800 if(_initializables != null) 801 { 802 for(Initializable initializable : _initializables) 803 { 804 initializable.wrapup(); 805 } 806 } 807 } 808 809 /** Add the specified object to the list of objects whose 810 * preinitialize(), intialize(), and wrapup() 811 * methods should be invoked upon invocation of the corresponding 812 * methods of this object. 813 * @param initializable The object whose methods should be invoked. 814 * @see #removeInitializable(Initializable) 815 * @see ptolemy.actor.CompositeActor#addPiggyback(Executable) 816 */ 817 @Override 818 public void addInitializable(Initializable initializable) 819 { 820 if (_initializables == null) 821 { 822 _initializables = new LinkedList<Initializable>(); 823 } 824 _initializables.add(initializable); 825 } 826 827 /** Remove the specified object from the list of objects whose 828 * preinitialize(), intialize(), and wrapup() 829 * methods should be invoked upon invocation of the corresponding 830 * methods of this object. If the specified object is not 831 * on the list, do nothing. 832 * @param initializable The object whose methods should no longer be 833 * invoked. 834 * @see #addInitializable(Initializable) 835 * @see ptolemy.actor.CompositeActor#removePiggyback(Executable) 836 */ 837 @Override 838 public void removeInitializable(Initializable initializable) 839 { 840 if(_initializables != null) 841 { 842 _initializables.remove(initializable); 843 if(_initializables.size() == 0) 844 { 845 _initializables = null; 846 } 847 } 848 } 849 850 /////////////////////////////////////////////////////////////////// 851 //// ChangeListener interface //// 852 853 /** React to a change request has been successfully executed. 854 * 855 * @param change The change that has been executed, or null if 856 * the change was not done via a ChangeRequest. 857 */ 858 //TODO: NEED TO ADD LISTENERS TO NEWLY ADDED ACTORS AND PORTS!!! 859 @Override 860 public void changeExecuted(ChangeRequest change) 861 { 862 863 // check to make sure we were not removed from the model 864 // NOTE: we stop listening for change events when removed 865 // from model, but sometimes an event can be queued before 866 // we stop listening. we need to ignore them since in 867 // setContainer() we told the recording to disconnect. 868 if(! _removedFromModel) 869 { 870 synchronized(this) 871 { 872 boolean recordContents = false; 873 874 //_Debug("Model change: " + change.getDescription()); 875 //_Debug("Change made by: " + change.getSource()); 876 877 if(change instanceof MoMLChangeRequest) 878 { 879 //_Debug("Model change: " + change.getDescription()); 880 //_Debug("Change made by: " + change.getSource()); 881 882 MoMLChangeRequest momlChange = (MoMLChangeRequest)change; 883 NamedObj context = momlChange.getContext(); 884 885 //_Debug("context = " + context.getFullName()); 886 887 // ignore changes made us 888 if(context != this) 889 { 890 recordContents = true; 891 } 892 } 893 894 // record the contents of the model unless we are executing 895 // or are delaying recording specification. NOTE: if we ignore 896 // the latter, this causes the model to be recorded for every 897 // moml change request, which is very slow for large models. 898 // see http://bugzilla.ecoinformatics.org/show_bug.cgi?id=4764 899 if(recordContents && !_amExecuting && !_delayRecordingSpecificationVal) 900 { 901 try 902 { 903 if(checkForSupportedDirector()) 904 { 905 try 906 { 907 //_Debug("Model change: " + change.getDescription()); 908 //_Debug("Change made by: " + change.getSource()); 909 _recordWorkflowContents(); 910 } 911 catch(IllegalActionException e) 912 { 913 _debugWrite("ERROR trying to record workflow " + 914 "contents: " + e.getMessage()); 915 } 916 } 917 } 918 catch (IllegalActionException e) 919 { 920 MessageHandler.error("Error checking for supported director.", e); 921 } 922 } 923 } 924 } 925 } 926 927 /** React to a change request has resulted in an exception. 928 * 929 * @param change The change that was attempted or null if 930 * the change was not done via a ChangeRequest. 931 * @param exception The exception that resulted. 932 */ 933 @Override 934 public void changeFailed(ChangeRequest change, Exception exception) 935 { 936 System.out.println("Change failed: " + change.getDescription()); 937 System.out.println("Change made by: " + change.getSource()); 938 System.out.println("Exception: " + exception.getMessage()); 939 } 940 941 /////////////////////////////////////////////////////////////////// 942 //// ExecutionListener interface //// 943 944 /** Report an execution failure. This method will be called 945 * when an exception or error is caught by a manager. 946 * Exceptions are reported this way when the run() or startRun() 947 * methods of the manager are used to perform the execution. 948 * If instead the execute() method is used, then exceptions are 949 * not caught, and are instead just passed up to the caller of 950 * the execute() method. Those exceptions are not reported 951 * here (unless, of course, the caller of the execute() method does 952 * so). 953 * 954 * @param manager The manager controlling the execution. 955 * @param throwable The throwable to report. 956 */ 957 @Override 958 public void executionError(Manager manager, Throwable throwable) 959 { 960 if(_provStore != null) 961 { 962 // get the execution lsid 963 KeplerLSID executionLSID = (KeplerLSID)manager.getExecutionIdentifier(throwable); 964 965 if(executionLSID == null) 966 { 967 System.out.println("ERROR: no LSID for throwable: " + throwable.getMessage()); 968 throwable.printStackTrace(); 969 } 970 else 971 { 972 // see if there's a source 973 Nameable source = null; 974 if(throwable instanceof KernelException) 975 { 976 source = ((KernelException)throwable).getNameable1(); 977 } 978 979 try 980 { 981 _provStore.executionError(source, throwable, executionLSID); 982 983 // notify any piggybacks 984 for(Recording recording : _piggybacks) 985 { 986 recording.executionError(source, throwable, executionLSID); 987 } 988 } 989 catch(RecordingException e) 990 { 991 _debugWrite("Error recording execution error: " + 992 e.getMessage()); 993 } 994 } 995 } 996 } 997 998 /** Report that the current execution has finished and the wrapup 999 * sequence has completed normally. 1000 * 1001 * @param manager The manager controlling the execution. 1002 */ 1003 @Override 1004 public void executionFinished(Manager manager) 1005 { 1006 } 1007 1008 /** Report that an execution has been imported */ 1009 public void executionImported(KeplerLSID execLSID){ 1010 // notify any piggybacks 1011 for(Recording recording : _piggybacks) 1012 { 1013 recording.executionImported(execLSID); 1014 } 1015 } 1016 1017 /** Report that the manager has changed state. 1018 * 1019 * @param manager The manager controlling the execution. 1020 */ 1021 @Override 1022 public void managerStateChanged(Manager manager) 1023 { 1024 //Manager.State state = manager.getState(); 1025 //_Debug("managerStateChanged " + state.getDescription()); 1026 } 1027 1028 /////////////////////////////////////////////////////////////////// 1029 //// IOPortEventListener interface //// 1030 1031 /** React to a port read or write. 1032 * 1033 * @param event The event representing the token(s). 1034 */ 1035 @Override 1036 public void portEvent(IOPortEvent event) throws IllegalActionException 1037 { 1038 if(_recordTokensVal) 1039 { 1040 try 1041 { 1042 boolean ignore = false; 1043 IOPort port = event.getPort(); 1044 int type = event.getEventType(); 1045 1046 NamedObj container = (NamedObj)port.getContainer(); 1047 1048 // see if the port's container is our container 1049 if(container == getContainer()) 1050 { 1051 // ignore events outside our container 1052 if((port.isInput() && 1053 (type == IOPortEvent.GET_BEGIN || 1054 type == IOPortEvent.GET_END)) || 1055 (port.isOutput() && 1056 (type == IOPortEvent.SEND_BEGIN || 1057 type == IOPortEvent.SEND_END))) 1058 { 1059 ignore = true; 1060 } 1061 } 1062 // see if the port's container has a ProvenanceRecorder 1063 else if((container instanceof CompositeActor) && 1064 _provOpaqueSet.contains((CompositeActor)container)) 1065 { 1066 // ignore events inside composite actor 1067 if((port.isInput() && 1068 (type == IOPortEvent.SEND_BEGIN || 1069 type == IOPortEvent.SEND_END)) || 1070 (port.isOutput() && 1071 (type == IOPortEvent.GET_BEGIN || 1072 type == IOPortEvent.GET_END))) 1073 { 1074 ignore = true; 1075 } 1076 } 1077 1078 if(!ignore) 1079 { 1080 _provStore.portEvent(event); 1081 1082 // notify any piggybacks 1083 for(Recording recording : _piggybacks) 1084 { 1085 recording.portEvent(event); 1086 } 1087 } 1088 1089 /* 1090 IOPort srcPort = event.getPort(); 1091 int srcType = event.getEventType(); 1092 int newType; 1093 IOPort newPort = null; 1094 1095 if(!srcPort.isOpaque() && srcType == IOPortEvent.GET_END) 1096 { 1097 newType = IOPortEvent.SEND; 1098 newPort = srcPort; 1099 } 1100 else 1101 { 1102 newType = IOPortEvent.GET_END; 1103 } 1104 1105 // send an event for each connected port in table 1106 List<IOPort> connPorts = _portConnTable.get(srcPort); 1107 if(connPorts != null) 1108 { 1109 for(IOPort destPort : connPorts) 1110 { 1111 IOPortEvent newEvent = null; 1112 1113 if(newType == IOPortEvent.GET_END) 1114 { 1115 newPort = destPort; 1116 1117 // do not make a fake receive event for opaque ports. 1118 if(destPort.isOpaque()) 1119 { 1120 continue; 1121 } 1122 } 1123 1124 //_Debug("resending " + destPort); 1125 1126 if(event.getVectorLength() == IOPortEvent.SINGLETOKEN) 1127 { 1128 newEvent = new IOPortEvent(newPort, newType, 1129 event.getChannel(), event.getOutsideFlag(), 1130 event.getToken()); 1131 } 1132 else 1133 { 1134 newEvent = new IOPortEvent(newPort, newType, 1135 event.getChannel(), event.getOutsideFlag(), 1136 event.getTokenArray(), event.getVectorLength()); 1137 } 1138 1139 _Debug("FAKE"); 1140 portEvent(newEvent); 1141 } 1142 } 1143 */ 1144 } 1145 catch(RecordingException e) 1146 { 1147 // this is not necessarily an error since it may 1148 // be expected (e.g., port condition). 1149 //_debugWrite("ERROR: RecordingException: " + e.getMessage()); 1150 1151 Throwable cause = e.getCause(); 1152 if(cause != null && (cause instanceof IllegalActionException)) 1153 { 1154 throw (IllegalActionException)cause; 1155 } 1156 } 1157 } 1158 } 1159 1160 /////////////////////////////////////////////////////////////////// 1161 //// ActorFiringListener interface //// 1162 1163 /** React to an actor firing. 1164 * 1165 * @param event The event representing the token(s) sent. 1166 */ 1167 @Override 1168 public void firingEvent(FiringEvent event) 1169 { 1170 Actor actor = event.getActor(); 1171 1172 // see if this actor has been registered. 1173 if(! _firingEventSenders.contains(actor)) 1174 { 1175 /* 1176 System.out.println("firing from unregistered actor: " + 1177 actor.getFullName()); 1178 1179 new Exception().printStackTrace(System.out); 1180 */ 1181 1182 try 1183 { 1184 _provStore.evolutionStart(); 1185 1186 for(Recording recording : _piggybacks) 1187 { 1188 recording.evolutionStart(); 1189 } 1190 1191 _recordContainerContents((NamedObj)actor); 1192 1193 _provStore.evolutionStop(); 1194 1195 for(Recording recording : _piggybacks) 1196 { 1197 recording.evolutionStop(); 1198 } 1199 } 1200 catch(RecordingException e) 1201 { 1202 System.out.println("Error recording container contents: " + 1203 e.getMessage()); 1204 } 1205 } 1206 1207 try 1208 { 1209 _provStore.actorFire(event); 1210 1211 // notify any piggybacks 1212 for(Recording recording : _piggybacks) 1213 { 1214 recording.actorFire(event); 1215 } 1216 } 1217 catch(RecordingException e) 1218 { 1219 _debugWrite("Error recording actor firing: " + e.getMessage()); 1220 } 1221 } 1222 1223 /////////////////////////////////////////////////////////////////// 1224 //// DebugListener interface //// 1225 1226 /** React to a generic debug event. Actors and directors may use 1227 * this interface to inform listeners of firing events. 1228 */ 1229 @Override 1230 public void event(DebugEvent event) 1231 { 1232 if(event instanceof FiringEvent) 1233 { 1234 firingEvent((FiringEvent)event); 1235 } 1236 else if(event instanceof ProvenanceEvent) 1237 { 1238 try 1239 { 1240 _provStore.customProvEvent((ProvenanceEvent)event); 1241 1242 // notify any piggybacks 1243 for(Recording recording : _piggybacks) 1244 { 1245 recording.customProvEvent((ProvenanceEvent)event); 1246 } 1247 } 1248 catch(RecordingException e) 1249 { 1250 System.out.println("ERROR: RecordingException: " + 1251 e.getMessage()); 1252 } 1253 } 1254 else if(event instanceof IOPortEvent) 1255 { 1256 try 1257 { 1258 portEvent((IOPortEvent)event); 1259 } 1260 catch (IllegalActionException e) 1261 { 1262 System.out.println("ERROR: port event: " + e.getMessage()); 1263 } 1264 } 1265 else if(event instanceof IOPortRefillEvent) 1266 { 1267 try 1268 { 1269 _provStore.refillPortEvent((IOPortRefillEvent)event); 1270 1271 for(Recording recording : _piggybacks) 1272 { 1273 recording.refillPortEvent((IOPortRefillEvent)event); 1274 } 1275 } 1276 catch(RecordingException e) 1277 { 1278 System.out.println("ERROR: RecordingException: " + 1279 e.getMessage()); 1280 } 1281 } 1282 } 1283 1284 /** Add a recording type. */ 1285 public void addRecordingType(String typeName, String typeImpl) 1286 throws IllegalActionException 1287 { 1288 _recordingTypesMap.put(typeName, typeImpl); 1289 _populateChoices(); 1290 } 1291 1292 /** Remove a recording type. */ 1293 public void removeRecordingType(String typeName) 1294 throws IllegalActionException 1295 { 1296 if(_recordingTypesMap.remove(typeName) == null) 1297 { 1298 throw new IllegalActionException(this, 1299 "No such Recording type to remove: " + typeName); 1300 } 1301 _populateChoices(); 1302 } 1303 1304 /** Load the default recording types. */ 1305 public void loadRecordingTypes() throws IllegalActionException 1306 { 1307 ConfigurationProperty typesProp = 1308 ConfigurationManager.getInstance().getProperty( 1309 ConfigurationManager.getModule("provenance"), "provenance.recordingTypes"); 1310 _recordingTypesMap = ConfigurationUtilities.getPairsMap(typesProp); 1311 1312 // make sure all keys and values are not empty. 1313 for(Map.Entry<String, String> entry : _recordingTypesMap.entrySet()) 1314 { 1315 String key = entry.getKey(); 1316 String value = entry.getValue(); 1317 if(key == null || key.trim().length() == 0) 1318 { 1319 throw new IllegalActionException(this, 1320 "Missing key in recording types."); 1321 } 1322 else if(value == null || value.trim().length() == 0) 1323 { 1324 throw new IllegalActionException(this, 1325 "Missing value in recording types."); 1326 } 1327 } 1328 _populateChoices(); 1329 } 1330 1331 /** Load the recording types from a specific file. */ 1332 public void loadRecordingTypes(File file) throws IllegalActionException 1333 { 1334 _recordingTypesMap = new HashMap<String,String>(); 1335 1336 Properties propFile = new Properties(); 1337 InputStream is = null; 1338 // attempt to load the properties file. 1339 try 1340 { 1341 is = new FileInputStream(file); 1342 propFile.load(is); 1343 } 1344 catch(FileNotFoundException e) 1345 { 1346 throw new IllegalActionException(this, "Cannot load recording " + 1347 "types from " + file + ": " + e.getMessage()); 1348 } 1349 catch(IOException e) 1350 { 1351 throw new IllegalActionException(this, 1352 "Error reading " + file + ": " + e.getMessage()); 1353 } 1354 finally 1355 { 1356 if(is != null) 1357 { 1358 try 1359 { 1360 is.close(); 1361 } 1362 catch(IOException e) 1363 { 1364 System.out.println("Unable to close " + file + ": " + e.getMessage()); 1365 } 1366 } 1367 } 1368 1369 for(Map.Entry<Object, Object> entry : propFile.entrySet()) 1370 { 1371 _recordingTypesMap.put((String)entry.getKey(), 1372 (String)entry.getValue()); 1373 } 1374 1375 _populateChoices(); 1376 } 1377 1378 /** Save the recording types to a specific properties file. 1379 * 1380 * @param file the properties file path. 1381 * @param header optional header for properties file. 1382 */ 1383 public void saveRecordingTypes(String file, String header) 1384 throws IllegalActionException 1385 { 1386 Properties propFile = new Properties(); 1387 for(Map.Entry<String, String> entry : _recordingTypesMap.entrySet()) 1388 { 1389 propFile.setProperty(entry.getKey(), entry.getValue()); 1390 } 1391 1392 OutputStream out = null; 1393 try 1394 { 1395 out = new FileOutputStream(file); 1396 propFile.store(out, header); 1397 } 1398 catch(FileNotFoundException e) 1399 { 1400 throw new IllegalActionException(this, 1401 "FileNotFoundException when trying to write to " + file + 1402 ": " + e.getMessage()); 1403 } 1404 catch(IOException e) 1405 { 1406 throw new IllegalActionException(this, 1407 "IOException when trying to write to " + file + ": " + 1408 e.getMessage()); 1409 } 1410 finally 1411 { 1412 if(out != null) 1413 { 1414 try 1415 { 1416 out.close(); 1417 } 1418 catch(IOException e) 1419 { 1420 System.out.println("Unable to close " + file + ": " + e.getMessage()); 1421 } 1422 } 1423 } 1424 } 1425 1426 /////////////////////////////////////////////////////////////////// 1427 //// ValueListener interface //// 1428 1429 /** React to a change in parameter value. */ 1430 @Override 1431 public void valueChanged(Settable settable) 1432 { 1433 try 1434 { 1435 _workspace.getWriteAccess(); 1436 1437 // ignore the settable if it's pending 1438 if(_pendingSettables.contains(settable)) 1439 { 1440 return; 1441 } 1442 else 1443 { 1444 _pendingSettables.add(settable); 1445 } 1446 1447 String name = _valueEventSenders.get(settable); 1448 1449 // if the names do not match, then the parameter was 1450 // removed from the model. 1451 if(name == null || 1452 ! name.equals(((NamedObj)settable).getFullName())) 1453 { 1454 // remove the cached last value. 1455 _paramValueTable.remove(settable); 1456 } 1457 else 1458 { 1459 // NOTE: calling getValueAsString() may generate 1460 // another valueChanged event, so we do that first 1461 // before retrieving the old value from the table. 1462 // 1463 // the following function call sequence can occur: 1464 // 1465 // valueChanged() -> getValueAsString() -> valueChanged() 1466 // A B C 1467 // 1468 // if we retrieve the old value in A before calling 1469 // B, then the update that C makes will not be found. 1470 // in this case, A will always find the old value 1471 // different from the current value. 1472 // 1473 // instead, we retrieve the old value in A after calling 1474 // B so that C has updated the table. 1475 // 1476 1477 String newVal = settable.getValueAsString(); 1478 1479 String oldVal = _paramValueTable.get(settable); 1480 if(oldVal == null || !oldVal.equals(newVal)) 1481 { 1482 /* 1483 _Debug("changed value: " + 1484 ((NamedObj)settable).getFullName() + " to = " + newVal); 1485 */ 1486 1487 _paramValueTable.put(settable, newVal); 1488 1489 try 1490 { 1491 _provStore.evolutionStart(); 1492 _provStore.regParameter((NamedObj)settable); 1493 _provStore.evolutionStop(); 1494 1495 // notify any piggybacks 1496 for(Recording recording : _piggybacks) 1497 { 1498 recording.evolutionStart(); 1499 recording.regParameter((NamedObj)settable); 1500 recording.evolutionStop(); 1501 } 1502 1503 1504 } 1505 catch(RecordingException e) 1506 { 1507 _debugWrite("ERROR recording value change: " + 1508 e.getMessage()); 1509 } 1510 } 1511 1512 _pendingSettables.remove(settable); 1513 } 1514 } 1515 finally 1516 { 1517 // we didn't change the workspace, so call doneTemporaryWriting() 1518 // so that the workspace version is not incremented. 1519 _workspace.doneTemporaryWriting(); 1520 } 1521 } 1522 1523 /////////////////////////////////////////////////////////////////// 1524 //// SshEventListener interface //// 1525 1526 /** React to an ssh event. */ 1527 @Override 1528 public void sshEvent(SshEvent event) 1529 { 1530 // XXX no implementation yet. 1531 } 1532 1533 /////////////////////////////////////////////////////////////////// 1534 //// WorkflowRenameListener interface //// 1535 1536 /** A workflow was renamed. 1537 * 1538 * @param namedObj the workflow 1539 * @param oldLSID the previous LSID 1540 * @param newLSID the new LSID 1541 * @param oldName the previous name 1542 * @param newName the new name 1543 * @see WorkflowRenameListener 1544 */ 1545 @Override 1546 public void renamedWorkflow(NamedObj namedObj, KeplerLSID oldLSID, 1547 KeplerLSID newLSID, String oldName, String newName) 1548 { 1549 // make sure the workflow is our container since we receive 1550 // rename events for all open workflows. 1551 if(namedObj == getContainer()) 1552 { 1553 if(_provStore != null) 1554 { 1555 try 1556 { 1557 _provStore.renamedWorkflow(namedObj, oldLSID, newLSID, oldName, 1558 newName); 1559 1560 // notify any piggybacks 1561 for (Recording recording : _piggybacks) 1562 { 1563 recording.renamedWorkflow(namedObj, oldLSID, newLSID, 1564 oldName, newName); 1565 } 1566 } 1567 catch (RecordingException e) 1568 { 1569 // TODO Auto-generated catch block 1570 e.printStackTrace(); 1571 } 1572 } 1573 } 1574 } 1575 1576 /////////////////////////////////////////////////////////////////// 1577 //// TagEventListener interface //// 1578 1579 @Override 1580 public void tagAdded(TagEvent event) 1581 { 1582 _delayRecordingSpecificationVal = false; 1583 try { 1584 NamedObj tagEventSource = event.getSource(); 1585 if (!(tagEventSource instanceof WorkflowRun)) { 1586 _recordWorkflowContents(); 1587 } 1588 } 1589 catch(IllegalActionException ex) { 1590 log.error("Failed to record workflow", ex); 1591 } 1592 1593 if(_haveExecuted || !_delayRecordingSpecificationVal) 1594 { 1595 try 1596 { 1597 _provStore.tagAdded(event); 1598 // notify any piggybacks 1599 for(Recording recording : _piggybacks) 1600 { 1601 recording.tagAdded(event); 1602 } 1603 } 1604 catch(RecordingException e) 1605 { 1606 System.out.println("Error adding tag: " + e.getMessage()); 1607 } 1608 } 1609 else 1610 { 1611 // queue the event 1612 DelayedEvent delayed = new DelayedEvent(event, true); 1613 _delayedEventList.add(delayed); 1614 } 1615 } 1616 1617 @Override 1618 public void tagRemoved(TagEvent event) 1619 { 1620 _delayRecordingSpecificationVal = false; 1621 try { 1622 NamedObj tagEventSource = event.getSource(); 1623 if (!(tagEventSource instanceof WorkflowRun)) { 1624 _recordWorkflowContents(); 1625 } 1626 } 1627 catch(IllegalActionException ex) { 1628 log.error("Failed to record workflow", ex); 1629 } 1630 1631 if(_haveExecuted || !_delayRecordingSpecificationVal) 1632 { 1633 try 1634 { 1635 _provStore.tagRemoved(event); 1636 // notify any piggybacks 1637 for(Recording recording : _piggybacks) 1638 { 1639 recording.tagRemoved(event); 1640 } 1641 } 1642 catch(RecordingException e) 1643 { 1644 System.out.println("Error removing tag: " + e.getMessage()); 1645 } 1646 1647 } 1648 else 1649 { 1650 // queue the event 1651 DelayedEvent delayed = new DelayedEvent(event, false); 1652 _delayedEventList.add(delayed); 1653 } 1654 } 1655 1656 1657 /////////////////////////////////////////////////////////////////// 1658 1659 /** Set a writer for debugging output. */ 1660 public void setDebugWriter(Writer writer) 1661 { 1662 _debugWriter = writer; 1663 if(_provStore != null) 1664 { 1665 _provStore.setDebugWriter(writer); 1666 } 1667 } 1668 1669 /** Get the provenance recorder with for a NamedObj. The "default" 1670 * provenance recorder for a NamedObj is stored at the top level of 1671 * the hierarchy and is named using the TOP Recorder Name in the 1672 * configuration file. If no default recorder exists, returns null. 1673 */ 1674 public static ProvenanceRecorder getDefaultProvenanceRecorder(NamedObj namedObj) 1675 { 1676 Map<String,String> defaultsMap = getDefaultsMap(); 1677 String name = defaultsMap.get(_DEFAULT_NAME); 1678 1679 NamedObj top = namedObj.toplevel(); 1680 return (ProvenanceRecorder)top.getAttribute(name); 1681 } 1682 1683 /** Get a queryable from the default provenance recorder for a NamedObj. 1684 * The "default" provenance recorder for a NamedObj is stored at the top 1685 * level of the hierarchy and is named using the TOP Recorder Name in 1686 * the configuration file. If no default recorder exists, returns null. 1687 * If the NamedObj is null, this returns the Queryable for the default 1688 * provenance recording. 1689 */ 1690 public static Queryable getDefaultQueryable(NamedObj namedObj) 1691 throws QueryException, RecordingException 1692 { 1693 ProvenanceRecorder recorder = null; 1694 1695 if(namedObj != null) { 1696 recorder = getDefaultProvenanceRecorder(namedObj); 1697 1698 if(recorder != null) 1699 { 1700 Recording recording = recorder.getRecording(); 1701 if(recording != null) 1702 { 1703 return recording.getQueryable(true); 1704 } 1705 } 1706 } 1707 1708 try { 1709 recorder = new ProvenanceRecorder(); 1710 } catch (IllegalActionException | NameDuplicationException e) { 1711 throw new RecordingException("Unable to create ProvenanceRecorder.", e); 1712 } 1713 return recorder.getRecording().getQueryable(false); 1714 } 1715 1716 /** Get current RecordingParameters. XXX how does this differ from 1717 * attributeList()? 1718 */ 1719 public RecordingParameters getRecordingParameters() 1720 { 1721 return _provStoreParams; 1722 } 1723 1724 /** Get the current recording. */ 1725 public Recording getRecording() 1726 { 1727 return _provStore; 1728 } 1729 1730 /** Set the persistence of parameters. */ 1731 public void setPersistentParameters(boolean persistent) 1732 { 1733 username.setPersistent(persistent); 1734 hostname.setPersistent(persistent); 1735 containerWorkflowLSID.setPersistent(persistent); 1736 containerWorkflowName.setPersistent(persistent); 1737 delayRecordingSpecification.setPersistent(persistent); 1738 recordTokens.setPersistent(persistent); 1739 recordingType.setPersistent(persistent); 1740 1741 if(_provStoreParams != null) 1742 { 1743 _provStoreParams.setPersistent(persistent); 1744 } 1745 } 1746 1747 /** Add the specified recording to the list of recordings notified 1748 * for each provenance event. NOTE: the recordings are notified 1749 * <b>after</b> the internal Recording. 1750 * 1751 * @param recording the Recording to be notified 1752 */ 1753 public void addPiggyback(Recording recording) 1754 { 1755 if(!_piggybacks.contains(recording)) 1756 { 1757 _piggybacks.add(recording); 1758 recording.setContainer(this); 1759 } 1760 } 1761 1762 /** Remove the specified recording from the list of recordings 1763 * notified for each provenance event. 1764 * 1765 * @param recording the Recording to remove 1766 * @see ProvenanceRecorder#addPiggyback 1767 */ 1768 public void removePiggyback(Recording recording) 1769 { 1770 _piggybacks.remove(recording); 1771 recording.setContainer(null); 1772 } 1773 1774 /** Add the specified recording to the list of recordings that receive 1775 * provenance events from all ProvenanceRecorders created in the 1776 * future. NOTE: this does not include existing ProvenanceRecorders. 1777 * 1778 * @param recording the Recording to be notified 1779 */ 1780 public static void addPiggybackForFuture(Recording recording) 1781 { 1782 if(!_futurePiggybacks.containsKey(recording)) 1783 { 1784 _futurePiggybacks.put(recording, null); 1785 } 1786 } 1787 1788 /** Remove the specified recording from the list of recordings that 1789 * receive provenance events from all ProvenanceRecorders created 1790 * in the future. 1791 * 1792 * @param recording the Recording to remove 1793 * @see ProvenanceRecorder#addPiggybackForFuture 1794 */ 1795 public static void removePiggybackForFuture(Recording recording) 1796 { 1797 _futurePiggybacks.remove(recording); 1798 } 1799 1800 /** Return list of recordings that receive provenance events from all 1801 * ProvenanceRecorders created in the future. 1802 * @param recording 1803 * @return _futurePiggybacks 1804 */ 1805 public static List<Recording> getFuturisticPiggybacks(Recording recording) 1806 { 1807 List<Recording> recordings = new LinkedList<Recording>(); 1808 Iterator<Recording> _futurePiggybacksKeyIt = _futurePiggybacks.keySet().iterator(); 1809 while (_futurePiggybacksKeyIt.hasNext()) 1810 { 1811 Recording record = _futurePiggybacksKeyIt.next(); 1812 if (record != null) 1813 recordings.add(record); 1814 } 1815 return recordings; 1816 } 1817 1818 /** Returns true if the composite actor contains a director supported 1819 * by the provenance recorder or no director. 1820 */ 1821 public static boolean containsSupportedDirector(CompositeActor composite) 1822 { 1823 boolean retval = false; 1824 1825 Director director = composite.getDirector(); 1826 if(director == null || 1827 (director instanceof SDFDirector) || 1828 (director instanceof DDFDirector) || 1829 (director instanceof PNDirector) || 1830 1831 // DE director sends actor firing events to listeners 1832 // of the director instead of listeners of the actors, 1833 // so provenance recording is not fully supported. 1834 //(director instanceof DEDirector) || 1835 1836 // IterateOverArray uses IterateDirector, which is 1837 // private 1838 (composite instanceof IterateOverArray) || 1839 (director instanceof CaseDirector)) 1840 { 1841 retval = true; 1842 } 1843 1844 return retval; 1845 } 1846 1847 /** Check if the director is supported. If it is not, then provenance 1848 * recording is turned off. 1849 */ 1850 public boolean checkForSupportedDirector() throws IllegalActionException 1851 { 1852 boolean retval = containsSupportedDirector((CompositeActor)getContainer()); 1853 1854 if(!retval) 1855 { 1856 recordingType.setToken(ProvenanceRecorder.DEFAULT_REC_TYPE_IGNORE); 1857 } 1858 1859 return retval; 1860 } 1861 1862 /** Get the execution LSID. If the model is not running, 1863 * returns null. 1864 */ 1865 public KeplerLSID getExecutionLSID() 1866 { 1867 return _executionLSID; 1868 } 1869 1870 /** Add the default provenance recorder to a container if it does not 1871 * already contain a provenance recorder. 1872 * 1873 * @param container The container in which to add the default provenance 1874 * recorder. 1875 * @param listener Register this object to listen for provenance enabled 1876 * events. This parameter can be null. 1877 * @param effigy The effigy displaying the container. This parameter can 1878 * be null. 1879 * @return Returns true if the default provenance recorder is recording. 1880 * This may be false due to an unsupported director in container, or the 1881 * default recording type set to ignore. 1882 * 1883 */ 1884 public static boolean addProvenanceRecorder(CompositeActor container, 1885 ProvenanceEnabledListener listener, Effigy effigy) 1886 throws IllegalActionException, NameDuplicationException 1887 { 1888 1889 Map<String,String> defaultsMap = getDefaultsMap(); 1890 String defaultName = defaultsMap.get(_DEFAULT_NAME); 1891 1892 ProvenanceRecorder defaultRecorder = null; 1893 List<?> recorderList = container.attributeList(ProvenanceRecorder.class); 1894 for(Object object : recorderList) 1895 { 1896 final ProvenanceRecorder recorder = (ProvenanceRecorder)object; 1897 1898 // see if this is the default 1899 if(recorder.getName().equals(defaultName)) 1900 { 1901 defaultRecorder = recorder; 1902 break; 1903 } 1904 } 1905 1906 // see if there were no other provenance recorders 1907 if(recorderList.isEmpty()) 1908 { 1909 // add the default provenance recorder 1910 defaultRecorder = new ProvenanceRecorder(container); 1911 1912 // set the workflow name to be the model name 1913 String wfNameStr = container.getName(); 1914 1915 Parameter wfNameParameter = 1916 (Parameter)defaultRecorder.getAttribute(SQLRecordingParameters.wfNameParamStr); 1917 1918 // see if it was found (not all recording types have this parameter) 1919 if(wfNameParameter != null) 1920 { 1921 wfNameParameter.setToken(wfNameStr); 1922 } 1923 1924 if(listener != null) 1925 { 1926 defaultRecorder.addEnabledListener(listener); 1927 } 1928 1929 // do not save in moml 1930 defaultRecorder.setPersistent(false); 1931 } 1932 1933 boolean enabled = false; 1934 1935 // see if we found the default recorder 1936 if(defaultRecorder != null) 1937 { 1938 // see if the director is supported 1939 if(!ProvenanceRecorder.containsSupportedDirector(container)) 1940 { 1941 // turn off recording 1942 defaultRecorder.recordingType.setToken(ProvenanceRecorder.DEFAULT_REC_TYPE_IGNORE); 1943 } 1944 else 1945 { 1946 // get the default recording type 1947 Map<String,String> map = ProvenanceRecorder.getDefaultsMap(); 1948 String recordingType = map.get(_RECORDING_TYPE); 1949 1950 defaultRecorder.recordingType.setToken(recordingType); 1951 enabled = defaultRecorder.isEnabled(); 1952 } 1953 1954 // see if the default provenance recorder was stored in the moml 1955 if(defaultRecorder.isPersistent() && effigy != null) 1956 { 1957 System.out.println("removing PR from moml"); 1958 defaultRecorder.setPersistent(false); 1959 effigy.setModified(true); 1960 } 1961 1962 } 1963 1964 // causes new (blank) workflows to prompt for save 1965 // see: http://bugzilla.ecoinformatics.org/show_bug.cgi?id=4200 1966 //_effigy.setModified(true); 1967 1968 return enabled; 1969 } 1970 1971 /** Set the state serializer for recordings. */ 1972 public void setStateSerializer(StateSerializer serializer) 1973 { 1974 _stateSerializer = serializer; 1975 1976 if(_provStore != null) 1977 { 1978 _provStore.setStateSerializer(serializer); 1979 } 1980 1981 // notify any piggybacks 1982 for(Recording recording : _piggybacks) 1983 { 1984 recording.setStateSerializer(serializer); 1985 } 1986 } 1987 1988 /** Remove the state serializer. */ 1989 public void removeStateSerializer() 1990 { 1991 _stateSerializer = null; 1992 1993 if(_provStore != null) 1994 { 1995 _provStore.removeStateSerializer(); 1996 } 1997 1998 // notify any piggybacks 1999 for(Recording recording : _piggybacks) 2000 { 2001 recording.removeStateSerializer(); 2002 } 2003 } 2004 2005 /////////////////////////////////////////////////////////////////// 2006 //// private methods ////// 2007 2008 /** Initialize the provenance execution listener. */ 2009 private synchronized void _init() 2010 throws IllegalActionException, NameDuplicationException 2011 { 2012 _attachText("_iconDescription", "<svg>\n" + 2013 "<rect x=\"-50\" y=\"-15\" " 2014 + "width=\"100\" height=\"30\" " 2015 + "style=\"fill:blue\"/>\n" + 2016 "</svg>\n"); 2017 2018 _addedExecutionListener = false; 2019 _amExecuting = false; 2020 _changeEventSenders = new HashSet<NamedObj>(); 2021 _curRecordingTypeStr = null; 2022 _debugEventSenders = new HashSet<Debuggable>(); 2023 _debugWriter = null; 2024 _delayedEventList = new LinkedList<DelayedEvent>(); 2025 _enabledListeners = new LinkedList<WeakReference<ProvenanceEnabledListener>>(); 2026 _executionLSID = null; 2027 _firingEventSenders = new HashSet<FiringsRecordable>(); 2028 _haveExecuted = false; 2029 _initializables = null; 2030 _isEnabled = true; 2031 _multiInstanceCompositeClones = new HashSet<MultiInstanceComposite>(); 2032 _multiInstanceCompositeMasterNames = new HashSet<String>(); 2033 _paramValueTable = new Hashtable<Settable,String>(); 2034 _pendingSettables = new HashSet<Settable>(); 2035 _piggybacks = new LinkedList<Recording>(_futurePiggybacks.keySet()); 2036 _plParams = new LinkedHashSet<String>(); 2037 _portEventSenders = new HashSet<IOPort>(); 2038 _possibleRecordingAttributes = new HashSet<Attribute>(); 2039 //_portConnTable = new Hashtable<IOPort,List<IOPort>>(); 2040 _provOpaqueSet = new HashSet<CompositeActor>(); 2041 _provStore = null; 2042 _provStoreParams = null; 2043 _recordingTypesMap = null; 2044 _removedFromModel = false; 2045 _settingsProperty = null; 2046 _stateSerializer = null; 2047 _valueEventSenders = new Hashtable<Settable,String>(); 2048 2049 2050 for(Recording recording : _piggybacks) { 2051 recording.setContainer(this); 2052 } 2053 2054 NamedObj container = getContainer(); 2055 if(container != null) 2056 { 2057 _changeEventSenders.add(container); 2058 } 2059 2060 Map<String,String> settingsMap = getSettingsMap(); 2061 2062 String defaultValue; 2063 2064 String name = "Record Token Values"; 2065 _plParams.add(name); 2066 if(recordTokens == null) { 2067 recordTokens = new Parameter(this, name); 2068 defaultValue = settingsMap.get(name); 2069 recordTokens.setToken(new BooleanToken(defaultValue)); 2070 recordTokens.setTypeEquals(BaseType.BOOLEAN); 2071 recordTokens.setVisibility(Settable.EXPERT); 2072 } 2073 2074 name = "User Name"; 2075 _plParams.add(name); 2076 if(username == null) { 2077 username = new StringParameter(this, name); 2078 username.setVisibility(Settable.EXPERT); 2079 } 2080 2081 name = "Machine Name"; 2082 _plParams.add(name); 2083 if(hostname == null) { 2084 hostname = new StringParameter(this, name); 2085 hostname.setVisibility(Settable.EXPERT); 2086 } 2087 2088 name = "containerWorkflowLSID"; 2089 _plParams.add(name); 2090 if(containerWorkflowLSID == null) { 2091 containerWorkflowLSID = new StringParameter(this, name); 2092 containerWorkflowLSID.setVisibility(Settable.EXPERT); 2093 } 2094 2095 name = "containerWorkflowName"; 2096 _plParams.add(name); 2097 if(containerWorkflowName == null) { 2098 containerWorkflowName = new StringParameter(this, name); 2099 containerWorkflowName.setVisibility(Settable.EXPERT); 2100 } 2101 2102 name = "Delay Recording Specification"; 2103 _plParams.add(name); 2104 if(delayRecordingSpecification == null) { 2105 delayRecordingSpecification = new Parameter(this, name, 2106 BooleanToken.getInstance(_delayRecordingSpecificationVal)); 2107 delayRecordingSpecification.setTypeEquals(BaseType.BOOLEAN); 2108 delayRecordingSpecification.setVisibility(Settable.EXPERT); 2109 } 2110 2111 // set the recording type. 2112 // NOTE: when the recording type changes, the new recording 2113 // is told about some of the parameters, such as user name 2114 // and machine name. These parameters therefore must be 2115 // created, so we change the recording type last. 2116 // 2117 // NOTE: add the recording type to the prov recorders 2118 // parameters before we set the default recording type; 2119 // otherwise it will get removed. 2120 // 2121 name = _RECORDING_TYPE; 2122 _plParams.add(name); 2123 boolean isClone = false; 2124 if(recordingType == null) { 2125 recordingType = new StringParameter(this, name); 2126 isClone = true; 2127 } 2128 2129 // NOTE: we must load the recording types before we set the default 2130 // recording type. 2131 loadRecordingTypes(); 2132 2133 // NOTE: if this ProvenanceRecorder is a clone, then we do 2134 // not want to set the recording type to the default value. 2135 // instead, call _changeRecordingType() to instantiate the 2136 // provStore to the type specified by the original 2137 // ProvenanceRecorder's recording type. 2138 if(isClone) { 2139 defaultValue = settingsMap.get(name); 2140 recordingType.setToken(new StringToken(defaultValue)); 2141 } else { 2142 _changeRecordingType(); 2143 } 2144 2145 // set any remaining default settings. 2146 for(Map.Entry<String,String> entry : settingsMap.entrySet()) 2147 { 2148 name = entry.getKey(); 2149 if(!_plParams.contains(name)) 2150 { 2151 Parameter parameter = (Parameter)getAttribute(name); 2152 if(parameter != null) 2153 { 2154 parameter.setExpression(entry.getValue()); 2155 _provStore.attributeChanged(parameter); 2156 } 2157 } 2158 } 2159 2160 /* not implemented yet... 2161 recordSshData = new Parameter(this, "Record Ssh System Data", 2162 new BooleanToken(_recordSshDataVal)); 2163 recordSshData.setTypeEquals(BaseType.BOOLEAN); 2164 */ 2165 2166 _recorders.add(new WeakReference<ProvenanceRecorder>(this)); 2167 } 2168 2169 /** Add valid recording type choices to the recording types parameter 2170 * drop-down box. 2171 */ 2172 private void _populateChoices() 2173 { 2174 recordingType.removeAllChoices(); 2175 2176 // add the types to the drop-down box 2177 for(String name : _recordingTypesMap.keySet()) 2178 { 2179 //_Debug("adding recording type choice " + (String)name); 2180 recordingType.addChoice(name); 2181 } 2182 2183 //Exception e = new Exception(); 2184 //e.printStackTrace(); 2185 } 2186 2187 /** Output the model contents */ 2188 private void _recordContainerContents(NamedObj namedObj) 2189 throws RecordingException 2190 { 2191 // whether to recurse based on Recording register method. 2192 boolean recurse = true; 2193 2194 if(namedObj instanceof Actor) 2195 { 2196 if(namedObj instanceof MultiInstanceComposite) 2197 { 2198 MultiInstanceComposite mic = (MultiInstanceComposite)namedObj; 2199 2200 // see if it's the master 2201 // XXX this seems fragile 2202 if(mic.instance.getExpression().equals("0")) 2203 { 2204 _multiInstanceCompositeMasterNames.add(mic.getName(getContainer())); 2205 } 2206 else 2207 { 2208 _multiInstanceCompositeClones.add(mic); 2209 } 2210 } 2211 2212 recurse = _provStore.regActor((Actor)namedObj); 2213 2214 // notify any piggybacks 2215 for(Recording recording : _piggybacks) 2216 { 2217 recording.regActor((Actor)namedObj); 2218 } 2219 2220 _addDebugEventSender(namedObj); 2221 } 2222 else if(namedObj instanceof Director) 2223 { 2224 recurse = _provStore.regDirector((Director)namedObj); 2225 2226 // notify any piggybacks 2227 for(Recording recording : _piggybacks) 2228 { 2229 recording.regDirector((Director)namedObj); 2230 } 2231 2232 _addDebugEventSender(namedObj); 2233 } 2234 else if(namedObj instanceof TypedIOPort) 2235 { 2236 TypedIOPort port = (TypedIOPort)namedObj; 2237 _provStore.regPort(port); 2238 2239 // do not record information about attributes contained by the port 2240 recurse = false; 2241 2242 // notify any piggybacks 2243 for(Recording recording : _piggybacks) 2244 { 2245 recording.regPort(port); 2246 } 2247 2248 // only listen to opaque ports; transparent ones do not 2249 // send events. 2250 if(port.isOpaque()) 2251 { 2252 _addIOPortEventSender(port); 2253 } 2254 2255 /* 2256 // if the port is an output port or transparent input, add all 2257 // outgoing connections to portConnTable. 2258 if(port.isOutput() || !port.isOpaque()) 2259 { 2260 List tablePorts = new LinkedList<IOPort>(); 2261 2262 List connPorts = null; 2263 if(port.isOutput()) 2264 { 2265 connPorts = port.connectedPortList(); 2266 } 2267 else 2268 { 2269 connPorts = port.insidePortList(); 2270 } 2271 2272 Iterator iter = connPorts.iterator(); 2273 while(iter.hasNext()) 2274 { 2275 IOPort conn = (IOPort)iter.next(); 2276 //if(!conn.isOpaque() || !port.isOpaque()) 2277 //{ 2278 tablePorts.add(conn); 2279 //} 2280 } 2281 if(tablePorts.size() > 0) 2282 { 2283 _Debug("table rcver " + port.getFullName() + " -> " + 2284 tablePorts); 2285 _portConnTable.put(port, tablePorts); 2286 } 2287 } 2288 */ 2289 } 2290 else if(namedObj instanceof IORelation) 2291 { 2292 IORelation rel = (IORelation)namedObj; 2293 recurse = _provStore.regRelation(rel); 2294 // notify any piggybacks 2295 for(Recording recording : _piggybacks) 2296 { 2297 recording.regRelation(rel); 2298 } 2299 } 2300 else if(namedObj != this && 2301 // NOTE: do not register a connected port parameter as a parameter 2302 // since during execution, reading a token by the port generates 2303 // both port and value change events. value change events are 2304 // treated as a workflow evolution, which can be expensive. 2305 (!(namedObj instanceof PortParameter) || 2306 ((PortParameter)namedObj).getPort().numberOfSources() == 0)) 2307 { 2308 recurse = _provStore.regParameter(namedObj); 2309 2310 // notify any piggybacks 2311 for(Recording recording : _piggybacks) 2312 { 2313 recording.regParameter(namedObj); 2314 } 2315 2316 // listen for changes in value for certain kinds 2317 // of settables. 2318 if(namedObj instanceof Settable && 2319 !(namedObj instanceof ColorAttribute) && 2320 !(namedObj instanceof SizeAttribute) && 2321 !(namedObj instanceof WindowPropertiesAttribute) && 2322 !(namedObj instanceof SemanticType) && 2323 !(namedObj instanceof ExpertParameter) && 2324 !(namedObj instanceof SharedParameter)) 2325 { 2326 //_Debug("add value listener for " + namedObj.getFullName()); 2327 _updateValueEventSender((Settable)namedObj, true); 2328 2329 /* 2330 if(namedObj.getFullName(). 2331 equals(".dataturbine.DataTurbine._icon.text.text")) 2332 _Debug("FOUND dataturbine"); 2333 */ 2334 } 2335 } 2336 else // namedObj == this 2337 { 2338 // do not recurse into our parameters 2339 recurse = false; 2340 } 2341 2342 // see if this namedObj fires 2343 if(namedObj instanceof FiringsRecordable) 2344 { 2345 _addFiringEventSender((FiringsRecordable)namedObj); 2346 } 2347 2348 if(recurse) 2349 { 2350 // recursively register contents of the current namedObj 2351 List<NamedObj> contentsList = new LinkedList<NamedObj>(); 2352 2353 boolean isPLOpaque = false; 2354 2355 if(namedObj instanceof CompositeEntity) 2356 { 2357 CompositeEntity composite = (CompositeEntity)namedObj; 2358 contentsList.addAll(composite.entityList()); 2359 2360 // NOTE: we add the relations last since for each one, 2361 // regLink() is called and both ends of the link must be 2362 // registered first. 2363 contentsList.addAll(composite.relationList()); 2364 2365 // check to see if named obj contains an instance of 2366 // provenance listener 2367 List<?> provs = namedObj.attributeList(ProvenanceRecorder.class); 2368 if(provs != null && provs.size() > 0 && namedObj != getContainer()) 2369 { 2370 isPLOpaque = true; 2371 } 2372 else if((namedObj instanceof CompositeActor) && 2373 !containsSupportedDirector((CompositeActor)namedObj)) 2374 { 2375 isPLOpaque = true; 2376 } 2377 2378 if(isPLOpaque) 2379 { 2380 _provOpaqueSet.add((CompositeActor)namedObj); 2381 } 2382 } 2383 2384 // add the ports to the beginning 2385 if(namedObj instanceof Entity) 2386 { 2387 contentsList.addAll(0, ((Entity<?>)namedObj).portList()); 2388 } 2389 2390 // add the attributes to the beginning 2391 contentsList.addAll(0, namedObj.attributeList()); 2392 2393 for(NamedObj containedNamedObj : contentsList) 2394 { 2395 if(isPLOpaque && 2396 // do not record contents of these types of objects: 2397 ((containedNamedObj instanceof Actor) || 2398 (containedNamedObj instanceof Director) || 2399 (containedNamedObj instanceof IORelation) || 2400 (containedNamedObj instanceof TextAttribute) || 2401 (containedNamedObj instanceof ProvenanceRecorder) || 2402 (containedNamedObj instanceof 2403 KeplerDocumentationAttribute))) 2404 { 2405 //_Debug("not recursing down " + 2406 //containedNamedObj.getFullName()); 2407 } 2408 else 2409 { 2410 _recordContainerContents(containedNamedObj); 2411 } 2412 } 2413 2414 // now that all the ports and relations have been registered 2415 // (if namedObj is a CompositeEntity), it is safe to register 2416 // the links (since each end point must already registered). 2417 2418 if(!isPLOpaque && (namedObj instanceof CompositeEntity)) 2419 { 2420 // iterate over the contained relations 2421 List<?> relationList = ((CompositeEntity)namedObj).relationList(); 2422 Iterator<?> relationIter = relationList.iterator(); 2423 while(relationIter.hasNext()) 2424 { 2425 Relation curRelation = (Relation)relationIter.next(); 2426 2427 ///if(!adding) 2428 //{ 2429 //containedName = oldName + "." + 2430 //curRelation.getName(); 2431 //} 2432 _recordLinksInRelation(curRelation); 2433 } 2434 } 2435 } 2436 } 2437 2438 /** Record all the links within a relation. */ 2439 private void _recordLinksInRelation(Relation relation) 2440 throws RecordingException 2441 { 2442 // iterate over the object linked in the current relation 2443 List<?> linkedObjList = relation.linkedObjectsList(); 2444 Iterator<?> linkedObjsIter = linkedObjList.iterator(); 2445 while(linkedObjsIter.hasNext()) 2446 { 2447 NamedObj linkedObj = (NamedObj)linkedObjsIter.next(); 2448 2449 // see if current relation is linked to a port 2450 if(linkedObj instanceof TypedIOPort) 2451 { 2452 _provStore.regLink(linkedObj, relation); 2453 2454 // notify any piggybacks 2455 for(Recording recording : _piggybacks) 2456 { 2457 recording.regLink(linkedObj, relation); 2458 } 2459 } 2460 // otherwise the link is between two relations. 2461 // we do not want to register this link twice, 2462 // e.g., a <--> b and b <--> a 2463 // so only perform registration if the current 2464 // relation's name lexigraphically precedes the 2465 // linked relation's name. 2466 else if(relation.getName().compareTo( 2467 linkedObj.getName()) < 0) 2468 { 2469 _provStore.regLink(relation, linkedObj); 2470 2471 // notify any piggybacks 2472 for(Recording recording : _piggybacks) 2473 { 2474 recording.regLink(relation, linkedObj); 2475 } 2476 } 2477 } 2478 } 2479 2480 /** Change the Recording type. */ 2481 private void _changeRecordingType() throws IllegalActionException 2482 { 2483 String typeStr = recordingType.getExpression(); 2484 2485 //System.out.println("chg rec start: " + _curRecordingTypeStr + ", ps = " + _provStore); 2486 2487 // see if the type is not null and is different than the 2488 // current type. 2489 if(!typeStr.trim().isEmpty() && 2490 (_curRecordingTypeStr == null || 2491 !typeStr.equals(_curRecordingTypeStr))) 2492 { 2493 //System.out.println("chg rec: " + _curRecordingTypeStr + " to " + typeStr); 2494 2495 // see if it's a valid type. 2496 if(!_recordingTypesMap.containsKey(typeStr)) 2497 { 2498 throw new IllegalActionException(this, 2499 "Unknown recording type: " + typeStr); 2500 } 2501 2502 // stop listening to all events 2503 // 2504 // NOTE: if we do not unregister from all entities, we may 2505 // send the new recording an event from an entity it does 2506 // not know about. (the new recording may ignore entities 2507 // not ignored by the old recording). 2508 _stopListening(); 2509 2510 // create a table of all the Parameters in our container. 2511 Hashtable<String, Parameter> curParamTable = 2512 new Hashtable<String, Parameter>(); 2513 Iterator<?> iter = attributeList().iterator(); 2514 while(iter.hasNext()) 2515 { 2516 NamedObj no = (NamedObj)iter.next(); 2517 if(no instanceof Parameter) 2518 { 2519 curParamTable.put(no.getName(), (Parameter)no); 2520 } 2521 } 2522 2523 String recordingClassStr = _recordingTypesMap.get(typeStr); 2524 2525 //_Debug("going to instantiate " + recordingClassStr); 2526 2527 Object newObj = null; 2528 try 2529 { 2530 // attempt to instantiate the new Recording type. 2531 Class<?> rec = Class.forName(recordingClassStr); 2532 newObj = rec.newInstance(); 2533 } 2534 catch(ClassNotFoundException e) 2535 { 2536 throw new IllegalActionException(this, 2537 "Could not find recording class: " + recordingClassStr); 2538 } 2539 catch(InstantiationException e) 2540 { 2541 throw new IllegalActionException(this, 2542 "Error instantiating " + recordingClassStr + ": " + 2543 e.getMessage()); 2544 } 2545 catch(IllegalAccessException e) 2546 { 2547 throw new IllegalActionException(this, 2548 "IllegalActionException: " + e.getMessage()); 2549 } 2550 2551 if(!(newObj instanceof Recording)) 2552 { 2553 throw new IllegalActionException(this, 2554 "Class " + recordingClassStr + 2555 " does not subclass Recording."); 2556 } 2557 2558 // disconnect existing prov store 2559 if(_provStore != null) 2560 { 2561 try 2562 { 2563 _provStore.disconnect(); 2564 2565 // notify any piggybacks 2566 for(Recording recording : _piggybacks) 2567 { 2568 recording.disconnect(); 2569 } 2570 } 2571 catch(RecordingException e) 2572 { 2573 throw new IllegalActionException(this, e, 2574 "Error disconnecting previous recording."); 2575 } 2576 } 2577 _provStore = (Recording)newObj; 2578 _provStore.setContainer(this); 2579 2580 // notify any piggybacks 2581 for(Recording recording : _piggybacks) 2582 { 2583 recording.setContainer(this); 2584 } 2585 2586 try 2587 { 2588 // tell the new Recording to generate its parameters 2589 // and put them in a dummy container. 2590 NamedObj tmp = new NamedObj(workspace()); 2591 _provStoreParams = _provStore.generateParameters(tmp); 2592 2593 // remove all our current parameters that are not part of 2594 // the new Recording. 2595 2596 // NOTE: we iterate over a copy of the names instead 2597 // of directly using the names since some may be removed 2598 // from the table. 2599 Set<String> currentParamNames = 2600 new HashSet<String>(curParamTable.keySet()); 2601 for(String str : currentParamNames) 2602 { 2603 if(!_provStoreParams.containsName(str) && 2604 !_plParams.contains(str)) 2605 { 2606 Parameter p = curParamTable.get(str); 2607 p.setContainer(null); 2608 curParamTable.remove(str); 2609 2610 //_Debug("rm param: " + str); 2611 } 2612 } 2613 2614 // finally add parameters from the new Recording 2615 // that are not in our container. 2616 for(String str : _provStoreParams.names()) 2617 { 2618 // if not in our container, add it 2619 if(!curParamTable.containsKey(str)) 2620 { 2621 //_Debug("add new parameter: " + str); 2622 _provStoreParams.replaceContainer(str, this); 2623 } 2624 else 2625 { 2626 // is in our container, so replace in 2627 // generated table. 2628 2629 //XXX type could be different, but type not 2630 // set when loading from moml. 2631 Parameter p = curParamTable.get(str); 2632 _provStoreParams.replaceParameter(str, p); 2633 2634 // NOTE: tell the recording that the attributed 2635 // changed so that it reads the new value. 2636 _provStore.attributeChanged(p); 2637 2638 //_Debug("replacing parameter: " + str); 2639 } 2640 } 2641 } 2642 catch(NameDuplicationException e) 2643 { 2644 throw new IllegalActionException(this, e, "Name duplication."); 2645 } 2646 2647 _curRecordingTypeStr = typeStr; 2648 2649 // see if we're enabled. 2650 if (typeStr.equals(DEFAULT_REC_TYPE_IGNORE)) 2651 { 2652 _isEnabled = false; 2653 } 2654 else 2655 { 2656 _isEnabled = true; 2657 } 2658 2659 for(WeakReference<ProvenanceEnabledListener> listenerRef : _enabledListeners) 2660 { 2661 if (listenerRef != null) 2662 { 2663 final ProvenanceEnabledListener listener = listenerRef.get(); 2664 if(listener != null) 2665 { 2666 listener.toggle(_isEnabled); 2667 } 2668 } 2669 } 2670 2671 // tell recording about user name and machine 2672 _provStore.attributeChanged(username); 2673 _provStore.attributeChanged(hostname); 2674 2675 // tell recording about state serializer 2676 if(_stateSerializer != null) 2677 { 2678 _provStore.setStateSerializer(_stateSerializer); 2679 } 2680 2681 String lsidStr = containerWorkflowLSID.stringValue(); 2682 if(lsidStr.length() > 0) 2683 { 2684 try 2685 { 2686 _provStore.setContainerLSID(new KeplerLSID(lsidStr)); 2687 } 2688 catch(Exception e) 2689 { 2690 throw new IllegalActionException(this, e, "Error creating KeplerLSID"); 2691 } 2692 } 2693 _provStore.setContainerName(containerWorkflowName.stringValue()); 2694 2695 // start receiving change events from our container 2696 _addChangeEventSender(getContainer()); 2697 2698 // set debug writer 2699 if(_debugWriter != null) 2700 { 2701 _provStore.setDebugWriter(_debugWriter); 2702 } 2703 } 2704 } 2705 2706 /** Record the contents of the workflow. */ 2707 private synchronized void _recordWorkflowContents() 2708 throws IllegalActionException 2709 { 2710 //_Debug("_recordWorkflowContents() regcontents = " + _provStore.regContents()); 2711 2712 try 2713 { 2714 if(_provStore != null && 2715 (_provStore.regContents()) || _delayRecordingSpecificationVal) 2716 { 2717 //_Debug("regContents true!"); 2718 2719 //_portConnTable.clear(); 2720 _provOpaqueSet.clear(); 2721 2722 // record workflow specification 2723 2724 _provStore.specificationStart(); 2725 2726 // notify any piggybacks 2727 for(Recording recording : _piggybacks) 2728 { 2729 recording.specificationStart(); 2730 } 2731 2732 _recordContainerContents(getContainer()); 2733 2734 _provStore.specificationStop(); 2735 2736 // notify any piggybacks 2737 for(Recording recording : _piggybacks) 2738 { 2739 recording.specificationStop(); 2740 } 2741 } 2742 } 2743 catch(RecordingException e) 2744 { 2745 //e.printStackTrace(); 2746 throw new IllegalActionException(this, e, "Error recording " + 2747 "workflow structure: " + e.getMessage()); 2748 } 2749 } 2750 2751 /** Set the name of the provenance recorder using the default name 2752 * in the configuration file. 2753 */ 2754 private void _setDefaultName() throws IllegalActionException, NameDuplicationException 2755 { 2756 Map<String,String> settingsMap = getSettingsMap(); 2757 String name = settingsMap.get(_DEFAULT_NAME); 2758 setName(name); 2759 } 2760 2761 2762 /** Return a Map of the settings, returns default settings if necessary */ 2763 public Map<String,String> getSettingsMap() 2764 { 2765 if (_settingsProperty == null) 2766 { 2767 return getDefaultsMap(); 2768 } 2769 else 2770 { 2771 return ConfigurationUtilities.getPairsMap(_settingsProperty); 2772 } 2773 } 2774 2775 /** Return a Map of the default settings. */ 2776 public static Map<String,String> getDefaultsMap() 2777 { 2778 ConfigurationProperty defaultsProperty = 2779 ConfigurationManager.getInstance().getProperty( 2780 ConfigurationManager.getModule("provenance"), "provenance.defaultSettings"); 2781 return ConfigurationUtilities.getPairsMap(defaultsProperty); 2782 } 2783 2784 /** Listen to a Debuggable, and add to the debug event senders list. */ 2785 private void _addDebugEventSender(Debuggable debuggable) 2786 { 2787 debuggable.addDebugListener(this); 2788 _debugEventSenders.add(debuggable); 2789 } 2790 2791 /** Listen to a FiringsRecordable and add to firing event senders list. */ 2792 private void _addFiringEventSender(FiringsRecordable firable) 2793 { 2794 firable.addActorFiringListener(this); 2795 _firingEventSenders.add(firable); 2796 } 2797 2798 /** Listen to an IOPort and add to port event senders list. */ 2799 private void _addIOPortEventSender(IOPort port) 2800 { 2801 //_Debug("going to listen to " + port.getFullName()); 2802 port.addIOPortEventListener(this); 2803 _portEventSenders.add(port); 2804 } 2805 2806 /** Start or stop listening to a Settable for value change events. */ 2807 private void _updateValueEventSender(Settable settable, boolean add) 2808 { 2809 if(add) 2810 { 2811 settable.addValueListener(this); 2812 if(settable instanceof Variable) { 2813 ((Variable)settable).setValueListenerAsWeakDependency(this); 2814 } 2815 _valueEventSenders.put(settable, ((NamedObj)settable).getFullName()); 2816 } 2817 else 2818 { 2819 settable.removeValueListener(this); 2820 _valueEventSenders.remove(settable); 2821 } 2822 } 2823 2824 /** Listen to a NamedObj for change events. */ 2825 private void _addChangeEventSender(NamedObj no) 2826 { 2827 if(no != null) 2828 { 2829 no.addChangeListener(this); 2830 _changeEventSenders.add(no); 2831 } 2832 } 2833 2834 /** Stop listening to all events. */ 2835 private synchronized void _stopListening() 2836 { 2837 // value changes. 2838 for(Settable settable : _valueEventSenders.keySet()) 2839 { 2840 settable.removeValueListener(this); 2841 } 2842 _valueEventSenders.clear(); 2843 2844 2845 // debug events 2846 for(Debuggable d : _debugEventSenders) 2847 { 2848 d.removeDebugListener(this); 2849 } 2850 _debugEventSenders.clear(); 2851 2852 // firing events 2853 for(FiringsRecordable fs : _firingEventSenders) 2854 { 2855 fs.removeActorFiringListener(this); 2856 } 2857 _firingEventSenders.clear(); 2858 2859 // port events 2860 for(IOPort port : _portEventSenders) 2861 { 2862 port.removeIOPortEventListener(this); 2863 } 2864 _portEventSenders.clear(); 2865 2866 // change events 2867 for(NamedObj sender : _changeEventSenders) 2868 { 2869 sender.removeChangeListener(this); 2870 } 2871 _changeEventSenders.clear(); 2872 } 2873 2874 /** Write to debug writer (if present) and stdout. */ 2875 private void _debugWrite(String message) 2876 { 2877 System.out.println(message); 2878 2879 if(_debugWriter != null) 2880 { 2881 try 2882 { 2883 _debugWriter.write(message + "\n"); 2884 } 2885 catch (IOException e) 2886 { 2887 System.out.println("ERROR writing to debug writer: " + 2888 e.getMessage()); 2889 } 2890 } 2891 } 2892 2893 /////////////////////////////////////////////////////////////////// 2894 //// private classes //// 2895 2896 /** A class to save a reference to a DebugEvent. */ 2897 private static class DelayedEvent 2898 { 2899 public DelayedEvent(DebugEvent event, boolean add) 2900 { 2901 _event = event; 2902 _add = add; 2903 } 2904 2905 public DebugEvent getEvent() 2906 { 2907 return _event; 2908 } 2909 2910 public boolean isAdd() 2911 { 2912 return _add; 2913 } 2914 2915 /** If true, event is an add, otherwise event is a remove. */ 2916 private boolean _add; 2917 2918 /** The event. */ 2919 private DebugEvent _event; 2920 } 2921 2922 /////////////////////////////////////////////////////////////////// 2923 //// private variables //// 2924 2925 /** Provenance storage. */ 2926 private Recording _provStore = null; 2927 2928 /** A set of all senders of DebugEvents. */ 2929 private Set<Debuggable> _debugEventSenders = null; 2930 2931 /** A set of all senders of IOPortEvents . */ 2932 private Set<IOPort> _portEventSenders = null; 2933 2934 /** A set of all senders of FiringEvents. */ 2935 private Set<FiringsRecordable> _firingEventSenders = null; 2936 2937 /** A map of all senders of valueChanged(). When a parameter is 2938 * deleted, we may receive a valueChange event and these should 2939 * be ignored. The parameter or its container no longer has a 2940 * container, so getFullName() returns a different string. 2941 * We cache the full name in this map so we can tell when it 2942 * has changed. 2943 */ 2944 private Map<Settable,String> _valueEventSenders; 2945 2946 /** A cache of all parameter values. */ 2947 private Map<Settable,String> _paramValueTable = null; 2948 2949 /** A set of all senders of ChangeEvents. */ 2950 private Set<NamedObj> _changeEventSenders = null; 2951 2952 /** The following hold Parameter values. */ 2953 //private boolean _recordSshDataVal = false; 2954 private boolean _recordTokensVal = false; 2955 //private boolean _recordTransparentPortsVal = true; 2956 2957 /** Value of delayRecordingSpecification parameter. */ 2958 private boolean _delayRecordingSpecificationVal = true; 2959 2960 /** A set of parameters used by ProvenanceRecorder. */ 2961 private Set<String> _plParams; 2962 2963 /** Parameters used by the current recording object. */ 2964 private RecordingParameters _provStoreParams = null; 2965 2966 /** Currently selected recording type. */ 2967 private String _curRecordingTypeStr = "<unknown>"; 2968 2969 /** Mapping of name to class of valid recording types. */ 2970 private Map<String,String> _recordingTypesMap; 2971 2972 /** A table that maps a port to outgoing connected ports. */ 2973 //private Hashtable<IOPort,List<IOPort>> _portConnTable = null; 2974 2975 /** A set of composite actors that contain a ProvenanceRecorder instance. 2976 Our container is not in the set. 2977 */ 2978 private Set<CompositeActor> _provOpaqueSet = null; 2979 2980 /** True if we are listening to execution of manager. */ 2981 private boolean _addedExecutionListener = false; 2982 2983 /** List of objects whose (pre)initialize() and wrapup() methods 2984 * should be slaved to these. 2985 */ 2986 private transient List<Initializable> _initializables; 2987 2988 /** A collection of attributes whose value has changed, but did 2989 * not belong to us. They might belong to the recording type. 2990 * We store them and tell the recording once it has been created. 2991 */ 2992 private Set<Attribute> _possibleRecordingAttributes; 2993 2994 /** If true, we have been removed from the model. */ 2995 private boolean _removedFromModel = false; 2996 2997 /** Used for debugging output. */ 2998 private Writer _debugWriter; 2999 3000 /** If true, have workflow has executed at least once. */ 3001 private boolean _haveExecuted = false; 3002 3003 /** If true, workflow is executing. */ 3004 private boolean _amExecuting = false; 3005 3006 /** A list of recordings to be notified of provenance events. */ 3007 private transient List<Recording> _piggybacks; 3008 3009 /** A list of recordings to be notified of provenance events for all 3010 * newly created ProvenanceRecorders. Only key set in the WeakHashMap 3011 * is useful. 3012 */ 3013 private static transient WeakHashMap<Recording, Object> _futurePiggybacks = 3014 new WeakHashMap<Recording, Object>(); 3015 3016 /** Settables that we should ignore if additional valueChanged events are 3017 * received for them. 3018 */ 3019 private Set<Settable> _pendingSettables = new HashSet<Settable>(); 3020 3021 /** A list of events to process if we are delaying recording 3022 * specification. 3023 */ 3024 private List<DelayedEvent> _delayedEventList; 3025 3026 /** The name of the key in the configuration file for the name of the 3027 * default provenance recorder. 3028 */ 3029 private static final String _DEFAULT_NAME = "Provenance Recorder Name"; 3030 3031 private static final String _RECORDING_TYPE = "Recording Type"; 3032 3033 private ConfigurationProperty _settingsProperty = null; 3034 3035 /** KeplerLSID for current execution */ 3036 private KeplerLSID _executionLSID = null; 3037 3038 /** True if recording type is not set to IGNORE. */ 3039 private boolean _isEnabled; 3040 3041 /** A list of listeners for enabled events. */ 3042 private List<WeakReference<ProvenanceEnabledListener>> _enabledListeners; 3043 3044 /** A set containing the MultiInstanceComposite clones seen during the 3045 * initial recording contents. During initialize(), any clones that 3046 * are found but not contained in this set are assumed to have been 3047 * created during preinitialize(), and registered. 3048 */ 3049 private Set<MultiInstanceComposite> _multiInstanceCompositeClones = 3050 new HashSet<MultiInstanceComposite>(); 3051 3052 /** A set containing names of MultiInstanceComposite masters (non-clones). 3053 */ 3054 private Set<String> _multiInstanceCompositeMasterNames = 3055 new HashSet<String>(); 3056 3057 /** A set containing all instances of ProvenanceRecorder. */ 3058 private static Set<WeakReference<ProvenanceRecorder>> _recorders 3059 = Collections.synchronizedSet(new HashSet<WeakReference<ProvenanceRecorder>>()); 3060 3061 /** Serialize states for recordings. */ 3062 private StateSerializer _stateSerializer; 3063 3064 private static final Log log = LogFactory.getLog(ProvenanceRecorder.class.getName()); 3065 //private static final boolean isDebugging = log.isDebugEnabled(); 3066} 3067 3068