001/* An operator for reduce. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-11-05 00:27:41 +0000 (Thu, 05 Nov 2015) $' 008 * '$Revision: 34216 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.spark.operator; 031 032import java.util.Iterator; 033 034import org.apache.spark.api.java.JavaPairRDD; 035import org.apache.spark.api.java.function.Function2; 036import org.apache.spark.api.java.function.PairFlatMapFunction; 037 038import scala.Tuple2; 039 040/** An operator for reduce. 041 * 042 * @author Daniel Crawl 043 * @version $Id: ReduceOperator.java 34216 2015-11-05 00:27:41Z crawl $ 044 */ 045public class ReduceOperator extends Operator { 046 047 /** Create a new ReduceOperator that will not generate new keys. 048 * @param stub the stub class to run in the reduce operator 049 * @param name the operator name 050 */ 051 public ReduceOperator(Function2<?,?,?> stub, String name) { 052 super(1, stub, name); 053 } 054 055 /** Create a new ReduceOperator that can generate new keys. 056 * @param stub the stub class to run in the reduce operator 057 * @param name the operator name 058 */ 059 public ReduceOperator(PairFlatMapFunction<Iterator<Tuple2<Object, Iterable<Object>>>, Object, Object> stub, String name) { 060 super(1, stub, name); 061 _newKeys = true; 062 } 063 064 /** Execute the operator. */ 065 @Override 066 public JavaPairRDD<Object, ?> execute() { 067 if(_newKeys) { 068 if(_numInstances < 1) { 069 return _inputData[0].groupByKey().mapPartitionsToPair((PairFlatMapFunction)_stub); 070 } else { 071 return _inputData[0].groupByKey(_numInstances).mapPartitionsToPair((PairFlatMapFunction)_stub); 072 } 073 } else if(_numInstances < 1) { 074 return _inputData[0].reduceByKey((Function2)_stub); 075 } else { 076 return _inputData[0].reduceByKey((Function2)_stub, _numInstances); 077 } 078 } 079 080 /////////////////////////////////////////////////////////////////// 081 //// private fields ////// 082 083 /** If true, the stub is a reduce function that may generate 084 * a new set of keys. If false, the reduce function never 085 * will generate new keys. 086 */ 087 private boolean _newKeys = false; 088 089}