001/* PROV output 002 003Copyright (c) 2015 The Regents of the University of California. 004All rights reserved. 005Permission is hereby granted, without written agreement and without 006license or royalty fees, to use, copy, modify, and distribute this 007software and its documentation for any purpose, provided that the above 008copyright notice and the following two paragraphs appear in all copies 009of this software. 010 011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015SUCH DAMAGE. 016 017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022ENHANCEMENTS, OR MODIFICATIONS. 023 024*/ 025 026package org.kepler.provenance.prov; 027 028import java.io.ByteArrayOutputStream; 029import java.io.FileNotFoundException; 030import java.io.FileOutputStream; 031import java.io.IOException; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Date; 036import java.util.HashMap; 037import java.util.LinkedList; 038import java.util.List; 039import java.util.Map; 040import java.util.concurrent.atomic.AtomicInteger; 041 042import org.kepler.objectmanager.lsid.KeplerLSID; 043import org.kepler.provenance.FireState; 044import org.kepler.provenance.PortConnector; 045import org.kepler.provenance.RecordingException; 046import org.kepler.provenance.RecordingParameters; 047import org.kepler.provenance.SimpleFiringRecording; 048import org.openprovenance.prov.interop.InteropFramework; 049import org.openprovenance.prov.interop.InteropFramework.ProvFormat; 050import org.openprovenance.prov.model.Activity; 051import org.openprovenance.prov.model.Agent; 052import org.openprovenance.prov.model.Entity; 053import org.openprovenance.prov.model.HasOther; 054import org.openprovenance.prov.model.Name; 055import org.openprovenance.prov.model.Namespace; 056import org.openprovenance.prov.model.Other; 057import org.openprovenance.prov.model.ProvFactory; 058import org.openprovenance.prov.model.QualifiedName; 059import org.openprovenance.prov.model.Role; 060import org.openprovenance.prov.model.StatementOrBundle; 061import org.openprovenance.prov.model.Used; 062import org.openprovenance.prov.model.WasAssociatedWith; 063import org.openprovenance.prov.model.WasGeneratedBy; 064import org.openprovenance.prov.xml.Document; 065 066import ptolemy.actor.Actor; 067import ptolemy.actor.FiringEvent; 068import ptolemy.actor.IOPort; 069import ptolemy.actor.IOPortEvent; 070import ptolemy.actor.TypedIOPort; 071import ptolemy.data.StringToken; 072import ptolemy.data.Token; 073import ptolemy.data.expr.FileParameter; 074import ptolemy.data.expr.Parameter; 075import ptolemy.kernel.util.Attribute; 076import ptolemy.kernel.util.IllegalActionException; 077import ptolemy.kernel.util.NameDuplicationException; 078import ptolemy.kernel.util.NamedObj; 079 080/** A Recording for PROV output. 081 * 082 * @author Daniel Crawl 083 * @version $Id: ProvRecording.java 33636 2015-08-24 22:49:48Z crawl $ 084 */ 085 086public class ProvRecording extends SimpleFiringRecording<Activity> 087{ 088 089 /** Construct a new ProvRecording. */ 090 public ProvRecording() throws RecordingException 091 { 092 super(); 093 094 _params = null; 095 096 _portConnector = new PortConnector<Entity>(); 097 098 _tokenToEntityMap = new HashMap<Token,Entity>(); 099 100 _namedObjToRoleMap = new HashMap<NamedObj,Role>(); 101 102 _statementsOrBundles = new LinkedList<StatementOrBundle>(); 103 104 _factory = InteropFramework.newXMLProvFactory(); 105 _name = _factory.getName(); 106 107 _activityCounter = new AtomicInteger(0); 108 _entityCounter = new AtomicInteger(0); 109 110 _createNamespace(); 111 } 112 113 /** React to a parameter change. */ 114 @Override 115 public void attributeChanged(Attribute attribute) 116 throws IllegalActionException 117 { 118 119 String name = attribute.getName(); 120 121 if(name.equals(ProvRecordingParameters._addTimestampsStr)) { 122 _addTimestamps = _params.getAddTimestampsValue(); 123 } else if(name.equals(ProvRecordingParameters._namespaceStr)) { 124 _namespaceStr = _params.getStringValue(name); 125 _createNamespace(); 126 } else if(name.equals(ProvRecordingParameters._nsPrefixStr)) { 127 _namespacePrefixStr = _params.getStringValue(name); 128 _createNamespace(); 129 } else if(name.equals(ProvRecordingParameters._outputTypeStr)) { 130 String outputTypeStr = _params.getStringValue(name); 131 ProvFormat outputType = ProvFormat.valueOf(outputTypeStr); 132 if(outputType == null) { 133 throw new IllegalActionException(_recorderContainer, 134 "Unsupported PROV serialization type: " + outputTypeStr); 135 } 136 _outputType = outputType; 137 } else { 138 super.attributeChanged(attribute); 139 } 140 141 } 142 143 /** Register a port or portparameter. */ 144 @Override 145 public boolean regPort(TypedIOPort port) throws RecordingException 146 { 147 boolean retval = super.regPort(port); 148 149 _portConnector.createConnections(port); 150 151 return retval; 152 } 153 154 @Override 155 public void executionStart(KeplerLSID executionLSID, Date timestamp) throws RecordingException { 156 157 // add Agent for user 158 _agent = _factory.newAgent(_qn(QN_USER), 159 System.getProperty("user.name")); 160 _statementsOrBundles.add(_agent); 161 162 _wfEntity = _factory.newEntity(_qn(QN_WORKFLOW)); 163 _wfEntity.setValue(_factory.newValue(_recorderContainer.exportMoML())); 164 _statementsOrBundles.add(_wfEntity); 165 166 Other other = _factory.newOther(_qn(QN_EXECUTION_LSID), 167 executionLSID.toString(), _name.XSD_STRING); 168 _factory.addAttribute(_wfEntity, other); 169 170 other = _factory.newOther(_qn(QN_ACTOR_NAME), 171 _recorderContainer.getFullName(), _name.XSD_STRING); 172 _factory.addAttribute(_wfEntity, other); 173 174 if(_addTimestamps) { 175 other = _factory.newOther(_qn(QN_EXECUTION_START_DATETIME), 176 _factory.newTime(timestamp), _name.XSD_DATETIME); 177 _factory.addAttribute(_wfEntity, other); 178 } 179 180 } 181 182 /** Record the stopping of workflow execution. */ 183 @Override 184 public void executionStop(KeplerLSID executionLSID, Date timestamp) throws RecordingException 185 { 186 187 if(_addTimestamps) { 188 Other other = _factory.newOther(_qn(QN_EXECUTION_END_DATETIME), 189 _factory.newTime(timestamp), _name.XSD_DATETIME); 190 _factory.addAttribute(_wfEntity, other); 191 } 192 193 // TODO stop all currently firing actors 194 195 // output serialization 196 try { 197 Document document = new Document(); 198 document.setNamespace(_namespace); 199 document.getStatementOrBundle().addAll(_statementsOrBundles); 200 201 InteropFramework framework = new InteropFramework(); 202 203 OutputStream outputStream = null; 204 try { 205 outputStream = _getOutput(); 206 if(outputStream != null) { 207 framework.writeDocument(outputStream, _outputType, document); 208 } 209 } finally { 210 if(_closeOutput) { 211 try { 212 outputStream.close(); 213 } catch (IOException e) { 214 throw new RecordingException("Error closing output.", e); 215 } 216 } 217 } 218 219 if(_debugWriter != null) { 220 try(ByteArrayOutputStream byteStream = new ByteArrayOutputStream();) { 221 framework.writeDocument(byteStream, _outputType, document); 222 _debugWriter.write(byteStream.toString()); 223 } catch (IOException e) { 224 throw new RecordingException("Error writing to debug writer.", e); 225 } 226 227 } 228 229 230 } catch(IllegalActionException e) { 231 throw new RecordingException("Error writing output.", e); 232 } finally { 233 _reset(); 234 } 235 236 } 237 238 /** Record an actor firing. */ 239 @Override 240 public void actorFire(FiringEvent event, Date timestamp) throws RecordingException 241 { 242 Actor actor = event.getActor(); 243 FiringEvent.FiringEventType curEventType = event.getType(); 244 FireState<Activity> fireState = _fireStateTable.get(actor); 245 246 if(fireState == null) 247 { 248 throw new RecordingException( 249 "Received actor fire event for unregistered actor: " + 250 actor.getFullName()); 251 } 252 253 synchronized(fireState) 254 { 255 // get the last type of firing start 256 FiringEvent.FiringEventType lastStartType = 257 fireState.getLastStartFireType(); 258 259 // see if current firing is new iteration: 260 // NOTE: PN does not report iterate firings so the iteration 261 // may begin with prefire if the last type of firing was not 262 // iterate. 263 if(curEventType == FiringEvent.BEFORE_ITERATE || 264 (curEventType == FiringEvent.BEFORE_PREFIRE && 265 lastStartType != FiringEvent.BEFORE_ITERATE)) 266 { 267 268 // create a new process for the new firing 269 final int count = _activityCounter.incrementAndGet(); 270 Activity activity = _factory.newActivity(_qn("a" + count)); 271 _statementsOrBundles.add(activity); 272 273 Other other = _factory.newOther(_qn(QN_ACTOR_CLASS), 274 ((NamedObj) actor).getClassName(), _name.XSD_STRING); 275 _factory.addAttribute(activity, other); 276 277 other = _factory.newOther(_qn(QN_FIRING), 278 fireState.getNumberOfFirings(), _name.XSD_INT); 279 _factory.addAttribute(activity, other); 280 281 other = _factory.newOther(_qn(QN_ACTOR_NAME), 282 actor.getFullName(), _name.XSD_STRING); 283 _factory.addAttribute(activity, other); 284 285 if(_addTimestamps) { 286 activity.setStartTime(_factory.newTime(timestamp)); 287 } 288 289 // start the fire in the fire state. 290 fireState.fireStart(curEventType, activity); 291 292 WasAssociatedWith waw = _factory.newWasAssociatedWith( 293 _qn("waw" + count), activity.getId(), _agent.getId()); 294 waw.setPlan(_wfEntity.getId()); 295 _statementsOrBundles.add(waw); 296 297 // record the parameters used. 298 299 300 /** TODO 301 List<?> attributeList = ((NamedObj)actor).attributeList(); 302 for(int i = 0; i < attributeList.size(); i++) 303 { 304 Attribute attribute = (Attribute)attributeList.get(i); 305 _createUsedForParameter(attribute, activity); 306 } 307 **/ 308 309 } 310 // see if current firing is end of iteration: 311 else if(curEventType == FiringEvent.AFTER_ITERATE || 312 (curEventType == FiringEvent.AFTER_POSTFIRE && 313 lastStartType == FiringEvent.BEFORE_PREFIRE)) 314 { 315 //_debug("end firing: " + type + " for " + actor.getFullName()); 316 317 // NOTE: if the type is a AFTER_POSTFIRE and last start 318 // type is BEFORE_PREFIRE, we are running in PN. 319 // in this case, tell the fireState to stop firing using 320 // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never 321 // told the fireState about the BEFORE_POSTFIRE. 322 Activity activity; 323 if(curEventType == FiringEvent.AFTER_POSTFIRE) 324 { 325 activity = fireState.fireStop(FiringEvent.AFTER_PREFIRE); 326 } 327 else 328 { 329 activity = fireState.fireStop(curEventType); 330 } 331 332 if(_addTimestamps) { 333 activity.setEndTime(_factory.newTime(timestamp)); 334 } 335 336 } 337 } 338 } 339 340 /** Record a port event. */ 341 @Override 342 public void portEvent(IOPortEvent event, Date timestamp) throws RecordingException 343 { 344 TypedIOPort port = (TypedIOPort)event.getPort(); 345 Actor actor = (Actor)port.getContainer(); 346 FireState<Activity> fireState = _fireStateTable.get(actor); 347 348 synchronized(fireState) 349 { 350 int eventType = event.getEventType(); 351 boolean recordEvent = false; 352 boolean isRead = true; 353 354 if(eventType == IOPortEvent.SEND_BEGIN) 355 { 356 recordEvent = true; 357 isRead = false; 358 } 359 else if(eventType == IOPortEvent.GET_END) 360 { 361 recordEvent = true; 362 } 363 364 if(recordEvent) 365 { 366 Activity activity = fireState.getCurFireId(); 367 368 if(event.getVectorLength() == IOPortEvent.SINGLETOKEN) 369 { 370 _recordPortEvent(port, activity, isRead, event.getChannel(), event.getToken(), timestamp); 371 } 372 else 373 { 374 Token[] tokenArray = event.getTokenArray(); 375 for(int i = 0; i < tokenArray.length; i++) 376 { 377 _recordPortEvent(port, activity, isRead, event.getChannel(), tokenArray[i], timestamp); 378 } 379 } 380 } 381 } 382 } 383 384 /** Add Parameters for ProvenanceListener. */ 385 @Override 386 public RecordingParameters generateParameters(NamedObj no) 387 throws IllegalActionException, NameDuplicationException 388 { 389 _params = new ProvRecordingParameters(no); 390 return _params; 391 } 392 393 /** Set the output. The stream is written and closed after 394 * the workflow execution stops. 395 */ 396 public void setOutput(OutputStream stream) { 397 _outputStream = stream; 398 } 399 400 /** Set the output type. */ 401 public void setOutputType(ProvFormat type) { 402 _outputType = type; 403 } 404 405 //////////////////////////////////////////////////////////////////////// 406 //// protected methods //// 407 408 /** Record a port read or write. */ 409 protected void _recordPortEvent(IOPort port, Activity activity, 410 boolean isRead, int channel, Token token, Date timestamp) 411 throws RecordingException 412 { 413 414 Role role = _getRole(port); 415 416 HasOther hasOther; 417 418 if(isRead) 419 { 420 Entity entity = _portConnector.getNextId(port, channel); 421 hasOther = _createNewUsed(entity, activity, role, timestamp); 422 } 423 else 424 { 425 Entity entity = _getEntity(token, true); 426 427 WasGeneratedBy wasGeneratedBy = 428 _factory.newWasGeneratedBy(entity, port.getFullName(), activity); 429 _statementsOrBundles.add(wasGeneratedBy); 430 hasOther = wasGeneratedBy; 431 432 if(_addTimestamps) { 433 wasGeneratedBy.setTime(_factory.newTime(timestamp)); 434 } 435 436 _portConnector.sendIdToConnections(port, channel, entity); 437 } 438 439 // set the channel 440 Other other = _factory.newOther(_qn(QN_CHANNEL), channel, _name.XSD_INT); 441 _factory.addAttribute(hasOther, other); 442 } 443 444 445 //////////////////////////////////////////////////////////////////////// 446 //// package protected fields //// 447 448 final static String QN_WORKFLOW = "workflow"; 449 final static String QN_USER = "user"; 450 final static String QN_EXECUTION_LSID = "executionLSID"; 451 final static String QN_ACTOR_CLASS = "actorClass"; 452 final static String QN_FIRING = "firing"; 453 final static String QN_ACTOR_NAME = "actorName"; 454 final static String QN_TOKEN_CLASS = "tokenClass"; 455 final static String QN_EXECUTION_END_DATETIME = "executionStopTime"; 456 final static String QN_EXECUTION_START_DATETIME = "executionStartTime"; 457 final static String QN_CHANNEL = "channel"; 458 459 //////////////////////////////////////////////////////////////////////// 460 //// private methods //// 461 462 /** Get an existing or create a new Entity for a token. */ 463 private Entity _getEntity(Token token, boolean isNewToken) 464 { 465 Entity retval = null; 466 467 if(!isNewToken) 468 { 469 retval = _tokenToEntityMap.get(token); 470 } 471 472 if(retval == null) 473 { 474 retval = _factory.newEntity(_qn("e" + String.valueOf(_entityCounter.incrementAndGet()))); 475 476 Other other = _factory.newOther(_qn(QN_TOKEN_CLASS), 477 token.getClass().getName(), _name.XSD_STRING); 478 _factory.addAttribute(retval, other); 479 480 _statementsOrBundles.add(retval); 481 482 // if it's a string token, use stringValue so we don't 483 // get the quotes. 484 if(token instanceof StringToken) 485 { 486 retval.setValue(_factory.newValue(((StringToken)token).stringValue())); 487 } 488 else 489 { 490 retval.setValue(_factory.newValue(token.toString())); 491 } 492 493 _tokenToEntityMap.put(token, retval); 494 } 495 496 return retval; 497 } 498 499 private void _createNamespace() { 500 _namespace = new Namespace(); 501 _namespace.addKnownNamespaces(); 502 _namespace.register(_namespacePrefixStr, _namespaceStr); 503 } 504 505 private Used _createNewUsed(Entity entity, Activity activity, Role role, Date timestamp) 506 { 507 Used used = _factory.newUsed(activity.getId(), entity.getId()); 508 _factory.addRole(used, role); 509 if(_addTimestamps) { 510 used.setTime(_factory.newTime(timestamp)); 511 } 512 _statementsOrBundles.add(used); 513 return used; 514 } 515 516 /* 517 private void _createUsedForParameter(Attribute attribute, Activity activity, Date timestamp) 518 throws RecordingException 519 { 520 if( (attribute instanceof Parameter) && 521 !(attribute instanceof ptolemy.data.expr.ExpertParameter) && 522 !(attribute instanceof ptolemy.actor.gui.WindowPropertiesAttribute) && 523 !(attribute instanceof ptolemy.actor.gui.SizeAttribute) && 524 !(attribute instanceof org.kepler.sms.SemanticType)) 525 { 526 //_debug(attribute); 527 Parameter parameter = (Parameter)attribute; 528 Token token = null; 529 530 try 531 { 532 token = parameter.getToken(); 533 } 534 catch(IllegalActionException e) 535 { 536 throw new RecordingException("Error reading parameter token: ", e); 537 } 538 539 if(token != null) 540 { 541 Role role = _getRole(attribute); 542 Entity entity = _getEntity(token, false); 543 _createNewUsed(entity, activity, role, timestamp); 544 } 545 } 546 } 547 */ 548 549 /** Get a role for a NamedObj. */ 550 private Role _getRole(NamedObj namedObj) 551 { 552 Role retval = null; 553 554 if((retval = _namedObjToRoleMap.get(namedObj)) == null) 555 { 556 retval = _factory.newRole(namedObj.getFullName(), _name.XSD_STRING); 557 _namedObjToRoleMap.put(namedObj, retval); 558 } 559 560 return retval; 561 } 562 563 /** Get the output stream. Returns null if the name is "NULL". */ 564 private OutputStream _getOutput() throws IllegalActionException 565 { 566 if(_outputStream != null) { 567 return _outputStream; 568 } else { 569 _closeOutput = false; 570 571 FileParameter fp = _params.getFileParameter(); 572 String name = ((StringToken)fp.getToken()).stringValue(); 573 574 // see if we should write to stdout or stderr 575 if(name.equals("System.out")) { 576 return System.out; 577 } else if(name.equals("System.err")) { 578 return System.err; 579 } else if(name.equals("NULL")) { 580 return null; 581 } else if(name.startsWith("file:")) { 582 // remove possible "file:" prefix 583 name = name.substring("file:".length()); 584 } 585 586 _closeOutput = true; 587 588 try { 589 return new FileOutputStream(name, false); 590 } catch(FileNotFoundException e) { 591 throw new IllegalActionException("Could not find file " + 592 name + " : " + e.getMessage()); 593 } 594 } 595 } 596 597 /** Create a qualified name. */ 598 private QualifiedName _qn(String name) { 599 return _namespace.qualifiedName(_namespacePrefixStr, name, _factory); 600 } 601 602 /** Reset all the objects. */ 603 private void _reset() { 604 _fireStateTable.clear(); 605 _namedObjToRoleMap.clear(); 606 _portConnector.clear(); 607 _statementsOrBundles.clear(); 608 _tokenToEntityMap.clear(); 609 _activityCounter.set(0); 610 _entityCounter.set(0); 611 } 612 613 //////////////////////////////////////////////////////////////////////// 614 //// private classes //// 615 616 private static class ProvRecordingParameters extends RecordingParameters 617 { 618 ProvRecordingParameters(NamedObj no) throws IllegalActionException, NameDuplicationException 619 { 620 super(no); 621 addFileParameter(_filenameStr, "System.out"); 622 addStringParameter(_namespaceStr, "http://kelper-project.org"); 623 addStringParameter(_nsPrefixStr, "kepler"); 624 addBooleanParameter(_addTimestampsStr, true); 625 626 addStringParameter(_outputTypeStr, "XML"); 627 List<String> formats = new ArrayList<String>(); 628 for(ProvFormat format : ProvFormat.values()) { 629 formats.add(format.toString()); 630 } 631 String[] array = formats.toArray(new String[formats.size()]); 632 Arrays.sort(array); 633 for(String name : array) { 634 addStringParameterChoice(_outputTypeStr, name); 635 } 636 } 637 638 public boolean getAddTimestampsValue() throws IllegalActionException { 639 return getBooleanValue(_addTimestampsStr); 640 } 641 642 /** Replace a Parameter. */ 643 @Override 644 public void replaceParameter(String name, Parameter parameter) 645 throws IllegalActionException 646 { 647 // if replacing the filename, close the file. 648 if(name.equals(_filenameStr)) 649 { 650 FileParameter p = (FileParameter)_params.get(_filenameStr); 651 p.close(); 652 } 653 654 super.replaceParameter(name, parameter); 655 } 656 657 /** Get the output FileParameter. */ 658 FileParameter getFileParameter() throws IllegalActionException 659 { 660 return (FileParameter)_params.get(_filenameStr); 661 } 662 663 private static final String _filenameStr = "filename"; 664 private static final String _namespaceStr = "namespace"; 665 private static final String _nsPrefixStr = "nsPrefix"; 666 private static final String _outputTypeStr = "outputType"; 667 private static final String _addTimestampsStr = "addTimestamps"; 668 669 } 670 671 //////////////////////////////////////////////////////////////////////// 672 //// private variables //// 673 674 private ProvFactory _factory; 675 676 private Name _name; 677 678 private List<StatementOrBundle> _statementsOrBundles; 679 680 private PortConnector<Entity> _portConnector; 681 682 private ProvRecordingParameters _params; 683 684 private Map<NamedObj, Role> _namedObjToRoleMap; 685 686 private AtomicInteger _entityCounter; 687 private AtomicInteger _activityCounter; 688 689 private Map<Token,Entity> _tokenToEntityMap; 690 691 private Namespace _namespace; 692 693 private String _namespacePrefixStr = "kepler"; 694 private String _namespaceStr = "http://kepler-project.org"; 695 696 private ProvFormat _outputType = ProvFormat.XML; 697 698 private Agent _agent; 699 private Entity _wfEntity; 700 701 /** If true, add timestamps to events. */ 702 private boolean _addTimestamps = true; 703 704 /** If true, close the output after writing. */ 705 private boolean _closeOutput; 706 707 /** The output stream to write to. */ 708 private OutputStream _outputStream; 709}