001/* 
002 * Copyright (c) 2015 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-06-04 22:39:55 +0000 (Thu, 04 Jun 2015) $' 
007 * '$Revision: 33462 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.spark.mllib;
030
031import java.util.regex.Pattern;
032
033import org.apache.spark.api.java.JavaRDD;
034import org.apache.spark.api.java.function.Function;
035import org.apache.spark.mllib.linalg.Vector;
036import org.apache.spark.mllib.linalg.Vectors;
037
038import ptolemy.actor.TypedAtomicActor;
039import ptolemy.actor.TypedIOPort;
040import ptolemy.actor.parameters.PortParameter;
041import ptolemy.data.ArrayToken;
042import ptolemy.data.IntToken;
043import ptolemy.data.ObjectToken;
044import ptolemy.data.StringToken;
045import ptolemy.data.Token;
046import ptolemy.data.type.ArrayType;
047import ptolemy.data.type.BaseType;
048import ptolemy.data.type.ObjectType;
049import ptolemy.kernel.CompositeEntity;
050import ptolemy.kernel.util.IllegalActionException;
051import ptolemy.kernel.util.NameDuplicationException;
052import ptolemy.kernel.util.SingletonAttribute;
053
054/** Takes a JavaRDD<String> dataset, an array of column indices and a
055 *  separator string. It extracts specified columns from the passed
056 *  dataset and creates an RDD<Vector> of them.
057 * 
058 *  @author Tahereh Masoumi
059 *  @version $Id: ExtractColumns.java 33462 2015-06-04 22:39:55Z crawl $
060 *  
061 */
062public class ExtractColumns extends TypedAtomicActor {
063
064    public ExtractColumns(CompositeEntity container, String name)
065            throws IllegalActionException, NameDuplicationException {
066        super(container, name);
067        
068        in = new TypedIOPort(this, "in", true, false);
069        in.setTypeEquals(new ObjectType(JavaRDD.class));
070        new SingletonAttribute(in, "_showName");
071
072        ColumnIndices = new TypedIOPort(this, "ColumnIndices", true, false);
073        ColumnIndices.setTypeAtLeast(ArrayType.ARRAY_BOTTOM);
074        new SingletonAttribute(ColumnIndices, "_showName");
075
076        sep = new PortParameter(this,"sep");
077        sep.setTypeEquals(BaseType.STRING);
078        sep.setStringMode(true);
079        new SingletonAttribute(sep.getPort(), "_showName");
080
081        out = new TypedIOPort(this, "out", false, true);
082        out.setTypeEquals(new ObjectType(JavaRDD.class));
083        new SingletonAttribute(out, "_showName");
084
085        test =  new TypedIOPort(this, "test", false, true);
086        test.setTypeEquals(new ObjectType(Vector.class));
087        new SingletonAttribute(test, "_showName");
088
089    }
090
091    @Override
092    public void fire() throws IllegalActionException {
093        
094        super.fire();
095        sep.update();
096        String separator = ((StringToken)sep.getToken()).stringValue();
097        final JavaRDD<String> lines = (JavaRDD<String>) ((ObjectToken)in.get(0)).getValue();
098        final Token[] idxs;       
099        
100        //ColumnIndices.update();
101        if (ColumnIndices.hasToken(0)) {
102                
103            ArrayToken token = (ArrayToken) ColumnIndices.get(0);
104            idxs = token.arrayValue();
105            JavaRDD<Vector> points = lines.map(new ParsePoint(idxs,separator));
106            out.broadcast(new ObjectToken(points, JavaRDD.class));
107            test.broadcast(new ObjectToken(points.first(),Vector.class));
108        
109        } else {
110            
111            JavaRDD<Vector> points = lines.map(new ParsePoint(separator));
112            out.broadcast(new ObjectToken(points, JavaRDD.class));
113            test.broadcast(new ObjectToken(points.first(),Vector.class));
114        }
115    }
116    
117    /** The input JavaRDD <String> dataset. */
118    public TypedIOPort in;
119    
120    /** An RDD of Vectors. */
121    public TypedIOPort out;
122    
123    /** An Array of column indices.*/
124    public TypedIOPort ColumnIndices;
125  
126    /** string column separator.*/
127    public PortParameter sep;
128    
129    /** Outputs the first row of the new dataset only for test purpose*/
130    public TypedIOPort test;
131    
132    
133    private static class ParsePoint implements Function<String, Vector> {
134        
135        private static Token[] idxs;
136        
137        private static Pattern SPACE;
138        
139        public ParsePoint(Token[] _idxs,String separator) {
140                idxs = _idxs;
141                SPACE = Pattern.compile("\\"+separator+"+");
142        }
143        
144        public ParsePoint(String separator){
145                SPACE = Pattern.compile(separator);     
146        }
147
148        @Override
149        public Vector call(String line) {
150            String[] tok = SPACE.split(line);
151            
152            double[] point;
153            
154            /** if no column index is passed it returns the vector of the whole dataset. */
155            if(idxs.equals(null)){
156                    point = new double[tok.length];
157                    for (int i = 0; i < tok.length; ++i) {
158                        point[i] = Double.parseDouble(tok[i]);
159                        
160                    }
161            } else {
162
163                point = new double[idxs.length];
164                
165                for (int i = 0; i < idxs.length; ++i) {
166                        point[i] = Double.parseDouble(tok[((IntToken)idxs[i]).intValue()]);
167                }
168            }
169            
170            return Vectors.dense(point);
171        }
172    }
173}