001/* A PACT Match stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-24 22:42:20 +0000 (Mon, 24 Aug 2015) $' 
008 * '$Revision: 33628 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.stratosphere.stub;
031
032import java.util.List;
033
034import org.apache.commons.logging.Log;
035import org.apache.commons.logging.LogFactory;
036import org.kepler.ddp.Utilities;
037import org.kepler.ddp.actor.pattern.stub.MatchInput;
038import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
039import org.kepler.stratosphere.type.TypeUtilities;
040
041import eu.stratosphere.api.java.record.functions.JoinFunction;
042import eu.stratosphere.configuration.Configuration;
043import eu.stratosphere.types.Key;
044import eu.stratosphere.types.Record;
045import eu.stratosphere.types.Value;
046import eu.stratosphere.util.Collector;
047import ptolemy.actor.CompositeActor;
048import ptolemy.actor.ExecutionListener;
049import ptolemy.actor.Manager;
050import ptolemy.data.Token;
051import ptolemy.kernel.util.IllegalActionException;
052
053/** A PACT Match stub that runs a Kepler workflow. The workflow is loaded in
054 *  configure(), and executed each time match() is called.
055 * 
056 *  @author Daniel Crawl
057 *  @version $Id: KeplerMatchStub.java 33628 2015-08-24 22:42:20Z crawl $
058 */
059public class KeplerMatchStub extends JoinFunction implements ExecutionListener {
060    
061    /** Free resources. */
062    @Override
063    public void close() throws Exception
064    {
065        super.close();
066        if(!_runWorkflowLifecyclePerInput) {
067            _sourceActor.finish();
068            _manager.waitForCompletion();
069        }
070        _manager.removeExecutionListener(this);
071        _manager = null;
072        _sinkActor = null;
073        _sourceActor = null;
074    }
075
076    /** Configure the stub by loading the workflow and setting the PACT data
077     *  types from the workflow stub actors.
078     */
079    @Override
080    public void open(Configuration parameters) throws Exception {
081                
082        StubUtilities.initializePtolemy();
083                
084        // initialize kepler to run the workflow in the stub
085        if(!parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
086                StubUtilities.initializeKepler(parameters);
087        }
088
089        final CompositeActor model = Utilities.getModel(
090                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
091                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL, null),
092                parameters.getString(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
093                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
094                parameters.getString(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
095                                
096        _sourceActor = (MatchInput) model.getEntity("MatchInput");
097        if(_sourceActor == null) {
098            throw new RuntimeException("Could not find MatchInput actor in model.");
099        }
100        
101        _sinkActor = (StubSinkActor) model.getEntity("MatchOutput");
102        if(_sinkActor == null) {
103            throw new RuntimeException("Could not find MatchOutput actor in model.");
104        }
105        
106        firstIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in, "key");
107        firstIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in, "value");
108        secondIK = TypeUtilities.getPactKeyTypeForFieldInKeyValuePort(_sourceActor.in2, "key");
109        secondIV = TypeUtilities.getPactValueTypeForFieldInKeyValuePort(_sourceActor.in2, "value");
110
111        LOG.info("match types: ik1 = " + firstIK + " iv1 = " + firstIV +
112            " ik2 = " + secondIK + " iv2 = " + secondIV);
113          
114        _runWorkflowLifecyclePerInput = parameters.getBoolean(
115                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
116        
117        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
118                _runWorkflowLifecyclePerInput);
119        
120        _stubThread = Thread.currentThread();
121
122        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
123                _runWorkflowLifecyclePerInput,
124                parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
125
126        //LOG.info("map leaving configure()");
127
128    }
129
130    /** Transfer the data from Stratosphere to Kepler, execute the workflow,
131     *  and then transfer the data from Kepler to Stratosphere.
132     */
133    @Override
134    public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
135        
136        //LOG.info("k = " + key);
137        //LOG.info("v1 = " + value1);
138        //LOG.info("v2 = " + value1);
139
140        // set the inputs
141        final Token keyToken = StubUtilities.convertPactDataToToken(value1,
142            TypeUtilities.KEY_FIELD, firstIK);
143        final Token value1Token = StubUtilities.convertPactDataToToken(value1,
144            TypeUtilities.VALUE_FIELD, firstIV);
145        final Token value2Token = StubUtilities.convertPactDataToToken(value2,
146            TypeUtilities.VALUE_FIELD, secondIV);
147        _sourceActor.setInput(keyToken, value1Token, value2Token);
148        
149        if(_runWorkflowLifecyclePerInput) {
150            _manager.execute();
151        }
152
153        // read the output from the output actor
154        List<Token> tokenList = null;
155        try {
156            tokenList = _sinkActor.getOutput();
157        } catch (IllegalActionException e) {
158            throw new Exception("Error getting output for " + _sinkActor.getName() + ".", e);
159        }
160
161        if(tokenList != null) {
162            StubUtilities.convertTokenToCollector(tokenList, out);
163        } else if(_keplerManagerException != null) {
164            throw _keplerManagerException;
165        }
166
167    }
168
169    /** Report workflow execution errors to the log. */
170    @Override
171    public void executionError(Manager manager, Throwable throwable) {
172        
173        if(throwable instanceof Exception) {
174            _keplerManagerException = (Exception) throwable;
175        } else {
176            _keplerManagerException = new Exception(throwable);
177        }
178        _stubThread.interrupt();
179        
180    }
181
182    /** Do nothing. */
183    @Override
184    public void executionFinished(Manager manager) {
185        
186        //LOG.info("match execution finished");
187
188    }
189
190    /** Do nothing. */
191    @Override
192    public void managerStateChanged(Manager manager) {
193
194        //LOG.info("match manager state changed: " + manager.getState());
195       
196    }
197   
198    /** A source actor used for reading data from Stratosphere and writing
199     *  it the Kepler workflow.
200     */
201    private MatchInput _sourceActor;
202
203    /** A sink actor used for reading data from the workflow and writing
204     *  it to Stratosphere.
205     */
206    private StubSinkActor _sinkActor;
207
208    /** Logging. */
209    private static final Log LOG = LogFactory.getLog(KeplerMatchStub.class);
210
211    /** Manager to execute the workflow. */
212    private Manager _manager;
213
214    /**
215     * First input key type.
216     */
217    private Class<? extends Key> firstIK;
218
219    /**
220     * First input value type.
221     */
222    private Class<? extends Value> firstIV;
223
224    /**
225     * Second input key type.
226     */
227    private Class<? extends Key> secondIK;
228
229    /**
230     * Second input value type.
231     */
232    private Class<? extends Value> secondIV;
233
234    /** If true, the entire workflow lifecycle is executed for each input. */
235    private boolean _runWorkflowLifecyclePerInput = false;
236
237    /** If the workflow is running in a separate thread
238     *  (_runWorkflowLifecyclePerInput = false) and an exception
239     *  is thrown, this field is set to that exception.
240     */
241    private Exception _keplerManagerException;
242    
243    /** The thread executing the stub (not the workflow). */
244    private Thread _stubThread;
245
246}