001/* OPM XML output
002
003Copyright (c) 2009-2010 The Regents of the University of California.
004All rights reserved.
005Permission is hereby granted, without written agreement and without
006license or royalty fees, to use, copy, modify, and distribute this
007software and its documentation for any purpose, provided that the above
008copyright notice and the following two paragraphs appear in all copies
009of this software.
010
011IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015SUCH DAMAGE.
016
017THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022ENHANCEMENTS, OR MODIFICATIONS.
023
024*/
025
026package org.kepler.provenance.opm;
027
028import java.io.FileNotFoundException;
029import java.io.PrintStream;
030import java.io.StringWriter;
031import java.util.GregorianCalendar;
032import java.util.HashMap;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036
037import javax.xml.bind.JAXBException;
038import javax.xml.datatype.DatatypeConfigurationException;
039import javax.xml.datatype.DatatypeFactory;
040import javax.xml.datatype.XMLGregorianCalendar;
041
042import org.kepler.provenance.FireState;
043import org.kepler.provenance.PortConnector;
044import org.kepler.provenance.Recording;
045import org.kepler.provenance.RecordingException;
046import org.kepler.provenance.RecordingParameters;
047import org.openprovenance.model.Artifact;
048import org.openprovenance.model.ArtifactId;
049import org.openprovenance.model.Artifacts;
050import org.openprovenance.model.CausalDependencies;
051import org.openprovenance.model.OPMFactory;
052import org.openprovenance.model.OPMGraph;
053import org.openprovenance.model.OPMSerialiser;
054import org.openprovenance.model.OTime;
055import org.openprovenance.model.ObjectFactory;
056import org.openprovenance.model.Process;
057import org.openprovenance.model.ProcessId;
058import org.openprovenance.model.Processes;
059import org.openprovenance.model.Role;
060import org.openprovenance.model.Used;
061import org.openprovenance.model.WasGeneratedBy;
062
063import ptolemy.actor.Actor;
064import ptolemy.actor.CompositeActor;
065import ptolemy.actor.FiringEvent;
066import ptolemy.actor.FiringsRecordable;
067import ptolemy.actor.IOPort;
068import ptolemy.actor.IOPortEvent;
069import ptolemy.actor.TypedIOPort;
070import ptolemy.data.StringToken;
071import ptolemy.data.Token;
072import ptolemy.data.expr.FileParameter;
073import ptolemy.data.expr.Parameter;
074import ptolemy.kernel.util.Attribute;
075import ptolemy.kernel.util.IllegalActionException;
076import ptolemy.kernel.util.NameDuplicationException;
077import ptolemy.kernel.util.NamedObj;
078
079/** A Recording for Open Provenance Model XML.
080 *
081 * @author Daniel Crawl
082 * @version $Id: OpenProvenanceModelXML.java 33496 2015-06-22 20:24:20Z crawl $
083 * @deprecated This recording output is no longer used. Instead, use ProvRecording.
084 */
085@Deprecated    
086public class OpenProvenanceModelXML extends Recording
087{
088
089    /** Construct a new OpenProvenanceModelXML. */
090    public OpenProvenanceModelXML() throws RecordingException
091    {
092        _params = null;
093
094        _portConnector = new PortConnector<ArtifactId>();
095
096        _fireStateTable = new LinkedHashMap<Actor, FireState<ProcessId>>();
097
098        _tokenToArtifactIdMap = new HashMap<Token,ArtifactId>();
099        _artifactCounter = 0;
100        _processCounter = 0;
101
102        _compositePortToProcessMap = new HashMap<CompositeMapKey,ProcessId>();
103        _portFiringCounter = 0;
104
105        _namedObjToRoleMap = new HashMap<NamedObj,Role>();
106
107        _objectFactory = new ObjectFactory();
108        _opmFactory = OPMFactory.getFactory();
109   
110        try
111        {
112            _opmSerialiser = new OPMSerialiser();
113        }
114        catch(JAXBException e)
115        {
116            throw new RecordingException("Unable to create new serializer: ", e);
117        }
118        
119        try
120        {
121            _dataTypeFactory = DatatypeFactory.newInstance();
122        }
123        catch(DatatypeConfigurationException e)
124        {
125            throw new RecordingException("Error trying to get new DataTypeFactory: ", e); 
126        }
127
128        Artifacts artifacts = _objectFactory.createArtifacts();
129        _artifactList = artifacts.getArtifact();
130
131        Processes processes = _objectFactory.createProcesses();
132        _processList = processes.getProcess();
133        
134        CausalDependencies depends = _objectFactory.createCausalDependencies();
135        _dependList = depends.getUsedOrWasGeneratedByOrWasTriggeredBy();
136        
137        _graph = _objectFactory.createOPMGraph();
138        _graph.setArtifacts(artifacts);
139        _graph.setProcesses(processes);
140        _graph.setCausalDependencies(depends);
141       
142        _needWorkflowContents(true);
143    }
144
145    /** React to a parameter change. */
146    public void attributeChanged(Attribute attribute)
147        throws IllegalActionException
148    {
149        
150        String name = attribute.getName();
151
152        if(name.equals(OPMRecordingParameters._filenameStr))
153        {
154            _resetWriter(); 
155        }
156        else
157        {
158            super.attributeChanged(attribute); 
159        }
160
161    }
162
163    /** Register an actor. */
164    public boolean regActor(Actor actor) throws RecordingException
165    {
166        if(actor instanceof FiringsRecordable)
167        {
168            FireState<ProcessId> fireState = new FireState<ProcessId>(actor, -1);
169            _fireStateTable.put(actor, fireState);
170        }
171
172        return true;
173    }
174
175    /** Register a port or portparameter.  */
176    public boolean regPort(TypedIOPort port) throws RecordingException
177    {
178        boolean retval = super.regPort(port);
179
180        _portConnector.createConnections(port);
181
182        return retval;
183    }
184
185    /** Record the starting of workflow execution. */
186    public void executionStart() throws RecordingException
187    {
188        _artifactList.clear();
189        _processList.clear();
190        _dependList.clear();
191        _namedObjToRoleMap.clear();
192        _artifactCounter = 0;
193        _processCounter = 0;
194        _tokenToArtifactIdMap.clear();
195        _compositePortToProcessMap.clear();
196        _portFiringCounter = 0;
197    }
198
199    /** Record the stopping of workflow execution. */
200    public void executionStop() throws RecordingException
201    {
202        // output serialization
203
204        StringWriter writer = new StringWriter();
205
206        try
207        {
208            String string = _opmSerialiser.serialiseOPMGraph(writer, _graph, true);
209
210            if(_textWriter != null)
211            {
212                _textWriter.print(string);
213            }
214        }
215        catch(JAXBException e)
216        {
217            throw new RecordingException("Error serializing graph: ", e);
218        }
219    }
220
221    /** Record an actor firing. */
222    public void actorFire(FiringEvent event) throws RecordingException
223    {
224        Actor actor = event.getActor();
225        FiringEvent.FiringEventType curEventType = event.getType();
226        FireState<ProcessId> fireState = _fireStateTable.get(actor);
227        
228        if(fireState == null)
229        {
230            throw new RecordingException(
231                "Received actor fire event for unregistered actor: " +
232                actor.getFullName());
233        }
234
235        synchronized(fireState)
236        {
237            // get the last type of firing start
238            FiringEvent.FiringEventType lastStartType =
239                fireState.getLastStartFireType();
240
241            // see if current firing is new iteration:
242            // NOTE: PN does not report iterate firings so the iteration
243            // may begin with prefire if the last type of firing was not
244            // iterate.
245            if(curEventType == FiringEvent.BEFORE_ITERATE ||
246                (curEventType == FiringEvent.BEFORE_PREFIRE &&
247                lastStartType != FiringEvent.BEFORE_ITERATE))
248            {
249                
250                // create a new process for the new firing
251                org.openprovenance.model.Process process = _objectFactory.createProcess();
252                _processList.add(process);
253               
254                process.setId("_p" + String.valueOf(_processCounter));
255                _processCounter++;
256
257                // set the process name to be the actor name and number of
258                // times it has fired.
259                String actorName = ((NamedObj)actor).getFullName();
260                int firings = fireState.getNumberOfFirings();
261                process.setValue(actorName + " fire " + firings);
262                ProcessId processId = _opmFactory.newProcessId(process);
263
264                // start the fire in the fire state.
265                fireState.fireStart(curEventType, processId);
266
267                //_debug("adding process id " + actorName + " fire " + firings);
268                
269                // record the parameters used.
270
271                List<?> attributeList = ((NamedObj)actor).attributeList();
272                for(int i = 0; i < attributeList.size(); i++)
273                {
274                    Attribute attribute = (Attribute)attributeList.get(i);
275                    _createUsedForParameter(attribute, processId);
276                }
277                
278
279            }
280            // see if current firing is end of iteration:
281            else if(curEventType == FiringEvent.AFTER_ITERATE ||
282                (curEventType == FiringEvent.AFTER_POSTFIRE &&
283                lastStartType == FiringEvent.BEFORE_PREFIRE))
284            {
285                //_debug("end firing: " + type + " for " + actor.getFullName());
286
287                // NOTE: if the type is a AFTER_POSTFIRE and last start
288                // type is BEFORE_PREFIRE, we are running in PN.
289                // in this case, tell the fireState to stop firing using
290                // AFTER_PREFIRE instead of AFTER_POSTFIRE, since we never
291                // told the fireState about the BEFORE_POSTFIRE.
292                if(curEventType == FiringEvent.AFTER_POSTFIRE)
293                {
294                    fireState.fireStop(FiringEvent.AFTER_PREFIRE);
295                }
296                else
297                {
298                    fireState.fireStop(curEventType);
299                }
300            }
301        }
302    }
303    
304    /** Record a port event. */
305    public void portEvent(IOPortEvent event) throws RecordingException
306    {
307        TypedIOPort port = (TypedIOPort)event.getPort();
308        Actor actor = (Actor)port.getContainer();
309        FireState<ProcessId> fireState = _fireStateTable.get(actor);
310        
311        synchronized(fireState)
312        {
313            int eventType = event.getEventType();
314            boolean recordEvent = false;
315            boolean isRead = true;
316
317            if(eventType == IOPortEvent.SEND_BEGIN)
318            {
319                recordEvent = true;
320                isRead = false;
321            }   
322            else if(eventType == IOPortEvent.GET_END)
323            {
324                recordEvent = true;
325            }
326
327            if(recordEvent)
328            {
329                ProcessId processId = fireState.getCurFireId();
330
331                if(actor instanceof CompositeActor)
332                {
333                    if(processId == null)
334                    {
335                        System.out.println("pid is null, actor = " + actor.getFullName());
336                    }
337
338                    CompositeMapKey key = new CompositeMapKey(processId, port);
339
340                    processId = _compositePortToProcessMap.get(key);
341                    if(processId == null)
342                    {
343                        // create a new process for the new firing
344                        org.openprovenance.model.Process process = _objectFactory.createProcess();
345                        _processList.add(process);
346                        
347                        process.setId("_p" + String.valueOf(_processCounter));
348                        _processCounter++;
349                        
350                        String name = port.getFullName();
351                        int firings = _portFiringCounter++;
352                        process.setValue(name + " fire " + firings);
353                        processId = _opmFactory.newProcessId(process);
354 
355                        _compositePortToProcessMap.put(key, processId);
356                    }
357                }
358
359                if(event.getVectorLength() == IOPortEvent.SINGLETOKEN)
360                {
361                    _recordPortEvent(port, processId, isRead, event.getChannel(), event.getToken());
362                }
363                else
364                {
365                    Token[] tokenArray = event.getTokenArray();
366                    for(int i = 0; i < tokenArray.length; i++)
367                    {
368                        _recordPortEvent(port, processId, isRead, event.getChannel(), tokenArray[i]);
369                    }
370                }
371            }
372        } 
373    }
374
375    /** Add Parameters for ProvenanceListener. */
376    public RecordingParameters generateParameters(NamedObj no)
377        throws IllegalActionException, NameDuplicationException
378    {
379        _params = new OPMRecordingParameters(no);
380        return _params;
381    }
382
383    ////////////////////////////////////////////////////////////////////////
384    //// protected methods                                              ////
385
386    /** Record a port read or write. */
387    protected void _recordPortEvent(IOPort port, ProcessId processId,
388        boolean isRead, int channel, Token token)
389        throws RecordingException
390    {
391    
392        Role role = _getRole(port);
393
394        if(isRead)
395        {
396            ArtifactId artifactId = _portConnector.getNextId(port, channel);
397            _createNewUsed(artifactId, processId, role);
398        }
399        else
400        {
401            ArtifactId artifactId = _getArtifactId(token, true);
402
403            WasGeneratedBy wasGeneratedBy = _objectFactory.createWasGeneratedBy();
404            wasGeneratedBy.setCause(processId);
405            wasGeneratedBy.setEffect(artifactId);
406            wasGeneratedBy.setTime(_getCurrentOTime());
407            wasGeneratedBy.setRole(role);
408            _dependList.add(wasGeneratedBy);
409
410            _portConnector.sendIdToConnections(port, channel, artifactId);
411        }
412    }
413
414    ////////////////////////////////////////////////////////////////////////
415    //// private methods                                                ////
416
417    /** Close the writer unless it is stdout or stderr. */
418    private void _closeWriter()
419    {
420        if(_textWriter != null)
421        {
422            // don't close stdout or stderr
423            if(!_outputName.equals("System.out") &&
424                !_outputName.equals("System.err"))
425            {
426                _textWriter.close();
427            }
428            else
429            {
430                _textWriter.flush();
431            }
432        }
433    }
434
435    /** Get an existing or create a new ArtifactId for a token. */
436    private ArtifactId _getArtifactId(Token token, boolean isNewToken)
437    {
438        ArtifactId retval = null;
439        
440        if(!isNewToken)
441        {
442            retval = _tokenToArtifactIdMap.get(token);
443        }
444
445        if(retval == null)
446        {
447            Artifact artifact = _objectFactory.createArtifact();
448            _artifactList.add(artifact);
449        
450            artifact.setId("_a" + String.valueOf(_artifactCounter));
451            _artifactCounter++;
452            
453            // if it's a string token, use stringValue so we don't
454            // get the quotes.
455            if(token instanceof StringToken)
456            {
457                artifact.setValue(((StringToken)token).stringValue());
458            }
459            else
460            {
461                artifact.setValue(token.toString());
462            }
463
464            retval = _opmFactory.newArtifactId(artifact);
465
466            _tokenToArtifactIdMap.put(token, retval);
467        }
468
469        return retval;
470    }
471
472    private void _createNewUsed(ArtifactId artifactId, ProcessId processId, Role role)
473    {
474        Used used = _objectFactory.createUsed();
475        _dependList.add(used);
476        
477        used.setCause(artifactId);
478        used.setEffect(processId);
479        used.setTime(_getCurrentOTime());
480        used.setRole(role);
481    }
482
483    private void _createUsedForParameter(Attribute attribute, ProcessId processId)
484        throws RecordingException
485    {
486        if( (attribute instanceof Parameter) &&
487           !(attribute instanceof ptolemy.data.expr.ExpertParameter) &&
488           !(attribute instanceof ptolemy.actor.gui.WindowPropertiesAttribute) &&
489           !(attribute instanceof ptolemy.actor.gui.SizeAttribute) &&
490           !(attribute instanceof org.kepler.sms.SemanticType))
491        {
492            //_debug(attribute); 
493            Parameter parameter = (Parameter)attribute;
494            Token token = null;
495
496            try
497            {
498                token = parameter.getToken();
499            }
500            catch(IllegalActionException e)
501            {
502                throw new RecordingException("Error reading parameter token: ", e);
503            }
504            
505            if(token != null)
506            {
507                Role role = _getRole(attribute);
508                ArtifactId artifactId = _getArtifactId(token, false);
509                _createNewUsed(artifactId, processId, role);
510            }
511        }
512    }
513
514    /** Get a new OTime representing the current date and time. */
515    private OTime _getCurrentOTime()// throws Exception
516    {
517        OTime retval = _objectFactory.createOTime();
518        //retval.setClockId("clock id");
519        XMLGregorianCalendar cal = _dataTypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
520        retval.setNoEarlierThan(cal);
521        retval.setNoLaterThan(cal);
522        retval.setClockId("_c1");
523        return retval;
524    }
525
526    /** Get a role for a NamedObj. */
527    private Role _getRole(NamedObj namedObj)
528    {
529        Role retval = null;
530        
531        if((retval = _namedObjToRoleMap.get(namedObj)) == null)
532        {
533            retval = _objectFactory.createRole();
534            _namedObjToRoleMap.put(namedObj, retval);
535            retval.setValue(namedObj.getName());
536        }
537
538        return retval;
539    }
540
541    /** Reset the output writer. */
542    private void _resetWriter() throws IllegalActionException
543    {
544        FileParameter fp = _params.getFileParameter();
545        String name = ((StringToken)fp.getToken()).stringValue();
546
547        // see if the output name has changed or was never set
548        if(_outputName == null || !name.equals(_outputName))
549        {
550            _closeWriter();
551
552            // see if we should write to stdout
553            if(name.equals("System.out"))
554            {
555                _textWriter = System.out;
556            }
557            // see if we should write to stderr
558            else if(name.equals("System.err"))
559            {
560                _textWriter = System.err;
561            }
562            else if(name.equals("NULL"))
563            {
564                _textWriter = null;
565            }
566            else
567            {
568                // remove possible "file:" prefix
569                if(name.startsWith("file:"))
570                {
571                    name = name.substring("file:".length());
572                }
573
574                try
575                {
576                    _textWriter = new PrintStream(name);
577                }
578                catch(FileNotFoundException e)
579                {
580                    throw new IllegalActionException("Could not find file " +
581                        name + " : " + e.getMessage());
582                }
583            }
584
585            // save the name
586            _outputName = name;
587        }
588    }
589
590    ////////////////////////////////////////////////////////////////////////
591    //// private classes                                                ////
592   
593    private static class OPMRecordingParameters extends RecordingParameters
594    {
595        OPMRecordingParameters(NamedObj no) throws IllegalActionException, NameDuplicationException
596        {
597            super(no);
598            addFileParameter(_filenameStr, "System.out");
599        }
600
601        /** Replace a Parameter. */
602        public void replaceParameter(String name, Parameter parameter)
603            throws IllegalActionException
604        {
605            // if replacing the filename, close the file.
606            if(name.equals(_filenameStr))
607            {
608                FileParameter p = (FileParameter)_params.get(_filenameStr);
609                p.close();
610            }
611
612            super.replaceParameter(name, parameter);
613        }
614
615        /** Get the output FileParameter. */
616        FileParameter getFileParameter() throws IllegalActionException
617        {
618            return (FileParameter)_params.get(_filenameStr);
619        }
620
621        private static final String _filenameStr = "Filename";
622    }
623
624    private static class CompositeMapKey
625    {
626        public CompositeMapKey(ProcessId processId, IOPort port)
627        {
628            this.processId = processId;
629            this.port = port;
630        }
631
632        public int hashCode()
633        {
634            return processId.hashCode() + port.hashCode();
635        }
636
637        public boolean equals(Object obj)
638        {
639            if(!(obj instanceof CompositeMapKey))
640            {
641                return false;
642            }
643            else
644            {
645                CompositeMapKey other = (CompositeMapKey)obj;
646                return processId.equals(other.processId) &&
647                    port.equals(other.port);
648            }
649        }
650
651        public ProcessId processId;
652        public IOPort port;
653    }
654
655    /*
656    private static class TokenInfo
657    {
658        public TokenInfo(ArtifactId artifactId)
659        {
660            _artifactId = artifactId;
661        }
662
663        public ArtifactId getArtifactId()
664        {
665            return _artifactId;
666        }
667
668        private ArtifactId _artifactId;
669    }
670    */
671
672    ////////////////////////////////////////////////////////////////////////
673    //// private variables                                              ////
674
675    private ObjectFactory _objectFactory;
676    private OPMFactory _opmFactory;
677    private DatatypeFactory _dataTypeFactory;
678
679    private OPMGraph _graph;
680    private List<Artifact> _artifactList;
681    private List<Process> _processList;
682    private List<Object> _dependList;
683        
684    private OPMSerialiser _opmSerialiser;
685    
686    //private Map<Actor,Process>
687
688    /** A table to map actor to its firing state object. */
689    private Map<Actor, FireState<ProcessId>> _fireStateTable = null;
690
691    private PortConnector<ArtifactId> _portConnector;
692
693    private OPMRecordingParameters _params;
694
695    /** Output writer. */
696    private PrintStream _textWriter = null;
697
698    /** Output name. */
699    private String _outputName = null;
700
701    private Map<NamedObj, Role> _namedObjToRoleMap;
702
703    private int _artifactCounter;
704    private int _processCounter;
705
706    private Map<Token,ArtifactId> _tokenToArtifactIdMap;
707
708    private Map<CompositeMapKey,ProcessId> _compositePortToProcessMap;
709
710    private int _portFiringCounter;
711}