001/* OPM XML output 002 003Copyright (c) 2009-2010 The Regents of the University of California. 004All rights reserved. 005Permission is hereby granted, without written agreement and without 006license or royalty fees, to use, copy, modify, and distribute this 007software and its documentation for any purpose, provided that the above 008copyright notice and the following two paragraphs appear in all copies 009of this software. 010 011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015SUCH DAMAGE. 016 017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022ENHANCEMENTS, OR MODIFICATIONS. 023 024*/ 025 026package org.kepler.provenance.opm; 027 028import java.io.FileNotFoundException; 029import java.io.PrintStream; 030import java.io.StringWriter; 031import java.util.GregorianCalendar; 032import java.util.HashMap; 033import java.util.LinkedHashMap; 034import java.util.List; 035import java.util.Map; 036 037import javax.xml.bind.JAXBException; 038import javax.xml.datatype.DatatypeConfigurationException; 039import javax.xml.datatype.DatatypeFactory; 040import javax.xml.datatype.XMLGregorianCalendar; 041 042import org.kepler.provenance.FireState; 043import org.kepler.provenance.PortConnector; 044import org.kepler.provenance.Recording; 045import org.kepler.provenance.RecordingException; 046import org.kepler.provenance.RecordingParameters; 047import org.openprovenance.model.Artifact; 048import org.openprovenance.model.ArtifactId; 049import org.openprovenance.model.Artifacts; 050import org.openprovenance.model.CausalDependencies; 051import org.openprovenance.model.OPMFactory; 052import org.openprovenance.model.OPMGraph; 053import org.openprovenance.model.OPMSerialiser; 054import org.openprovenance.model.OTime; 055import org.openprovenance.model.ObjectFactory; 056import org.openprovenance.model.Process; 057import org.openprovenance.model.ProcessId; 058import org.openprovenance.model.Processes; 059import org.openprovenance.model.Role; 060import org.openprovenance.model.Used; 061import org.openprovenance.model.WasGeneratedBy; 062 063import ptolemy.actor.Actor; 064import ptolemy.actor.CompositeActor; 065import ptolemy.actor.FiringEvent; 066import ptolemy.actor.FiringsRecordable; 067import ptolemy.actor.IOPort; 068import ptolemy.actor.IOPortEvent; 069import ptolemy.actor.TypedIOPort; 070import ptolemy.data.StringToken; 071import ptolemy.data.Token; 072import ptolemy.data.expr.FileParameter; 073import ptolemy.data.expr.Parameter; 074import ptolemy.kernel.util.Attribute; 075import ptolemy.kernel.util.IllegalActionException; 076import ptolemy.kernel.util.NameDuplicationException; 077import ptolemy.kernel.util.NamedObj; 078 079/** A Recording for Open Provenance Model XML. 080 * 081 * @author Daniel Crawl 082 * @version $Id: OpenProvenanceModelXML.java 33496 2015-06-22 20:24:20Z crawl $ 083 * @deprecated This recording output is no longer used. Instead, use ProvRecording. 084 */ 085@Deprecated 086public class OpenProvenanceModelXML extends Recording 087{ 088 089 /** Construct a new OpenProvenanceModelXML. */ 090 public OpenProvenanceModelXML() throws RecordingException 091 { 092 _params = null; 093 094 _portConnector = new PortConnector<ArtifactId>(); 095 096 _fireStateTable = new LinkedHashMap<Actor, FireState<ProcessId>>(); 097 098 _tokenToArtifactIdMap = new HashMap<Token,ArtifactId>(); 099 _artifactCounter = 0; 100 _processCounter = 0; 101 102 _compositePortToProcessMap = new HashMap<CompositeMapKey,ProcessId>(); 103 _portFiringCounter = 0; 104 105 _namedObjToRoleMap = new HashMap<NamedObj,Role>(); 106 107 _objectFactory = new ObjectFactory(); 108 _opmFactory = OPMFactory.getFactory(); 109 110 try 111 { 112 _opmSerialiser = new OPMSerialiser(); 113 } 114 catch(JAXBException e) 115 { 116 throw new RecordingException("Unable to create new serializer: ", e); 117 } 118 119 try 120 { 121 _dataTypeFactory = DatatypeFactory.newInstance(); 122 } 123 catch(DatatypeConfigurationException e) 124 { 125 throw new RecordingException("Error trying to get new DataTypeFactory: ", e); 126 } 127 128 Artifacts artifacts = _objectFactory.createArtifacts(); 129 _artifactList = artifacts.getArtifact(); 130 131 Processes processes = _objectFactory.createProcesses(); 132 _processList = processes.getProcess(); 133 134 CausalDependencies depends = _objectFactory.createCausalDependencies(); 135 _dependList = depends.getUsedOrWasGeneratedByOrWasTriggeredBy(); 136 137 _graph = _objectFactory.createOPMGraph(); 138 _graph.setArtifacts(artifacts); 139 _graph.setProcesses(processes); 140 _graph.setCausalDependencies(depends); 141 142 _needWorkflowContents(true); 143 } 144 145 /** React to a parameter change. */ 146 public void attributeChanged(Attribute attribute) 147 throws IllegalActionException 148 { 149 150 String name = attribute.getName(); 151 152 if(name.equals(OPMRecordingParameters._filenameStr)) 153 { 154 _resetWriter(); 155 } 156 else 157 { 158 super.attributeChanged(attribute); 159 } 160 161 } 162 163 /** Register an actor. */ 164 public boolean regActor(Actor actor) throws RecordingException 165 { 166 if(actor instanceof FiringsRecordable) 167 { 168 FireState<ProcessId> fireState = new FireState<ProcessId>(actor, -1); 169 _fireStateTable.put(actor, fireState); 170 } 171 172 return true; 173 } 174 175 /** Register a port or portparameter. */ 176 public boolean regPort(TypedIOPort port) throws RecordingException 177 { 178 boolean retval = super.regPort(port); 179 180 _portConnector.createConnections(port); 181 182 return retval; 183 } 184 185 /** Record the starting of workflow execution. */ 186 public void executionStart() throws RecordingException 187 { 188 _artifactList.clear(); 189 _processList.clear(); 190 _dependList.clear(); 191 _namedObjToRoleMap.clear(); 192 _artifactCounter = 0; 193 _processCounter = 0; 194 _tokenToArtifactIdMap.clear(); 195 _compositePortToProcessMap.clear(); 196 _portFiringCounter = 0; 197 } 198 199 /** Record the stopping of workflow execution. */ 200 public void executionStop() throws RecordingException 201 { 202 // output serialization 203 204 StringWriter writer = new StringWriter(); 205 206 try 207 { 208 String string = _opmSerialiser.serialiseOPMGraph(writer, _graph, true); 209 210 if(_textWriter != null) 211 { 212 _textWriter.print(string); 213 } 214 } 215 catch(JAXBException e) 216 { 217 throw new RecordingException("Error serializing graph: ", e); 218 } 219 } 220 221 /** Record an actor firing. */ 222 public void actorFire(FiringEvent event) throws RecordingException 223 { 224 Actor actor = event.getActor(); 225 FiringEvent.FiringEventType curEventType = event.getType(); 226 FireState<ProcessId> fireState = _fireStateTable.get(actor); 227 228 if(fireState == null) 229 { 230 throw new RecordingException( 231 "Received actor fire event for unregistered actor: " + 232 actor.getFullName()); 233 } 234 235 synchronized(fireState) 236 { 237 // get the last type of firing start 238 FiringEvent.FiringEventType lastStartType = 239 fireState.getLastStartFireType(); 240 241 // see if current firing is new iteration: 242 // NOTE: PN does not report iterate firings so the iteration 243 // may begin with prefire if the last type of firing was not 244 // iterate. 245 if(curEventType == FiringEvent.BEFORE_ITERATE || 246 (curEventType == FiringEvent.BEFORE_PREFIRE && 247 lastStartType != FiringEvent.BEFORE_ITERATE)) 248 { 249 250 // create a new process for the new firing 251 org.openprovenance.model.Process process = _objectFactory.createProcess(); 252 _processList.add(process); 253 254 process.setId("_p" + String.valueOf(_processCounter)); 255 _processCounter++; 256 257 // set the process name to be the actor name and number of 258 // times it has fired. 259 String actorName = ((NamedObj)actor).getFullName(); 260 int firings = fireState.getNumberOfFirings(); 261 process.setValue(actorName + " fire " + firings); 262 ProcessId processId = _opmFactory.newProcessId(process); 263 264 // start the fire in the fire state. 265 fireState.fireStart(curEventType, processId); 266 267 //_debug("adding process id " + actorName + " fire " + firings); 268 269 // record the parameters used. 270 271 List<?> attributeList = ((NamedObj)actor).attributeList(); 272 for(int i = 0; i < attributeList.size(); i++) 273 { 274 Attribute attribute = (Attribute)attributeList.get(i); 275 _createUsedForParameter(attribute, processId); 276 } 277 278 279 } 280 // see if current firing is end of iteration: 281 else if(curEventType == FiringEvent.AFTER_ITERATE || 282 (curEventType == FiringEvent.AFTER_POSTFIRE && 283 lastStartType == FiringEvent.BEFORE_PREFIRE)) 284 { 285 //_debug("end firing: " + type + " for " + actor.getFullName()); 286 287 // NOTE: if the type is a AFTER_POSTFIRE and last start 288 // type is BEFORE_PREFIRE, we are running in PN. 289 // in this case, tell the fireState to stop firing using 290 // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never 291 // told the fireState about the BEFORE_POSTFIRE. 292 if(curEventType == FiringEvent.AFTER_POSTFIRE) 293 { 294 fireState.fireStop(FiringEvent.AFTER_PREFIRE); 295 } 296 else 297 { 298 fireState.fireStop(curEventType); 299 } 300 } 301 } 302 } 303 304 /** Record a port event. */ 305 public void portEvent(IOPortEvent event) throws RecordingException 306 { 307 TypedIOPort port = (TypedIOPort)event.getPort(); 308 Actor actor = (Actor)port.getContainer(); 309 FireState<ProcessId> fireState = _fireStateTable.get(actor); 310 311 synchronized(fireState) 312 { 313 int eventType = event.getEventType(); 314 boolean recordEvent = false; 315 boolean isRead = true; 316 317 if(eventType == IOPortEvent.SEND_BEGIN) 318 { 319 recordEvent = true; 320 isRead = false; 321 } 322 else if(eventType == IOPortEvent.GET_END) 323 { 324 recordEvent = true; 325 } 326 327 if(recordEvent) 328 { 329 ProcessId processId = fireState.getCurFireId(); 330 331 if(actor instanceof CompositeActor) 332 { 333 if(processId == null) 334 { 335 System.out.println("pid is null, actor = " + actor.getFullName()); 336 } 337 338 CompositeMapKey key = new CompositeMapKey(processId, port); 339 340 processId = _compositePortToProcessMap.get(key); 341 if(processId == null) 342 { 343 // create a new process for the new firing 344 org.openprovenance.model.Process process = _objectFactory.createProcess(); 345 _processList.add(process); 346 347 process.setId("_p" + String.valueOf(_processCounter)); 348 _processCounter++; 349 350 String name = port.getFullName(); 351 int firings = _portFiringCounter++; 352 process.setValue(name + " fire " + firings); 353 processId = _opmFactory.newProcessId(process); 354 355 _compositePortToProcessMap.put(key, processId); 356 } 357 } 358 359 if(event.getVectorLength() == IOPortEvent.SINGLETOKEN) 360 { 361 _recordPortEvent(port, processId, isRead, event.getChannel(), event.getToken()); 362 } 363 else 364 { 365 Token[] tokenArray = event.getTokenArray(); 366 for(int i = 0; i < tokenArray.length; i++) 367 { 368 _recordPortEvent(port, processId, isRead, event.getChannel(), tokenArray[i]); 369 } 370 } 371 } 372 } 373 } 374 375 /** Add Parameters for ProvenanceListener. */ 376 public RecordingParameters generateParameters(NamedObj no) 377 throws IllegalActionException, NameDuplicationException 378 { 379 _params = new OPMRecordingParameters(no); 380 return _params; 381 } 382 383 //////////////////////////////////////////////////////////////////////// 384 //// protected methods //// 385 386 /** Record a port read or write. */ 387 protected void _recordPortEvent(IOPort port, ProcessId processId, 388 boolean isRead, int channel, Token token) 389 throws RecordingException 390 { 391 392 Role role = _getRole(port); 393 394 if(isRead) 395 { 396 ArtifactId artifactId = _portConnector.getNextId(port, channel); 397 _createNewUsed(artifactId, processId, role); 398 } 399 else 400 { 401 ArtifactId artifactId = _getArtifactId(token, true); 402 403 WasGeneratedBy wasGeneratedBy = _objectFactory.createWasGeneratedBy(); 404 wasGeneratedBy.setCause(processId); 405 wasGeneratedBy.setEffect(artifactId); 406 wasGeneratedBy.setTime(_getCurrentOTime()); 407 wasGeneratedBy.setRole(role); 408 _dependList.add(wasGeneratedBy); 409 410 _portConnector.sendIdToConnections(port, channel, artifactId); 411 } 412 } 413 414 //////////////////////////////////////////////////////////////////////// 415 //// private methods //// 416 417 /** Close the writer unless it is stdout or stderr. */ 418 private void _closeWriter() 419 { 420 if(_textWriter != null) 421 { 422 // don't close stdout or stderr 423 if(!_outputName.equals("System.out") && 424 !_outputName.equals("System.err")) 425 { 426 _textWriter.close(); 427 } 428 else 429 { 430 _textWriter.flush(); 431 } 432 } 433 } 434 435 /** Get an existing or create a new ArtifactId for a token. */ 436 private ArtifactId _getArtifactId(Token token, boolean isNewToken) 437 { 438 ArtifactId retval = null; 439 440 if(!isNewToken) 441 { 442 retval = _tokenToArtifactIdMap.get(token); 443 } 444 445 if(retval == null) 446 { 447 Artifact artifact = _objectFactory.createArtifact(); 448 _artifactList.add(artifact); 449 450 artifact.setId("_a" + String.valueOf(_artifactCounter)); 451 _artifactCounter++; 452 453 // if it's a string token, use stringValue so we don't 454 // get the quotes. 455 if(token instanceof StringToken) 456 { 457 artifact.setValue(((StringToken)token).stringValue()); 458 } 459 else 460 { 461 artifact.setValue(token.toString()); 462 } 463 464 retval = _opmFactory.newArtifactId(artifact); 465 466 _tokenToArtifactIdMap.put(token, retval); 467 } 468 469 return retval; 470 } 471 472 private void _createNewUsed(ArtifactId artifactId, ProcessId processId, Role role) 473 { 474 Used used = _objectFactory.createUsed(); 475 _dependList.add(used); 476 477 used.setCause(artifactId); 478 used.setEffect(processId); 479 used.setTime(_getCurrentOTime()); 480 used.setRole(role); 481 } 482 483 private void _createUsedForParameter(Attribute attribute, ProcessId processId) 484 throws RecordingException 485 { 486 if( (attribute instanceof Parameter) && 487 !(attribute instanceof ptolemy.data.expr.ExpertParameter) && 488 !(attribute instanceof ptolemy.actor.gui.WindowPropertiesAttribute) && 489 !(attribute instanceof ptolemy.actor.gui.SizeAttribute) && 490 !(attribute instanceof org.kepler.sms.SemanticType)) 491 { 492 //_debug(attribute); 493 Parameter parameter = (Parameter)attribute; 494 Token token = null; 495 496 try 497 { 498 token = parameter.getToken(); 499 } 500 catch(IllegalActionException e) 501 { 502 throw new RecordingException("Error reading parameter token: ", e); 503 } 504 505 if(token != null) 506 { 507 Role role = _getRole(attribute); 508 ArtifactId artifactId = _getArtifactId(token, false); 509 _createNewUsed(artifactId, processId, role); 510 } 511 } 512 } 513 514 /** Get a new OTime representing the current date and time. */ 515 private OTime _getCurrentOTime()// throws Exception 516 { 517 OTime retval = _objectFactory.createOTime(); 518 //retval.setClockId("clock id"); 519 XMLGregorianCalendar cal = _dataTypeFactory.newXMLGregorianCalendar(new GregorianCalendar()); 520 retval.setNoEarlierThan(cal); 521 retval.setNoLaterThan(cal); 522 retval.setClockId("_c1"); 523 return retval; 524 } 525 526 /** Get a role for a NamedObj. */ 527 private Role _getRole(NamedObj namedObj) 528 { 529 Role retval = null; 530 531 if((retval = _namedObjToRoleMap.get(namedObj)) == null) 532 { 533 retval = _objectFactory.createRole(); 534 _namedObjToRoleMap.put(namedObj, retval); 535 retval.setValue(namedObj.getName()); 536 } 537 538 return retval; 539 } 540 541 /** Reset the output writer. */ 542 private void _resetWriter() throws IllegalActionException 543 { 544 FileParameter fp = _params.getFileParameter(); 545 String name = ((StringToken)fp.getToken()).stringValue(); 546 547 // see if the output name has changed or was never set 548 if(_outputName == null || !name.equals(_outputName)) 549 { 550 _closeWriter(); 551 552 // see if we should write to stdout 553 if(name.equals("System.out")) 554 { 555 _textWriter = System.out; 556 } 557 // see if we should write to stderr 558 else if(name.equals("System.err")) 559 { 560 _textWriter = System.err; 561 } 562 else if(name.equals("NULL")) 563 { 564 _textWriter = null; 565 } 566 else 567 { 568 // remove possible "file:" prefix 569 if(name.startsWith("file:")) 570 { 571 name = name.substring("file:".length()); 572 } 573 574 try 575 { 576 _textWriter = new PrintStream(name); 577 } 578 catch(FileNotFoundException e) 579 { 580 throw new IllegalActionException("Could not find file " + 581 name + " : " + e.getMessage()); 582 } 583 } 584 585 // save the name 586 _outputName = name; 587 } 588 } 589 590 //////////////////////////////////////////////////////////////////////// 591 //// private classes //// 592 593 private static class OPMRecordingParameters extends RecordingParameters 594 { 595 OPMRecordingParameters(NamedObj no) throws IllegalActionException, NameDuplicationException 596 { 597 super(no); 598 addFileParameter(_filenameStr, "System.out"); 599 } 600 601 /** Replace a Parameter. */ 602 public void replaceParameter(String name, Parameter parameter) 603 throws IllegalActionException 604 { 605 // if replacing the filename, close the file. 606 if(name.equals(_filenameStr)) 607 { 608 FileParameter p = (FileParameter)_params.get(_filenameStr); 609 p.close(); 610 } 611 612 super.replaceParameter(name, parameter); 613 } 614 615 /** Get the output FileParameter. */ 616 FileParameter getFileParameter() throws IllegalActionException 617 { 618 return (FileParameter)_params.get(_filenameStr); 619 } 620 621 private static final String _filenameStr = "Filename"; 622 } 623 624 private static class CompositeMapKey 625 { 626 public CompositeMapKey(ProcessId processId, IOPort port) 627 { 628 this.processId = processId; 629 this.port = port; 630 } 631 632 public int hashCode() 633 { 634 return processId.hashCode() + port.hashCode(); 635 } 636 637 public boolean equals(Object obj) 638 { 639 if(!(obj instanceof CompositeMapKey)) 640 { 641 return false; 642 } 643 else 644 { 645 CompositeMapKey other = (CompositeMapKey)obj; 646 return processId.equals(other.processId) && 647 port.equals(other.port); 648 } 649 } 650 651 public ProcessId processId; 652 public IOPort port; 653 } 654 655 /* 656 private static class TokenInfo 657 { 658 public TokenInfo(ArtifactId artifactId) 659 { 660 _artifactId = artifactId; 661 } 662 663 public ArtifactId getArtifactId() 664 { 665 return _artifactId; 666 } 667 668 private ArtifactId _artifactId; 669 } 670 */ 671 672 //////////////////////////////////////////////////////////////////////// 673 //// private variables //// 674 675 private ObjectFactory _objectFactory; 676 private OPMFactory _opmFactory; 677 private DatatypeFactory _dataTypeFactory; 678 679 private OPMGraph _graph; 680 private List<Artifact> _artifactList; 681 private List<Process> _processList; 682 private List<Object> _dependList; 683 684 private OPMSerialiser _opmSerialiser; 685 686 //private Map<Actor,Process> 687 688 /** A table to map actor to its firing state object. */ 689 private Map<Actor, FireState<ProcessId>> _fireStateTable = null; 690 691 private PortConnector<ArtifactId> _portConnector; 692 693 private OPMRecordingParameters _params; 694 695 /** Output writer. */ 696 private PrintStream _textWriter = null; 697 698 /** Output name. */ 699 private String _outputName = null; 700 701 private Map<NamedObj, Role> _namedObjToRoleMap; 702 703 private int _artifactCounter; 704 private int _processCounter; 705 706 private Map<Token,ArtifactId> _tokenToArtifactIdMap; 707 708 private Map<CompositeMapKey,ProcessId> _compositePortToProcessMap; 709 710 private int _portFiringCounter; 711}