001/* A Spark Reduce stub that executes a ScriptEngine.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import scala.Tuple2;
037
038/** A Spark Reduce stub that executes a ScriptEngine.
039 * 
040 *  @author Daniel Crawl
041 *  @version $Id: ScriptEngineReduceStub.java 34532 2016-10-17 19:14:15Z crawl $
042 *  
043 */
044public class ScriptEngineReduceStub 
045    extends ScriptEnginePairFlatMapFunction<Iterator<Tuple2<Object, List<Object>>>, Object, Object> {
046
047    /** Transfer the data from Spark to Kepler, execute the workflow,
048     *  and then transfer the data from Kepler to Spark.
049     */
050    @Override
051    public Iterator<Tuple2<Object, Object>> call(Iterator<Tuple2<Object, List<Object>>> iterator) throws Exception {
052
053        _initialize();
054        
055        final List<Tuple2<Object, Object>> retval = new LinkedList<Tuple2<Object, Object>>();
056
057        while(iterator.hasNext()) {
058            
059            Tuple2<Object, List<Object>> input = iterator.next();
060                        
061            //System.out.println("input key: " + key + " values = " + values);
062            
063            _invocable.invokeFunction("reduce",
064                    input._1,
065                    input._2,
066                    _resultKeys,
067                    _resultValues);
068                    
069            //System.out.println("result keys: " + _resultKeys);
070            //System.out.println("result values: " + _resultValues);
071            
072            // convert results to collector
073            StubUtilities.convertListsToTupleList(_resultKeys, _resultValues, retval);        
074
075            // clear the lists since the results are no longer needed.
076            _resultKeys.clear();
077            _resultValues.clear();                    
078        }
079        
080        _cleanup();
081        
082        return retval.iterator();
083        
084    }
085}