001/* An operator for token sources.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-11-05 00:27:41 +0000 (Thu, 05 Nov 2015) $' 
008 * '$Revision: 34216 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.operator;
031
032import java.util.LinkedList;
033import java.util.List;
034
035import org.apache.spark.api.java.JavaPairRDD;
036import org.kepler.spark.stub.StubUtilities;
037
038import ptolemy.data.ArrayToken;
039import ptolemy.data.Token;
040import ptolemy.kernel.util.IllegalActionException;
041import scala.Tuple2;
042
043/** An data source that reads Ptolemy tokens.
044 * 
045 *  @author Daniel Crawl
046 *  @version $Id: TokenSource.java 34216 2015-11-05 00:27:41Z crawl $
047 */
048public class TokenSource extends DataSource {
049
050    /** Create a new TokenSource.
051     *  @param token the input token
052     *  @param name the name of the operator.
053     */
054    public TokenSource(ArrayToken token, String name) throws IllegalActionException {
055        super(null, name);
056        
057        _tokenArray = token;
058    }
059
060    /** Execute the operator. */
061    @Override
062    public JavaPairRDD<Object, ?> execute() throws IllegalActionException {
063        // convert the array token into a list of tuples.
064        List<Tuple2<Object,Object>> tuples = new LinkedList<Tuple2<Object,Object>>();
065        for(Token token : _tokenArray.arrayValue()) {
066            Object object = StubUtilities.convertTokenToObject(token);
067            tuples.add(new Tuple2<Object,Object>(object,null));
068        }
069        if(_numInstances < 1) {
070            return _context.parallelizePairs(tuples);
071        } else {
072            return _context.parallelizePairs(tuples, _numInstances);
073        }
074    }
075
076    /** The input token. */
077    private ArrayToken _tokenArray;
078}