001/* A PACT Map stub that runs a Kepler workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.List; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.kepler.ddp.Utilities; 037import org.kepler.ddp.actor.pattern.stub.MapInput; 038import org.kepler.ddp.actor.pattern.stub.StubSinkActor; 039import org.kepler.stratosphere.type.TypeUtilities; 040 041import eu.stratosphere.api.java.record.functions.MapFunction; 042import eu.stratosphere.configuration.Configuration; 043import eu.stratosphere.types.Key; 044import eu.stratosphere.types.Record; 045import eu.stratosphere.types.Value; 046import eu.stratosphere.util.Collector; 047import ptolemy.actor.CompositeActor; 048import ptolemy.actor.ExecutionListener; 049import ptolemy.actor.Manager; 050import ptolemy.data.Token; 051import ptolemy.kernel.util.IllegalActionException; 052 053/** A PACT Map stub that runs a Kepler workflow. The workflow is loaded in 054 * configure(), and executed each time map() is called. 055 * 056 * @author Daniel Crawl 057 * @version $Id: KeplerMapStub.java 33628 2015-08-24 22:42:20Z crawl $ 058 */ 059public class KeplerMapStub extends MapFunction implements ExecutionListener { 060 061 /** Free resources. */ 062 @Override 063 public void close() throws Exception 064 { 065 super.close(); 066 //LOG.info("stopping Map stub"); 067 if(!_runWorkflowLifecyclePerInput) { 068 _sourceActor.finish(); 069 _manager.waitForCompletion(); 070 } 071 _manager.removeExecutionListener(this); 072 _manager = null; 073 _sinkActor = null; 074 _sourceActor = null; 075 ptolemy.data.expr.CachedMethod.clear(); 076 //LOG.info("stopped Map stub"); 077 } 078 079 /** Configure the stub by loading the workflow and setting the PACT data 080 * types from the workflow stub actors. 081 */ 082 @Override 083 public void open(Configuration parameters) throws Exception { 084 085 StubUtilities.initializePtolemy(); 086 087 // initialize kepler to run the workflow in the stub 088 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 089 StubUtilities.initializeKepler(parameters); 090 } 091 092 final CompositeActor model = Utilities.getModel( 093 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null), 094 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null), 095 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null), 096 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false), 097 parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, "")); 098 099 _sourceActor = (MapInput) model.getEntity("MapInput"); 100 if(_sourceActor == null) { 101 throw new RuntimeException("Could not find MapInput actor in model."); 102 } 103 104 _sinkActor = (StubSinkActor) model.getEntity("MapOutput"); 105 if(_sinkActor == null) { 106 throw new RuntimeException("Could not find MapOutput actor in model."); 107 } 108 109 110 // set the pact data types 111 ik = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key"); 112 iv = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value"); 113 114 //LOG.info("map types: ik = " + ik + " iv = " + iv); 115 116 _runWorkflowLifecyclePerInput = parameters.getBoolean( 117 Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false); 118 119 _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model, 120 _runWorkflowLifecyclePerInput); 121 122 _stubThread = Thread.currentThread(); 123 124 _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor, 125 _runWorkflowLifecyclePerInput, 126 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false)); 127 128 //LOG.info("started Map stub"); 129 130 } 131 132 /** Transfer the data from Stratosphere to Kepler, execute the workflow, 133 * and then transfer the data from Kepler to Stratosphere. 134 */ 135 @Override 136 public void map(Record record, Collector<Record> out) throws Exception { 137 138 /* 139 LOG.info("in map, record = " + 140 PactRecordUtil.stringify(record, new Class[] {ik, iv})); 141 */ 142 143 //LOG.info("value length = " + value.toString().length()); 144 145 // place the input in the input actor 146 147 Token keyToken = null; 148 Token valueToken = null; 149 150 // see if there's only one field 151 if(record.getNumFields() == 1) { 152 // since there's only one field, assume it is the value 153 valueToken = StubUtilities.convertPactDataToToken(record, 0, iv); 154 } else { 155 keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, ik); 156 valueToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, iv); 157 } 158 159 _sourceActor.setInput(keyToken, valueToken); 160 161 //LOG.info("map stub: key = " + keyToken); 162 //LOG.info("map stub: value = " + valueToken); 163 164 if(_runWorkflowLifecyclePerInput) { 165 _manager.execute(); 166 } 167 168 // read the output from the output actor 169 List<Token> tokenList = null; 170 try { 171 tokenList = _sinkActor.getOutput(); 172 } catch (IllegalActionException e) { 173 throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e); 174 } 175 176 if(tokenList != null) { 177 StubUtilities.convertTokenToCollector(tokenList, out); 178 } else if(_keplerManagerException != null) { 179 throw _keplerManagerException; 180 } 181 182 //LOG.info(" got collector = " + out); 183 } 184 185 /** Report workflow execution errors to the log. */ 186 @Override 187 public void executionError(Manager manager, Throwable throwable) { 188 189 if(throwable instanceof Exception) { 190 _keplerManagerException = (Exception) throwable; 191 } else { 192 _keplerManagerException = new Exception(throwable); 193 } 194 _stubThread.interrupt(); 195 196 } 197 198 /** Do nothing. */ 199 @Override 200 public void executionFinished(Manager manager) { 201 202 //LOG.info("map execution finished"); 203 204 } 205 206 /** Do nothing. */ 207 @Override 208 public void managerStateChanged(Manager manager) { 209 210 //LOG.info("map manager state changed: " + manager.getState()); 211 212 } 213 214 /** 215 * Input key type. 216 */ 217 private Class<? extends Key> ik; 218 219 /** 220 * Input value type. 221 */ 222 private Class<? extends Value> iv; 223 224 /** A source actor used for reading data from Stratosphere and writing 225 * it the Kepler workflow. 226 */ 227 private MapInput _sourceActor; 228 229 /** A sink actor used for reading data from the workflow and writing 230 * it to Stratosphere. 231 */ 232 private StubSinkActor _sinkActor; 233 234 /** Logging. */ 235 private static final Log LOG = LogFactory.getLog(KeplerMapStub.class); 236 237 /** Manager to execute the workflow. */ 238 private Manager _manager; 239 240 /** If true, the entire workflow lifecycle is executed for each input. */ 241 private boolean _runWorkflowLifecyclePerInput = false; 242 243 /** If the workflow is running in a separate thread 244 * (_runWorkflowLifecyclePerInput = false) and an exception 245 * is thrown, this field is set to that exception. 246 */ 247 private Exception _keplerManagerException; 248 249 /** The thread executing the stub (not the workflow). */ 250 private Thread _stubThread; 251 252}