001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-06 19:09:58 +0000 (Tue, 06 Feb 2018) $' 007 * '$Revision: 34656 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import org.apache.spark.sql.Column; 033import org.apache.spark.sql.Dataset; 034import org.apache.spark.sql.Row; 035import org.kepler.spark.actor.SparkSQLActor; 036 037import ptolemy.actor.TypedIOPort; 038import ptolemy.data.ObjectToken; 039import ptolemy.data.expr.StringParameter; 040import ptolemy.data.type.ObjectType; 041import ptolemy.kernel.CompositeEntity; 042import ptolemy.kernel.util.IllegalActionException; 043import ptolemy.kernel.util.NameDuplicationException; 044import ptolemy.kernel.util.SingletonAttribute; 045 046/** 047 * @author Dylan Uys, Jiaxin Li 048 * 049 * This actor converts the values in specified columns from one unit 050 * of measurement to another. Accomplished by performing element wise 051 * multiplication on the specified columns by a factor determined by what input 052 * units and output units are selected (currently only supports meters/second 053 * and miles/hour) 054 */ 055public class ConvertColumns extends SparkSQLActor { 056 057 public ConvertColumns(CompositeEntity container, String name) 058 throws IllegalActionException, NameDuplicationException { 059 super(container, name); 060 061 // initialize ports and parameters 062 inData = new TypedIOPort(this, "inData", true, false); 063 inData.setTypeEquals(new ObjectType(Dataset.class)); 064 new SingletonAttribute(inData, "_showName"); 065 066 outData = new TypedIOPort(this, "outData", false, true); 067 outData.setTypeEquals(new ObjectType(Dataset.class)); 068 new SingletonAttribute(outData, "_showName"); 069 070 first = new TypedIOPort(this, "first", false, true); 071 first.setTypeEquals(new ObjectType(Row.class)); 072 new SingletonAttribute(first, "_showName"); 073 074 columnNames = new StringParameter(this, "columnNames"); 075 columnNames.setToken("avg_wind_speed,max_wind_speed"); 076 077 inUnits = new StringParameter(this, "inUnits"); 078 inUnits.addChoice("miles per hour"); 079 inUnits.addChoice("meters per second"); 080 inUnits.setToken("meters per second"); 081 082 outUnits = new StringParameter(this, "outUnits"); 083 outUnits.addChoice("miles per hour"); 084 outUnits.addChoice("meters per second"); 085 outUnits.setToken("miles per hour"); 086 087 } 088 089 090 @Override 091 public void preinitialize() throws IllegalActionException { 092 super.preinitialize(); 093 094 // Determine the conversion factor by getting the inUnits and outUnits 095 // values from the string parameters 096 _convertFactor = getConvertFactor(inUnits.stringValue(), 097 outUnits.stringValue()); 098 } 099 100 101 @Override 102 public void fire() throws IllegalActionException { 103 super.fire(); 104 105 // Read data 106 Dataset<Row> df= (Dataset<Row>)((ObjectToken)inData.get(0)).getValue(); 107 108 // Get names of columns to convert 109 String colNames = columnNames.stringValue(); 110 111 for (String colName: colNames.split(",")) { 112 // element wise multiplication 113 Column convertedCol = 114 df.col(colName.trim()).multiply(_convertFactor); 115 116 // newly converted one 117 df = df.withColumn(colName.trim(), convertedCol); 118 } 119 120 // Broadcast results 121 outData.broadcast(new ObjectToken(df, Dataset.class)); 122 123 if (first.numberOfSinks() > 0) 124 first.broadcast(new ObjectToken(df.first(), Row.class)); 125 126 } 127 128 129 // Use to determine the factor by which to multiply the incoming value 130 private double getConvertFactor(String inUnits, String outUnits) { 131 if (inUnits.equalsIgnoreCase("meters per second")) { 132 if (outUnits.equalsIgnoreCase("miles per hour")) { 133 return 2.23694; 134 } 135 } else if (inUnits.equalsIgnoreCase("miles per hour")) { 136 if (outUnits.equalsIgnoreCase("meters per second")) { 137 return 0.44704; 138 } 139 } 140 return 1.0; 141 } 142 143 144 /** The input DataFrame */ 145 public TypedIOPort inData; 146 147 /** The output DataFrame */ 148 public TypedIOPort outData; 149 150 /** First row of dataframe for debug purposes */ 151 public TypedIOPort first; 152 153 /** incoming units of value to convert */ 154 public StringParameter inUnits; 155 156 /** outgoing units of value to convert */ 157 public StringParameter outUnits; 158 159 /** outgoing units of value to convert */ 160 public StringParameter columnNames; 161 162 /** value determined by getConvertFactor() from incoming/outgoing units */ 163 private double _convertFactor; 164 165 166} 167 168