001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34658 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import org.apache.spark.ml.feature.VectorAssembler; 033import org.apache.spark.sql.Dataset; 034import org.apache.spark.sql.Row; 035import org.kepler.spark.actor.SparkSQLActor; 036 037import ptolemy.actor.TypedIOPort; 038import ptolemy.data.BooleanToken; 039import ptolemy.data.ObjectToken; 040import ptolemy.data.expr.Parameter; 041import ptolemy.data.expr.StringParameter; 042import ptolemy.data.type.BaseType; 043import ptolemy.data.type.ObjectType; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.IllegalActionException; 046import ptolemy.kernel.util.NameDuplicationException; 047import ptolemy.kernel.util.SingletonAttribute; 048 049 050/** 051 * @author Jiaxin Li 052 * 053 * VectorAssembler. 054 * Used to create a vector column and append it to the incoming DataFrame. 055 * The user must specify the columns to assemble into vectors. 056 */ 057public class VectorAssemble extends SparkSQLActor { 058 059 public VectorAssemble(CompositeEntity container, String name) 060 throws IllegalActionException, NameDuplicationException { 061 super(container, name); 062 063 inData = new TypedIOPort(this, "inData", true, false); 064 inData.setTypeEquals(new ObjectType(Dataset.class)); 065 new SingletonAttribute(inData, "_showName"); 066 067 outData = new TypedIOPort(this, "outData", false, true); 068 outData.setTypeEquals(new ObjectType(Dataset.class)); 069 new SingletonAttribute(outData, "_showName"); 070 071 colsAssembled = new StringParameter(this, "colsAssembled"); 072 073 vecColName = new StringParameter(this, "vecColName"); 074 vecColName.setToken("in_vector"); 075 076 convertToDouble = new Parameter(this, "convertToDouble"); 077 convertToDouble.setTypeEquals(BaseType.BOOLEAN); 078 convertToDouble.setToken(BooleanToken.TRUE); 079 080 } 081 082 083 /* 084 * This actor standardizes the values in a Dataset, and outputs a Dataset 085 * containing a column 'scaledFeatures' containing a vector of the 086 * standardize feature values. 087 */ 088 @Override 089 public void fire() throws IllegalActionException { 090 091 super.fire(); 092 093 // Read input data frame 094 Dataset<Row> df = 095 (Dataset<Row>)((ObjectToken)inData.get(0)).getValue(); 096 097 // get the names of all data columns and put them in the colNames array 098 // these will be used when assembling the features column vector. 099 String[] colNames = 100 colsAssembled.stringValue().trim().split("\\s*,\\s*"); 101 102 // Optionally convert all data columns in inDf to DoubleType 103 // NOTE: this is to eliminate the type incompatibiilties as the input 104 // data fields may contain different numeric types 105 if(convertToDouble.getToken() == BooleanToken.TRUE) 106 for(String col : colNames) 107 df = df.withColumn(col, df.col(col).cast("Double")); 108 109 // Append a "features" column to the end of the DataFrame, containing a 110 // VectorUDT of all the specified columns in the row. 111 VectorAssembler assembler = new VectorAssembler() 112 .setInputCols(colNames) 113 .setOutputCol(vecColName.stringValue().trim()); 114 df = assembler.transform(df); 115 116 // Output the standardized dataframe 117 outData.broadcast(new ObjectToken(df, Dataset.class)); 118 } 119 120 121 /** Input DataFrame. */ 122 public TypedIOPort inData; 123 124 /** Standardized data set as an RDD of Vectors. */ 125 public TypedIOPort outData; 126 127 /** Columns to assemble into feature vectors. */ 128 public StringParameter colsAssembled; 129 130 /** Name of the output column containing standardized vectors */ 131 public StringParameter vecColName; 132 133 /** Option to convert all data columns to type double */ 134 public Parameter convertToDouble; 135 136}