001/* HDFSUnLoader class to copy HDFS data into local file system. 002 003/* 004 * Copyright (c) 2010-2014 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027 028package org.kepler.hadoop.actor; 029 030import java.io.File; 031import java.io.IOException; 032 033import org.apache.commons.io.FileUtils; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.Path; 037 038import ptolemy.actor.TypedAtomicActor; 039import ptolemy.actor.TypedIOPort; 040import ptolemy.actor.parameters.PortParameter; 041import ptolemy.data.BooleanToken; 042import ptolemy.data.StringToken; 043import ptolemy.data.expr.Parameter; 044import ptolemy.data.expr.StringParameter; 045import ptolemy.data.type.BaseType; 046import ptolemy.kernel.CompositeEntity; 047import ptolemy.kernel.util.IllegalActionException; 048import ptolemy.kernel.util.InternalErrorException; 049import ptolemy.kernel.util.NameDuplicationException; 050import ptolemy.kernel.util.Workspace; 051import ptolemy.util.MessageHandler; 052 053////////////////////////////////////////////////////////////////////////// 054////HDFSUnLoader 055 056/** 057 * This class copies HDFS data into local file system. 058 * 059 * @author Jianwu Wang (jianwu@sdsc.edu) 060 * @version $Id: HDFSUnLoader.java 33862 2015-09-04 16:53:05Z crawl $ 061 */ 062 063public class HDFSUnLoader extends TypedAtomicActor { 064 065 // ///////////////////////////////////////////////////////////////// 066 // // ports and parameters //// 067 068 /** 069 * The local file path to which to read. This is a string with any form 070 * accepted by PortParameter. 071 * 072 * @see PortParameter 073 */ 074 public PortParameter sourceHDFSPath; 075 076 /** 077 * The target file path on HDFS to which to write. This is a string with any 078 * form accepted by PortParameter. 079 * 080 * @see PortParameter 081 */ 082 public PortParameter targetLocalDir; 083 084 /** 085 * If <i>false</i>, then overwrite the specified file if it exists without 086 * asking. If <i>true</i> (the default), then if the file exists, ask for 087 * confirmation before overwriting. 088 */ 089 public Parameter confirmOverwrite; 090 091 /** DDP engine configuration directory. */ 092 public StringParameter configDir; 093 094 /** 095 * Written file URL 096 */ 097 public TypedIOPort url; 098 099 public HDFSUnLoader(CompositeEntity container, String name) 100 throws NameDuplicationException, IllegalActionException { 101 super(container, name); 102 sourceHDFSPath = new PortParameter(this, "sourceHDFSPath"); 103 sourceHDFSPath.setExpression("hdfs://localhost:9000/"); 104 sourceHDFSPath.setStringMode(true); 105 106 targetLocalDir = new PortParameter(this, "targetLocalDir"); 107 targetLocalDir.setExpression("/home/ubuntu/"); 108 targetLocalDir.setStringMode(true); 109 110 confirmOverwrite = new Parameter(this, "confirmOverwrite"); 111 confirmOverwrite.setTypeEquals(BaseType.BOOLEAN); 112 confirmOverwrite.setToken(BooleanToken.TRUE); 113 114 configDir = new StringParameter(this, "configDir"); 115 116 url = new TypedIOPort(this, "url", false, true); 117 url.setTypeEquals(BaseType.STRING); 118 url.setMultiport(false); 119 120 _attachText("_iconDescription", "<svg>\n" 121 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 122 + "style=\"fill:white\"/>\n" 123 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 124 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 125 + "</svg>\n"); 126 127 } 128 129 // ///////////////////////////////////////////////////////////////// 130 // // public methods //// 131 132 /** 133 * Clone the actor into the specified workspace. 134 * 135 * @param workspace 136 * The workspace for the new object. 137 * @return A new actor. 138 * @exception CloneNotSupportedException 139 * If a derived class contains an attribute that cannot be 140 * cloned. 141 */ 142 @Override 143 public Object clone(Workspace workspace) throws CloneNotSupportedException { 144 HDFSUnLoader newObject = (HDFSUnLoader) super.clone(workspace); 145 return newObject; 146 } 147 148 /** 149 * Load HDFS files from input port to local file system. 150 * If the file does not exist, then create it. 151 * If the file already exists, then query the user for overwrite, 152 * 153 * @exception IllegalActionException 154 * If the file cannot be opened or created, or if the user 155 * refuses to overwrite an existing file. 156 */ 157 @Override 158 public boolean postfire() throws IllegalActionException { 159 sourceHDFSPath.update(); 160 targetLocalDir.update(); 161 try { 162 FileSystem _fs = FileSystem.get(_conf); 163 164 _srcPath = ((StringToken) sourceHDFSPath.getToken()).stringValue(); 165 166 _targetDir = ((StringToken) targetLocalDir.getToken()).stringValue(); 167 if (!_srcPath.startsWith("hdfs")) 168 _srcPath = _hdfsURL + "/" + _srcPath; 169 170 // find file or folder name to be copied. 171 String srcName = _srcPath; 172 if (_srcPath.endsWith("/")) 173 srcName = _srcPath.substring(0, _srcPath.length() - 1); 174 if (srcName.lastIndexOf("/") == srcName.lastIndexOf("//") + 1) 175 // The only '/' is '//', which means src file/folder is empty. 176 srcName = ""; 177 else 178 srcName = srcName.substring(srcName.lastIndexOf("/"), 179 srcName.length()); 180 String targetFilePath = _targetDir + srcName; 181 182 Path targetDirPath = new Path(_targetDir); 183 184 boolean overwrite = false; 185 186 boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite 187 .getToken()).booleanValue(); 188 if(new File(targetFilePath).exists()){ 189 if (confirmOverwriteValue) { 190 // Query for overwrite. 191 overwrite = MessageHandler.yesNoQuestion("OK to overwrite " 192 + targetFilePath + "?"); 193 if (!overwrite) { 194 throw new IllegalActionException(this, 195 "Please select another file name."); 196 } 197 } 198 FileUtils.forceDelete(new File(targetFilePath)); 199 } 200 201 _fs.copyToLocalFile(new Path(_srcPath), targetDirPath); 202 _fs.close(); 203 url.broadcast(new StringToken(_targetDir)); 204 } catch (IOException e) { 205 e.printStackTrace(); 206 throw new IllegalActionException(e.getMessage()); 207 } 208 return super.postfire(); 209 } 210 211 /** 212 * Get default value of HDFS Configure. 213 * 214 * @exception IllegalActionException 215 * If there is an error reading the alwaysFlush parameter. 216 */ 217 @Override 218 public void preinitialize() throws IllegalActionException { 219 super.preinitialize(); 220 _configDirStr = configDir.stringValue(); 221 if (_configDirStr.trim().isEmpty()) { 222 // set the default location of the config directory 223 String workflowDirStr = System.getProperty("hadoop.workflowdir"); 224 if (workflowDirStr == null) { 225 throw new InternalErrorException( 226 "System property hadoop.workflowdir not set."); 227 } 228 _configDirStr = workflowDirStr + File.separator + "tools" 229 + File.separator + "etc" + File.separator + "hadoop"; 230 } 231 _conf = new Configuration(); 232 _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE)); 233 _hdfsURL = _conf.get(FS_NAME); 234 } 235 236 /** 237 * Close the writer if there is one. 238 * 239 * @exception IllegalActionException 240 * If an IO error occurs. 241 */ 242 @Override 243 public void wrapup() throws IllegalActionException { 244 try { 245 if (_fs != null) 246 _fs.close(); 247 } catch (IOException e) { 248 e.printStackTrace(); 249 throw new IllegalActionException(e.getMessage()); 250 } 251 } 252 253 // ///////////////////////////////////////////////////////////////// 254 // // public members //// 255 256 public static final String CORE_SITE = "core-site.xml"; 257 public static final String FS_NAME = "fs.defaultFS"; 258 259 // ///////////////////////////////////////////////////////////////// 260 // // private members //// 261 262 /** FileSystem. */ 263 private FileSystem _fs; 264 265 private Configuration _conf; 266 267 private String _configDirStr; 268 269 private String _hdfsURL; 270 271 private String _targetDir; 272 273 private String _srcPath; 274 275}