001/* A PACT Cross stub that runs a Kepler workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.List; 033 034import org.apache.commons.logging.Log; 035import org.apache.commons.logging.LogFactory; 036import org.kepler.ddp.Utilities; 037import org.kepler.ddp.actor.pattern.stub.CrossInput; 038import org.kepler.ddp.actor.pattern.stub.StubSinkActor; 039import org.kepler.stratosphere.type.TypeUtilities; 040 041import eu.stratosphere.api.java.record.functions.CrossFunction; 042import eu.stratosphere.configuration.Configuration; 043import eu.stratosphere.types.Key; 044import eu.stratosphere.types.Record; 045import eu.stratosphere.types.Value; 046import eu.stratosphere.util.Collector; 047import ptolemy.actor.CompositeActor; 048import ptolemy.actor.ExecutionListener; 049import ptolemy.actor.Manager; 050import ptolemy.data.Token; 051import ptolemy.kernel.util.IllegalActionException; 052 053/** A PACT Cross stub that runs a Kepler workflow. The workflow is loaded in 054 * configure(), and executed each time cross() is called. 055 * 056 * @author Daniel Crawl 057 * @version $Id: KeplerCrossStub.java 33628 2015-08-24 22:42:20Z crawl $ 058 */ 059public class KeplerCrossStub extends CrossFunction implements ExecutionListener { 060 061 /** Free resources. */ 062 @Override 063 public void close() throws Exception 064 { 065 super.close(); 066 if(!_runWorkflowLifecyclePerInput) { 067 _sourceActor.finish(); 068 _manager.waitForCompletion(); 069 } 070 _manager.removeExecutionListener(this); 071 _manager = null; 072 _sinkActor = null; 073 _sourceActor = null; 074 } 075 076 /** Configure the stub by loading the workflow and setting the PACT data 077 * types from the workflow stub actors. 078 */ 079 @Override 080 public void open(Configuration parameters) throws Exception { 081 082 StubUtilities.initializePtolemy(); 083 084 // initialize kepler to run the workflow in the stub 085 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 086 StubUtilities.initializeKepler(parameters); 087 } 088 089 final CompositeActor model = Utilities.getModel( 090 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null), 091 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null), 092 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null), 093 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false), 094 parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, "")); 095 096 _sourceActor = (CrossInput) model.getEntity("CrossInput"); 097 if(_sourceActor == null) { 098 throw new RuntimeException("Could not find CrossInput actor in model."); 099 } 100 101 _sinkActor = (StubSinkActor) model.getEntity("CrossOutput"); 102 if(_sinkActor == null) { 103 throw new RuntimeException("Could not find CrossOutput actor in model."); 104 } 105 106 firstIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key"); 107 firstIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value"); 108 secondIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in2, "key"); 109 secondIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in2, "value"); 110 111 LOG.info("cross types: ik1 = " + firstIK + " iv1 = " + firstIV + 112 " ik2 = " + secondIK + " iv2 = " + secondIV); 113 114 _runWorkflowLifecyclePerInput = parameters.getBoolean( 115 Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false); 116 117 _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model, 118 _runWorkflowLifecyclePerInput); 119 120 _stubThread = Thread.currentThread(); 121 122 _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor, 123 _runWorkflowLifecyclePerInput, 124 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false)); 125 126 //LOG.info("map leaving configure()"); 127 128 } 129 130 /** Transfer the data from Stratosphere to Kepler, execute the workflow, 131 * and then transfer the data from Kepler to Stratosphere. 132 */ 133 @Override 134 public void cross(Record record1, Record record2, Collector<Record> out) throws Exception { 135 136 // set the inputs 137 Token key1Token = null; 138 Token value1Token = null; 139 Token key2Token = null; 140 Token value2Token = null; 141 142 // see if there's only one field 143 if(record1.getNumFields() == 1) { 144 // since there's only one field, assume it is the value 145 value1Token = StubUtilities.convertPactDataToToken(record1, 0, firstIV); 146 } else { 147 key1Token = StubUtilities.convertPactDataToToken(record1, 148 TypeUtilities.KEY_FIELD, firstIK); 149 value1Token = StubUtilities.convertPactDataToToken(record1, 150 TypeUtilities.VALUE_FIELD, firstIV); 151 } 152 153 // see if there's only one field 154 if(record2.getNumFields() == 1) { 155 // since there's only one field, assume it is the value 156 value2Token = StubUtilities.convertPactDataToToken(record2, 0, secondIV); 157 } else { 158 key2Token = StubUtilities.convertPactDataToToken(record2, 159 TypeUtilities.KEY_FIELD, secondIK); 160 value2Token = StubUtilities.convertPactDataToToken(record2, 161 TypeUtilities.VALUE_FIELD, secondIV); 162 } 163 164 _sourceActor.setInput(key1Token, value1Token, key2Token, value2Token); 165 166 if(_runWorkflowLifecyclePerInput) { 167 _manager.execute(); 168 } 169 170 // read the output from the output actor 171 List<Token> tokenList = null; 172 try { 173 tokenList = _sinkActor.getOutput(); 174 } catch (IllegalActionException e) { 175 throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e); 176 } 177 178 if(tokenList != null) { 179 StubUtilities.convertTokenToCollector(tokenList, out); 180 } else if(_keplerManagerException != null) { 181 throw _keplerManagerException; 182 } 183 184 } 185 186 /** Report workflow execution errors to the log. */ 187 @Override 188 public void executionError(Manager manager, Throwable throwable) { 189 190 if(throwable instanceof Exception) { 191 _keplerManagerException = (Exception) throwable; 192 } else { 193 _keplerManagerException = new Exception(throwable); 194 } 195 _stubThread.interrupt(); 196 197 } 198 199 /** Do nothing. */ 200 @Override 201 public void executionFinished(Manager manager) { 202 203 //LOG.info("map execution finished"); 204 205 } 206 207 /** Do nothing. */ 208 @Override 209 public void managerStateChanged(Manager manager) { 210 211 //LOG.info("map manager state changed: " + manager.getState()); 212 213 } 214 215 /** A source actor used for reading data from Stratosphere and writing 216 * it the Kepler workflow. 217 */ 218 private CrossInput _sourceActor; 219 220 /** A sink actor used for reading data from the workflow and writing 221 * it to Stratosphere. 222 */ 223 private StubSinkActor _sinkActor; 224 225 /** Logging. */ 226 private static final Log LOG = LogFactory.getLog(KeplerCrossStub.class); 227 228 /** Manager to execute the workflow. */ 229 private Manager _manager; 230 231 /** 232 * First input key type. 233 */ 234 private Class<? extends Key> firstIK; 235 236 /** 237 * First input value type. 238 */ 239 private Class<? extends Value> firstIV; 240 241 /** 242 * Second input key type. 243 */ 244 private Class<? extends Key> secondIK; 245 246 /** 247 * Second input value type. 248 */ 249 private Class<? extends Value> secondIV; 250 251 /** If true, the entire workflow lifecycle is executed for each input. */ 252 private boolean _runWorkflowLifecyclePerInput = false; 253 254 /** If the workflow is running in a separate thread 255 * (_runWorkflowLifecyclePerInput = false) and an exception 256 * is thrown, this field is set to that exception. 257 */ 258 private Exception _keplerManagerException; 259 260 /** The thread executing the stub (not the workflow). */ 261 private Thread _stubThread; 262 263}