001/* KAR entry handler to import/export PROV. 002 * 003 * Copyright (c) 2015 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2017-08-24 23:01:35 +0000 (Thu, 24 Aug 2017) $' 008 * '$Revision: 34624 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.kar.handlers; 031 032import java.io.ByteArrayInputStream; 033import java.io.ByteArrayOutputStream; 034import java.io.InputStream; 035import java.util.Hashtable; 036import java.util.LinkedList; 037import java.util.List; 038import java.util.Vector; 039import java.util.concurrent.atomic.AtomicBoolean; 040import java.util.jar.Attributes; 041 042import org.apache.commons.io.FilenameUtils; 043import org.kepler.configuration.ConfigurationManager; 044import org.kepler.kar.KAREntry; 045import org.kepler.kar.KAREntryHandler; 046import org.kepler.kar.KAREntryHandlerFactory; 047import org.kepler.kar.KARFile; 048import org.kepler.objectmanager.ObjectManager; 049import org.kepler.objectmanager.cache.CacheObject; 050import org.kepler.objectmanager.lsid.KeplerLSID; 051import org.kepler.objectmanager.lsid.LSIDGenerator; 052import org.kepler.provenance.ProvenanceRecorder; 053import org.kepler.provenance.QueryException; 054import org.kepler.provenance.Queryable; 055import org.kepler.provenance.RecordPlayer; 056import org.kepler.provenance.Recording; 057import org.kepler.provenance.RecordingException; 058import org.kepler.provenance.manager.TransferComponent; 059import org.kepler.provenance.prov.ProvQuery; 060import org.kepler.provenance.prov.ProvRecording; 061import org.kepler.util.WorkflowRun; 062import org.openprovenance.prov.interop.InteropFramework.ProvFormat; 063 064import ptolemy.actor.gui.TableauFrame; 065import ptolemy.kernel.util.IllegalActionException; 066import ptolemy.kernel.util.NameDuplicationException; 067import ptolemy.kernel.util.NamedObj; 068import ptolemy.util.MessageHandler; 069 070/** A KAR entry handler that imports/exports workflow provenance 071 * in PROV format. 072 * 073 * @author Daniel Crawl 074 * @version $Id: ProvKAREntryHandler.java 34624 2017-08-24 23:01:35Z crawl $ 075 */ 076public class ProvKAREntryHandler implements KAREntryHandler { 077 078 /** Construct a new ProvKAREntryHandler. */ 079 public ProvKAREntryHandler() { 080 081 // get the default export format 082 String formatStr = null; 083 try { 084 formatStr = ConfigurationManager.getInstance().getProperty( 085 ConfigurationManager.getModule("provenance"), 086 "provenance.karProvType").getValue(); 087 } catch(Exception e) { 088 System.err.println("WARNING: missing karProvType in provenance configuration."); 089 } 090 091 if(formatStr == null || formatStr.trim().isEmpty()) { 092 System.err.println("WARNING: karProvType is empty."); 093 } else { 094 _exportFormat = ProvFormat.valueOf(formatStr); 095 if(_exportFormat == null) { 096 System.err.println("WARNING: unknown PROV format in karProvType: " + 097 formatStr); 098 _exportFormat = ProvFormat.JSON; 099 } 100 } 101 } 102 103 /** Returns null since this is only used for 1.0 KARs. */ 104 @Override 105 public String getTypeName() { 106 return null; 107 } 108 109 /** Get the KAR entry type we handle. */ 110 @Override 111 public boolean handlesType(String typeName) { 112 return typeName.equals(_ENTRY_TYPE); 113 } 114 115 /** Does nothing. */ 116 @Override 117 public void initialize() { 118 } 119 120 /** Does nothing. */ 121 @Override 122 public CacheObject cache(KARFile karFile, KAREntry entry) throws Exception { 123 return null; 124 } 125 126 /** Open a KAR entry for a PROVProvenance. */ 127 @Override 128 public boolean open(KARFile karFile, KAREntry entry, 129 TableauFrame tableauFrame) throws Exception { 130 131 // get the default recording 132 ProvenanceRecorder recorder = new ProvenanceRecorder(); 133 Recording recording = recorder.getRecording(); 134 135 // get the format of the serialization in the KAR entry 136 // from the format attribute. 137 ProvFormat importFormat = null; 138 Attributes attributes = entry.getAttributes(); 139 if(attributes != null) { 140 String formatStr = attributes.getValue(_FORMAT_NAME); 141 if(formatStr != null) { 142 importFormat = ProvFormat.valueOf(formatStr); 143 } 144 } 145 146 // if we couldn't get the format from the format attribute, 147 // try the file name extension. 148 if(importFormat == null) { 149 String name = entry.getName(); 150 String formatStr = FilenameUtils.getExtension(name); 151 if(formatStr != null) { 152 importFormat = ProvFormat.valueOf(formatStr); 153 } 154 } 155 156 // if we still couldn't get the format, try the default. 157 if(importFormat == null) { 158 System.err.println("WARNING: unknown format for " + entry.getName() + 159 " Trying JSON."); 160 importFormat = ProvFormat.JSON; 161 } 162 163 try(InputStream input = karFile.getInputStream(entry); 164 ProvQuery query = new ProvQuery(input, importFormat);) { 165 166 RecordPlayer player = new RecordPlayer(query, recording); 167 168 // load all of the executions in the serialization 169 for(Integer execId : query.getExecutions()) { 170 171 KeplerLSID execLSID = query.getExecutionLSIDForExecution(execId); 172 173 System.out.println("Importing " + /*name +*/ " run " + execId + " run lsid " + execLSID); 174 175 try { 176 player.play(execId); 177 } catch(QueryException | RecordingException e) { 178 MessageHandler.error("Error importing provenance.", e); 179 } 180 } 181 } 182 return true; 183 } 184 185 /** Serialize a workflow execution to a KAR entry. */ 186 @Override 187 public Hashtable<KAREntry, InputStream> save(Vector<KeplerLSID> lsids, 188 KeplerLSID karLsid, TableauFrame tableauFrame) throws Exception { 189 190 Hashtable<KAREntry, InputStream> entries = new Hashtable<KAREntry, InputStream>(); 191 192 // if we are not saving, return empty entries table. 193 if(!_save.get()) { 194 System.out.println("NOT SAVING"); 195 return entries; 196 } 197 198 // get the default query object. 199 try(Queryable query = ProvenanceRecorder.getDefaultQueryable(null);) { 200 201 ObjectManager objectManager = ObjectManager.getInstance(); 202 203 if(query == null) { 204 System.err.println("WARNING: unable to get default provenance query."); 205 return null; 206 } 207 208 209 List<KeplerLSID> execLSIDs = new LinkedList<KeplerLSID>(); 210 211 int i = 0; 212 for(KeplerLSID lsid : lsids) { 213 214 NamedObj namedObj = objectManager.getObjectRevision(lsid); 215 if(namedObj != null && namedObj instanceof TransferComponent) { 216 execLSIDs.addAll(((TransferComponent)namedObj).getExecutionLSIDs()); 217 _exportFormat = ProvFormat.valueOf(((TransferComponent)namedObj).getFormat().toUpperCase()); 218 } else { 219 execLSIDs.add(lsid); 220 } 221 } 222 223 for(KeplerLSID execLSID : execLSIDs) { 224 225 // see if the LSID is a valid execution LSID store in provenance 226 Integer execId = query.getExecutionForExecutionLSID(execLSID); 227 if(execId != null) { 228 229 String name; 230 NamedObj namedObj = objectManager.getObjectRevision(execLSID); 231 if(namedObj instanceof WorkflowRun) { 232 name = ((WorkflowRun)namedObj).getWorkflowName(); 233 } else { 234 name = query.getWorkflowNameForExecution(execLSID); 235 if(name == null) { 236 name = "unknown" + i++; 237 } 238 } 239 240 System.out.println("Exporting " + name + " run " + execId + " run lsid " + execLSID); 241 242 // export the provenance to PROV 243 ProvRecording recording = new ProvRecording(); 244 recording.setOutputType(_exportFormat); 245 RecordPlayer player = new RecordPlayer(query, recording); 246 247 try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream();) { 248 249 recording.setOutput(outputStream); 250 251 try { 252 player.play(execId); 253 } catch(QueryException | RecordingException e) { 254 MessageHandler.error("Error exporting provenance.", e); 255 continue; 256 } 257 258 byte[] bytes = outputStream.toByteArray(); 259 260 // create the KAR entry 261 KAREntry entry = new KAREntry(name + "." + execId + ".PROV." + 262 _exportFormat.toString().toLowerCase()); 263 entry.setLSID(LSIDGenerator.getInstance().getNewLSID()); 264 entry.setType(_ENTRY_TYPE); 265 entry.addLsidDependency(execLSID); 266 entry.addAttribute(_FORMAT_NAME, _exportFormat.toString()); 267 268 entries.put(entry, new ByteArrayInputStream(bytes)); 269 } 270 } 271 } 272 return entries; 273 } 274 } 275 276 /** Set if save() should write PROV to the KAR. 277 * @return The current save value. 278 */ 279 public static boolean setSave(boolean save) { 280 return _save.getAndSet(save); 281 } 282 283 /** A factory that creates a ProvKAREntryHandler object. */ 284 public static class Factory extends KAREntryHandlerFactory { 285 286 public Factory(NamedObj container, String name) 287 throws IllegalActionException, NameDuplicationException { 288 super(container, name); 289 } 290 291 @Override 292 public KAREntryHandler createKAREntryHandler() { 293 return new ProvKAREntryHandler(); 294 } 295 } 296 297 /** The type of KAREntry. */ 298 private static final String _ENTRY_TYPE = "PROVProvenance"; 299 300 /** The name of the format attribute. */ 301 private static final String _FORMAT_NAME = "Format"; 302 303 /** PROV format used when exporting. */ 304 private ProvFormat _exportFormat = ProvFormat.JSON; 305 306 /** If true, save() writes to the KAR. Otherwise, save() does nothing. */ 307 private final static AtomicBoolean _save = new AtomicBoolean(true); 308}