001/**
002 *  '$Author: barseghian $'
003 *  '$Date: 2011-06-21 22:42:31 +0000 (Tue, 21 Jun 2011) $'
004 *  '$Revision: 27756 $'
005 *
006 *  For Details:
007 *  http://www.kepler-project.org
008 *
009 *  Copyright (c) 2009-2010 The Regents of the
010 *  University of California. All rights reserved. Permission is hereby granted,
011 *  without written agreement and without license or royalty fees, to use, copy,
012 *  modify, and distribute this software and its documentation for any purpose,
013 *  provided that the above copyright notice and the following two paragraphs
014 *  appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF
015 *  CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL,
016 *  OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
017 *  DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE
018 *  POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY
019 *  DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
020 *  WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
021 *  SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 *  CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 *  ENHANCEMENTS, OR MODIFICATIONS.
024 */
025
026package org.kepler.kar.handlers;
027
028import java.io.BufferedReader;
029import java.io.ByteArrayInputStream;
030import java.io.IOException;
031import java.io.InputStream;
032import java.io.InputStreamReader;
033import java.util.Hashtable;
034import java.util.Iterator;
035import java.util.List;
036import java.util.Vector;
037import java.util.zip.ZipEntry;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041import org.kepler.kar.KAREntry;
042import org.kepler.kar.KAREntryHandler;
043import org.kepler.kar.KAREntryHandlerFactory;
044import org.kepler.kar.KARFile;
045import org.kepler.moml.NamedObjId;
046import org.kepler.objectmanager.ObjectManager;
047import org.kepler.objectmanager.cache.CacheManager;
048import org.kepler.objectmanager.cache.CacheObject;
049import org.kepler.objectmanager.cache.CacheObjectInterface;
050import org.kepler.objectmanager.cache.WorkflowRunCacheObject;
051import org.kepler.objectmanager.lsid.KeplerLSID;
052import org.kepler.provenance.ProvenanceRecorder;
053import org.kepler.provenance.Recording;
054import org.kepler.provenance.sql.SQLRecordingV8;
055import org.kepler.util.FileUtil;
056import org.kepler.util.WorkflowRun;
057import org.kepler.workflowrunmanager.WorkflowRunManager;
058import org.kepler.workflowrunmanager.WorkflowRunManagerManager;
059
060import ptolemy.actor.gui.TableauFrame;
061import ptolemy.kernel.ComponentEntity;
062import ptolemy.kernel.util.Attribute;
063import ptolemy.kernel.util.IllegalActionException;
064import ptolemy.kernel.util.NameDuplicationException;
065import ptolemy.kernel.util.NamedObj;
066import ptolemy.moml.MoMLParser;
067
068/**
069 * @author Aaron
070 * 
071 */
072public class WorkflowRunEntryHandler implements KAREntryHandler {
073
074        private static final Log log = LogFactory
075                        .getLog(WorkflowRunEntryHandler.class.getName());
076        private static final boolean isDebugging = log.isDebugEnabled();
077
078        /*
079         * (non-Javadoc)
080         * 
081         * @see org.kepler.kar.KAREntryHandler#cache(org.kepler.kar.KARFile,
082         * org.kepler.kar.KAREntry)
083         */
084        public CacheObject cache(KARFile karFile, KAREntry entry) throws Exception {
085
086                InputStream runStream = karFile.getInputStream(entry);
087                //String runString = convertStreamToString(runStream);
088                String runString = FileUtil.convertStreamToString(runStream);
089                
090                MoMLParser parser = new MoMLParser();
091                NamedObj no = parser.parse(runString);
092                if (no instanceof WorkflowRun) {
093                        WorkflowRun run = (WorkflowRun) no;
094                        KeplerLSID lsid = entry.getLSID();
095                        String name = run.getName() + "." + run.getExecId() + WorkflowRun.FILENAME_EXTENSION;
096                        if (isDebugging) log.debug(name);
097                        WorkflowRunCacheObject runCacheObj = new WorkflowRunCacheObject(name, lsid, run);
098                        return runCacheObj;
099                }
100                
101                return null;
102        }
103
104        /*
105         * (non-Javadoc)
106         * 
107         * @see org.kepler.kar.KAREntryHandler#getTypeName()
108         */
109        public String getTypeName() {
110                return WorkflowRun.class.getName();
111        }
112
113        /*
114         * (non-Javadoc)
115         * 
116         * @see org.kepler.kar.KAREntryHandler#handlesType(java.lang.String)
117         */
118        public boolean handlesType(String typeName) {
119                if (typeName.equals(getTypeName())) {
120                        return true;
121                }
122                return false;
123        }
124
125        /*
126         * (non-Javadoc)
127         * 
128         * @see org.kepler.kar.KAREntryHandler#initialize()
129         */
130        public void initialize() {
131                // TODO Auto-generated method stub
132
133        }
134
135        /*
136         * (non-Javadoc)
137         * 
138         * @see org.kepler.kar.KAREntryHandler#open(org.kepler.kar.KARFile,
139         * org.kepler.kar.KAREntry)
140         */
141        public boolean open(KARFile karFile, KAREntry entry, TableauFrame tableauFrame) throws Exception {
142
143                WorkflowRun run = null;
144                
145                CacheManager cm = CacheManager.getInstance();
146                CacheObjectInterface coi = cm.getObject(entry.getLSID());
147                if (coi instanceof WorkflowRunCacheObject) {
148                        WorkflowRunCacheObject wrco = (WorkflowRunCacheObject) coi;
149                        Object o = wrco.getObject();
150                        if (o instanceof WorkflowRun) {
151                                run = (WorkflowRun) o;
152                        }
153                }
154                if (run == null) {
155                        log.debug("Unable to get WorkflowRun from CacheManager, getting it from KAR...");
156                        
157                        // as a backup we'll just parse it again from the KAR
158                        InputStream wfRunStream = karFile.getInputStream(entry);
159                        //String wfRunString = convertStreamToString(wfRunStream);
160                        String wfRunString = FileUtil.convertStreamToString(wfRunStream);
161                        MoMLParser parser = new MoMLParser();
162                        NamedObj no;
163                        try {
164                                no = parser.parse(wfRunString);
165                                run = (WorkflowRun) no;
166                        } catch (Exception e) {
167                                e.printStackTrace();
168                        }
169                }
170
171                // FIXME attempting to insert the run husk into the prov store found in
172                // the workflow assumes too much
173                // (e.g. will break when you're not using the default prov store)
174                Attribute recorderAttribute = null;
175
176                //OM lsid for this run is different from run's execution lsid 
177                //KeplerLSID runLSID = ObjectManager.getIdFor(run);
178                KeplerLSID runLSID = run.getExecLSID();
179                KeplerLSID associatedWorkflowLSID = run.getWorkflowLSID();
180                                
181                //try to get workflow out of provenance
182                WorkflowRunManagerManager wrmm = WorkflowRunManagerManager.getInstance();
183                WorkflowRunManager workflowRunManager = wrmm.getWRM(tableauFrame);
184                NamedObj workflow = workflowRunManager.getAssociatedWorkflowForWorkflowRun(runLSID);
185                
186                //unable to get workflow from provenance, get it from KAR
187                if (workflow == null) {
188                        MoMLParser parser = new MoMLParser();
189                        karFile.getManifest();
190                        String workflowEntryName = karFile
191                                        .getEntryName(associatedWorkflowLSID);
192                        ZipEntry workflowEntry = karFile.getEntry(workflowEntryName);
193                        InputStream is = karFile.getInputStream(workflowEntry);
194                        workflow = parser.parse(null, null, is);
195
196                }
197
198                try {
199                        recorderAttribute = ProvenanceRecorder
200                                        .getDefaultProvenanceRecorder(workflow);
201                } catch (Exception e) {
202                        System.out.println("WorkflowRun open Caught exception e:"
203                                        + e.getMessage());
204                        System.out
205                                        .println("unable to get recorderAttribute from workflow, just returning false for now"); // TODO
206                        return false;
207                }
208
209                ProvenanceRecorder pr = (ProvenanceRecorder) recorderAttribute;
210                Recording recording = pr.getRecording();
211
212                // add any piggybacks that may have been added after this
213                // ProvenanceRecorder was originally created
214                List<Recording> futuristicPiggyBacks = ProvenanceRecorder
215                                .getFuturisticPiggybacks(recording);
216                Iterator<Recording> i = futuristicPiggyBacks.iterator();
217                while (i.hasNext()) {
218                        pr.addPiggyback(i.next());
219                }
220
221                // TODO see about making more general
222                if (recording instanceof SQLRecordingV8){
223                        boolean result = ((SQLRecordingV8) recording).insertHuskRun(karFile,
224                                run);
225
226                        if (result) {
227                                pr.executionImported(run.getExecLSID());
228                        }
229                }
230
231                // KARFile wants to know if this item was dealt with properly,
232                // not if it was imported. so don't return 'result' variable here.
233                return true;
234        }
235
236        /*
237         * (non-Javadoc)
238         * 
239         * @see org.kepler.kar.KAREntryHandler#save(java.util.Vector,
240         * org.kepler.objectmanager.lsid.KeplerLSID)
241         */
242        public Hashtable<KAREntry, InputStream> save(Vector<KeplerLSID> lsids,
243                        KeplerLSID karLsid, TableauFrame tableauFrame) throws Exception {
244                
245                Hashtable<KAREntry, InputStream> items = new Hashtable<KAREntry, InputStream>();
246
247                ObjectManager oman = ObjectManager.getInstance();
248
249                Vector<WorkflowRun> runsToAddToKAR = new Vector<WorkflowRun>();
250                
251                WorkflowRunManagerManager wrmm = WorkflowRunManagerManager.getInstance();
252                WorkflowRunManager workflowRunManager = wrmm.getWRM(tableauFrame);
253                
254                // loop through the LSIDs that were given to us on this pass
255                for (KeplerLSID lsid : lsids) {
256                        
257                        NamedObj no = oman.getObjectRevision(lsid);
258                        if (no != null) {
259                                
260                                // This entry handler only cares about Workflows since
261                                // WorkflowRuns are dependent on Workflows
262                                // in other words: pass me a Workflow (i.e. ComponentEntity)
263                                //   and I'm going to return the WorkflowRuns that are
264                                //   associated with that Workflow (if the user selected them)
265                                if (no instanceof ComponentEntity) {
266                                        KeplerLSID ceLSID = NamedObjId.getIdFor(no);
267                                        Vector<WorkflowRun> associatedRunsForWorkflow = 
268                                                        workflowRunManager.getWorkflowRunsForWorkflow(ceLSID);
269                                        
270                                        for (WorkflowRun wr : associatedRunsForWorkflow) {
271                                                KeplerLSID wrLSID = NamedObjId.getIdFor(wr);
272                                                if (workflowRunManager.getSelectedRunsForSave().contains(wrLSID)) {
273                                                        runsToAddToKAR.add(wr);
274                                                }
275                                        }
276                                }
277                        }
278                }
279                
280                if (runsToAddToKAR == null || runsToAddToKAR.size() == 0) {
281                        return null;
282                }
283                
284                if (runsToAddToKAR.size() > 0) {
285                        for (WorkflowRun wr : runsToAddToKAR) {
286
287                                // if you change the name here, change it also in WorkflowRun.putRunInCache
288                                KAREntry entry = new KAREntry(wr.getName() + "." + wr.getExecId()
289                                                + WorkflowRun.FILENAME_EXTENSION);
290
291                                NamedObjId noi = NamedObjId.getIdAttributeFor(wr);
292                                entry.setLSID(noi.getId());
293                                String objType = wr.getClass().getName();
294                                entry.setType(objType);
295                                entry.addLsidDependency(wr.getWorkflowLSID());
296                                
297                                String theMoml = wr.exportMoML();
298                                byte[] momlBytes = theMoml.getBytes();
299                                ByteArrayInputStream bais = new ByteArrayInputStream(momlBytes);
300                                items.put(entry, bais);
301
302                                //putInObjectManager(run);
303
304                        }
305                }
306                return items;
307        }
308
309
310        /**
311         * @deprecated. once FileUtil.convertStreamToString verified as substitute,
312         * erase this method. 
313         * @param is
314         * @return
315         */
316        public static String convertStreamToString(InputStream is) 
317        {
318                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
319                StringBuilder sb = new StringBuilder();
320
321                String line = null;
322                try 
323                {
324                        while ((line = reader.readLine()) != null) 
325                        {
326                                sb.append(line + "\n");
327                        }
328                } 
329                catch (IOException e) 
330                {
331                        e.printStackTrace();
332                } 
333                finally 
334                {
335                        try 
336                        {
337                                is.close();
338                        } 
339                        catch (IOException e) 
340                        {
341                                e.printStackTrace();
342                        }
343                }
344
345                return sb.toString();
346        }
347
348        /**
349         * Put run in ObjectManager, removing any previous version.
350         * 
351         * @param run
352         */
353        public static void putInObjectManager(WorkflowRun run) {
354                ObjectManager om = ObjectManager.getInstance();
355                try {
356                        NamedObjId noi = NamedObjId.getIdAttributeFor(run);
357                        if (noi != null) {
358                                om.removeObject(noi.getId());
359                                
360                                //TODO double check the order of these two calls:
361                                NamedObjId.assignIdTo(run, noi.getId());
362                                om.addNamedObj(run);
363                        } else {
364                                om.addNamedObj(run);
365                        }
366                } catch (Exception e) {
367                        // TODO Auto-generated catch block
368                        e.printStackTrace();
369                }
370        }
371
372        /**
373         * A factory that creates a KAREntryHandler object.
374         */
375        public static class Factory extends KAREntryHandlerFactory {
376                /**
377                 * Create a factory with the given name and container.
378                 * 
379                 *@param container
380                 *            The container.
381                 *@param name
382                 *            The name of the entity.
383                 *@exception IllegalActionException
384                 *                If the container is incompatible with this attribute.
385                 *@exception NameDuplicationException
386                 *                If the name coincides with an attribute already in the
387                 *                container.
388                 */
389                public Factory(NamedObj container, String name)
390                                throws IllegalActionException, NameDuplicationException {
391                        super(container, name);
392                }
393
394                /**
395                 * .
396                 * 
397                 */
398                public KAREntryHandler createKAREntryHandler() {
399                        if (isDebugging) 
400                                log.debug("createKAREntryHandler for WorkflowRun");
401                        WorkflowRunEntryHandler wfRunEH = new WorkflowRunEntryHandler();
402                        return wfRunEH;
403                }
404        }
405
406}