001/* 
002 * Copyright (c) 2016-2017 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 
007 * '$Revision: 34658 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.spark.mllib;
031
032import java.util.NoSuchElementException;
033
034import org.apache.spark.sql.Dataset;
035import org.apache.spark.sql.Row;
036import org.kepler.spark.actor.SparkBaseActor;
037
038import ptolemy.actor.TypedIOPort;
039import ptolemy.data.BooleanToken;
040import ptolemy.data.LongToken;
041import ptolemy.data.ObjectToken;
042import ptolemy.data.expr.Parameter;
043import ptolemy.data.type.BaseType;
044import ptolemy.data.type.ObjectType;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.NameDuplicationException;
048import ptolemy.kernel.util.SingletonAttribute;
049
050/**
051 * @author Dylan Uys, Jiaxin Li
052 *
053 * Returns a new DataFrame, removing all Rows with any null values from the 
054 * input DataFrame
055 */
056public class RemoveRowsWithNulls extends SparkBaseActor {
057
058    public RemoveRowsWithNulls(CompositeEntity container, String name)
059        throws IllegalActionException, NameDuplicationException {
060        super(container, name);
061        
062        inData = new TypedIOPort(this, "inData", true, false);
063        inData.setTypeEquals(new ObjectType(Dataset.class));
064        new SingletonAttribute(inData, "_showName");
065
066        outData = new TypedIOPort(this, "outData", false, true);
067        outData.setTypeEquals(new ObjectType(Dataset.class));
068        new SingletonAttribute(outData, "_showName");
069        
070        count = new TypedIOPort(this, "count", false, true);
071        count.setTypeEquals(BaseType.LONG);
072        new SingletonAttribute(count, "_showName");
073
074        suppressEmptyDF = new Parameter(this, "suppressEmptyDF");
075        suppressEmptyDF.setTypeEquals(BaseType.BOOLEAN);
076        suppressEmptyDF.setToken(BooleanToken.FALSE);
077
078    }
079
080    @Override
081    public void fire() throws IllegalActionException {
082        
083        super.fire();
084
085        Dataset<Row> df =(Dataset<Row>)((ObjectToken)inData.get(0)).getValue();
086
087        // Drop rows with null values
088        Dataset<Row> filtered = df.na().drop();
089
090        // if dataset contains rows, output token; otherwise err msg to stderr
091        try{
092
093            // if specified, detect if output data frame is empty
094            if(suppressEmptyDF.getToken() == BooleanToken.TRUE) {
095                Row test = filtered.first();  
096            }
097
098            outData.broadcast(new ObjectToken(filtered, Dataset.class));
099
100            if (count.numberOfSinks() > 0) { // Debug output if port hooked
101                count.broadcast(new LongToken(filtered.count()));
102            }
103        
104        } catch (NoSuchElementException e) {
105            System.err.println("\nWARNING: suppressing empty data frame!\n"); 
106        }
107    }
108        
109    /** Incoming DataFrame */
110    public TypedIOPort inData;
111    
112    /** New DataFrame without null rows */
113    public TypedIOPort outData;
114    
115    /** Number of rows of the filtered dataset. */
116    public TypedIOPort count;
117
118    /** Option to suppress empty data frame after dropna */
119    public Parameter suppressEmptyDF;
120    
121}
122