001/* A PACT CoGroup stub that runs a Kepler workflow. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 008 * '$Revision: 33628 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.stratosphere.stub; 031 032import java.util.ArrayList; 033import java.util.Iterator; 034import java.util.List; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.kepler.ddp.Utilities; 039import org.kepler.ddp.actor.pattern.stub.CoGroupInput; 040import org.kepler.ddp.actor.pattern.stub.StubSinkActor; 041import org.kepler.stratosphere.type.TypeUtilities; 042 043import eu.stratosphere.api.java.record.functions.CoGroupFunction; 044import eu.stratosphere.configuration.Configuration; 045import eu.stratosphere.types.Key; 046import eu.stratosphere.types.Record; 047import eu.stratosphere.types.Value; 048import eu.stratosphere.util.Collector; 049import ptolemy.actor.CompositeActor; 050import ptolemy.actor.ExecutionListener; 051import ptolemy.actor.Manager; 052import ptolemy.data.ArrayToken; 053import ptolemy.data.Token; 054import ptolemy.data.type.ArrayType; 055import ptolemy.data.type.RecordType; 056import ptolemy.kernel.util.IllegalActionException; 057 058/** A PACT CoGroup stub that runs a Kepler workflow. The workflow is loaded in 059 * configure(), and executed each time coGroup() is called. 060 * 061 * @author Daniel Crawl 062 * @version $Id: KeplerCoGroupStub.java 33628 2015-08-24 22:42:20Z crawl $ 063 */ 064public class KeplerCoGroupStub extends CoGroupFunction implements ExecutionListener { 065 066 /** Free resources. */ 067 @Override 068 public void close() throws Exception 069 { 070 super.close(); 071 //LOG.info("stopping CoGroup stub"); 072 if(!_runWorkflowLifecyclePerInput) { 073 _sourceActor.finish(); 074 _manager.waitForCompletion(); 075 } 076 _manager.removeExecutionListener(this); 077 _manager = null; 078 _sinkActor = null; 079 _sourceActor = null; 080 //LOG.info("stopped CoGroup stub"); 081 } 082 083 /** Configure the stub by loading the workflow and setting the PACT data 084 * types from the workflow stub actors. 085 */ 086 @Override 087 public void open(Configuration parameters) throws Exception { 088 089 StubUtilities.initializePtolemy(); 090 091 // initialize kepler to run the workflow in the stub 092 if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) { 093 StubUtilities.initializeKepler(parameters); 094 } 095 096 final CompositeActor model = Utilities.getModel( 097 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null), 098 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null), 099 parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null), 100 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false), 101 parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, "")); 102 103 _sourceActor = (CoGroupInput) model.getEntity("CoGroupInput"); 104 if(_sourceActor == null) { 105 throw new RuntimeException("Could not find CoGroupInput actor in model."); 106 } 107 108 _sinkActor = (StubSinkActor) model.getEntity("CoGroupOutput"); 109 if(_sinkActor == null) { 110 throw new RuntimeException("Could not find CoGroupOutput actor in model."); 111 } 112 113 114 firstIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key"); 115 firstIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value"); 116 secondIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in2, "key"); 117 secondIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in2, "value"); 118 119 //LOG.info("cogroup types: ik1 = " + firstIK + " iv1 = " + firstIV + 120 //" ik2 = " + secondIK + " iv2 = " + secondIV); 121 122 _runWorkflowLifecyclePerInput = parameters.getBoolean( 123 Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false); 124 125 _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model, 126 _runWorkflowLifecyclePerInput); 127 128 _stubThread = Thread.currentThread(); 129 130 _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor, 131 _runWorkflowLifecyclePerInput, 132 parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false)); 133 134 //LOG.info("started CoGroup stub"); 135 136 } 137 138 /** Transfer the data from Stratosphere to Kepler, execute the workflow, 139 * and then transfer the data from Kepler to Stratosphere. 140 */ 141 @Override 142 public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception { 143 144 //LOG.info("k = " + key); 145 //LOG.info("v1 = " + values1); 146 //LOG.info("v2 = " + values2); 147 148 Token keyToken = null; 149 150 // see if values1 is empty 151 Token values1Token; 152 if(!records1.hasNext()) { 153 values1Token = new ArrayToken(((RecordType)((ArrayType)_sourceActor.in.getType()).getElementType()).get("value")); 154 } else { 155 final List<Token> tokenList = new ArrayList<Token>(); 156 while(records1.hasNext()) { 157 final Record record = records1.next(); 158 159 if(keyToken == null) { 160 keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, firstIK); 161 } 162 163 tokenList.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, firstIV)); 164 } 165 try { 166 values1Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()])); 167 } catch (IllegalActionException e) { 168 throw new RuntimeException("Error creating array token.", e); 169 } 170 171 } 172 173 // see if values2 is empty 174 Token values2Token; 175 if(!records2.hasNext()) { 176 values2Token = new ArrayToken(((RecordType)((ArrayType)_sourceActor.in2.getType()).getElementType()).get("value")); 177 } else { 178 final List<Token> tokenList = new ArrayList<Token>(); 179 while(records2.hasNext()) { 180 final Record record = records2.next(); 181 182 if(keyToken == null) { 183 keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, secondIK); 184 } 185 186 tokenList.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, secondIV)); 187 } 188 try { 189 values2Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()])); 190 } catch (IllegalActionException e) { 191 throw new RuntimeException("Error creating array token.", e); 192 } 193 } 194 195 196 // set the inputs 197 _sourceActor.setInput(keyToken, values1Token, values2Token); 198 199 if(_runWorkflowLifecyclePerInput) { 200 _manager.execute(); 201 } 202 203 // read the output from the output actor 204 List<Token> tokenList = null; 205 try { 206 tokenList = _sinkActor.getOutput(); 207 } catch (IllegalActionException e) { 208 throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e); 209 } 210 211 if(tokenList != null) { 212 StubUtilities.convertTokenToCollector(tokenList, out); 213 } else if(_keplerManagerException != null) { 214 throw _keplerManagerException; 215 } 216 217 } 218 219 /** Report workflow execution errors to the log. */ 220 @Override 221 public void executionError(Manager manager, Throwable throwable) { 222 223 if(throwable instanceof Exception) { 224 _keplerManagerException = (Exception) throwable; 225 } else { 226 _keplerManagerException = new Exception(throwable); 227 } 228 _stubThread.interrupt(); 229 230 } 231 232 /** Do nothing. */ 233 @Override 234 public void executionFinished(Manager manager) { 235 236 //LOG.info("cogroup execution finished"); 237 238 } 239 240 /** Do nothing. */ 241 @Override 242 public void managerStateChanged(Manager manager) { 243 244 //LOG.info("cogroup manager state changed: " + manager.getState()); 245 246 } 247 248 /** A source actor used for reading data from Stratosphere and writing 249 * it the Kepler workflow. 250 */ 251 private CoGroupInput _sourceActor; 252 253 /** A sink actor used for reading data from the workflow and writing 254 * it to Stratosphere. 255 */ 256 private StubSinkActor _sinkActor; 257 258 /** Logging. */ 259 private static final Log LOG = LogFactory.getLog(KeplerCoGroupStub.class); 260 261 /** Manager to execute the workflow. */ 262 private Manager _manager; 263 264 /** 265 * First input key type. 266 */ 267 private Class<? extends Key> firstIK; 268 269 /** 270 * First input value type. 271 */ 272 private Class<? extends Value> firstIV; 273 274 /** 275 * Second input key type. 276 */ 277 private Class<? extends Key> secondIK; 278 279 /** 280 * Second input value type. 281 */ 282 private Class<? extends Value> secondIV; 283 284 /** If true, the entire workflow lifecycle is executed for each input. */ 285 private boolean _runWorkflowLifecyclePerInput = false; 286 287 /** If the workflow is running in a separate thread 288 * (_runWorkflowLifecyclePerInput = false) and an exception 289 * is thrown, this field is set to that exception. 290 */ 291 private Exception _keplerManagerException; 292 293 /** The thread executing the stub (not the workflow). */ 294 private Thread _stubThread; 295 296}