001/* Create an RDD of Vectors from a text file.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-25 19:23:47 +0000 (Fri, 25 Sep 2015) $' 
008 * '$Revision: 33972 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.mllib;
031
032import java.io.File;
033import java.util.regex.Pattern;
034
035import org.apache.spark.api.java.JavaRDD;
036import org.apache.spark.api.java.function.Function;
037import org.apache.spark.mllib.linalg.Vector;
038import org.apache.spark.mllib.linalg.Vectors;
039import org.kepler.spark.actor.SparkBaseActor;
040
041import ptolemy.actor.TypedIOPort;
042import ptolemy.actor.parameters.FilePortParameter;
043import ptolemy.data.ObjectToken;
044import ptolemy.data.StringToken;
045import ptolemy.data.type.BaseType;
046import ptolemy.data.type.ObjectType;
047import ptolemy.kernel.CompositeEntity;
048import ptolemy.kernel.util.IllegalActionException;
049import ptolemy.kernel.util.NameDuplicationException;
050import ptolemy.kernel.util.SingletonAttribute;
051
052/** Create an RDD of Vectors from a text file or from an RDD of Strings.
053 * 
054 *  @author Daniel Crawl
055 *  @version $Id: CreateVectorRDD.java 33972 2015-09-25 19:23:47Z crawl $
056 */
057public class CreateVectorRDD extends SparkBaseActor {
058
059    public CreateVectorRDD(CompositeEntity container, String name)
060            throws IllegalActionException, NameDuplicationException {
061        super(container, name);
062        
063        in = new FilePortParameter(this, "in");
064        in.setTypeEquals(BaseType.STRING);
065        new SingletonAttribute(in.getPort(), "_showName");
066        
067        inRDD = new TypedIOPort(this, "inRDD", true, false);
068        inRDD.setTypeEquals(new ObjectType(JavaRDD.class));
069        new SingletonAttribute(inRDD, "_showName");
070
071        out = new TypedIOPort(this, "out", false, true);
072        out.setTypeEquals(new ObjectType(JavaRDD.class));
073
074    }
075
076    @Override
077    public void preinitialize() throws IllegalActionException {
078        
079        super.preinitialize();
080        
081        if (inRDD.numberOfSources() > 0 && (in.getPort().numberOfSources() > 0 ||
082            !((StringToken)in.getToken()).stringValue().trim().isEmpty())) {
083            throw new IllegalActionException(this,
084               "Actor can have either filename or RDD<String> as input, not both.");
085        }
086    }
087    
088    @Override
089    public void fire() throws IllegalActionException {
090        
091        super.fire();
092        
093        // Process file input
094        if (inRDD.numberOfSources() == 0) {
095                in.update();
096                File file = in.asFile();
097        
098                if(!file.exists()) {
099                        throw new IllegalActionException(this, "Input file does not exist: " + file);
100                }
101        
102                JavaRDD<String> lines = _context.textFile(file.getAbsolutePath());
103            JavaRDD<Vector> points = lines.map(new ParsePoint());
104            out.broadcast(new ObjectToken(points, JavaRDD.class));
105            
106        // Process RDD<String> input
107        } else {
108            JavaRDD<String> lines = (JavaRDD<String>) ((ObjectToken)inRDD.get(0)).getValue();
109            JavaRDD<Vector> points = lines.map(new ParsePoint());
110            out.broadcast(new ObjectToken(points, JavaRDD.class));
111        }
112                
113    }
114        
115    /** The input file name. */
116    public FilePortParameter in;
117    
118    /** The input RDD of Strings. */
119    public TypedIOPort inRDD;
120    
121    /** An RDD of Vectors. */
122    public TypedIOPort out;
123        
124    private static class ParsePoint implements Function<String, Vector> {
125        private static final Pattern SPACE = Pattern.compile("\\s+");
126
127        @Override
128        public Vector call(String line) {
129            String[] tok = SPACE.split(line);
130            double[] point = new double[tok.length];
131            for (int i = 0; i < tok.length; ++i) {
132                point[i] = Double.parseDouble(tok[i]);
133            }
134            return Vectors.dense(point);
135        }
136    }    
137}