001/* HDFSLoader class to load data from local file system to HDFS. 002 003/* 004 * Copyright (c) 2010-2014 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027 028package org.kepler.hadoop.actor; 029 030import java.io.File; 031import java.io.IOException; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.fs.Path; 036 037import ptolemy.actor.TypedAtomicActor; 038import ptolemy.actor.TypedIOPort; 039import ptolemy.actor.parameters.PortParameter; 040import ptolemy.data.BooleanToken; 041import ptolemy.data.StringToken; 042import ptolemy.data.expr.Parameter; 043import ptolemy.data.expr.StringParameter; 044import ptolemy.data.type.BaseType; 045import ptolemy.kernel.CompositeEntity; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.InternalErrorException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.Workspace; 050import ptolemy.util.MessageHandler; 051 052////////////////////////////////////////////////////////////////////////// 053////HDFSLoader 054 055/** 056 * This class load files/directories to HDFS. 057 * 058 * @author Jianwu Wang (jianwu@sdsc.edu) 059 * @version $Id: HDFSLoader.java 33862 2015-09-04 16:53:05Z crawl $ 060 */ 061 062public class HDFSLoader extends TypedAtomicActor { 063 064 // ///////////////////////////////////////////////////////////////// 065 // // ports and parameters //// 066 067 /** 068 * The local file path to which to read. This is a string with any form 069 * accepted by PortParameter. 070 * 071 * @see PortParameter 072 */ 073 public PortParameter sourceLocalPaths; 074 075 /** 076 * The target file path on HDFS to which to write. This is a string with any 077 * form accepted by PortParameter. 078 * 079 * @see PortParameter 080 */ 081 public PortParameter targetHDFSPath; 082 083 /** 084 * If <i>false</i>, then overwrite the specified file if it exists without 085 * asking. If <i>true</i> (the default), then if the file exists, ask for 086 * confirmation before overwriting. 087 */ 088 public Parameter confirmOverwrite; 089 090 /** DDP engine configuration directory. */ 091 public StringParameter configDir; 092 093 /** 094 * Written file URL 095 */ 096 public TypedIOPort url; 097 098 public HDFSLoader(CompositeEntity container, String name) 099 throws NameDuplicationException, IllegalActionException { 100 super(container, name); 101 sourceLocalPaths = new PortParameter(this, "sourceLocalPaths"); 102 sourceLocalPaths.setExpression("/home/ubuntu/a.txt"); 103 sourceLocalPaths.setStringMode(true); 104 105 targetHDFSPath = new PortParameter(this, "targetHDFSPath"); 106 targetHDFSPath.setExpression("hdfs://localhost:9000/"); 107 targetHDFSPath.setStringMode(true); 108 109 confirmOverwrite = new Parameter(this, "confirmOverwrite"); 110 confirmOverwrite.setTypeEquals(BaseType.BOOLEAN); 111 confirmOverwrite.setToken(BooleanToken.TRUE); 112 113 configDir = new StringParameter(this, "configDir"); 114 115 url = new TypedIOPort(this, "url", false, true); 116 url.setTypeEquals(BaseType.STRING); 117 url.setMultiport(false); 118 119 _attachText("_iconDescription", "<svg>\n" 120 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 121 + "style=\"fill:white\"/>\n" 122 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 123 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 124 + "</svg>\n"); 125 126 } 127 128 // ///////////////////////////////////////////////////////////////// 129 // // public methods //// 130 131 /** 132 * Clone the actor into the specified workspace. 133 * 134 * @param workspace 135 * The workspace for the new object. 136 * @return A new actor. 137 * @exception CloneNotSupportedException 138 * If a derived class contains an attribute that cannot be 139 * cloned. 140 */ 141 @Override 142 public Object clone(Workspace workspace) throws CloneNotSupportedException { 143 HDFSLoader newObject = (HDFSLoader) super.clone(workspace); 144 return newObject; 145 } 146 147 /** 148 * Load local files from input port to HDFS. If the file does not exist, then 149 * create it. If the file already exists, then query the user for overwrite. 150 * 151 * @exception IllegalActionException 152 * If the file cannot be opened or created, or if the user 153 * refuses to overwrite an existing file. 154 */ 155 @Override 156 public boolean postfire() throws IllegalActionException { 157 sourceLocalPaths.update(); 158 targetHDFSPath.update(); 159 try { 160 FileSystem _fs = FileSystem.get(_conf); 161 162 _targetHDFSPath = ((StringToken) targetHDFSPath.getToken()) 163 .stringValue(); 164 if (!_targetHDFSPath.startsWith("hdfs")) 165 _targetHDFSPath = _hdfsURL + "/" + _targetHDFSPath; 166 Path targetPath = new Path(_targetHDFSPath); 167 168 _srcLocalPath = ((StringToken) sourceLocalPaths.getToken()) 169 .stringValue(); 170 String[] localPathStrs = _srcLocalPath.split(";"); 171 Path[] targePaths = new Path[localPathStrs.length]; 172 Path[] localPaths = new Path[localPathStrs.length]; 173 String srcName; 174 for (int i = 0; i < localPathStrs.length; i++) { 175 localPaths[i] = new Path(localPathStrs[i]); 176 // find file or folder name to be copied. 177 srcName = localPathStrs[i]; 178 if (localPathStrs[i].endsWith(File.separator)) 179 srcName = localPathStrs[i].substring(0, 180 localPathStrs[i].length() - 1); 181 srcName = srcName.substring( 182 srcName.lastIndexOf(File.separator), srcName.length()); 183 targePaths[i] = new Path(_targetHDFSPath + srcName); 184 } 185 186 boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite 187 .getToken()).booleanValue(); 188 boolean overwrite = false; 189 190 if (confirmOverwriteValue) { 191 for (int i = 0; i < targePaths.length; i++) { 192 if (_fs.exists(targePaths[i])) { 193 overwrite = MessageHandler 194 .yesNoQuestion("OK to overwrite " 195 + targePaths[i] + "?"); 196 if (!overwrite) { 197 throw new IllegalActionException(this, 198 "Please select another file name."); 199 } 200 } 201 } 202 } 203 204 for (int i = 0; i < targePaths.length; i++) { 205 if (_fs.exists(targePaths[i])) { 206 _fs.delete(targePaths[i], true); 207 } 208 } 209 _fs.copyFromLocalFile(false, true, localPaths, targetPath); 210 _fs.close(); 211 url.broadcast(new StringToken(_targetHDFSPath)); 212 } catch (IOException e) { 213 e.printStackTrace(); 214 throw new IllegalActionException(e.getMessage()); 215 } 216 return super.postfire(); 217 } 218 219 /** 220 * Get default value of HDFS Configure. 221 * 222 * @exception IllegalActionException 223 * If there is an error reading the alwaysFlush parameter. 224 */ 225 @Override 226 public void preinitialize() throws IllegalActionException { 227 super.preinitialize(); 228 _configDirStr = configDir.stringValue(); 229 if (_configDirStr.trim().isEmpty()) { 230 // set the default location of the config directory 231 String workflowDirStr = System.getProperty("hadoop.workflowdir"); 232 if (workflowDirStr == null) { 233 throw new InternalErrorException( 234 "System property hadoop.workflowdir not set."); 235 } 236 _configDirStr = workflowDirStr + File.separator + "tools" 237 + File.separator + "etc" + File.separator + "hadoop"; 238 } 239 _conf = new Configuration(); 240 _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE)); 241 _hdfsURL = _conf.get(FS_NAME); 242 } 243 244 /** 245 * Close the writer if there is one. 246 * 247 * @exception IllegalActionException 248 * If an IO error occurs. 249 */ 250 @Override 251 public void wrapup() throws IllegalActionException { 252 try { 253 if (_fs != null) 254 _fs.close(); 255 } catch (IOException e) { 256 e.printStackTrace(); 257 throw new IllegalActionException(e.getMessage()); 258 } 259 } 260 261 // ///////////////////////////////////////////////////////////////// 262 // // public members //// 263 264 public static final String CORE_SITE = "core-site.xml"; 265 public static final String FS_NAME = "fs.defaultFS"; 266 267 // ///////////////////////////////////////////////////////////////// 268 // // private members //// 269 270 /** FileSystem. */ 271 private FileSystem _fs; 272 273 private Configuration _conf; 274 275 private String _configDirStr; 276 277 private String _hdfsURL; 278 279 private String _targetHDFSPath; 280 281 private String _srcLocalPath; 282 283}