001/* A PACT Map stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.kepler.ddp.Utilities;
037import org.kepler.ddp.actor.pattern.stub.MapInput;
038import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
039import org.kepler.stratosphere.type.TypeUtilities;
040
041import eu.stratosphere.api.java.record.functions.MapFunction;
042import eu.stratosphere.configuration.Configuration;
043import eu.stratosphere.types.Key;
044import eu.stratosphere.types.Record;
045import eu.stratosphere.types.Value;
046import eu.stratosphere.util.Collector;
047import ptolemy.actor.CompositeActor;
048import ptolemy.actor.ExecutionListener;
049import ptolemy.actor.Manager;
050import ptolemy.data.Token;
051import ptolemy.kernel.util.IllegalActionException;
052
053/** A PACT Map stub that runs a Kepler workflow. The workflow is loaded in
054 *  configure(), and executed each time map() is called.
055 * 
056 *  @author Daniel Crawl
057 *  @version $Id: KeplerMapStub.java 33628 2015-08-24 22:42:20Z crawl $
058 */
059public class KeplerMapStub extends MapFunction implements ExecutionListener {
060
061    /** Free resources. */
062    @Override
063    public void close() throws Exception
064    {
065        super.close();
066        //LOG.info("stopping Map stub");
067        if(!_runWorkflowLifecyclePerInput) {
068            _sourceActor.finish();
069            _manager.waitForCompletion();
070        }
071        _manager.removeExecutionListener(this);
072        _manager = null;
073        _sinkActor = null;
074        _sourceActor = null;
075        ptolemy.data.expr.CachedMethod.clear();
076        //LOG.info("stopped Map stub");
077    }
078
079    /** Configure the stub by loading the workflow and setting the PACT data
080     *  types from the workflow stub actors.
081     */
082    @Override
083    public void open(Configuration parameters) throws Exception {
084        
085        StubUtilities.initializePtolemy();
086                
087        // initialize kepler to run the workflow in the stub
088        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
089                StubUtilities.initializeKepler(parameters);
090        }
091        
092        final CompositeActor model = Utilities.getModel(
093                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
094                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null),
095                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
096                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
097                parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
098                                
099        _sourceActor = (MapInput) model.getEntity("MapInput");
100        if(_sourceActor == null) {
101            throw new RuntimeException("Could not find MapInput actor in model.");
102        }
103        
104        _sinkActor = (StubSinkActor) model.getEntity("MapOutput");
105        if(_sinkActor == null) {
106            throw new RuntimeException("Could not find MapOutput actor in model.");
107        }
108        
109                
110        // set the pact data types
111        ik = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key");
112        iv = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value");
113
114        //LOG.info("map types: ik = " + ik + " iv = " + iv);
115        
116        _runWorkflowLifecyclePerInput = parameters.getBoolean(
117                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
118        
119        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
120                _runWorkflowLifecyclePerInput);
121        
122        _stubThread = Thread.currentThread();
123
124        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
125                _runWorkflowLifecyclePerInput,
126                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
127                
128        //LOG.info("started Map stub");
129
130    }
131        
132    /** Transfer the data from Stratosphere to Kepler, execute the workflow,
133     *  and then transfer the data from Kepler to Stratosphere.
134     */
135    @Override
136        public void map(Record record, Collector<Record> out) throws Exception {
137                
138        /*
139        LOG.info("in map, record = " + 
140                PactRecordUtil.stringify(record, new Class[] {ik, iv}));
141        */  
142                
143        //LOG.info("value length = " + value.toString().length());
144        
145        // place the input in the input actor
146        
147        Token keyToken = null;
148        Token valueToken = null;
149        
150        // see if there's only one field
151        if(record.getNumFields() == 1) {
152            // since there's only one field, assume it is the value
153            valueToken = StubUtilities.convertPactDataToToken(record, 0, iv);        
154        } else {
155            keyToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.KEY_FIELD, ik);
156            valueToken = StubUtilities.convertPactDataToToken(record, TypeUtilities.VALUE_FIELD, iv);
157        }
158        
159        _sourceActor.setInput(keyToken, valueToken);
160        
161        //LOG.info("map stub: key = " + keyToken);
162        //LOG.info("map stub: value = " + valueToken);
163        
164        if(_runWorkflowLifecyclePerInput) {
165            _manager.execute();
166        }
167        
168        // read the output from the output actor
169        List<Token> tokenList = null;
170        try {
171            tokenList = _sinkActor.getOutput();
172        } catch (IllegalActionException e) {
173            throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e);
174        }
175
176        if(tokenList != null) {
177            StubUtilities.convertTokenToCollector(tokenList, out);
178        } else if(_keplerManagerException != null) {
179            throw _keplerManagerException;
180        }
181        
182        //LOG.info("  got collector = " + out);
183    }
184    
185    /** Report workflow execution errors to the log. */
186    @Override
187    public void executionError(Manager manager, Throwable throwable) {
188        
189        if(throwable instanceof Exception) {
190            _keplerManagerException = (Exception) throwable;
191        } else {
192            _keplerManagerException = new Exception(throwable);
193        }
194        _stubThread.interrupt();
195        
196    }
197
198    /** Do nothing. */
199    @Override
200    public void executionFinished(Manager manager) {
201        
202        //LOG.info("map execution finished");
203
204    }
205    
206    /** Do nothing. */
207    @Override
208    public void managerStateChanged(Manager manager) {
209
210        //LOG.info("map manager state changed: " + manager.getState());
211
212    }
213
214    /**
215     * Input key type.
216     */
217    private Class<? extends Key> ik;
218
219    /**
220     * Input value type.
221     */
222    private Class<? extends Value> iv;
223
224    /** A source actor used for reading data from Stratosphere and writing
225     *  it the Kepler workflow.
226     */
227    private MapInput _sourceActor;
228    
229    /** A sink actor used for reading data from the workflow and writing
230     *  it to Stratosphere.
231     */
232    private StubSinkActor _sinkActor;
233
234    /** Logging. */
235    private static final Log LOG = LogFactory.getLog(KeplerMapStub.class);
236
237    /** Manager to execute the workflow. */
238    private Manager _manager;
239    
240    /** If true, the entire workflow lifecycle is executed for each input. */
241    private boolean _runWorkflowLifecyclePerInput = false;
242    
243    /** If the workflow is running in a separate thread
244     *  (_runWorkflowLifecyclePerInput = false) and an exception
245     *  is thrown, this field is set to that exception.
246     */
247    private Exception _keplerManagerException;
248    
249    /** The thread executing the stub (not the workflow). */
250    private Thread _stubThread;
251
252}