001/* HDFSRemover class to remove files/directories at HDFS. 002 003/* 004 * Copyright (c) 2010-2014 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027 028package org.kepler.hadoop.actor; 029 030import java.io.File; 031import java.io.IOException; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036 037import ptolemy.actor.TypedAtomicActor; 038import ptolemy.actor.TypedIOPort; 039import ptolemy.actor.parameters.PortParameter; 040import ptolemy.data.BooleanToken; 041import ptolemy.data.StringToken; 042import ptolemy.data.expr.Parameter; 043import ptolemy.data.expr.StringParameter; 044import ptolemy.data.type.BaseType; 045import ptolemy.kernel.CompositeEntity; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.InternalErrorException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.Workspace; 050 051////////////////////////////////////////////////////////////////////////// 052////HDFSRemove 053 054/** 055 * This class remove files/directories at HDFS. 056 * 057 * @author Jianwu Wang (jianwu@sdsc.edu) 058 * @version $Id: HDFSRemover.java 33075 2014-11-13 17:08:52Z jianwu $ 059 */ 060 061public class HDFSRemover extends TypedAtomicActor { 062 063 // ///////////////////////////////////////////////////////////////// 064 // // ports and parameters //// 065 066 /** 067 * The full paths on HDFS that need to be deleted. 068 * 069 * @see PortParameter 070 */ 071 public PortParameter hdfsPaths; 072 073 /** 074 * If <i>false</i> and the folder has non-empty sub-folders, an exception will thrown. 075 * If <i>true</i> (the default), then the folder will deleted recursively. 076 */ 077 public Parameter recursive; 078 079 /** DDP engine configuration directory. */ 080 public StringParameter configDir; 081 082 /** 083 * Whether the deletion is success or not. The value will be false if there is an error when deleting an input path. 084 * If one input path doesn't exist, it won't affect this value. 085 */ 086 public TypedIOPort success; 087 088 /** 089 * The full paths on HDFS that need to be deleted. 090 */ 091 public TypedIOPort hdfsPathsOut; 092 093 public HDFSRemover(CompositeEntity container, String name) 094 throws NameDuplicationException, IllegalActionException { 095 super(container, name); 096 hdfsPaths = new PortParameter(this, "hdfsPaths"); 097 hdfsPaths.setExpression("hdfs://localhost:9000/a.txt"); 098 hdfsPaths.setStringMode(true); 099 100 recursive = new Parameter(this, "recursive"); 101 recursive.setTypeEquals(BaseType.BOOLEAN); 102 recursive.setToken(BooleanToken.TRUE); 103 104 configDir = new StringParameter(this, "configDir"); 105 106 success = new TypedIOPort(this, "success", false, true); 107 success.setTypeEquals(BaseType.BOOLEAN); 108 success.setMultiport(false); 109 110 hdfsPathsOut = new TypedIOPort(this, "hdfsPathsOut", false, true); 111 hdfsPathsOut.setTypeEquals(BaseType.STRING); 112 hdfsPathsOut.setMultiport(false); 113 114 _attachText("_iconDescription", "<svg>\n" 115 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 116 + "style=\"fill:white\"/>\n" 117 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 118 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 119 + "</svg>\n"); 120 121 } 122 123 // ///////////////////////////////////////////////////////////////// 124 // // public methods //// 125 126 /** 127 * Clone the actor into the specified workspace. 128 * 129 * @param workspace 130 * The workspace for the new object. 131 * @return A new actor. 132 * @exception CloneNotSupportedException 133 * If a derived class contains an attribute that cannot be 134 * cloned. 135 */ 136 @Override 137 public Object clone(Workspace workspace) throws CloneNotSupportedException { 138 HDFSRemover newObject = (HDFSRemover) super.clone(workspace); 139 return newObject; 140 } 141 142 /** 143 * Parse input paths and delete them one by one. 144 * 145 * @exception IllegalActionException 146 * If the paths cannot be deleted, or the file system has 147 * exception during its initialization, path existence checking, closing. 148 */ 149 @Override 150 public boolean postfire() throws IllegalActionException { 151 hdfsPaths.update(); 152 boolean _success = true; 153 try { 154 FileSystem _fs = FileSystem.get(_conf); 155 _hdfsPaths = ((StringToken) hdfsPaths.getToken()) 156 .stringValue(); 157 String[] hdfsPathStrs = _hdfsPaths.split(";"); 158 boolean bRecursive = ((BooleanToken) recursive 159 .getToken()).booleanValue(); 160 161 for (int i = 0; i < hdfsPathStrs.length; i++) { 162 Path hdfsPath = new Path(hdfsPathStrs[i]); 163 if (_fs.exists(hdfsPath) && !_fs.delete(hdfsPath, bRecursive)) 164 _success = false; 165 } 166 _fs.close(); 167 success.broadcast(new BooleanToken(_success)); 168 hdfsPathsOut.broadcast(new StringToken(_hdfsPaths)); 169 } catch (IOException e) { 170 e.printStackTrace(); 171 throw new IllegalActionException(e.getMessage()); 172 } 173 return super.postfire(); 174 } 175 176 /** 177 * Get default value of HDFS Configure. 178 * 179 * @exception IllegalActionException 180 * If there is an error reading the alwaysFlush parameter. 181 */ 182 @Override 183 public void preinitialize() throws IllegalActionException { 184 super.preinitialize(); 185 _configDirStr = configDir.stringValue(); 186 if (_configDirStr.trim().isEmpty()) { 187 // set the default location of the config directory 188 String workflowDirStr = System.getProperty("hadoop.workflowdir"); 189 if (workflowDirStr == null) { 190 throw new InternalErrorException( 191 "System property hadoop.workflowdir not set."); 192 } 193 _configDirStr = workflowDirStr + File.separator + "tools" 194 + File.separator + "etc" + File.separator + "hadoop"; 195 } 196 _conf = new Configuration(); 197 _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE)); 198 } 199 200 /** 201 * Close the file system if there is one. 202 * 203 * @exception IllegalActionException 204 * If an IO error occurs. 205 */ 206 @Override 207 public void wrapup() throws IllegalActionException { 208 try { 209 if (_fs != null) 210 _fs.close(); 211 } catch (IOException e) { 212 e.printStackTrace(); 213 throw new IllegalActionException(e.getMessage()); 214 } 215 } 216 217 // ///////////////////////////////////////////////////////////////// 218 // // public members //// 219 220 public static final String CORE_SITE = "core-site.xml"; 221 222 // ///////////////////////////////////////////////////////////////// 223 // // private members //// 224 225 /** FileSystem. */ 226 private FileSystem _fs; 227 228 private Configuration _conf; 229 230 private String _configDirStr; 231 232 private String _hdfsPaths; 233 234}