001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-06 19:09:58 +0000 (Tue, 06 Feb 2018) $' 
007 * '$Revision: 34656 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import org.apache.spark.sql.Column;
033import org.apache.spark.sql.Dataset;
034import org.apache.spark.sql.Row;
035import org.kepler.spark.actor.SparkSQLActor;
036
037import ptolemy.actor.TypedIOPort;
038import ptolemy.data.ObjectToken;
039import ptolemy.data.expr.StringParameter;
040import ptolemy.data.type.ObjectType;
041import ptolemy.kernel.CompositeEntity;
042import ptolemy.kernel.util.IllegalActionException;
043import ptolemy.kernel.util.NameDuplicationException;
044import ptolemy.kernel.util.SingletonAttribute;
045
046/**
047 * @author Dylan Uys, Jiaxin Li
048 *
049 * This actor converts the values in specified columns from one unit
050 * of measurement to another. Accomplished by performing element wise 
051 * multiplication on the specified columns by a factor determined by what input
052 * units and output units are selected (currently only supports meters/second 
053 * and miles/hour) 
054 */
055public class ConvertColumns extends SparkSQLActor {
056
057    public ConvertColumns(CompositeEntity container, String name)
058        throws IllegalActionException, NameDuplicationException {
059        super(container, name);
060        
061        // initialize ports and parameters
062        inData = new TypedIOPort(this, "inData", true, false);
063        inData.setTypeEquals(new ObjectType(Dataset.class));
064        new SingletonAttribute(inData, "_showName");
065        
066        outData = new TypedIOPort(this, "outData", false, true);
067        outData.setTypeEquals(new ObjectType(Dataset.class));
068        new SingletonAttribute(outData, "_showName");
069        
070        first = new TypedIOPort(this, "first", false, true);
071        first.setTypeEquals(new ObjectType(Row.class));
072        new SingletonAttribute(first, "_showName");
073        
074        columnNames = new StringParameter(this, "columnNames");
075        columnNames.setToken("avg_wind_speed,max_wind_speed");
076        
077        inUnits = new StringParameter(this, "inUnits");
078        inUnits.addChoice("miles per hour");
079        inUnits.addChoice("meters per second");
080        inUnits.setToken("meters per second");
081        
082        outUnits = new StringParameter(this, "outUnits");
083        outUnits.addChoice("miles per hour");
084        outUnits.addChoice("meters per second");
085        outUnits.setToken("miles per hour");
086        
087    }
088    
089        
090    @Override
091    public void preinitialize() throws IllegalActionException {
092        super.preinitialize();
093        
094        // Determine the conversion factor by getting the inUnits and outUnits 
095        // values from the string parameters
096        _convertFactor = getConvertFactor(inUnits.stringValue(),
097                                          outUnits.stringValue());
098    }
099    
100    
101    @Override 
102    public void fire() throws IllegalActionException { 
103        super.fire();
104        
105        // Read data
106        Dataset<Row> df= (Dataset<Row>)((ObjectToken)inData.get(0)).getValue();
107
108        // Get names of columns to convert
109        String colNames = columnNames.stringValue();
110        
111        for (String colName: colNames.split(",")) {
112            // element wise multiplication
113            Column convertedCol =
114                df.col(colName.trim()).multiply(_convertFactor);
115
116            // newly converted one
117            df = df.withColumn(colName.trim(), convertedCol);
118        } 
119        
120        // Broadcast results
121        outData.broadcast(new ObjectToken(df, Dataset.class));
122
123        if (first.numberOfSinks() > 0)
124            first.broadcast(new ObjectToken(df.first(), Row.class));
125        
126    }
127
128        
129    // Use to determine the factor by which to multiply the incoming value 
130    private double getConvertFactor(String inUnits, String outUnits) {
131        if (inUnits.equalsIgnoreCase("meters per second")) {
132            if (outUnits.equalsIgnoreCase("miles per hour")) {
133                return 2.23694;
134            } 
135        } else if (inUnits.equalsIgnoreCase("miles per hour")) {
136            if (outUnits.equalsIgnoreCase("meters per second")) {
137                return 0.44704;
138            } 
139        }
140        return 1.0;
141    }
142
143    
144    /** The input DataFrame */ 
145    public TypedIOPort inData;
146        
147    /** The output DataFrame */
148    public TypedIOPort outData;
149    
150    /** First row of dataframe for debug purposes */ 
151    public TypedIOPort first;
152     
153    /** incoming units of value to convert */
154    public StringParameter inUnits;
155
156    /** outgoing units of value to convert */
157    public StringParameter outUnits;
158    
159    /** outgoing units of value to convert */
160    public StringParameter columnNames;
161    
162    /** value determined by getConvertFactor() from incoming/outgoing units */
163    private double _convertFactor;
164    
165   
166}
167
168