001/* A Spark Map stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.kepler.ddp.actor.pattern.stub.MapInput;
037
038import ptolemy.data.Token;
039import ptolemy.kernel.util.IllegalActionException;
040import scala.Tuple2;
041
042/** A Spark Map stub that runs a Kepler workflow.
043 * 
044 *  @author Daniel Crawl
045 *  @version $Id: KeplerMapStub.java 34532 2016-10-17 19:14:15Z crawl $
046 *  
047 */
048public class KeplerMapStub extends KeplerPairFlatMapFunction<Iterator<Tuple2<Object,Object>>,Object,Object> {
049
050    public KeplerMapStub() {
051        super("MapInput", "MapOutput");
052    }
053    
054    /** Transfer the data from Spark to Kepler, execute the workflow,
055     *  and then transfer the data from Kepler to Spark.
056     */
057    @Override
058    public Iterator<Tuple2<Object, Object>> call(Iterator<Tuple2<Object,Object>> iterator)
059            throws Exception {    
060
061        final List<Tuple2<Object, Object>> retval = new LinkedList<Tuple2<Object, Object>>();
062        
063        // this method can be called with no input, so only proceed if
064        // input is present.
065        if(!iterator.hasNext()) {
066            return retval.iterator();
067        }
068
069        _initialize();
070        
071        MapInput mapSourceActor = (MapInput) _sourceActor;
072        
073        while(iterator.hasNext()) {
074            
075            Tuple2<?,?> inputTuple = iterator.next();
076            
077            // place the input in the input actor
078            
079            Token keyToken = null;
080            Token valueToken = null;
081            
082            // see if there's only one field
083            if(inputTuple._2 == null) {
084                // since there's only one field, assume it is the value
085                valueToken = StubUtilities.convertToToken(inputTuple._1);
086            } else {
087                keyToken = StubUtilities.convertToToken(inputTuple._1);
088                valueToken = StubUtilities.convertToToken(inputTuple._2);
089            }
090            
091            mapSourceActor.setInput(keyToken, valueToken);
092            
093            //LOG.info("map stub: key = " + keyToken);
094            //LOG.info("map stub: value = " + valueToken);
095            
096            if(_runWorkflowLifecyclePerInput) {
097                _manager.execute();
098            }
099            
100            // read the output from the output actor
101            
102            try {
103                final List<Token> tokenList = _sinkActor.getOutput();
104                if(tokenList != null) {
105                    StubUtilities.convertTokenToTupleList(tokenList, retval);
106                } else if(_keplerManagerException != null) {
107                    throw _keplerManagerException;
108                }
109            } catch (IllegalActionException e) {
110                throw new RuntimeException("Error getting output for " + _sinkActor.getName() + ".", e);
111            }
112            
113        }
114        
115        _cleanup();
116        
117        return retval.iterator();
118    }
119}