001/** 002 * '$Author: barseghian $' 003 * '$Date: 2011-06-21 22:42:31 +0000 (Tue, 21 Jun 2011) $' 004 * '$Revision: 27756 $' 005 * 006 * For Details: 007 * http://www.kepler-project.org 008 * 009 * Copyright (c) 2009-2010 The Regents of the 010 * University of California. All rights reserved. Permission is hereby granted, 011 * without written agreement and without license or royalty fees, to use, copy, 012 * modify, and distribute this software and its documentation for any purpose, 013 * provided that the above copyright notice and the following two paragraphs 014 * appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF 015 * CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, 016 * OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS 017 * DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE 018 * POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY 019 * DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 020 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE 021 * SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 * ENHANCEMENTS, OR MODIFICATIONS. 024 */ 025 026package org.kepler.kar.handlers; 027 028import java.io.BufferedReader; 029import java.io.ByteArrayInputStream; 030import java.io.IOException; 031import java.io.InputStream; 032import java.io.InputStreamReader; 033import java.util.Hashtable; 034import java.util.Iterator; 035import java.util.List; 036import java.util.Vector; 037import java.util.zip.ZipEntry; 038 039import org.apache.commons.logging.Log; 040import org.apache.commons.logging.LogFactory; 041import org.kepler.kar.KAREntry; 042import org.kepler.kar.KAREntryHandler; 043import org.kepler.kar.KAREntryHandlerFactory; 044import org.kepler.kar.KARFile; 045import org.kepler.moml.NamedObjId; 046import org.kepler.objectmanager.ObjectManager; 047import org.kepler.objectmanager.cache.CacheManager; 048import org.kepler.objectmanager.cache.CacheObject; 049import org.kepler.objectmanager.cache.CacheObjectInterface; 050import org.kepler.objectmanager.cache.WorkflowRunCacheObject; 051import org.kepler.objectmanager.lsid.KeplerLSID; 052import org.kepler.provenance.ProvenanceRecorder; 053import org.kepler.provenance.Recording; 054import org.kepler.provenance.sql.SQLRecordingV8; 055import org.kepler.util.FileUtil; 056import org.kepler.util.WorkflowRun; 057import org.kepler.workflowrunmanager.WorkflowRunManager; 058import org.kepler.workflowrunmanager.WorkflowRunManagerManager; 059 060import ptolemy.actor.gui.TableauFrame; 061import ptolemy.kernel.ComponentEntity; 062import ptolemy.kernel.util.Attribute; 063import ptolemy.kernel.util.IllegalActionException; 064import ptolemy.kernel.util.NameDuplicationException; 065import ptolemy.kernel.util.NamedObj; 066import ptolemy.moml.MoMLParser; 067 068/** 069 * @author Aaron 070 * 071 */ 072public class WorkflowRunEntryHandler implements KAREntryHandler { 073 074 private static final Log log = LogFactory 075 .getLog(WorkflowRunEntryHandler.class.getName()); 076 private static final boolean isDebugging = log.isDebugEnabled(); 077 078 /* 079 * (non-Javadoc) 080 * 081 * @see org.kepler.kar.KAREntryHandler#cache(org.kepler.kar.KARFile, 082 * org.kepler.kar.KAREntry) 083 */ 084 public CacheObject cache(KARFile karFile, KAREntry entry) throws Exception { 085 086 InputStream runStream = karFile.getInputStream(entry); 087 //String runString = convertStreamToString(runStream); 088 String runString = FileUtil.convertStreamToString(runStream); 089 090 MoMLParser parser = new MoMLParser(); 091 NamedObj no = parser.parse(runString); 092 if (no instanceof WorkflowRun) { 093 WorkflowRun run = (WorkflowRun) no; 094 KeplerLSID lsid = entry.getLSID(); 095 String name = run.getName() + "." + run.getExecId() + WorkflowRun.FILENAME_EXTENSION; 096 if (isDebugging) log.debug(name); 097 WorkflowRunCacheObject runCacheObj = new WorkflowRunCacheObject(name, lsid, run); 098 return runCacheObj; 099 } 100 101 return null; 102 } 103 104 /* 105 * (non-Javadoc) 106 * 107 * @see org.kepler.kar.KAREntryHandler#getTypeName() 108 */ 109 public String getTypeName() { 110 return WorkflowRun.class.getName(); 111 } 112 113 /* 114 * (non-Javadoc) 115 * 116 * @see org.kepler.kar.KAREntryHandler#handlesType(java.lang.String) 117 */ 118 public boolean handlesType(String typeName) { 119 if (typeName.equals(getTypeName())) { 120 return true; 121 } 122 return false; 123 } 124 125 /* 126 * (non-Javadoc) 127 * 128 * @see org.kepler.kar.KAREntryHandler#initialize() 129 */ 130 public void initialize() { 131 // TODO Auto-generated method stub 132 133 } 134 135 /* 136 * (non-Javadoc) 137 * 138 * @see org.kepler.kar.KAREntryHandler#open(org.kepler.kar.KARFile, 139 * org.kepler.kar.KAREntry) 140 */ 141 public boolean open(KARFile karFile, KAREntry entry, TableauFrame tableauFrame) throws Exception { 142 143 WorkflowRun run = null; 144 145 CacheManager cm = CacheManager.getInstance(); 146 CacheObjectInterface coi = cm.getObject(entry.getLSID()); 147 if (coi instanceof WorkflowRunCacheObject) { 148 WorkflowRunCacheObject wrco = (WorkflowRunCacheObject) coi; 149 Object o = wrco.getObject(); 150 if (o instanceof WorkflowRun) { 151 run = (WorkflowRun) o; 152 } 153 } 154 if (run == null) { 155 log.debug("Unable to get WorkflowRun from CacheManager, getting it from KAR..."); 156 157 // as a backup we'll just parse it again from the KAR 158 InputStream wfRunStream = karFile.getInputStream(entry); 159 //String wfRunString = convertStreamToString(wfRunStream); 160 String wfRunString = FileUtil.convertStreamToString(wfRunStream); 161 MoMLParser parser = new MoMLParser(); 162 NamedObj no; 163 try { 164 no = parser.parse(wfRunString); 165 run = (WorkflowRun) no; 166 } catch (Exception e) { 167 e.printStackTrace(); 168 } 169 } 170 171 // FIXME attempting to insert the run husk into the prov store found in 172 // the workflow assumes too much 173 // (e.g. will break when you're not using the default prov store) 174 Attribute recorderAttribute = null; 175 176 //OM lsid for this run is different from run's execution lsid 177 //KeplerLSID runLSID = ObjectManager.getIdFor(run); 178 KeplerLSID runLSID = run.getExecLSID(); 179 KeplerLSID associatedWorkflowLSID = run.getWorkflowLSID(); 180 181 //try to get workflow out of provenance 182 WorkflowRunManagerManager wrmm = WorkflowRunManagerManager.getInstance(); 183 WorkflowRunManager workflowRunManager = wrmm.getWRM(tableauFrame); 184 NamedObj workflow = workflowRunManager.getAssociatedWorkflowForWorkflowRun(runLSID); 185 186 //unable to get workflow from provenance, get it from KAR 187 if (workflow == null) { 188 MoMLParser parser = new MoMLParser(); 189 karFile.getManifest(); 190 String workflowEntryName = karFile 191 .getEntryName(associatedWorkflowLSID); 192 ZipEntry workflowEntry = karFile.getEntry(workflowEntryName); 193 InputStream is = karFile.getInputStream(workflowEntry); 194 workflow = parser.parse(null, null, is); 195 196 } 197 198 try { 199 recorderAttribute = ProvenanceRecorder 200 .getDefaultProvenanceRecorder(workflow); 201 } catch (Exception e) { 202 System.out.println("WorkflowRun open Caught exception e:" 203 + e.getMessage()); 204 System.out 205 .println("unable to get recorderAttribute from workflow, just returning false for now"); // TODO 206 return false; 207 } 208 209 ProvenanceRecorder pr = (ProvenanceRecorder) recorderAttribute; 210 Recording recording = pr.getRecording(); 211 212 // add any piggybacks that may have been added after this 213 // ProvenanceRecorder was originally created 214 List<Recording> futuristicPiggyBacks = ProvenanceRecorder 215 .getFuturisticPiggybacks(recording); 216 Iterator<Recording> i = futuristicPiggyBacks.iterator(); 217 while (i.hasNext()) { 218 pr.addPiggyback(i.next()); 219 } 220 221 // TODO see about making more general 222 if (recording instanceof SQLRecordingV8){ 223 boolean result = ((SQLRecordingV8) recording).insertHuskRun(karFile, 224 run); 225 226 if (result) { 227 pr.executionImported(run.getExecLSID()); 228 } 229 } 230 231 // KARFile wants to know if this item was dealt with properly, 232 // not if it was imported. so don't return 'result' variable here. 233 return true; 234 } 235 236 /* 237 * (non-Javadoc) 238 * 239 * @see org.kepler.kar.KAREntryHandler#save(java.util.Vector, 240 * org.kepler.objectmanager.lsid.KeplerLSID) 241 */ 242 public Hashtable<KAREntry, InputStream> save(Vector<KeplerLSID> lsids, 243 KeplerLSID karLsid, TableauFrame tableauFrame) throws Exception { 244 245 Hashtable<KAREntry, InputStream> items = new Hashtable<KAREntry, InputStream>(); 246 247 ObjectManager oman = ObjectManager.getInstance(); 248 249 Vector<WorkflowRun> runsToAddToKAR = new Vector<WorkflowRun>(); 250 251 WorkflowRunManagerManager wrmm = WorkflowRunManagerManager.getInstance(); 252 WorkflowRunManager workflowRunManager = wrmm.getWRM(tableauFrame); 253 254 // loop through the LSIDs that were given to us on this pass 255 for (KeplerLSID lsid : lsids) { 256 257 NamedObj no = oman.getObjectRevision(lsid); 258 if (no != null) { 259 260 // This entry handler only cares about Workflows since 261 // WorkflowRuns are dependent on Workflows 262 // in other words: pass me a Workflow (i.e. ComponentEntity) 263 // and I'm going to return the WorkflowRuns that are 264 // associated with that Workflow (if the user selected them) 265 if (no instanceof ComponentEntity) { 266 KeplerLSID ceLSID = NamedObjId.getIdFor(no); 267 Vector<WorkflowRun> associatedRunsForWorkflow = 268 workflowRunManager.getWorkflowRunsForWorkflow(ceLSID); 269 270 for (WorkflowRun wr : associatedRunsForWorkflow) { 271 KeplerLSID wrLSID = NamedObjId.getIdFor(wr); 272 if (workflowRunManager.getSelectedRunsForSave().contains(wrLSID)) { 273 runsToAddToKAR.add(wr); 274 } 275 } 276 } 277 } 278 } 279 280 if (runsToAddToKAR == null || runsToAddToKAR.size() == 0) { 281 return null; 282 } 283 284 if (runsToAddToKAR.size() > 0) { 285 for (WorkflowRun wr : runsToAddToKAR) { 286 287 // if you change the name here, change it also in WorkflowRun.putRunInCache 288 KAREntry entry = new KAREntry(wr.getName() + "." + wr.getExecId() 289 + WorkflowRun.FILENAME_EXTENSION); 290 291 NamedObjId noi = NamedObjId.getIdAttributeFor(wr); 292 entry.setLSID(noi.getId()); 293 String objType = wr.getClass().getName(); 294 entry.setType(objType); 295 entry.addLsidDependency(wr.getWorkflowLSID()); 296 297 String theMoml = wr.exportMoML(); 298 byte[] momlBytes = theMoml.getBytes(); 299 ByteArrayInputStream bais = new ByteArrayInputStream(momlBytes); 300 items.put(entry, bais); 301 302 //putInObjectManager(run); 303 304 } 305 } 306 return items; 307 } 308 309 310 /** 311 * @deprecated. once FileUtil.convertStreamToString verified as substitute, 312 * erase this method. 313 * @param is 314 * @return 315 */ 316 public static String convertStreamToString(InputStream is) 317 { 318 BufferedReader reader = new BufferedReader(new InputStreamReader(is)); 319 StringBuilder sb = new StringBuilder(); 320 321 String line = null; 322 try 323 { 324 while ((line = reader.readLine()) != null) 325 { 326 sb.append(line + "\n"); 327 } 328 } 329 catch (IOException e) 330 { 331 e.printStackTrace(); 332 } 333 finally 334 { 335 try 336 { 337 is.close(); 338 } 339 catch (IOException e) 340 { 341 e.printStackTrace(); 342 } 343 } 344 345 return sb.toString(); 346 } 347 348 /** 349 * Put run in ObjectManager, removing any previous version. 350 * 351 * @param run 352 */ 353 public static void putInObjectManager(WorkflowRun run) { 354 ObjectManager om = ObjectManager.getInstance(); 355 try { 356 NamedObjId noi = NamedObjId.getIdAttributeFor(run); 357 if (noi != null) { 358 om.removeObject(noi.getId()); 359 360 //TODO double check the order of these two calls: 361 NamedObjId.assignIdTo(run, noi.getId()); 362 om.addNamedObj(run); 363 } else { 364 om.addNamedObj(run); 365 } 366 } catch (Exception e) { 367 // TODO Auto-generated catch block 368 e.printStackTrace(); 369 } 370 } 371 372 /** 373 * A factory that creates a KAREntryHandler object. 374 */ 375 public static class Factory extends KAREntryHandlerFactory { 376 /** 377 * Create a factory with the given name and container. 378 * 379 *@param container 380 * The container. 381 *@param name 382 * The name of the entity. 383 *@exception IllegalActionException 384 * If the container is incompatible with this attribute. 385 *@exception NameDuplicationException 386 * If the name coincides with an attribute already in the 387 * container. 388 */ 389 public Factory(NamedObj container, String name) 390 throws IllegalActionException, NameDuplicationException { 391 super(container, name); 392 } 393 394 /** 395 * . 396 * 397 */ 398 public KAREntryHandler createKAREntryHandler() { 399 if (isDebugging) 400 log.debug("createKAREntryHandler for WorkflowRun"); 401 WorkflowRunEntryHandler wfRunEH = new WorkflowRunEntryHandler(); 402 return wfRunEH; 403 } 404 } 405 406}