001/* A base class for operators in Spark jobs. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-07-03 17:22:34 +0000 (Thu, 03 Jul 2014) $' 008 * '$Revision: 32815 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.spark.operator; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.spark.api.java.JavaPairRDD; 034import org.kepler.ddp.Utilities; 035import org.kepler.spark.stub.PairFlatMapFunctionBase; 036 037import ptolemy.kernel.util.IllegalActionException; 038 039/** A base class for operators in a Spark job. Derived classes 040 * need to provide implementations of execute(). 041 * 042 * @author Daniel Crawl 043 * @version $Id: Operator.java 32815 2014-07-03 17:22:34Z crawl $ 044 */ 045public abstract class Operator { 046 047 /** Create a new Operator. 048 * @param numInputs the number of input operators 049 * @param stub the function to execute 050 * @param name the name of the operator 051 */ 052 public Operator(int numInputs, Object stub, String name) { 053 _inputs = new Operator[numInputs]; 054 _stub = stub; 055 _name = name; 056 _configuration = new Configuration(); 057 } 058 059 /** Get the number of input operators. */ 060 public int numInputs() { 061 return _inputs.length; 062 } 063 064 /** Get the configuration parameters. */ 065 public Configuration getParameters() { 066 return _configuration; 067 } 068 069 /** Set the (first) input operator. */ 070 public void setInput(Operator output) { 071 setInput(0, output); 072 } 073 074 /** Set the input operator at an index. */ 075 public void setInput(int index, Operator output) { 076 if(index < 0) { 077 throw new IllegalArgumentException("Index must be greater than or equal to 0."); 078 } 079 080 if(index > numInputs()) { 081 throw new IllegalArgumentException("Index must be less than " + numInputs()); 082 } 083 084 _inputs[index] = output; 085 } 086 087 /** Set the number of parallel instances to execute. */ 088 public void setDegreeOfParallelism(int numInstances) { 089 _numInstances = numInstances; 090 _configuration.setInt(Utilities.CONFIGURATION_KEPLER_PARA_PARALLEL, numInstances); 091 } 092 093 /** Execute the operator. */ 094 public abstract JavaPairRDD<Object, ?> execute() throws IllegalActionException; 095 096 /** Read the inputs and execute the operator to generate the output. */ 097 protected JavaPairRDD<Object, ?> _getOutput() throws Exception { 098 099 // see if we already have output data 100 if(_outputData == null) { 101 102 // read all the inputs 103 _inputData = new JavaPairRDD[numInputs()]; 104 for(int i = 0; i < numInputs(); i++) { 105 106 Operator input = _inputs[i]; 107 if(input == null) { 108 throw new Exception("Input " + i + " was not set for " + _name); 109 } 110 111 // get the output from upstream operators 112 _inputData[i] = input._getOutput(); 113 } 114 115 // see if the stub needs the configuration to be serialized 116 if(_stub instanceof PairFlatMapFunctionBase) { 117 ((PairFlatMapFunctionBase<?,?,?>)_stub).setConfiguration(_configuration); 118 } 119 120 // call the operator to get our output 121 _outputData = execute(); 122 123 } 124 125 return _outputData; 126 } 127 128 /////////////////////////////////////////////////////////////////// 129 //// protected fields ////// 130 131 /** The number of parallel instances for this operator. */ 132 protected int _numInstances = 1; 133 134 /** Configuration parameters. */ 135 protected Configuration _configuration; 136 137 /** The input data to this operator. */ 138 protected JavaPairRDD<Object, ?>[] _inputData; 139 140 /** The function to execute. */ 141 protected Object _stub; 142 143 /////////////////////////////////////////////////////////////////// 144 //// private fields ////// 145 146 /** Input operator(s) to this operator. */ 147 private Operator[] _inputs; 148 149 /** The output of this operator. */ 150 private JavaPairRDD<Object, ?> _outputData; 151 152 /** The name of the operator. */ 153 private String _name; 154}