001/* Create an RDD of LabeledPoints from a text file.
002 * 
003 * Copyright (c) 2015 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-08-31 18:04:42 +0000 (Mon, 31 Aug 2015) $' 
008 * '$Revision: 33837 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.mllib;
031
032import java.io.File;
033import java.util.regex.Pattern;
034
035import org.apache.spark.api.java.JavaRDD;
036import org.apache.spark.api.java.function.Function;
037import org.apache.spark.mllib.linalg.Vectors;
038import org.apache.spark.mllib.regression.LabeledPoint;
039import org.kepler.spark.actor.SparkBaseActor;
040
041import ptolemy.actor.TypedIOPort;
042import ptolemy.actor.parameters.FilePortParameter;
043import ptolemy.data.ObjectToken;
044import ptolemy.data.type.BaseType;
045import ptolemy.data.type.ObjectType;
046import ptolemy.kernel.CompositeEntity;
047import ptolemy.kernel.util.IllegalActionException;
048import ptolemy.kernel.util.NameDuplicationException;
049
050/** Create an RDD of LabeledPoints from a text file with space-delimited values.
051 * 
052 *  @author Mai H. Nguyen and Ankush Agrawal
053 *  @version $Id: CreateLabeledPoint.java 33837 2015-08-31 18:04:42Z crawl $
054 */
055public class CreateLabeledPoint extends SparkBaseActor {
056
057    public CreateLabeledPoint(CompositeEntity container, String name)
058            throws IllegalActionException, NameDuplicationException {
059        super(container, name);
060        
061        in = new FilePortParameter(this, "in");
062        in.setTypeEquals(BaseType.STRING);
063
064        out = new TypedIOPort(this, "out", false, true);
065        out.setTypeEquals(new ObjectType(JavaRDD.class));
066
067    }
068
069    @Override
070    public void fire() throws IllegalActionException {
071        
072        super.fire();
073
074        in.update();
075        File file = in.asFile();
076        
077        if(!file.exists()) {
078            throw new IllegalActionException(this, "Input file does not exist: " + file);
079        }
080        
081        JavaRDD<String> lines = _context.textFile(file.getAbsolutePath());
082        JavaRDD<LabeledPoint> points = lines.map(new ParsePoint());
083        out.broadcast(new ObjectToken(points, JavaRDD.class));
084    }
085        
086    /** Input file name. */
087    public FilePortParameter in;
088    
089    /** RDD of LabeledPoints. */
090    public TypedIOPort out;
091 
092    /** Class to parse a line of values to extract label and features. **/
093    private static class ParsePoint implements Function<String, LabeledPoint> {
094                private static final long serialVersionUID = 8331277031172797885L;
095        private static final Pattern SPACE = Pattern.compile(" ");
096
097                @Override
098        public LabeledPoint call(String line) {
099                        
100                        String[] values = SPACE.split(line);
101                        int      numValues = values.length;
102                        double   label;
103                        double[] features = new double[numValues-1];
104                        
105                        label = Double.parseDouble(values[0]);
106                        for (int i=0; i < (numValues-1); i++) {
107                                features[i] = Double.parseDouble(values[i+1]);
108                        }
109                                        
110                        LabeledPoint lp = new LabeledPoint(label,Vectors.dense(features));
111                        return lp;
112                                        
113        }
114    }    
115}