001/* A PACT Reduce stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.kepler.ddp.Utilities;
039import org.kepler.ddp.actor.pattern.stub.ReduceInput;
040import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
041import org.kepler.stratosphere.type.TypeUtilities;
042
043import eu.stratosphere.api.java.record.functions.ReduceFunction;
044import eu.stratosphere.configuration.Configuration;
045import eu.stratosphere.types.Key;
046import eu.stratosphere.types.Record;
047import eu.stratosphere.types.Value;
048import eu.stratosphere.util.Collector;
049import ptolemy.actor.CompositeActor;
050import ptolemy.actor.ExecutionListener;
051import ptolemy.actor.Manager;
052import ptolemy.data.ArrayToken;
053import ptolemy.data.Token;
054import ptolemy.kernel.util.IllegalActionException;
055
056/** A PACT Reduce stub that runs a Kepler workflow. The workflow is loaded in
057 *  configure(), and executed each time reduce() is called.
058 * 
059 *  @author Daniel Crawl
060 *  @version $Id: KeplerReduceStub.java 33628 2015-08-24 22:42:20Z crawl $
061 */
062public class KeplerReduceStub extends ReduceFunction implements ExecutionListener {
063    
064    /** Free resources. */
065    @Override
066    public void close() throws Exception
067    {
068        super.close();
069        if(!_runWorkflowLifecyclePerInput) {
070            _sourceActor.finish();
071            _manager.waitForCompletion();
072        }
073        _manager.removeExecutionListener(this);
074        _manager = null;
075        _sinkActor = null;
076        _sourceActor = null;
077        ptolemy.data.expr.CachedMethod.clear();
078    }
079    
080    /** Configure the stub by loading the workflow and setting the PACT data
081     *  types from the workflow stub actors.
082     */
083    @Override
084    public void open(Configuration parameters) throws Exception {
085          
086        //System.out.println("opening reduce stub " + name);
087        
088        StubUtilities.initializePtolemy();
089                
090        // initialize kepler to run the workflow in the stub
091        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
092                StubUtilities.initializeKepler(parameters);
093        }
094
095        final CompositeActor model = Utilities.getModel(
096                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
097                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null),
098                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
099                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
100                parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
101                                
102        _sourceActor = (ReduceInput) model.getEntity("ReduceInput");
103        if(_sourceActor == null) {
104            throw new RuntimeException("Could not find ReduceInput actor in model.");
105        }
106        
107        _sinkActor = (StubSinkActor) model.getEntity("ReduceOutput");
108        if(_sinkActor == null) {
109            throw new RuntimeException("Could not find ReduceOutput actor in model.");
110        }
111                   
112        ik = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key");
113        iv = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value");
114
115        LOG.info("reduce types: ik = " + ik + " iv = " + iv);
116        
117        _runWorkflowLifecyclePerInput = parameters.getBoolean(
118                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
119        
120        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
121                _runWorkflowLifecyclePerInput);
122        
123        _stubThread = Thread.currentThread();
124
125        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
126                _runWorkflowLifecyclePerInput,
127                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
128
129    }
130    
131    /** Transfer the data from Stratosphere to Kepler, execute the workflow,
132     *  and then transfer the data from Kepler to Stratosphere.
133     */
134    @Override
135    public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
136
137        // place the input in the input actor
138        Token keyToken = null;
139        ArrayToken valuesToken = null;
140        
141        final List<Token> inputTokens = new ArrayList<Token>();
142        while(records.hasNext()) {
143            final Record record = records.next();
144            
145            if(keyToken == null) {
146                keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, ik);
147            }
148                
149            inputTokens.add(StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, iv));
150        }
151        try {
152            valuesToken = new ArrayToken(inputTokens.toArray(new Token[inputTokens.size()]));
153        } catch (IllegalActionException e) {
154            throw new RuntimeException("Error creating array token.", e);
155        }
156
157        
158        //System.out.println(name + " setInput start");
159        _sourceActor.setInput(keyToken, valuesToken);
160        //System.out.println(name + " setInput end");
161        
162        if(_runWorkflowLifecyclePerInput) {
163            _manager.execute();
164        }
165
166        // read the output from the output actor
167        List<Token> tokenList = null;
168        try {
169            tokenList = _sinkActor.getOutput();
170        } catch (IllegalActionException e) {
171            throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e);
172        }
173
174        if(tokenList != null) {
175            StubUtilities.convertTokenToCollector(tokenList, out);
176        } else if(_keplerManagerException != null) {
177            throw _keplerManagerException;
178        }
179    }
180
181    /** Report workflow execution errors to the log. */
182    @Override
183    public void executionError(Manager manager, Throwable throwable) {
184
185        if(throwable instanceof Exception) {
186            _keplerManagerException = (Exception) throwable;
187        } else {
188            _keplerManagerException = new Exception(throwable);
189        }
190        _stubThread.interrupt();
191        
192    }
193
194    /** Do nothing. */
195    @Override
196    public void executionFinished(Manager manager) {
197        
198    }
199
200    /** Do nothing. */
201    @Override
202    public void managerStateChanged(Manager manager) {
203        
204    }
205
206    /** A source actor used for reading data from Stratosphere and writing
207     *  it the Kepler workflow.
208     */
209    private ReduceInput _sourceActor;
210
211    /** A sink actor used for reading data from the workflow and writing
212     *  it to Stratosphere.
213     */
214    private StubSinkActor _sinkActor;
215
216    /** Logging. */
217    private static final Log LOG = LogFactory.getLog(KeplerReduceStub.class);
218    
219    /** Manager to execute the workflow. */
220    private Manager _manager;
221
222    /**
223     * Input key type.
224     */
225    private Class<? extends Key> ik;
226
227    /**
228     * Input value type.
229     */
230    private Class<? extends Value> iv;
231    
232    /** If true, the entire workflow lifecycle is executed for each input. */
233    private boolean _runWorkflowLifecyclePerInput = false;
234
235    /** If the workflow is running in a separate thread
236     *  (_runWorkflowLifecyclePerInput = false) and an exception
237     *  is thrown, this field is set to that exception.
238     */
239    private Exception _keplerManagerException;
240    
241    /** The thread executing the stub (not the workflow). */
242    private Thread _stubThread;
243
244}