001/* A PACT CoGroup stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.kepler.ddp.Utilities;
039import org.kepler.ddp.actor.pattern.stub.CoGroupInput;
040import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
041import org.kepler.stratosphere.type.TypeUtilities;
042
043import eu.stratosphere.api.java.record.functions.CoGroupFunction;
044import eu.stratosphere.configuration.Configuration;
045import eu.stratosphere.types.Key;
046import eu.stratosphere.types.Record;
047import eu.stratosphere.types.Value;
048import eu.stratosphere.util.Collector;
049import ptolemy.actor.CompositeActor;
050import ptolemy.actor.ExecutionListener;
051import ptolemy.actor.Manager;
052import ptolemy.data.ArrayToken;
053import ptolemy.data.Token;
054import ptolemy.data.type.ArrayType;
055import ptolemy.data.type.RecordType;
056import ptolemy.kernel.util.IllegalActionException;
057
058/** A PACT CoGroup stub that runs a Kepler workflow. The workflow is loaded in
059 *  configure(), and executed each time coGroup() is called.
060 * 
061 *  @author Daniel Crawl
062 *  @version $Id: KeplerCoGroupStub.java 33628 2015-08-24 22:42:20Z crawl $
063 */
064public class KeplerCoGroupStub extends CoGroupFunction implements ExecutionListener {
065
066    /** Free resources. */
067    @Override
068    public void close() throws Exception
069    {
070        super.close();
071        //LOG.info("stopping CoGroup stub");
072        if(!_runWorkflowLifecyclePerInput) {
073            _sourceActor.finish();
074            _manager.waitForCompletion();
075        }
076        _manager.removeExecutionListener(this);
077        _manager = null;
078        _sinkActor = null;
079        _sourceActor = null;
080        //LOG.info("stopped CoGroup stub");
081    }
082
083    /** Configure the stub by loading the workflow and setting the PACT data
084     *  types from the workflow stub actors.
085     */
086    @Override
087    public void open(Configuration parameters) throws Exception {
088                
089        StubUtilities.initializePtolemy();
090                
091        // initialize kepler to run the workflow in the stub
092        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
093                StubUtilities.initializeKepler(parameters);
094        }
095
096        final CompositeActor model = Utilities.getModel(
097                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
098                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null),
099                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
100                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
101                parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
102                                
103        _sourceActor = (CoGroupInput) model.getEntity("CoGroupInput");
104        if(_sourceActor == null) {
105            throw new RuntimeException("Could not find CoGroupInput actor in model.");
106        }
107        
108        _sinkActor = (StubSinkActor) model.getEntity("CoGroupOutput");
109        if(_sinkActor == null) {
110            throw new RuntimeException("Could not find CoGroupOutput actor in model.");
111        }
112        
113                        
114        firstIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key");
115        firstIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value");
116        secondIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in2, "key");
117        secondIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in2, "value");
118
119        //LOG.info("cogroup types: ik1 = " + firstIK + " iv1 = " + firstIV +
120            //" ik2 = " + secondIK + " iv2 = " + secondIV);
121                
122        _runWorkflowLifecyclePerInput = parameters.getBoolean(
123                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
124        
125        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
126                _runWorkflowLifecyclePerInput);
127
128        _stubThread = Thread.currentThread();
129
130        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
131                _runWorkflowLifecyclePerInput,
132                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
133        
134        //LOG.info("started CoGroup stub");
135
136    }
137
138    /** Transfer the data from Stratosphere to Kepler, execute the workflow,
139     *  and then transfer the data from Kepler to Stratosphere.
140     */
141    @Override
142    public void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception {
143        
144        //LOG.info("k = " + key);
145        //LOG.info("v1 = " + values1);
146        //LOG.info("v2 = " + values2);
147
148        Token keyToken = null;
149        
150        // see if values1 is empty
151        Token values1Token;
152        if(!records1.hasNext()) {
153            values1Token = new ArrayToken(((RecordType)((ArrayType)_sourceActor.in.getType()).getElementType()).get("value"));
154        } else {
155            final List<Token> tokenList = new ArrayList<Token>();
156            while(records1.hasNext()) {
157                final Record record = records1.next();
158                
159                if(keyToken == null) {
160                    keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, firstIK);
161                }
162                    
163                tokenList.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, firstIV));
164            }
165            try {
166                values1Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()]));
167            } catch (IllegalActionException e) {
168                throw new RuntimeException("Error creating array token.", e);
169            }
170
171        }
172
173        // see if values2 is empty
174        Token values2Token;
175        if(!records2.hasNext()) {
176            values2Token = new ArrayToken(((RecordType)((ArrayType)_sourceActor.in2.getType()).getElementType()).get("value"));
177        } else {
178            final List<Token> tokenList = new ArrayList<Token>();
179            while(records2.hasNext()) {
180                final Record record = records2.next();
181                
182                if(keyToken == null) {
183                    keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, secondIK);
184                }
185                    
186                tokenList.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, secondIV));
187            }
188            try {
189                values2Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()]));
190            } catch (IllegalActionException e) {
191                throw new RuntimeException("Error creating array token.", e);
192            }
193        }
194
195        
196        // set the inputs
197        _sourceActor.setInput(keyToken, values1Token, values2Token);
198        
199        if(_runWorkflowLifecyclePerInput) {
200            _manager.execute();
201        }
202
203        // read the output from the output actor
204        List<Token> tokenList = null;
205        try {
206            tokenList = _sinkActor.getOutput();
207        } catch (IllegalActionException e) {
208            throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e);
209        }
210
211        if(tokenList != null) {
212            StubUtilities.convertTokenToCollector(tokenList, out);
213        } else if(_keplerManagerException != null) {
214            throw _keplerManagerException;
215        }
216
217    }
218
219    /** Report workflow execution errors to the log. */
220    @Override
221    public void executionError(Manager manager, Throwable throwable) {
222        
223        if(throwable instanceof Exception) {
224            _keplerManagerException = (Exception) throwable;
225        } else {
226            _keplerManagerException = new Exception(throwable);
227        }
228        _stubThread.interrupt();
229        
230    }
231
232    /** Do nothing. */
233    @Override
234    public void executionFinished(Manager manager) {
235        
236        //LOG.info("cogroup execution finished");
237
238    }
239
240    /** Do nothing. */
241    @Override
242    public void managerStateChanged(Manager manager) {
243
244        //LOG.info("cogroup manager state changed: " + manager.getState());
245       
246    }
247
248    /** A source actor used for reading data from Stratosphere and writing
249     *  it the Kepler workflow.
250     */
251    private CoGroupInput _sourceActor;
252
253    /** A sink actor used for reading data from the workflow and writing
254     *  it to Stratosphere.
255     */
256    private StubSinkActor _sinkActor;
257
258    /** Logging. */
259    private static final Log LOG = LogFactory.getLog(KeplerCoGroupStub.class);
260
261    /** Manager to execute the workflow. */
262    private Manager _manager;
263
264    /**
265     * First input key type.
266     */
267    private Class<? extends Key> firstIK;
268
269    /**
270     * First input value type.
271     */
272    private Class<? extends Value> firstIV;
273
274    /**
275     * Second input key type.
276     */
277    private Class<? extends Key> secondIK;
278
279    /**
280     * Second input value type.
281     */
282    private Class<? extends Value> secondIV;
283    
284    /** If true, the entire workflow lifecycle is executed for each input. */
285    private boolean _runWorkflowLifecyclePerInput = false;
286
287    /** If the workflow is running in a separate thread
288     *  (_runWorkflowLifecyclePerInput = false) and an exception
289     *  is thrown, this field is set to that exception.
290     */
291    private Exception _keplerManagerException;
292    
293    /** The thread executing the stub (not the workflow). */
294    private Thread _stubThread;
295
296}