001/* A Spark stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-05-20 19:00:56 +0000 (Wed, 20 May 2015) $' 
008 * '$Revision: 33420 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.kepler.ddp.Utilities;
035import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
036import org.kepler.ddp.actor.pattern.stub.StubSourceActor;
037
038import ptolemy.actor.CompositeActor;
039import ptolemy.actor.ExecutionListener;
040import ptolemy.actor.Manager;
041
042/** A Spark stub to run a Kepler workflow in a map. Derived classes
043 *  need to implement the call() method.
044 * 
045 *  @author Daniel Crawl
046 *  @version $Id: KeplerPairFlatMapFunction.java 33420 2015-05-20 19:00:56Z crawl $
047 *  
048 */
049public abstract class KeplerPairFlatMapFunction<T,K,V>
050    extends PairFlatMapFunctionBase<T, K, V> implements ExecutionListener {
051    
052    public KeplerPairFlatMapFunction(String sourceActorName, String sinkActorName) {
053        _sourceActorName = sourceActorName;
054        _sinkActorName = sinkActorName;
055    }
056
057    /** Free resources. */
058    protected void _cleanup() throws Exception
059    {
060        if(!_runWorkflowLifecyclePerInput) {
061            _sourceActor.finish();
062            _manager.waitForCompletion();
063        }
064        _manager.removeExecutionListener(this);
065        _manager = null;
066        _sinkActor = null;
067        _sourceActor = null;
068        ptolemy.data.expr.CachedMethod.clear();
069        
070        if(_keplerManagerException != null) {
071            throw _keplerManagerException;
072        }
073
074    }
075
076    /** Configure the stub by loading the workflow and initialize the
077     *  class fields.
078     */
079    protected void _initialize() throws Exception {
080        
081        StubUtilities.initializePtolemy();
082                        
083        // initialize kepler to run the workflow in the stub
084        if(!_parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false)) {
085            StubUtilities.initializeKepler(_parameters);
086        }
087        
088        final CompositeActor model = Utilities.getModel(
089                _parameters.get(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
090                _parameters.get(Utilities.CONFIGURATION_KEPLER_MODEL, null),
091                _parameters.get(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
092                _parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
093                _parameters.get(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
094                                
095        _sourceActor = (StubSourceActor) model.getEntity(_sourceActorName);
096        if(_sourceActor == null) {
097            throw new RuntimeException("Could not find " + _sourceActorName + " actor in model.");
098        }
099        
100        _sinkActor = (StubSinkActor) model.getEntity(_sinkActorName);
101        if(_sinkActor == null) {
102            throw new RuntimeException("Could not find " + _sinkActorName + " actor in model.");
103        }
104                
105        _runWorkflowLifecyclePerInput = _parameters.getBoolean(
106                Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
107        
108        _runWorkflowLifecyclePerInput = Utilities.checkDirectorIterations(model,
109                _runWorkflowLifecyclePerInput);
110        
111        _stubThread = Thread.currentThread();
112        
113        _manager = Utilities.createManagerForModel(model, this, _sourceActor, _sinkActor,
114                _runWorkflowLifecyclePerInput,
115                _parameters.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
116        
117    }
118    
119   
120    /** Report workflow execution errors to the log. */
121    @Override
122    public void executionError(Manager manager, Throwable throwable) {
123        
124        if(throwable instanceof Exception) {
125            _keplerManagerException = (Exception) throwable;
126        } else {
127            _keplerManagerException = new Exception(throwable);
128        }
129        _stubThread.interrupt();
130        
131    }
132
133    /** Do nothing. */
134    @Override
135    public void executionFinished(Manager manager) {
136        
137        //LOG.info("map execution finished");
138
139    }
140    
141    /** Do nothing. */
142    @Override
143    public void managerStateChanged(Manager manager) {
144
145        //LOG.info("map manager state changed: " + manager.getState());
146
147    }
148    
149    /** A source actor used for reading data from Spark and writing
150     *  it the Kepler workflow.
151     */
152    protected StubSourceActor _sourceActor;
153    
154    /** A sink actor used for reading data from the workflow and writing
155     *  it to Spark.
156     */
157    protected StubSinkActor _sinkActor;
158
159
160    /** Manager to execute the workflow. */
161    protected Manager _manager;
162    
163    /** If true, the entire workflow lifecycle is executed for each input. */
164    protected boolean _runWorkflowLifecyclePerInput = false;
165    
166    /** Logging. */
167    private static final Log LOG = LogFactory.getLog(KeplerPairFlatMapFunction.class);
168
169    private String _sourceActorName;
170    
171    private String _sinkActorName;
172
173    /** If the workflow is running in a separate thread
174     *  (_runWorkflowLifecyclePerInput = false) and an exception
175     *  is thrown, this field is set to that exception.
176     */
177    protected Exception _keplerManagerException;
178    
179    /** The thread executing the stub (not the workflow). */
180    private Thread _stubThread;
181    
182}