001/* A base class for Hadoop stubs that execute Kepler workflows. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * Permission is hereby granted, without written agreement and without 007 * license or royalty fees, to use, copy, modify, and distribute this 008 * software and its documentation for any purpose, provided that the above 009 * copyright notice and the following two paragraphs appear in all copies 010 * of this software. 011 * 012 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 013 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 014 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 015 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 016 * SUCH DAMAGE. 017 * 018 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 019 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 020 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 021 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 * ENHANCEMENTS, OR MODIFICATIONS. 024 * 025 */ 026 package org.kepler.hadoop.execution; 027 028import java.io.IOException; 029 030import org.apache.hadoop.conf.Configuration; 031import org.apache.log4j.Logger; 032import org.kepler.ddp.Utilities; 033import org.kepler.ddp.actor.pattern.stub.StubSinkActor; 034import org.kepler.ddp.actor.pattern.stub.StubSourceActor; 035 036import ptolemy.actor.CompositeActor; 037import ptolemy.actor.ExecutionListener; 038import ptolemy.actor.Manager; 039 040/** A base class for Hadoop stubs that execute Kepler workflows. 041 * 042 * @author Daniel Crawl 043 * @version $Id: KeplerAppBase.java 32785 2014-06-25 18:36:29Z crawl $ 044 * 045 */ 046public class KeplerAppBase implements ExecutionListener { 047 048 /** Create a new KeplerAppBase. 049 * @param hadoopConf the hadoop configuration object. 050 * @param sourceActorName the name of the source actor in the workflow. 051 * @param sinkActorName the name of the sink actor in the workflow. 052 */ 053 protected KeplerAppBase(Configuration hadoopConf, String sourceActorName, String sinkActorName) { 054 055 _model = Utilities.getModel( 056 hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL_NAME, null), 057 hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL, null), 058 hadoopConf.get(Utilities.CONFIGURATION_KEPLER_MODEL_PATH, null), 059 hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_SAME_JVM, false), 060 hadoopConf.get(Utilities.CONFIGURATION_KEPLER_REDIRECT_DIR, "")); 061 062 _runWorkflowLifecyclePerInput = 063 hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_RUN_WORKFLOW_LIFECYCLE_PER_INPUT, false); 064 065 _runWorkflowLifecyclePerInput = 066 Utilities.checkDirectorIterations(_model, _runWorkflowLifecyclePerInput); 067 068 _sourceActor = (StubSourceActor) _model.getEntity(sourceActorName); 069 _sinkActor = (StubSinkActor)_model.getEntity(sinkActorName); 070 071 _stubThread = Thread.currentThread(); 072 073 try { 074 _manager = Utilities.createManagerForModel(_model, this, _sourceActor, _sinkActor, 075 _runWorkflowLifecyclePerInput, 076 hadoopConf.getBoolean(Utilities.CONFIGURATION_KEPLER_PRINT_EXE_INFO, false)); 077 } catch (Exception ex) { 078 ex.printStackTrace(); 079 } 080 081 } 082 083 /** Perform cleanup after all inputs are processed. */ 084 public void cleanup() throws InterruptedException 085 { 086 if(!_runWorkflowLifecyclePerInput) { 087 _sourceActor.finish(); 088 _manager.waitForCompletion(); 089 } 090 _manager.removeExecutionListener(this); 091 _manager = null; 092 _sinkActor = null; 093 _sourceActor = null; 094 ptolemy.data.expr.CachedMethod.clear(); 095 } 096 097 @Override 098 public void executionError(Manager manager, Throwable throwable) { 099 100 if(throwable instanceof IOException) { 101 _keplerManagerException = (IOException) throwable; 102 } else { 103 _keplerManagerException = new IOException(throwable); 104 } 105 _stubThread.interrupt(); 106 } 107 108 @Override 109 public void executionFinished(Manager manager) { 110 // TODO Auto-generated method stub 111 112 } 113 114 @Override 115 public void managerStateChanged(Manager manager) { 116 // TODO Auto-generated method stub 117 118 } 119 120 /** The workflow to execute. */ 121 protected CompositeActor _model; 122 123 /** Manager to execute the workflow. */ 124 protected Manager _manager; 125 126 /** If true, the entire workflow lifecycle is executed for each input. */ 127 protected boolean _runWorkflowLifecyclePerInput = false; 128 129 /** The actor writing input to the workflow. */ 130 protected StubSourceActor _sourceActor; 131 132 /** The actor reading output from the workflow. */ 133 protected StubSinkActor _sinkActor; 134 135 /** If the workflow is running in a separate thread 136 * (_runWorkflowLifecyclePerInput = false) and an exception 137 * is thrown, this field is set to that exception. 138 */ 139 protected IOException _keplerManagerException; 140 141 /** The thread executing the stub (not the workflow). */ 142 private Thread _stubThread; 143 144 private final static Logger _logger = Logger.getLogger(KeplerApp4Map.class.getName()); 145 146}