001/* A base class for Hadoop stubs that execute Kepler workflows.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * Permission is hereby granted, without written agreement and without
007 * license or royalty fees, to use, copy, modify, and distribute this
008 * software and its documentation for any purpose, provided that the above
009 * copyright notice and the following two paragraphs appear in all copies
010 * of this software.
011 *
012 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
013 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
014 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
015 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
016 * SUCH DAMAGE.
017 *
018 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
019 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
020 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
021 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 * ENHANCEMENTS, OR MODIFICATIONS.
024 *
025 */
026 package org.kepler.hadoop.execution;
027
028import java.io.IOException;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.log4j.Logger;
032import org.kepler.ddp.Utilities;
033import org.kepler.ddp.actor.pattern.stub.StubSinkActor;
034import org.kepler.ddp.actor.pattern.stub.StubSourceActor;
035
036import ptolemy.actor.CompositeActor;
037import ptolemy.actor.ExecutionListener;
038import ptolemy.actor.Manager;
039
040/** A base class for Hadoop stubs that execute Kepler workflows.
041 * 
042 *  @author Daniel Crawl
043 *  @version $Id: KeplerAppBase.java 32785 2014-06-25 18:36:29Z crawl $
044 * 
045 */
046public class KeplerAppBase implements ExecutionListener {
047
048    /** Create a new KeplerAppBase.
049     *  @param hadoopConf the hadoop configuration object.
050     *  @param sourceActorName the name of the source actor in the workflow.
051     *  @param sinkActorName the name of the sink actor in the workflow.
052     */
053    protected KeplerAppBase(Configuration hadoopConf, String sourceActorName, String sinkActorName) {
054
055        _model = Utilities.getModel(
056                hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null),
057                hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL, null),
058                hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null),
059                hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false),
060                hadoopConf.get(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, ""));
061
062        _runWorkflowLifecyclePerInput = 
063                hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false);
064        
065        _runWorkflowLifecyclePerInput = 
066                Utilities.checkDirectorIterations(_model, _runWorkflowLifecyclePerInput);
067        
068        _sourceActor = (StubSourceActor) _model.getEntity(sourceActorName);
069        _sinkActor = (StubSinkActor)_model.getEntity(sinkActorName);
070        
071        _stubThread = Thread.currentThread();
072        
073        try {
074            _manager = Utilities.createManagerForModel(_model, this, _sourceActor, _sinkActor,
075                    _runWorkflowLifecyclePerInput,
076                    hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false));
077        } catch (Exception ex) {
078            ex.printStackTrace();
079        }
080
081    }
082
083    /** Perform cleanup after all inputs are processed. */
084    public void cleanup() throws InterruptedException
085    {
086        if(!_runWorkflowLifecyclePerInput) {
087            _sourceActor.finish();
088            _manager.waitForCompletion();
089        }
090        _manager.removeExecutionListener(this);
091        _manager = null;
092        _sinkActor = null;
093        _sourceActor = null;
094        ptolemy.data.expr.CachedMethod.clear();
095    }
096
097    @Override
098    public void executionError(Manager manager, Throwable throwable) {
099        
100        if(throwable instanceof IOException) {
101            _keplerManagerException = (IOException) throwable;
102        } else {
103            _keplerManagerException = new IOException(throwable);
104        }
105        _stubThread.interrupt();
106    }
107    
108    @Override
109    public void executionFinished(Manager manager) {
110        // TODO Auto-generated method stub
111        
112    }
113    
114    @Override
115    public void managerStateChanged(Manager manager) {
116        // TODO Auto-generated method stub
117        
118    }
119
120    /** The workflow to execute. */
121    protected CompositeActor _model;
122    
123    /** Manager to execute the workflow. */
124    protected Manager _manager;
125    
126    /** If true, the entire workflow lifecycle is executed for each input. */
127    protected boolean _runWorkflowLifecyclePerInput = false;
128    
129    /** The actor writing input to the workflow. */
130    protected StubSourceActor _sourceActor;
131    
132    /** The actor reading output from the workflow. */
133    protected StubSinkActor _sinkActor;    
134    
135    /** If the workflow is running in a separate thread
136     *  (_runWorkflowLifecyclePerInput = false) and an exception
137     *  is thrown, this field is set to that exception.
138     */
139    protected IOException _keplerManagerException;
140    
141    /** The thread executing the stub (not the workflow). */
142    private Thread _stubThread;
143    
144    private final static Logger _logger = Logger.getLogger(KeplerApp4Map.class.getName());
145
146}