001/* PROV output
002
003Copyright (c) 2015 The Regents of the University of California.
004All rights reserved.
005Permission is hereby granted, without written agreement and without
006license or royalty fees, to use, copy, modify, and distribute this
007software and its documentation for any purpose, provided that the above
008copyright notice and the following two paragraphs appear in all copies
009of this software.
010
011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015SUCH DAMAGE.
016
017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022ENHANCEMENTS, OR MODIFICATIONS.
023
024*/
025
026package org.kepler.provenance.prov;
027
028import java.io.ByteArrayOutputStream;
029import java.io.FileNotFoundException;
030import java.io.FileOutputStream;
031import java.io.IOException;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Date;
036import java.util.HashMap;
037import java.util.LinkedList;
038import java.util.List;
039import java.util.Map;
040import java.util.concurrent.atomic.AtomicInteger;
041
042import org.kepler.objectmanager.lsid.KeplerLSID;
043import org.kepler.provenance.FireState;
044import org.kepler.provenance.PortConnector;
045import org.kepler.provenance.RecordingException;
046import org.kepler.provenance.RecordingParameters;
047import org.kepler.provenance.SimpleFiringRecording;
048import org.openprovenance.prov.interop.InteropFramework;
049import org.openprovenance.prov.interop.InteropFramework.ProvFormat;
050import org.openprovenance.prov.model.Activity;
051import org.openprovenance.prov.model.Agent;
052import org.openprovenance.prov.model.Entity;
053import org.openprovenance.prov.model.HasOther;
054import org.openprovenance.prov.model.Name;
055import org.openprovenance.prov.model.Namespace;
056import org.openprovenance.prov.model.Other;
057import org.openprovenance.prov.model.ProvFactory;
058import org.openprovenance.prov.model.QualifiedName;
059import org.openprovenance.prov.model.Role;
060import org.openprovenance.prov.model.StatementOrBundle;
061import org.openprovenance.prov.model.Used;
062import org.openprovenance.prov.model.WasAssociatedWith;
063import org.openprovenance.prov.model.WasGeneratedBy;
064import org.openprovenance.prov.xml.Document;
065
066import ptolemy.actor.Actor;
067import ptolemy.actor.FiringEvent;
068import ptolemy.actor.IOPort;
069import ptolemy.actor.IOPortEvent;
070import ptolemy.actor.TypedIOPort;
071import ptolemy.data.StringToken;
072import ptolemy.data.Token;
073import ptolemy.data.expr.FileParameter;
074import ptolemy.data.expr.Parameter;
075import ptolemy.kernel.util.Attribute;
076import ptolemy.kernel.util.IllegalActionException;
077import ptolemy.kernel.util.NameDuplicationException;
078import ptolemy.kernel.util.NamedObj;
079
080/** A Recording for PROV output.
081 *
082 * @author Daniel Crawl
083 * @version $Id: ProvRecording.java 33636 2015-08-24 22:49:48Z crawl $
084 */
085    
086public class ProvRecording extends SimpleFiringRecording<Activity>
087{
088
089    /** Construct a new ProvRecording. */
090    public ProvRecording() throws RecordingException
091    {
092        super();
093        
094        _params = null;
095
096        _portConnector = new PortConnector<Entity>();
097
098        _tokenToEntityMap = new HashMap<Token,Entity>();
099
100        _namedObjToRoleMap = new HashMap<NamedObj,Role>();
101        
102        _statementsOrBundles = new LinkedList<StatementOrBundle>();
103
104        _factory = InteropFramework.newXMLProvFactory();
105        _name = _factory.getName();
106        
107        _activityCounter = new AtomicInteger(0);
108        _entityCounter = new AtomicInteger(0);
109        
110        _createNamespace();    
111    }
112
113    /** React to a parameter change. */
114    @Override
115    public void attributeChanged(Attribute attribute)
116        throws IllegalActionException
117    {
118        
119        String name = attribute.getName();
120
121        if(name.equals(ProvRecordingParameters._addTimestampsStr)) {
122            _addTimestamps = _params.getAddTimestampsValue();
123        } else if(name.equals(ProvRecordingParameters._namespaceStr)) {
124            _namespaceStr = _params.getStringValue(name);
125            _createNamespace();
126        } else if(name.equals(ProvRecordingParameters._nsPrefixStr)) {
127            _namespacePrefixStr = _params.getStringValue(name);
128            _createNamespace();
129        } else if(name.equals(ProvRecordingParameters._outputTypeStr)) {
130            String outputTypeStr = _params.getStringValue(name);
131            ProvFormat outputType = ProvFormat.valueOf(outputTypeStr);
132            if(outputType == null) {
133                throw new IllegalActionException(_recorderContainer,
134                        "Unsupported PROV serialization type: " + outputTypeStr);
135            }
136            _outputType = outputType;
137        } else {
138            super.attributeChanged(attribute); 
139        }
140
141    }
142        
143    /** Register a port or portparameter.  */
144    @Override
145    public boolean regPort(TypedIOPort port) throws RecordingException
146    {
147        boolean retval = super.regPort(port);
148
149        _portConnector.createConnections(port);
150
151        return retval;
152    }
153
154    @Override
155    public void executionStart(KeplerLSID executionLSID, Date timestamp) throws RecordingException {
156    
157        // add Agent for user
158        _agent = _factory.newAgent(_qn(QN_USER),
159                System.getProperty("user.name"));
160        _statementsOrBundles.add(_agent);
161        
162        _wfEntity = _factory.newEntity(_qn(QN_WORKFLOW)); 
163        _wfEntity.setValue(_factory.newValue(_recorderContainer.exportMoML()));
164        _statementsOrBundles.add(_wfEntity);
165        
166        Other other = _factory.newOther(_qn(QN_EXECUTION_LSID),
167                executionLSID.toString(), _name.XSD_STRING);
168        _factory.addAttribute(_wfEntity, other);
169        
170        other = _factory.newOther(_qn(QN_ACTOR_NAME),
171                _recorderContainer.getFullName(), _name.XSD_STRING);
172        _factory.addAttribute(_wfEntity, other);
173
174        if(_addTimestamps) {
175            other = _factory.newOther(_qn(QN_EXECUTION_START_DATETIME),
176                    _factory.newTime(timestamp), _name.XSD_DATETIME);
177            _factory.addAttribute(_wfEntity, other);
178        }
179
180    }
181    
182    /** Record the stopping of workflow execution. */
183    @Override
184    public void executionStop(KeplerLSID executionLSID, Date timestamp) throws RecordingException
185    {
186        
187        if(_addTimestamps) {
188            Other other = _factory.newOther(_qn(QN_EXECUTION_END_DATETIME),
189                    _factory.newTime(timestamp), _name.XSD_DATETIME);
190            _factory.addAttribute(_wfEntity, other);
191        }
192        
193        // TODO stop all currently firing actors     
194        
195        // output serialization        
196        try {
197            Document document = new Document();
198            document.setNamespace(_namespace);        
199            document.getStatementOrBundle().addAll(_statementsOrBundles);
200    
201            InteropFramework framework = new InteropFramework();
202            
203            OutputStream outputStream = null;
204            try {
205                outputStream = _getOutput();
206                if(outputStream != null) {
207                    framework.writeDocument(outputStream, _outputType, document);
208                }
209            } finally {
210                if(_closeOutput) {
211                    try {
212                        outputStream.close();
213                    } catch (IOException e) {
214                        throw new RecordingException("Error closing output.", e);
215                    }
216                }
217            }
218            
219            if(_debugWriter != null) {
220                try(ByteArrayOutputStream byteStream = new ByteArrayOutputStream();) {
221                    framework.writeDocument(byteStream, _outputType, document);
222                    _debugWriter.write(byteStream.toString());
223                } catch (IOException e) {
224                    throw new RecordingException("Error writing to debug writer.", e);
225                }
226                
227            }
228            
229            
230        } catch(IllegalActionException e) {
231            throw new RecordingException("Error writing output.", e);
232        } finally {
233            _reset();
234        }
235        
236    }
237
238    /** Record an actor firing. */
239    @Override
240    public void actorFire(FiringEvent event, Date timestamp) throws RecordingException
241    {
242        Actor actor = event.getActor();
243        FiringEvent.FiringEventType curEventType = event.getType();
244        FireState<Activity> fireState = _fireStateTable.get(actor);
245        
246        if(fireState == null)
247        {
248            throw new RecordingException(
249                "Received actor fire event for unregistered actor: " +
250                actor.getFullName());
251        }
252
253        synchronized(fireState)
254        {
255            // get the last type of firing start
256            FiringEvent.FiringEventType lastStartType =
257                fireState.getLastStartFireType();
258
259            // see if current firing is new iteration:
260            // NOTE: PN does not report iterate firings so the iteration
261            // may begin with prefire if the last type of firing was not
262            // iterate.
263            if(curEventType == FiringEvent.BEFORE_ITERATE ||
264                (curEventType == FiringEvent.BEFORE_PREFIRE &&
265                lastStartType != FiringEvent.BEFORE_ITERATE))
266            {
267                
268                // create a new process for the new firing
269                final int count = _activityCounter.incrementAndGet();
270                Activity activity = _factory.newActivity(_qn("a" + count));
271                _statementsOrBundles.add(activity);
272
273                Other other = _factory.newOther(_qn(QN_ACTOR_CLASS),
274                        ((NamedObj) actor).getClassName(), _name.XSD_STRING);
275                _factory.addAttribute(activity, other);
276                
277                other = _factory.newOther(_qn(QN_FIRING),
278                        fireState.getNumberOfFirings(), _name.XSD_INT);
279                _factory.addAttribute(activity, other);
280                
281                other = _factory.newOther(_qn(QN_ACTOR_NAME),
282                        actor.getFullName(), _name.XSD_STRING);
283                _factory.addAttribute(activity, other);
284                
285                if(_addTimestamps) {
286                    activity.setStartTime(_factory.newTime(timestamp));
287                }
288                                
289                // start the fire in the fire state.
290                fireState.fireStart(curEventType, activity);
291
292                WasAssociatedWith waw = _factory.newWasAssociatedWith(
293                        _qn("waw" + count), activity.getId(), _agent.getId());
294                waw.setPlan(_wfEntity.getId());
295                _statementsOrBundles.add(waw);
296                
297                // record the parameters used.
298
299                
300                /** TODO
301                List<?> attributeList = ((NamedObj)actor).attributeList();
302                for(int i = 0; i < attributeList.size(); i++)
303                {
304                    Attribute attribute = (Attribute)attributeList.get(i);
305                    _createUsedForParameter(attribute, activity);
306                }       
307                **/         
308
309            }
310            // see if current firing is end of iteration:
311            else if(curEventType == FiringEvent.AFTER_ITERATE ||
312                (curEventType == FiringEvent.AFTER_POSTFIRE &&
313                lastStartType == FiringEvent.BEFORE_PREFIRE))
314            {
315                //_debug("end firing: " + type + " for " + actor.getFullName());
316
317                // NOTE: if the type is a AFTER_POSTFIRE and last start
318                // type is BEFORE_PREFIRE, we are running in PN.
319                // in this case, tell the fireState to stop firing using
320                // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never
321                // told the fireState about the BEFORE_POSTFIRE.
322                Activity activity;
323                if(curEventType == FiringEvent.AFTER_POSTFIRE)
324                {
325                    activity = fireState.fireStop(FiringEvent.AFTER_PREFIRE);
326                }
327                else
328                {
329                    activity = fireState.fireStop(curEventType);
330                }
331                
332                if(_addTimestamps) {
333                    activity.setEndTime(_factory.newTime(timestamp));
334                }
335
336            }
337        }
338    }
339    
340    /** Record a port event. */
341    @Override
342    public void portEvent(IOPortEvent event, Date timestamp) throws RecordingException
343    {
344        TypedIOPort port = (TypedIOPort)event.getPort();
345        Actor actor = (Actor)port.getContainer();
346        FireState<Activity> fireState = _fireStateTable.get(actor);
347        
348        synchronized(fireState)
349        {
350            int eventType = event.getEventType();
351            boolean recordEvent = false;
352            boolean isRead = true;
353
354            if(eventType == IOPortEvent.SEND_BEGIN)
355            {
356                recordEvent = true;
357                isRead = false;
358            }   
359            else if(eventType == IOPortEvent.GET_END)
360            {
361                recordEvent = true;
362            }
363
364            if(recordEvent)
365            {
366                Activity activity = fireState.getCurFireId();
367
368                if(event.getVectorLength() == IOPortEvent.SINGLETOKEN)
369                {
370                    _recordPortEvent(port, activity, isRead, event.getChannel(), event.getToken(), timestamp);
371                }
372                else
373                {
374                    Token[] tokenArray = event.getTokenArray();
375                    for(int i = 0; i < tokenArray.length; i++)
376                    {
377                        _recordPortEvent(port, activity, isRead, event.getChannel(), tokenArray[i], timestamp);
378                    }
379                }
380            }
381        } 
382    }
383
384    /** Add Parameters for ProvenanceListener. */
385    @Override
386    public RecordingParameters generateParameters(NamedObj no)
387        throws IllegalActionException, NameDuplicationException
388    {
389        _params = new ProvRecordingParameters(no);
390        return _params;
391    }
392    
393    /** Set the output. The stream is written and closed after
394     *  the workflow execution stops.
395     */
396    public void setOutput(OutputStream stream) {
397        _outputStream = stream;
398    }
399    
400    /** Set the output type. */
401    public void setOutputType(ProvFormat type) {
402        _outputType = type;
403    }
404    
405    ////////////////////////////////////////////////////////////////////////
406    //// protected methods                                              ////
407
408    /** Record a port read or write. */
409    protected void _recordPortEvent(IOPort port, Activity activity,
410        boolean isRead, int channel, Token token, Date timestamp)
411        throws RecordingException
412    {
413    
414        Role role = _getRole(port);
415
416        HasOther hasOther;
417        
418        if(isRead)
419        {
420            Entity entity = _portConnector.getNextId(port, channel);
421            hasOther = _createNewUsed(entity, activity, role, timestamp);
422        }
423        else
424        {
425            Entity entity = _getEntity(token, true);
426
427            WasGeneratedBy wasGeneratedBy =
428                    _factory.newWasGeneratedBy(entity, port.getFullName(), activity);
429            _statementsOrBundles.add(wasGeneratedBy);
430            hasOther = wasGeneratedBy;
431            
432            if(_addTimestamps) {
433                wasGeneratedBy.setTime(_factory.newTime(timestamp));
434            }
435
436            _portConnector.sendIdToConnections(port, channel, entity);
437        }
438        
439        // set the channel
440        Other other = _factory.newOther(_qn(QN_CHANNEL), channel, _name.XSD_INT);
441        _factory.addAttribute(hasOther, other);
442    }
443
444    
445    ////////////////////////////////////////////////////////////////////////
446    //// package protected fields                                       ////
447    
448    final static String QN_WORKFLOW = "workflow";
449    final static String QN_USER = "user";
450    final static String QN_EXECUTION_LSID = "executionLSID";
451    final static String QN_ACTOR_CLASS = "actorClass";
452    final static String QN_FIRING = "firing";
453    final static String QN_ACTOR_NAME = "actorName";
454    final static String QN_TOKEN_CLASS = "tokenClass";
455    final static String QN_EXECUTION_END_DATETIME = "executionStopTime";
456    final static String QN_EXECUTION_START_DATETIME = "executionStartTime";
457    final static String QN_CHANNEL = "channel";
458    
459    ////////////////////////////////////////////////////////////////////////
460    //// private methods                                                ////
461
462    /** Get an existing or create a new Entity for a token. */
463    private Entity _getEntity(Token token, boolean isNewToken)
464    {
465        Entity retval = null;
466        
467        if(!isNewToken)
468        {
469            retval = _tokenToEntityMap.get(token);
470        }
471
472        if(retval == null)
473        {
474            retval = _factory.newEntity(_qn("e" + String.valueOf(_entityCounter.incrementAndGet())));
475
476            Other other = _factory.newOther(_qn(QN_TOKEN_CLASS),
477                    token.getClass().getName(), _name.XSD_STRING);
478            _factory.addAttribute(retval, other);
479                        
480            _statementsOrBundles.add(retval);       
481            
482            // if it's a string token, use stringValue so we don't
483            // get the quotes.
484            if(token instanceof StringToken)
485            {
486                retval.setValue(_factory.newValue(((StringToken)token).stringValue()));
487            }
488            else
489            {
490                retval.setValue(_factory.newValue(token.toString()));
491            }
492
493            _tokenToEntityMap.put(token, retval);
494        }
495
496        return retval;
497    }
498
499    private void _createNamespace() {   
500        _namespace = new Namespace();
501        _namespace.addKnownNamespaces();
502        _namespace.register(_namespacePrefixStr, _namespaceStr);
503    }
504
505    private Used _createNewUsed(Entity entity, Activity activity, Role role, Date timestamp)
506    {
507        Used used = _factory.newUsed(activity.getId(), entity.getId());
508        _factory.addRole(used, role);
509        if(_addTimestamps) {
510            used.setTime(_factory.newTime(timestamp));
511        }
512        _statementsOrBundles.add(used); 
513        return used;
514    }
515
516    /*
517    private void _createUsedForParameter(Attribute attribute, Activity activity, Date timestamp)
518        throws RecordingException
519    {
520        if( (attribute instanceof Parameter) &&
521           !(attribute instanceof ptolemy.data.expr.ExpertParameter) &&
522           !(attribute instanceof ptolemy.actor.gui.WindowPropertiesAttribute) &&
523           !(attribute instanceof ptolemy.actor.gui.SizeAttribute) &&
524           !(attribute instanceof org.kepler.sms.SemanticType))
525        {
526            //_debug(attribute); 
527            Parameter parameter = (Parameter)attribute;
528            Token token = null;
529
530            try
531            {
532                token = parameter.getToken();
533            }
534            catch(IllegalActionException e)
535            {
536                throw new RecordingException("Error reading parameter token: ", e);
537            }
538            
539            if(token != null)
540            {
541                Role role = _getRole(attribute);
542                Entity entity = _getEntity(token, false);
543                _createNewUsed(entity, activity, role, timestamp);
544            }
545        }
546    }
547    */
548
549    /** Get a role for a NamedObj. */
550    private Role _getRole(NamedObj namedObj)
551    {
552        Role retval = null;
553        
554        if((retval = _namedObjToRoleMap.get(namedObj)) == null)
555        {
556            retval = _factory.newRole(namedObj.getFullName(), _name.XSD_STRING);
557            _namedObjToRoleMap.put(namedObj, retval);
558        }
559
560        return retval;
561    }
562
563    /** Get the output stream. Returns null if the name is "NULL". */
564    private OutputStream _getOutput() throws IllegalActionException
565    {
566        if(_outputStream != null) {
567            return _outputStream;
568        } else {
569            _closeOutput = false;
570            
571            FileParameter fp = _params.getFileParameter();
572            String name = ((StringToken)fp.getToken()).stringValue();
573    
574            // see if we should write to stdout or stderr
575            if(name.equals("System.out")) {
576                return System.out;
577            } else if(name.equals("System.err")) {
578                return System.err;
579            } else if(name.equals("NULL")) {
580                return null;
581            } else if(name.startsWith("file:")) {
582              // remove possible "file:" prefix
583              name = name.substring("file:".length());
584            }
585    
586            _closeOutput = true;
587            
588            try {
589                return new FileOutputStream(name, false);
590            } catch(FileNotFoundException e) {
591                throw new IllegalActionException("Could not find file " +
592                    name + " : " + e.getMessage());
593            }
594        }
595    }
596        
597    /** Create a qualified name. */
598    private QualifiedName _qn(String name) {
599        return _namespace.qualifiedName(_namespacePrefixStr, name, _factory);
600    }
601
602    /** Reset all the objects. */
603    private void _reset() {
604        _fireStateTable.clear();
605        _namedObjToRoleMap.clear();
606        _portConnector.clear();
607        _statementsOrBundles.clear();
608        _tokenToEntityMap.clear();
609        _activityCounter.set(0);
610        _entityCounter.set(0);
611    }
612    
613    ////////////////////////////////////////////////////////////////////////
614    //// private classes                                                ////
615   
616    private static class ProvRecordingParameters extends RecordingParameters
617    {
618        ProvRecordingParameters(NamedObj no) throws IllegalActionException, NameDuplicationException
619        {
620            super(no);
621            addFileParameter(_filenameStr, "System.out");
622            addStringParameter(_namespaceStr, "http://kelper-project.org");
623            addStringParameter(_nsPrefixStr, "kepler");
624            addBooleanParameter(_addTimestampsStr, true);
625            
626            addStringParameter(_outputTypeStr, "XML");
627            List<String> formats = new ArrayList<String>();
628            for(ProvFormat format : ProvFormat.values()) {
629                formats.add(format.toString());
630            }
631            String[] array = formats.toArray(new String[formats.size()]);
632            Arrays.sort(array);
633            for(String name : array) {
634                addStringParameterChoice(_outputTypeStr, name);
635            }
636        }
637
638        public boolean getAddTimestampsValue() throws IllegalActionException {
639            return getBooleanValue(_addTimestampsStr);
640        }
641        
642        /** Replace a Parameter. */
643        @Override
644        public void replaceParameter(String name, Parameter parameter)
645            throws IllegalActionException
646        {
647            // if replacing the filename, close the file.
648            if(name.equals(_filenameStr))
649            {
650                FileParameter p = (FileParameter)_params.get(_filenameStr);
651                p.close();
652            }
653
654            super.replaceParameter(name, parameter);
655        }
656
657        /** Get the output FileParameter. */
658        FileParameter getFileParameter() throws IllegalActionException
659        {
660            return (FileParameter)_params.get(_filenameStr);
661        }
662
663        private static final String _filenameStr = "filename";
664        private static final String _namespaceStr = "namespace";
665        private static final String _nsPrefixStr = "nsPrefix";
666        private static final String _outputTypeStr = "outputType";
667        private static final String _addTimestampsStr = "addTimestamps";
668        
669    }
670    
671    ////////////////////////////////////////////////////////////////////////
672    //// private variables                                              ////
673
674    private ProvFactory _factory;
675
676    private Name _name;
677    
678    private List<StatementOrBundle> _statementsOrBundles;
679        
680    private PortConnector<Entity> _portConnector;
681
682    private ProvRecordingParameters _params;
683
684    private Map<NamedObj, Role> _namedObjToRoleMap;
685
686    private AtomicInteger _entityCounter;
687    private AtomicInteger _activityCounter;
688
689    private Map<Token,Entity> _tokenToEntityMap;
690    
691    private Namespace _namespace;
692    
693    private String _namespacePrefixStr = "kepler";
694    private String _namespaceStr = "http://kepler-project.org";
695
696    private ProvFormat _outputType = ProvFormat.XML;
697    
698    private Agent _agent;
699    private Entity _wfEntity;
700    
701    /** If true, add timestamps to events. */
702    private boolean _addTimestamps = true;
703    
704    /** If true, close the output after writing. */
705    private boolean _closeOutput;
706    
707    /** The output stream to write to. */
708    private OutputStream _outputStream;
709}