001/* 002 * Copyright (c) 2016-2017 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2018-02-07 01:00:17 +0000 (Wed, 07 Feb 2018) $' 007 * '$Revision: 34658 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.spark.mllib; 031 032import org.apache.spark.sql.Dataset; 033import org.apache.spark.sql.Row; 034import org.kepler.spark.actor.SparkBaseActor; 035 036import ptolemy.actor.TypedIOPort; 037import ptolemy.data.ObjectToken; 038import ptolemy.data.expr.StringParameter; 039import ptolemy.data.type.ObjectType; 040import ptolemy.kernel.CompositeEntity; 041import ptolemy.kernel.util.IllegalActionException; 042import ptolemy.kernel.util.NameDuplicationException; 043import ptolemy.kernel.util.SingletonAttribute; 044 045 046/** 047 * @author Jiaxin Li 048 * 049 * Drops user-specified columns from the input Spark dataframe. 050 * Specify column names to drop, separated by comma (','). 051 */ 052public class DropColumns extends SparkBaseActor { 053 054 public DropColumns(CompositeEntity container, String name) 055 throws IllegalActionException, NameDuplicationException { 056 super(container, name); 057 058 inData = new TypedIOPort(this, "inData", true, false); 059 inData.setTypeEquals(new ObjectType(Dataset.class)); 060 new SingletonAttribute(inData, "_showName"); 061 062 outData = new TypedIOPort(this, "outData", false, true); 063 outData.setTypeEquals(new ObjectType(Dataset.class)); 064 new SingletonAttribute(outData, "_showName"); 065 066 colsToDrop = new StringParameter(this, "colsToDrop"); 067 068 } 069 070 071 /* 072 * Fire 073 */ 074 @Override 075 public void fire() throws IllegalActionException { 076 077 super.fire(); 078 079 // get incoming Spark dataframe 080 Dataset<Row> inDf = 081 (Dataset<Row>)((ObjectToken)inData.get(0)).getValue(); 082 083 // drop specified columns 084 String[] colsDropped = 085 colsToDrop.stringValue().trim().split("\\s*,\\s*"); 086 for(String col : colsDropped) 087 inDf = inDf.drop(col); 088 089 // output result dataframe 090 outData.broadcast(new ObjectToken(inDf, Dataset.class)); 091 092 } 093 094 /** Dataset<Row> input */ 095 public TypedIOPort inData; 096 097 /** Dataset<Row> output */ 098 public TypedIOPort outData; 099 100 /** Columns to drop */ 101 public StringParameter colsToDrop; 102 103}