001/* A PACT Cross stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.kepler.ddp.Utilities;
037import org.kepler.ddp.actor.pattern.stub.CrossInput;
038import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
039import org.kepler.stratosphere.type.TypeUtilities;
040
041import eu.stratosphere.api.java.record.functions.CrossFunction;
042import eu.stratosphere.configuration.Configuration;
043import eu.stratosphere.types.Key;
044import eu.stratosphere.types.Record;
045import eu.stratosphere.types.Value;
046import eu.stratosphere.util.Collector;
047import ptolemy.actor.CompositeActor;
048import ptolemy.actor.ExecutionListener;
049import ptolemy.actor.Manager;
050import ptolemy.data.Token;
051import ptolemy.kernel.util.IllegalActionException;
052
053/** A PACT Cross stub that runs a Kepler workflow. The workflow is loaded in
054 *  configure(), and executed each time cross() is called.
055 * 
056 *  @author Daniel Crawl
057 *  @version $Id: KeplerCrossStub.java 33628 2015-08-24 22:42:20Z crawl $
058 */
059public class KeplerCrossStub extends CrossFunction implements ExecutionListener {
060
061    /** Free resources. */
062    @Override
063    public void close() throws Exception
064    {
065        super.close();
066        if(!_runWorkflowLifecyclePerInput) {
067            _sourceActor.finish();
068            _manager.waitForCompletion();
069        }
070        _manager.removeExecutionListener(this);
071        _manager = null;
072        _sinkActor = null;
073        _sourceActor = null;
074    }
075    
076    /** Configure the stub by loading the workflow and setting the PACT data
077     *  types from the workflow stub actors.
078     */
079    @Override
080    public void open(Configuration parameters) throws Exception {
081                
082        StubUtilities.initializePtolemy();
083        
084        // initialize kepler to run the workflow in the stub
085        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
086                StubUtilities.initializeKepler(parameters);
087        }
088
089        final CompositeActor model = Utilities.getModel(
090                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
091                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null),
092                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
093                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
094                parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
095                                
096        _sourceActor = (CrossInput) model.getEntity("CrossInput");
097        if(_sourceActor == null) {
098            throw new RuntimeException("Could not find CrossInput actor in model.");
099        }
100        
101        _sinkActor = (StubSinkActor) model.getEntity("CrossOutput");
102        if(_sinkActor == null) {
103            throw new RuntimeException("Could not find CrossOutput actor in model.");
104        }
105           
106        firstIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key");
107        firstIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value");
108        secondIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in2, "key");
109        secondIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in2, "value");
110
111        LOG.info("cross types: ik1 = " + firstIK + " iv1 = " + firstIV +
112            " ik2 = " + secondIK + " iv2 = " + secondIV);
113               
114        _runWorkflowLifecyclePerInput = parameters.getBoolean(
115                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
116        
117        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
118                _runWorkflowLifecyclePerInput);
119
120        _stubThread = Thread.currentThread();
121
122        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
123                _runWorkflowLifecyclePerInput,
124                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
125
126        //LOG.info("map leaving configure()");
127
128    }
129
130    /** Transfer the data from Stratosphere to Kepler, execute the workflow,
131     *  and then transfer the data from Kepler to Stratosphere.
132     */
133    @Override
134    public void cross(Record record1, Record record2, Collector<Record> out) throws Exception {
135
136        // set the inputs
137        Token key1Token = null;
138        Token value1Token = null;
139        Token key2Token = null;
140        Token value2Token = null;
141        
142        // see if there's only one field
143        if(record1.getNumFields() == 1) {
144            // since there's only one field, assume it is the value
145            value1Token = StubUtilities.convertPactDataToToken(record1, 0, firstIV);
146        } else {
147            key1Token = StubUtilities.convertPactDataToToken(record1,
148                    TypeUtilities.KEY_FIELD, firstIK);
149            value1Token = StubUtilities.convertPactDataToToken(record1,
150                    TypeUtilities.VALUE_FIELD, firstIV);            
151        }
152        
153        // see if there's only one field
154        if(record2.getNumFields() == 1) {
155            // since there's only one field, assume it is the value
156            value2Token = StubUtilities.convertPactDataToToken(record2, 0, secondIV);
157        } else {
158            key2Token = StubUtilities.convertPactDataToToken(record2,
159                    TypeUtilities.KEY_FIELD, secondIK);
160            value2Token = StubUtilities.convertPactDataToToken(record2,
161                    TypeUtilities.VALUE_FIELD, secondIV);            
162        }
163
164        _sourceActor.setInput(key1Token, value1Token, key2Token, value2Token);
165        
166        if(_runWorkflowLifecyclePerInput) {
167            _manager.execute();
168        }
169
170        // read the output from the output actor
171        List<Token> tokenList = null;
172        try {
173            tokenList = _sinkActor.getOutput();
174        } catch (IllegalActionException e) {
175            throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e);
176        }
177
178        if(tokenList != null) {
179            StubUtilities.convertTokenToCollector(tokenList, out);
180        } else if(_keplerManagerException != null) {
181            throw _keplerManagerException;
182        }
183        
184    }
185    
186    /** Report workflow execution errors to the log. */
187    @Override
188    public void executionError(Manager manager, Throwable throwable) {
189        
190        if(throwable instanceof Exception) {
191            _keplerManagerException = (Exception) throwable;
192        } else {
193            _keplerManagerException = new Exception(throwable);
194        }
195        _stubThread.interrupt();
196        
197    }
198
199    /** Do nothing. */
200    @Override
201    public void executionFinished(Manager manager) {
202        
203        //LOG.info("map execution finished");
204
205    }
206
207    /** Do nothing. */
208    @Override
209    public void managerStateChanged(Manager manager) {
210
211        //LOG.info("map manager state changed: " + manager.getState());
212       
213    }
214    
215    /** A source actor used for reading data from Stratosphere and writing
216     *  it the Kepler workflow.
217     */
218    private CrossInput _sourceActor;
219
220    /** A sink actor used for reading data from the workflow and writing
221     *  it to Stratosphere.
222     */
223    private StubSinkActor _sinkActor;
224
225    /** Logging. */
226    private static final Log LOG = LogFactory.getLog(KeplerCrossStub.class);
227
228    /** Manager to execute the workflow. */
229    private Manager _manager;
230    
231    /**
232     * First input key type.
233     */
234    private Class<? extends Key> firstIK;
235
236    /**
237     * First input value type.
238     */
239    private Class<? extends Value> firstIV;
240
241    /**
242     * Second input key type.
243     */
244    private Class<? extends Key> secondIK;
245
246    /**
247     * Second input value type.
248     */
249    private Class<? extends Value> secondIV;
250    
251    /** If true, the entire workflow lifecycle is executed for each input. */
252    private boolean _runWorkflowLifecyclePerInput = false;
253
254    /** If the workflow is running in a separate thread
255     *  (_runWorkflowLifecyclePerInput = false) and an exception
256     *  is thrown, this field is set to that exception.
257     */
258    private Exception _keplerManagerException;
259    
260    /** The thread executing the stub (not the workflow). */
261    private Thread _stubThread;
262
263}