001/* HDFSWriter class to write token data into HDFS. 002 003/* 004 * Copyright (c) 2010-2014 The Regents of the University of California. 005 * All rights reserved. 006 * 007 * Permission is hereby granted, without written agreement and without 008 * license or royalty fees, to use, copy, modify, and distribute this 009 * software and its documentation for any purpose, provided that the above 010 * copyright notice and the following two paragraphs appear in all copies 011 * of this software. 012 * 013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 017 * SUCH DAMAGE. 018 * 019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 024 * ENHANCEMENTS, OR MODIFICATIONS. 025 * 026 */ 027 028package org.kepler.hadoop.actor; 029 030import java.io.BufferedWriter; 031import java.io.File; 032import java.io.IOException; 033import java.io.OutputStreamWriter; 034 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038 039import ptolemy.actor.TypedIOPort; 040import ptolemy.actor.lib.Sink; 041import ptolemy.actor.parameters.PortParameter; 042import ptolemy.data.BooleanToken; 043import ptolemy.data.StringToken; 044import ptolemy.data.Token; 045import ptolemy.data.expr.Parameter; 046import ptolemy.data.expr.SingletonParameter; 047import ptolemy.data.expr.StringParameter; 048import ptolemy.data.type.BaseType; 049import ptolemy.kernel.CompositeEntity; 050import ptolemy.kernel.util.Attribute; 051import ptolemy.kernel.util.IllegalActionException; 052import ptolemy.kernel.util.InternalErrorException; 053import ptolemy.kernel.util.NameDuplicationException; 054import ptolemy.kernel.util.Workspace; 055import ptolemy.util.MessageHandler; 056 057////////////////////////////////////////////////////////////////////////// 058////HDFSWriter 059 060/** 061 * This class write tokens to HDFS. 062 * 063 * @author Jianwu Wang (jianwu@sdsc.edu) 064 * @version $Id: HDFSWriter.java 33862 2015-09-04 16:53:05Z crawl $ 065 */ 066 067public class HDFSWriter extends Sink { 068 069 // ///////////////////////////////////////////////////////////////// 070 // // ports and parameters //// 071 072 /** 073 * If <i>true</i>, then append to the specified file. If <i>false</i> (the 074 * default), then overwrite any preexisting file after asking the user for 075 * permission. 076 */ 077 // disable append temporally because it always has exception. 078 // public Parameter append; 079 080 /** 081 * The file path to which to write. This is a string with any form accepted 082 * by PortParameter. 083 * 084 * @see PortParameter 085 */ 086 public PortParameter targetHDFSPath; 087 088 /** 089 * If <i>false</i>, then overwrite the specified file if it exists without 090 * asking. If <i>true</i> (the default), then if the file exists, ask for 091 * confirmation before overwriting. 092 */ 093 public Parameter confirmOverwrite; 094 095 /** DDP engine configuration directory. */ 096 public StringParameter configDir; 097 098 /** 099 * End of token to use. This is a string that defaults to null, which 100 * results in the current platform's standard end-of-line character being 101 * used. If an empty string is specified, then no end of token character is 102 * used after each output written to the file. 103 */ 104 public Parameter endOfTokenCharacter; 105 106 /** 107 * Written file URL 108 */ 109 public TypedIOPort url; 110 111 public HDFSWriter(CompositeEntity container, String name) 112 throws NameDuplicationException, IllegalActionException { 113 super(container, name); 114 input.setTypeEquals(BaseType.STRING); 115 new SingletonParameter(input, "_showName").setToken(BooleanToken.TRUE); 116 117 targetHDFSPath = new PortParameter(this, "targetHDFSPath"); 118 targetHDFSPath.setExpression("hdfs://localhost:9000/tmp.txt"); 119 targetHDFSPath.setStringMode(true); 120 121 confirmOverwrite = new Parameter(this, "confirmOverwrite"); 122 confirmOverwrite.setTypeEquals(BaseType.BOOLEAN); 123 confirmOverwrite.setToken(BooleanToken.TRUE); 124 125 endOfTokenCharacter = new Parameter(this, "endOfTokenCharacter"); 126 endOfTokenCharacter.setTypeEquals(BaseType.STRING); 127 128 configDir = new StringParameter(this, "configDir"); 129 130 url = new TypedIOPort(this, "url", false, true); 131 url.setTypeEquals(BaseType.STRING); 132 url.setMultiport(false); 133 134 _attachText("_iconDescription", "<svg>\n" 135 + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" " 136 + "style=\"fill:white\"/>\n" 137 + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10" 138 + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n" 139 + "</svg>\n"); 140 141 } 142 143 // ///////////////////////////////////////////////////////////////// 144 // // public methods //// 145 146 /** 147 * If the specified attribute is <i>fileName</i> and there is an open file 148 * being written, then close that file. The new file will be opened or 149 * created when it is next written to. 150 * 151 * @param attribute 152 * The attribute that has changed. 153 * @exception IllegalActionException 154 * If the specified attribute is <i>fileName</i> and the 155 * previously opened file cannot be closed. 156 */ 157 @Override 158 public void attributeChanged(Attribute attribute) 159 throws IllegalActionException { 160 if (attribute == targetHDFSPath) { 161 // Do not close the file if it is the same file. 162 String newFileName = ((StringToken) targetHDFSPath.getToken()) 163 .stringValue(); 164 165 if (_previousFileName != null 166 && !newFileName.equals(_previousFileName)) { 167 _previousFileName = newFileName; 168 _writer = null; 169 } 170 } else { 171 super.attributeChanged(attribute); 172 } 173 } 174 175 /** 176 * Clone the actor into the specified workspace. 177 * 178 * @param workspace 179 * The workspace for the new object. 180 * @return A new actor. 181 * @exception CloneNotSupportedException 182 * If a derived class contains an attribute that cannot be 183 * cloned. 184 */ 185 @Override 186 public Object clone(Workspace workspace) throws CloneNotSupportedException { 187 HDFSWriter newObject = (HDFSWriter) super.clone(workspace); 188 newObject._writer = null; 189 return newObject; 190 } 191 192 /** 193 * Read an input string token from each input channel and write it to the 194 * file, one line per token. If there is no input, do nothing. If the file 195 * is not open for writing then open it. If the file does not exist, then 196 * create it. If the file already exists, then query the user for overwrite, 197 * unless the <i>append</i> parameter has value <i>true</i>. 198 * 199 * @exception IllegalActionException 200 * If the file cannot be opened or created, or if the user 201 * refuses to overwrite an existing file. 202 */ 203 @Override 204 public boolean postfire() throws IllegalActionException { 205 targetHDFSPath.update(); 206 try { 207 FileSystem _fs = FileSystem.get(_conf); 208 Token token = null; 209 210 for (int i = 0; i < input.getWidth(); i++) { 211 if (input.hasToken(i)) { 212 token = input.get(i); 213 // File has not been opened. 214 // boolean appendValue = ((BooleanToken) append.getToken()) 215 // .booleanValue(); 216 217 _fileNameValue = targetHDFSPath.getExpression(); 218 if (!_fileNameValue.startsWith("hdfs")) 219 _fileNameValue = _hdfsURL + "/" + _fileNameValue; 220 Path filePath = new Path(_fileNameValue); 221 222 // If previousFileName is null, we have never opened a 223 // file. 224 if (_previousFileName == null) { 225 _previousFileName = _fileNameValue; 226 } 227 228 boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite 229 .getToken()).booleanValue(); 230 231 if (_fs.exists(filePath) && confirmOverwriteValue) { 232 // Query for overwrite. 233 if (!MessageHandler.yesNoQuestion("OK to overwrite " 234 + filePath + "?")) { 235 throw new IllegalActionException(this, 236 "Please select another file name."); 237 } 238 } 239 240 _writer = new BufferedWriter(new OutputStreamWriter( 241 _fs.create(filePath, true))); 242 243 } 244 String eol = "\n"; 245 Token eolToken = endOfTokenCharacter.getToken(); 246 if (eolToken != null) { 247 eol = ((StringToken) eolToken).stringValue(); 248 } 249 _writer.write(((StringToken) token).stringValue() + eol); 250 _writer.flush(); 251 _writer.close(); 252 _fs.close(); 253 url.broadcast(new StringToken(_fileNameValue)); 254 } 255 } catch (IOException e) { 256 e.printStackTrace(); 257 throw new IllegalActionException(e.getMessage()); 258 } 259 return super.postfire(); 260 } 261 262 /** 263 * Get default value of HDFS Configure. 264 * 265 * @exception IllegalActionException 266 * If there is an error reading the alwaysFlush parameter. 267 */ 268 @Override 269 public void preinitialize() throws IllegalActionException { 270 super.preinitialize(); 271 _configDirStr = configDir.stringValue(); 272 if (_configDirStr.trim().isEmpty()) { 273 // set the default location of the config directory 274 String workflowDirStr = System.getProperty("hadoop.workflowdir"); 275 if (workflowDirStr == null) { 276 throw new InternalErrorException( 277 "System property hadoop.workflowdir not set."); 278 } 279 _configDirStr = workflowDirStr + File.separator + "tools" 280 + File.separator + "etc" + File.separator + "hadoop"; 281 } 282 _conf = new Configuration(); 283 _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE)); 284 _hdfsURL = _conf.get(FS_NAME); 285 } 286 287 /** 288 * Close the writer if there is one. 289 * 290 * @exception IllegalActionException 291 * If an IO error occurs. 292 */ 293 @Override 294 public void wrapup() throws IllegalActionException { 295 try { 296 if (_writer != null) 297 _writer.close(); 298 if (_fs != null) 299 _fs.close(); 300 _writer = null; 301 } catch (IOException e) { 302 e.printStackTrace(); 303 throw new IllegalActionException(e.getMessage()); 304 } 305 } 306 307 // ///////////////////////////////////////////////////////////////// 308 // // public members //// 309 310 public static final String CORE_SITE = "core-site.xml"; 311 public static final String HDFS_NAME = "fs.default.name"; 312 public static final String FS_NAME = "fs.defaultFS"; 313 314 // ///////////////////////////////////////////////////////////////// 315 // // protected members //// 316 317 /** The current writer. */ 318 protected BufferedWriter _writer; 319 320 // ///////////////////////////////////////////////////////////////// 321 // // private members //// 322 323 /** Previous value of fileName parameter. */ 324 private String _previousFileName; 325 326 /** FileSystem. */ 327 private FileSystem _fs; 328 329 private Configuration _conf; 330 331 private String _configDirStr; 332 333 private String _hdfsURL; 334 335 private String _fileNameValue; 336 337}