001/* A base class for operators in Spark jobs.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-07-03 17:22:34 +0000 (Thu, 03 Jul 2014) $' 
008 * '$Revision: 32815 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.operator;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.spark.api.java.JavaPairRDD;
034import org.kepler.ddp.Utilities;
035import org.kepler.spark.stub.PairFlatMapFunctionBase;
036
037import ptolemy.kernel.util.IllegalActionException;
038
039/** A base class for operators in a Spark job. Derived classes
040 *  need to provide implementations of execute().
041 * 
042 *  @author Daniel Crawl
043 *  @version $Id: Operator.java 32815 2014-07-03 17:22:34Z crawl $
044 */
045public abstract class Operator {
046
047    /** Create a new Operator.
048     *  @param numInputs the number of input operators
049     *  @param stub the function to execute
050     *  @param name the name of the operator
051     */
052    public Operator(int numInputs, Object stub, String name) {
053        _inputs = new Operator[numInputs];
054        _stub = stub;
055        _name = name;
056        _configuration = new Configuration();
057    }
058    
059    /** Get the number of input operators. */
060    public int numInputs() {
061        return _inputs.length;
062    }
063
064    /** Get the configuration parameters. */
065    public Configuration getParameters() {
066        return _configuration;
067    }
068
069    /** Set the (first) input operator. */
070    public void setInput(Operator output) {
071        setInput(0, output);
072    }
073
074    /** Set the input operator at an index. */
075    public void setInput(int index, Operator output) {
076        if(index < 0) {
077            throw new IllegalArgumentException("Index must be greater than or equal to 0.");
078        }
079        
080        if(index > numInputs()) {
081            throw new IllegalArgumentException("Index must be less than " + numInputs());
082        }
083        
084        _inputs[index] = output;
085    }
086
087    /** Set the number of parallel instances to execute. */
088    public void setDegreeOfParallelism(int numInstances) {
089        _numInstances = numInstances;
090        _configuration.setInt(Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL, numInstances);
091    }
092
093    /** Execute the operator. */
094    public abstract JavaPairRDD<Object, ?> execute() throws IllegalActionException;
095    
096    /** Read the inputs and execute the operator to generate the output. */
097    protected JavaPairRDD<Object, ?> _getOutput() throws Exception {
098           
099        // see if we already have output data
100        if(_outputData == null) {
101
102            // read all the inputs
103            _inputData = new JavaPairRDD[numInputs()];
104            for(int i = 0; i < numInputs(); i++) {
105                
106                Operator input = _inputs[i];
107                if(input == null) {
108                    throw new Exception("Input " + i + " was not set for " + _name);
109                }
110                
111                // get the output from upstream operators
112                _inputData[i] = input._getOutput();
113            }
114            
115            // see if the stub needs the configuration to be serialized
116            if(_stub instanceof PairFlatMapFunctionBase) {
117                ((PairFlatMapFunctionBase<?,?,?>)_stub).setConfiguration(_configuration);
118            }
119
120            // call the operator to get our output
121            _outputData = execute();
122
123        }
124        
125        return _outputData;        
126    }
127        
128    ///////////////////////////////////////////////////////////////////
129    ////                      protected fields                   //////
130
131    /** The number of parallel instances for this operator. */
132    protected int _numInstances = 1;
133    
134    /** Configuration parameters. */
135    protected Configuration _configuration;
136    
137    /** The input data to this operator. */
138    protected JavaPairRDD<Object, ?>[] _inputData;
139    
140    /** The function to execute. */
141    protected Object _stub;
142    
143    ///////////////////////////////////////////////////////////////////
144    ////                      private fields                     //////
145
146    /** Input operator(s) to this operator. */
147    private Operator[] _inputs;
148
149    /** The output of this operator. */
150    private JavaPairRDD<Object, ?> _outputData;
151
152    /** The name of the operator. */
153    private String _name;
154}