001/* A Spark Match/Join stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.kepler.ddp.actor.pattern.stub.MatchInput;
037
038import ptolemy.data.Token;
039import ptolemy.kernel.util.IllegalActionException;
040import scala.Tuple2;
041
042/** A Spark stub to run a Kepler workflow in a map after the join/match operator.
043 * 
044 *  @author Daniel Crawl
045 *  @version $Id: KeplerMatchStub.java 34532 2016-10-17 19:14:15Z crawl $
046 *  
047 */
048public class KeplerMatchStub
049    extends KeplerPairFlatMapFunction<Iterator<Tuple2<Object,Tuple2<Object,Object>>>, Object, Object> {
050        
051    public KeplerMatchStub() {
052        super("MatchInput", "MatchOutput");
053    }
054
055    @Override
056    public Iterator<Tuple2<Object, Object>> call(
057            Iterator<Tuple2<Object, Tuple2<Object, Object>>> iterator) throws Exception {
058
059        final List<Tuple2<Object, Object>> retval = new LinkedList<Tuple2<Object, Object>>();
060        
061        // this method can be called with no input, so only proceed if
062        // input is present.
063        if(!iterator.hasNext()) {
064            return retval.iterator();
065        }
066
067        _initialize();
068
069        MatchInput matchInputActor = (MatchInput)_sourceActor;
070
071        while(iterator.hasNext()) {
072            
073            Tuple2<Object, Tuple2<Object, Object>> input = iterator.next();
074
075            // set the inputs
076            final Token keyToken = StubUtilities.convertToToken(input._1);
077            final Token value1Token = StubUtilities.convertToToken(input._2._1);
078            final Token value2Token = StubUtilities.convertToToken(input._2._2);
079            matchInputActor.setInput(keyToken, value1Token, value2Token);
080            
081            if(_runWorkflowLifecyclePerInput) {
082                _manager.execute();
083            }
084    
085            // read the output from the output actor
086            try {
087                final List<Token> tokenList = _sinkActor.getOutput();
088                if(tokenList != null) {
089                    StubUtilities.convertTokenToTupleList(tokenList, retval);
090                } else if(_keplerManagerException != null) {
091                    throw _keplerManagerException;
092                }
093            } catch (IllegalActionException e) {
094                throw new RuntimeException("Error getting output for " + _sinkActor.getName() + ".", e);
095            }        
096        }
097        
098        _cleanup();
099        
100        return retval.iterator();
101    }
102}