001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34658 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import java.util.NoSuchElementException; 033 034import org.apache.spark.sql.Dataset; 035import org.apache.spark.sql.Row; 036import org.kepler.spark.actor.SparkBaseActor; 037 038import ptolemy.actor.TypedIOPort; 039import ptolemy.data.BooleanToken; 040import ptolemy.data.LongToken; 041import ptolemy.data.ObjectToken; 042import ptolemy.data.expr.Parameter; 043import ptolemy.data.type.BaseType; 044import ptolemy.data.type.ObjectType; 045import ptolemy.kernel.CompositeEntity; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.NameDuplicationException; 048import ptolemy.kernel.util.SingletonAttribute; 049 050/** 051 * @author Dylan Uys, Jiaxin Li 052 * 053 * Returns a new DataFrame, removing all Rows with any null values from the 054 * input DataFrame 055 */ 056public class RemoveRowsWithNulls extends SparkBaseActor { 057 058 public RemoveRowsWithNulls(CompositeEntity container, String name) 059 throws IllegalActionException, NameDuplicationException { 060 super(container, name); 061 062 inData = new TypedIOPort(this, "inData", true, false); 063 inData.setTypeEquals(new ObjectType(Dataset.class)); 064 new SingletonAttribute(inData, "_showName"); 065 066 outData = new TypedIOPort(this, "outData", false, true); 067 outData.setTypeEquals(new ObjectType(Dataset.class)); 068 new SingletonAttribute(outData, "_showName"); 069 070 count = new TypedIOPort(this, "count", false, true); 071 count.setTypeEquals(BaseType.LONG); 072 new SingletonAttribute(count, "_showName"); 073 074 suppressEmptyDF = new Parameter(this, "suppressEmptyDF"); 075 suppressEmptyDF.setTypeEquals(BaseType.BOOLEAN); 076 suppressEmptyDF.setToken(BooleanToken.FALSE); 077 078 } 079 080 @Override 081 public void fire() throws IllegalActionException { 082 083 super.fire(); 084 085 Dataset<Row> df =(Dataset<Row>)((ObjectToken)inData.get(0)).getValue(); 086 087 // Drop rows with null values 088 Dataset<Row> filtered = df.na().drop(); 089 090 // if dataset contains rows, output token; otherwise err msg to stderr 091 try{ 092 093 // if specified, detect if output data frame is empty 094 if(suppressEmptyDF.getToken() == BooleanToken.TRUE) { 095 Row test = filtered.first(); 096 } 097 098 outData.broadcast(new ObjectToken(filtered, Dataset.class)); 099 100 if (count.numberOfSinks() > 0) { // Debug output if port hooked 101 count.broadcast(new LongToken(filtered.count())); 102 } 103 104 } catch (NoSuchElementException e) { 105 System.err.println("\nWARNING: suppressing empty data frame!\n"); 106 } 107 } 108 109 /** Incoming DataFrame */ 110 public TypedIOPort inData; 111 112 /** New DataFrame without null rows */ 113 public TypedIOPort outData; 114 115 /** Number of rows of the filtered dataset. */ 116 public TypedIOPort count; 117 118 /** Option to suppress empty data frame after dropna */ 119 public Parameter suppressEmptyDF; 120 121} 122