001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 
007 * '$Revision: 34658 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import org.apache.spark.ml.feature.VectorAssembler;
033import org.apache.spark.sql.Dataset;
034import org.apache.spark.sql.Row;
035import org.kepler.spark.actor.SparkSQLActor;
036
037import ptolemy.actor.TypedIOPort;
038import ptolemy.data.BooleanToken;
039import ptolemy.data.ObjectToken;
040import ptolemy.data.expr.Parameter;
041import ptolemy.data.expr.StringParameter;
042import ptolemy.data.type.BaseType;
043import ptolemy.data.type.ObjectType;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.IllegalActionException;
046import ptolemy.kernel.util.NameDuplicationException;
047import ptolemy.kernel.util.SingletonAttribute;
048
049
050/**
051 * @author Jiaxin Li
052 *
053 * VectorAssembler. 
054 * Used to create a vector column and append it to the incoming DataFrame. 
055 * The user must specify the columns to assemble into vectors. 
056 */
057public class VectorAssemble extends SparkSQLActor {
058
059    public VectorAssemble(CompositeEntity container, String name)
060        throws IllegalActionException, NameDuplicationException {
061        super(container, name);
062        
063        inData = new TypedIOPort(this, "inData", true, false);
064        inData.setTypeEquals(new ObjectType(Dataset.class));
065        new SingletonAttribute(inData, "_showName");
066        
067        outData = new TypedIOPort(this, "outData", false, true);
068        outData.setTypeEquals(new ObjectType(Dataset.class));
069        new SingletonAttribute(outData, "_showName");
070
071        colsAssembled = new StringParameter(this, "colsAssembled");
072
073        vecColName = new StringParameter(this, "vecColName");
074        vecColName.setToken("in_vector");
075
076        convertToDouble = new Parameter(this, "convertToDouble");
077        convertToDouble.setTypeEquals(BaseType.BOOLEAN);
078        convertToDouble.setToken(BooleanToken.TRUE);
079
080    }
081
082
083    /*
084     * This actor standardizes the values in a Dataset, and outputs a Dataset 
085     * containing a column 'scaledFeatures' containing a vector of the 
086     * standardize feature values.
087     */
088    @Override
089    public void fire() throws IllegalActionException {
090        
091        super.fire();
092        
093        // Read input data frame 
094        Dataset<Row> df = 
095            (Dataset<Row>)((ObjectToken)inData.get(0)).getValue();
096
097        // get the names of all data columns and put them in the colNames array
098        // these will be used when assembling the features column vector.
099        String[] colNames = 
100            colsAssembled.stringValue().trim().split("\\s*,\\s*");
101
102        // Optionally convert all data columns in inDf to DoubleType
103        // NOTE: this is to eliminate the type incompatibiilties as the input
104        //       data fields may contain different numeric types 
105        if(convertToDouble.getToken() == BooleanToken.TRUE)
106            for(String col : colNames)
107                df = df.withColumn(col, df.col(col).cast("Double"));
108
109        // Append a "features" column to the end of the DataFrame, containing a
110        // VectorUDT of all the specified columns in the row.
111        VectorAssembler assembler = new VectorAssembler()
112            .setInputCols(colNames)
113            .setOutputCol(vecColName.stringValue().trim());     
114        df = assembler.transform(df);
115        
116        // Output the standardized dataframe
117        outData.broadcast(new ObjectToken(df, Dataset.class));
118    }
119    
120
121    /** Input DataFrame. */
122    public TypedIOPort inData;
123    
124    /** Standardized data set as an RDD of Vectors. */
125    public TypedIOPort outData;
126    
127    /** Columns to assemble into feature vectors. */
128    public StringParameter colsAssembled;
129
130    /** Name of the output column containing standardized vectors */
131    public StringParameter vecColName;
132
133    /** Option to convert all data columns to type double */ 
134    public Parameter convertToDouble; 
135
136}