001/* A Spark Map stub that executes a ScriptEngine.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import scala.Tuple2;
037
038/** A Spark Map stub that executes a ScriptEngine.
039 * 
040 *  @author Daniel Crawl
041 *  @version $Id: ScriptEngineMapStub.java 34532 2016-10-17 19:14:15Z crawl $
042 *  
043 */
044public class ScriptEngineMapStub extends ScriptEnginePairFlatMapFunction<Iterator<Tuple2<Object,Object>>,Object,Object> {
045   
046    /** Transfer the data from Spark to Kepler, execute the workflow,
047     *  and then transfer the data from Kepler to Spark.
048     */
049    @Override
050    public Iterator<Tuple2<Object, Object>> call(Iterator<Tuple2<Object,Object>> iterator)
051            throws Exception {    
052
053        _initialize();
054                
055        final List<Tuple2<Object, Object>> out = new LinkedList<Tuple2<Object, Object>>();
056        
057        while(iterator.hasNext()) {
058            
059            Tuple2<?,?> inputTuple = iterator.next();
060            
061            // see if there's only one field
062            if(inputTuple._2 == null) {
063                //System.out.println("input: " + value);
064                _invocable.invokeFunction("map",
065                        StubUtilities.convertToObject(inputTuple._1),
066                        _resultKeys,
067                        _resultValues);
068            } else {
069                _invocable.invokeFunction("map",
070                        StubUtilities.convertToObject(inputTuple._1),
071                        StubUtilities.convertToObject(inputTuple._2),
072                        _resultKeys,
073                        _resultValues);
074            }
075                    
076            //System.out.println("result keys: " + resultKeys);
077            //System.out.println("result values: " + resultValues);
078            
079            // convert results to collector
080            StubUtilities.convertListsToTupleList(_resultKeys, _resultValues, out);    
081            
082            // clear the lists since the results are no longer needed.
083            _resultKeys.clear();
084            _resultValues.clear();
085        }
086        
087        _cleanup();
088        
089        return out.iterator();
090    }
091}