001/* A Spark Cross stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.kepler.ddp.actor.pattern.stub.CrossInput;
037
038import ptolemy.data.Token;
039import ptolemy.kernel.util.IllegalActionException;
040import scala.Tuple2;
041
042/** A Spark stub to run a Kepler workflow in a map after the cartesian operator.
043 * 
044 *  @author Daniel Crawl
045 *  @version $Id: KeplerCrossStub.java 34532 2016-10-17 19:14:15Z crawl $
046 *  
047 */
048public class KeplerCrossStub extends
049    KeplerPairFlatMapFunction<Iterator<Tuple2<Tuple2<Object,Object>,Tuple2<Object,Object>>>, Object, Object> {
050
051    public KeplerCrossStub() {
052        super("CrossInput", "CrossOutput");
053    }
054
055    /** Transfer the data from Spark to Kepler, execute the workflow,
056     *  and then transfer the data from Kepler to Spark.
057     */
058    @Override
059    public Iterator<Tuple2<Object, Object>> call(Iterator<Tuple2<Tuple2<Object,Object>,Tuple2<Object,Object>>> iterator)
060            throws Exception {
061
062        final List<Tuple2<Object, Object>> retval = new LinkedList<Tuple2<Object, Object>>();
063        
064        // this method can be called with no input, so only proceed if
065        // input is present.
066        if(!iterator.hasNext()) {
067            retval.iterator();
068        }
069        
070        _initialize();
071        
072        CrossInput crossInputActor = (CrossInput) _sourceActor;
073        
074
075        while(iterator.hasNext()) {
076            
077            Tuple2<Tuple2<Object,Object>,Tuple2<Object,Object>> inputTuple = iterator.next();
078            
079            // set the inputs
080            Token key1Token = null;
081            Token value1Token = null;
082            Token key2Token = null;
083            Token value2Token = null;
084            
085            // see if there's only one field
086            if(inputTuple._1._2 == null) {
087                // since there's only one field, assume it is the value
088                value1Token = StubUtilities.convertToToken(inputTuple._1._1);
089            } else {
090                key1Token = StubUtilities.convertToToken(inputTuple._1._1);
091                value1Token = StubUtilities.convertToToken(inputTuple._1._2);   
092            }
093            
094            // see if there's only one field
095            if(inputTuple._2._2 == null) {
096                // since there's only one field, assume it is the value
097                value2Token = StubUtilities.convertToToken(inputTuple._2._1);
098            } else {
099                key2Token = StubUtilities.convertToToken(inputTuple._2._1);
100                value2Token = StubUtilities.convertToToken(inputTuple._2._2);   
101            }
102    
103            crossInputActor.setInput(key1Token, value1Token, key2Token, value2Token);
104            
105            if(_runWorkflowLifecyclePerInput) {
106                _manager.execute();
107            }
108    
109            // read the output from the output actor
110            try {
111                final List<Token> tokenList = _sinkActor.getOutput();
112                if(tokenList != null) {
113                    StubUtilities.convertTokenToTupleList(tokenList, retval);
114                } else if(_keplerManagerException != null) {
115                    throw _keplerManagerException;
116                }
117            } catch (IllegalActionException e) {
118                throw new RuntimeException("Error getting output for " + _sinkActor.getName() + ".", e);
119            }        
120        }
121        
122        _cleanup();
123        
124        return retval.iterator();
125
126    }
127}