001/* A PACT Reduce stub that runs a Kepler workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.ArrayList; 033import java.util.Iterator; 034import java.util.List; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.kepler.ddp.Utilities; 039import org.kepler.ddp.actor.pattern.stub.ReduceInput; 040import org.kepler.ddp.actor.pattern.stub.StubSinkActor; 041import org.kepler.stratosphere.type.TypeUtilities; 042 043import eu.stratosphere.api.java.record.functions.ReduceFunction; 044import eu.stratosphere.configuration.Configuration; 045import eu.stratosphere.types.Key; 046import eu.stratosphere.types.Record; 047import eu.stratosphere.types.Value; 048import eu.stratosphere.util.Collector; 049import ptolemy.actor.CompositeActor; 050import ptolemy.actor.ExecutionListener; 051import ptolemy.actor.Manager; 052import ptolemy.data.ArrayToken; 053import ptolemy.data.Token; 054import ptolemy.kernel.util.IllegalActionException; 055 056/** A PACT Reduce stub that runs a Kepler workflow. The workflow is loaded in 057 * configure(), and executed each time reduce() is called. 058 * 059 * @author Daniel Crawl 060 * @version $Id: KeplerReduceStub.java 33628 2015-08-24 22:42:20Z crawl $ 061 */ 062public class KeplerReduceStub extends ReduceFunction implements ExecutionListener { 063 064 /** Free resources. */ 065 @Override 066 public void close() throws Exception 067 { 068 super.close(); 069 if(!_runWorkflowLifecyclePerInput) { 070 _sourceActor.finish(); 071 _manager.waitForCompletion(); 072 } 073 _manager.removeExecutionListener(this); 074 _manager = null; 075 _sinkActor = null; 076 _sourceActor = null; 077 ptolemy.data.expr.CachedMethod.clear(); 078 } 079 080 /** Configure the stub by loading the workflow and setting the PACT data 081 * types from the workflow stub actors. 082 */ 083 @Override 084 public void open(Configuration parameters) throws Exception { 085 086 //System.out.println("opening reduce stub " + name); 087 088 StubUtilities.initializePtolemy(); 089 090 // initialize kepler to run the workflow in the stub 091 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 092 StubUtilities.initializeKepler(parameters); 093 } 094 095 final CompositeActor model = Utilities.getModel( 096 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null), 097 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null), 098 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null), 099 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false), 100 parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, "")); 101 102 _sourceActor = (ReduceInput) model.getEntity("ReduceInput"); 103 if(_sourceActor == null) { 104 throw new RuntimeException("Could not find ReduceInput actor in model."); 105 } 106 107 _sinkActor = (StubSinkActor) model.getEntity("ReduceOutput"); 108 if(_sinkActor == null) { 109 throw new RuntimeException("Could not find ReduceOutput actor in model."); 110 } 111 112 ik = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key"); 113 iv = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value"); 114 115 LOG.info("reduce types: ik = " + ik + " iv = " + iv); 116 117 _runWorkflowLifecyclePerInput = parameters.getBoolean( 118 Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false); 119 120 _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model, 121 _runWorkflowLifecyclePerInput); 122 123 _stubThread = Thread.currentThread(); 124 125 _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor, 126 _runWorkflowLifecyclePerInput, 127 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false)); 128 129 } 130 131 /** Transfer the data from Stratosphere to Kepler, execute the workflow, 132 * and then transfer the data from Kepler to Stratosphere. 133 */ 134 @Override 135 public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { 136 137 // place the input in the input actor 138 Token keyToken = null; 139 ArrayToken valuesToken = null; 140 141 final List<Token> inputTokens = new ArrayList<Token>(); 142 while(records.hasNext()) { 143 final Record record = records.next(); 144 145 if(keyToken == null) { 146 keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, ik); 147 } 148 149 inputTokens.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, iv)); 150 } 151 try { 152 valuesToken = new ArrayToken(inputTokens.toArray(new Token[inputTokens.size()])); 153 } catch (IllegalActionException e) { 154 throw new RuntimeException("Error creating array token.", e); 155 } 156 157 158 //System.out.println(name + " setInput start"); 159 _sourceActor.setInput(keyToken, valuesToken); 160 //System.out.println(name + " setInput end"); 161 162 if(_runWorkflowLifecyclePerInput) { 163 _manager.execute(); 164 } 165 166 // read the output from the output actor 167 List<Token> tokenList = null; 168 try { 169 tokenList = _sinkActor.getOutput(); 170 } catch (IllegalActionException e) { 171 throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e); 172 } 173 174 if(tokenList != null) { 175 StubUtilities.convertTokenToCollector(tokenList, out); 176 } else if(_keplerManagerException != null) { 177 throw _keplerManagerException; 178 } 179 } 180 181 /** Report workflow execution errors to the log. */ 182 @Override 183 public void executionError(Manager manager, Throwable throwable) { 184 185 if(throwable instanceof Exception) { 186 _keplerManagerException = (Exception) throwable; 187 } else { 188 _keplerManagerException = new Exception(throwable); 189 } 190 _stubThread.interrupt(); 191 192 } 193 194 /** Do nothing. */ 195 @Override 196 public void executionFinished(Manager manager) { 197 198 } 199 200 /** Do nothing. */ 201 @Override 202 public void managerStateChanged(Manager manager) { 203 204 } 205 206 /** A source actor used for reading data from Stratosphere and writing 207 * it the Kepler workflow. 208 */ 209 private ReduceInput _sourceActor; 210 211 /** A sink actor used for reading data from the workflow and writing 212 * it to Stratosphere. 213 */ 214 private StubSinkActor _sinkActor; 215 216 /** Logging. */ 217 private static final Log LOG = LogFactory.getLog(KeplerReduceStub.class); 218 219 /** Manager to execute the workflow. */ 220 private Manager _manager; 221 222 /** 223 * Input key type. 224 */ 225 private Class<? extends Key> ik; 226 227 /** 228 * Input value type. 229 */ 230 private Class<? extends Value> iv; 231 232 /** If true, the entire workflow lifecycle is executed for each input. */ 233 private boolean _runWorkflowLifecyclePerInput = false; 234 235 /** If the workflow is running in a separate thread 236 * (_runWorkflowLifecyclePerInput = false) and an exception 237 * is thrown, this field is set to that exception. 238 */ 239 private Exception _keplerManagerException; 240 241 /** The thread executing the stub (not the workflow). */ 242 private Thread _stubThread; 243 244}