001/* 
002 * Copyright (c) 2015 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-06-22 18:30:58 +0000 (Mon, 22 Jun 2015) $' 
007 * '$Revision: 33494 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029package org.kepler.spark.mllib;
030
031import java.io.File;
032
033import org.apache.spark.api.java.JavaRDD;
034import org.apache.spark.api.java.function.Function;
035import org.kepler.spark.actor.SparkBaseActor;
036
037import ptolemy.actor.TypedIOPort;
038import ptolemy.actor.parameters.FilePortParameter;
039import ptolemy.actor.parameters.PortParameter;
040import ptolemy.data.LongToken;
041import ptolemy.data.ObjectToken;
042import ptolemy.data.StringToken;
043import ptolemy.data.type.BaseType;
044import ptolemy.data.type.ObjectType;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.NameDuplicationException;
048import ptolemy.kernel.util.SingletonAttribute;
049
050/** Removes null values from a JavaRDD<String> dataset and creates a
051 *  new dataset.
052 * 
053 *  @author Tahereh Masoumi
054 *  @version $Id: RemoveNulls.java 33494 2015-06-22 18:30:58Z crawl $
055 */
056public class RemoveNulls extends SparkBaseActor {
057
058    public RemoveNulls(CompositeEntity container, String name)
059            throws IllegalActionException, NameDuplicationException {
060        super(container, name);
061        
062        in = new FilePortParameter(this, "in");
063        in.setTypeEquals(BaseType.STRING);
064        new SingletonAttribute(in.getPort(), "_showName");
065
066        nullDes = new PortParameter(this, "nullDes");
067        nullDes.setTypeEquals(BaseType.STRING);
068        nullDes.setStringMode(true);
069        new SingletonAttribute(nullDes.getPort(), "_showName");
070
071        out = new TypedIOPort(this, "out", false, true);
072        out.setTypeEquals(new ObjectType(JavaRDD.class));
073        new SingletonAttribute(out, "_showName");
074
075        count = new TypedIOPort(this, "count", false, true);
076        count.setTypeEquals(BaseType.LONG);
077        new SingletonAttribute(count, "_showName");
078
079    }
080
081    @Override
082    public void fire() throws IllegalActionException {
083        
084        super.fire();
085
086        in.update();
087        nullDes.update();
088        File file = in.asFile();
089        
090        if(!file.exists()) {
091            throw new IllegalActionException(this, "Input file does not exist: " + file);
092        }
093        
094        String nulldesig = ((StringToken)nullDes.getToken()).stringValue();
095        JavaRDD<String> lines = _context.textFile(file.getAbsolutePath());
096        JavaRDD<String> filterdLins = lines.filter(new FilterFunc(nulldesig));
097        out.broadcast(new ObjectToken(filterdLins, JavaRDD.class));
098        count.broadcast(new LongToken(filterdLins.count()));
099    }
100        
101    /** The input file name. */
102    public FilePortParameter in;
103    
104    /** An RDD of Vectors. */
105    public TypedIOPort out;
106    
107    /** Number of rows of the filtered dataset. */
108    public TypedIOPort count;
109    
110    /** Missing Values Designator: CANNOT be EMPTY STRING!. */
111    public PortParameter nullDes;
112    
113    private static class FilterFunc implements Function<String, Boolean> {
114        
115        private static String nulldes;
116        
117        public FilterFunc(String nulldesig) {
118                nulldes = nulldesig;
119        }
120
121        @Override
122        public Boolean call(String s) {
123            return (!s.contains(nulldes))&& (!s.isEmpty());
124        }
125    }
126    
127}