001/* A Spark CoGroup stub that runs a Kepler workflow.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2016-10-17 19:14:15 +0000 (Mon, 17 Oct 2016) $' 
008 * '$Revision: 34532 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.stub;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.LinkedList;
035import java.util.List;
036
037import org.kepler.ddp.actor.pattern.stub.CoGroupInput;
038
039import ptolemy.data.ArrayToken;
040import ptolemy.data.Token;
041import ptolemy.data.type.ArrayType;
042import ptolemy.data.type.RecordType;
043import ptolemy.kernel.util.IllegalActionException;
044import scala.Tuple2;
045
046/** A Spark stub to run a Kepler workflow in a map after the cogroup operator.
047 * 
048 *  @author Daniel Crawl
049 *  @version $Id: KeplerCoGroupStub.java 34532 2016-10-17 19:14:15Z crawl $
050 *  
051 */
052public class KeplerCoGroupStub
053    extends KeplerPairFlatMapFunction<Iterator<Tuple2<Object,Tuple2<Iterable<Object>,Iterable<Object>>>>,Object,Object> {
054        
055    public KeplerCoGroupStub() {
056        super("CoGroupInput", "CoGroupOutput");
057    }
058
059    @Override
060    public Iterator<Tuple2<Object, Object>> call(
061            Iterator<Tuple2<Object, Tuple2<Iterable<Object>, Iterable<Object>>>> iterator) throws Exception {
062
063        _initialize();
064        
065        CoGroupInput coGroupInputActor = (CoGroupInput) _sourceActor;
066        final List<Tuple2<Object, Object>> retval = new LinkedList<Tuple2<Object, Object>>();
067
068        while(iterator.hasNext()) {
069            
070            Tuple2<Object, Tuple2<Iterable<Object>, Iterable<Object>>> input = iterator.next();
071
072            Token keyToken = StubUtilities.convertToToken(input._1);
073            
074            // see if values1 is empty
075            Token values1Token;
076            Iterator<?> values1Iterator = input._2._1.iterator();
077            if(!values1Iterator.hasNext()) {
078                values1Token = new ArrayToken(((RecordType)((ArrayType)coGroupInputActor.in.getType()).getElementType()).get("value"));
079            } else {
080                final List<Token> tokenList = new ArrayList<Token>();
081                while(values1Iterator.hasNext()) {
082                    tokenList.add(StubUtilities.convertToToken(values1Iterator.next()));
083                }
084                try {
085                    values1Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()]));
086                } catch (IllegalActionException e) {
087                    throw new RuntimeException("Error creating array token.", e);
088                }
089    
090            }
091    
092            // see if values2 is empty
093            Token values2Token;
094            Iterator<?> values2Iterator = input._2._2.iterator();
095            if(!values2Iterator.hasNext()) {
096                values2Token = new ArrayToken(((RecordType)((ArrayType)coGroupInputActor.in2.getType()).getElementType()).get("value"));
097            } else {
098                final List<Token> tokenList = new ArrayList<Token>();
099                while(values2Iterator.hasNext()) {                                    
100                    tokenList.add(StubUtilities.convertToToken(values2Iterator.next()));
101                }
102                try {
103                    values2Token = new ArrayToken(tokenList.toArray(new Token[tokenList.size()]));
104                } catch (IllegalActionException e) {
105                    throw new RuntimeException("Error creating array token.", e);
106                }
107            }
108            
109            // set the inputs
110            coGroupInputActor.setInput(keyToken, values1Token, values2Token);
111            
112            if(_runWorkflowLifecyclePerInput) {
113                _manager.execute();
114            }
115    
116            // read the output from the output actor
117            try {
118                final List<Token> tokenList = _sinkActor.getOutput();
119                if(tokenList != null) {
120                    StubUtilities.convertTokenToTupleList(tokenList, retval);
121                } else if(_keplerManagerException != null) {
122                    throw _keplerManagerException;
123                }
124            } catch (IllegalActionException e) {
125                throw new RuntimeException("Error getting output for " + _sinkActor.getName() + ".", e);
126            }   
127        }
128        
129        _cleanup();
130        
131        return retval.iterator();
132    }
133}