001/* HDFSWriter class to write token data into HDFS.
002
003/*
004 * Copyright (c) 2010-2014 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027
028package org.kepler.hadoop.actor;
029
030import java.io.BufferedWriter;
031import java.io.File;
032import java.io.IOException;
033import java.io.OutputStreamWriter;
034
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038
039import ptolemy.actor.TypedIOPort;
040import ptolemy.actor.lib.Sink;
041import ptolemy.actor.parameters.PortParameter;
042import ptolemy.data.BooleanToken;
043import ptolemy.data.StringToken;
044import ptolemy.data.Token;
045import ptolemy.data.expr.Parameter;
046import ptolemy.data.expr.SingletonParameter;
047import ptolemy.data.expr.StringParameter;
048import ptolemy.data.type.BaseType;
049import ptolemy.kernel.CompositeEntity;
050import ptolemy.kernel.util.Attribute;
051import ptolemy.kernel.util.IllegalActionException;
052import ptolemy.kernel.util.InternalErrorException;
053import ptolemy.kernel.util.NameDuplicationException;
054import ptolemy.kernel.util.Workspace;
055import ptolemy.util.MessageHandler;
056
057//////////////////////////////////////////////////////////////////////////
058////HDFSWriter
059
060/**
061 * This class write tokens to HDFS.
062 * 
063 * @author Jianwu Wang (jianwu@sdsc.edu)
064 * @version $Id: HDFSWriter.java 33862 2015-09-04 16:53:05Z crawl $
065 */
066
067public class HDFSWriter extends Sink {
068
069        // /////////////////////////////////////////////////////////////////
070        // // ports and parameters ////
071
072        /**
073         * If <i>true</i>, then append to the specified file. If <i>false</i> (the
074         * default), then overwrite any preexisting file after asking the user for
075         * permission.
076         */
077        // disable append temporally because it always has exception.
078        // public Parameter append;
079
080        /**
081         * The file path to which to write. This is a string with any form accepted
082         * by PortParameter.
083         * 
084         * @see PortParameter
085         */
086        public PortParameter targetHDFSPath;
087
088        /**
089         * If <i>false</i>, then overwrite the specified file if it exists without
090         * asking. If <i>true</i> (the default), then if the file exists, ask for
091         * confirmation before overwriting.
092         */
093        public Parameter confirmOverwrite;
094
095        /** DDP engine configuration directory. */
096        public StringParameter configDir;
097
098        /**
099         * End of token to use. This is a string that defaults to null, which
100         * results in the current platform's standard end-of-line character being
101         * used. If an empty string is specified, then no end of token character is
102         * used after each output written to the file.
103         */
104        public Parameter endOfTokenCharacter;
105
106        /**
107         * Written file URL
108         */
109        public TypedIOPort url;
110
111        public HDFSWriter(CompositeEntity container, String name)
112                        throws NameDuplicationException, IllegalActionException {
113                super(container, name);
114                input.setTypeEquals(BaseType.STRING);
115                new SingletonParameter(input, "_showName").setToken(BooleanToken.TRUE);
116
117                targetHDFSPath = new PortParameter(this, "targetHDFSPath");
118                targetHDFSPath.setExpression("hdfs://localhost:9000/tmp.txt");
119                targetHDFSPath.setStringMode(true);
120
121                confirmOverwrite = new Parameter(this, "confirmOverwrite");
122                confirmOverwrite.setTypeEquals(BaseType.BOOLEAN);
123                confirmOverwrite.setToken(BooleanToken.TRUE);
124
125                endOfTokenCharacter = new Parameter(this, "endOfTokenCharacter");
126                endOfTokenCharacter.setTypeEquals(BaseType.STRING);
127
128                configDir = new StringParameter(this, "configDir");
129
130                url = new TypedIOPort(this, "url", false, true);
131                url.setTypeEquals(BaseType.STRING);
132                url.setMultiport(false);
133
134                _attachText("_iconDescription", "<svg>\n"
135                                + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" "
136                                + "style=\"fill:white\"/>\n"
137                                + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10"
138                                + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n"
139                                + "</svg>\n");
140
141        }
142
143        // /////////////////////////////////////////////////////////////////
144        // // public methods ////
145
146        /**
147         * If the specified attribute is <i>fileName</i> and there is an open file
148         * being written, then close that file. The new file will be opened or
149         * created when it is next written to.
150         * 
151         * @param attribute
152         *            The attribute that has changed.
153         * @exception IllegalActionException
154         *                If the specified attribute is <i>fileName</i> and the
155         *                previously opened file cannot be closed.
156         */
157        @Override
158    public void attributeChanged(Attribute attribute)
159                        throws IllegalActionException {
160                if (attribute == targetHDFSPath) {
161                        // Do not close the file if it is the same file.
162                        String newFileName = ((StringToken) targetHDFSPath.getToken())
163                                        .stringValue();
164
165                        if (_previousFileName != null
166                                        && !newFileName.equals(_previousFileName)) {
167                                _previousFileName = newFileName;
168                                _writer = null;
169                        }
170                } else {
171                        super.attributeChanged(attribute);
172                }
173        }
174
175        /**
176         * Clone the actor into the specified workspace.
177         * 
178         * @param workspace
179         *            The workspace for the new object.
180         * @return A new actor.
181         * @exception CloneNotSupportedException
182         *                If a derived class contains an attribute that cannot be
183         *                cloned.
184         */
185        @Override
186    public Object clone(Workspace workspace) throws CloneNotSupportedException {
187                HDFSWriter newObject = (HDFSWriter) super.clone(workspace);
188                newObject._writer = null;
189                return newObject;
190        }
191
192        /**
193         * Read an input string token from each input channel and write it to the
194         * file, one line per token. If there is no input, do nothing. If the file
195         * is not open for writing then open it. If the file does not exist, then
196         * create it. If the file already exists, then query the user for overwrite,
197         * unless the <i>append</i> parameter has value <i>true</i>.
198         * 
199         * @exception IllegalActionException
200         *                If the file cannot be opened or created, or if the user
201         *                refuses to overwrite an existing file.
202         */
203        @Override
204    public boolean postfire() throws IllegalActionException {
205                targetHDFSPath.update();
206                try {
207                        FileSystem _fs = FileSystem.get(_conf);
208                        Token token = null;
209
210                        for (int i = 0; i < input.getWidth(); i++) {
211                                if (input.hasToken(i)) {
212                                        token = input.get(i);
213                                        // File has not been opened.
214                                        // boolean appendValue = ((BooleanToken) append.getToken())
215                                        // .booleanValue();
216
217                                        _fileNameValue = targetHDFSPath.getExpression();
218                                        if (!_fileNameValue.startsWith("hdfs"))
219                                                _fileNameValue = _hdfsURL + "/" + _fileNameValue;
220                                        Path filePath = new Path(_fileNameValue);
221
222                                        // If previousFileName is null, we have never opened a
223                                        // file.
224                                        if (_previousFileName == null) {
225                                                _previousFileName = _fileNameValue;
226                                        }
227
228                                        boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite
229                                                        .getToken()).booleanValue();
230
231                                        if (_fs.exists(filePath) && confirmOverwriteValue) {
232                                                // Query for overwrite.
233                                                if (!MessageHandler.yesNoQuestion("OK to overwrite "
234                                                                + filePath + "?")) {
235                                                        throw new IllegalActionException(this,
236                                                                        "Please select another file name.");
237                                                }
238                                        }
239                                        
240                                        _writer = new BufferedWriter(new OutputStreamWriter(
241                                                        _fs.create(filePath, true)));
242
243                                }
244                                String eol = "\n";
245                                Token eolToken = endOfTokenCharacter.getToken();
246                                if (eolToken != null) {
247                                        eol = ((StringToken) eolToken).stringValue();
248                                }
249                                _writer.write(((StringToken) token).stringValue() + eol);
250                                _writer.flush();
251                                _writer.close();
252                                _fs.close();
253                                url.broadcast(new StringToken(_fileNameValue));
254                        }
255                } catch (IOException e) {
256                        e.printStackTrace();
257                        throw new IllegalActionException(e.getMessage());
258                }
259                return super.postfire();
260        }
261
262        /**
263         * Get default value of HDFS Configure.
264         * 
265         * @exception IllegalActionException
266         *                If there is an error reading the alwaysFlush parameter.
267         */
268        @Override
269    public void preinitialize() throws IllegalActionException {
270                super.preinitialize();
271                _configDirStr = configDir.stringValue();
272                if (_configDirStr.trim().isEmpty()) {
273                        // set the default location of the config directory
274                        String workflowDirStr = System.getProperty("hadoop.workflowdir");
275                        if (workflowDirStr == null) {
276                                throw new InternalErrorException(
277                                                "System property hadoop.workflowdir not set.");
278                        }
279                        _configDirStr = workflowDirStr + File.separator + "tools"
280                                        + File.separator + "etc" + File.separator + "hadoop";
281                }
282                _conf = new Configuration();
283                _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE));
284                _hdfsURL = _conf.get(FS_NAME);
285        }
286
287        /**
288         * Close the writer if there is one.
289         * 
290         * @exception IllegalActionException
291         *                If an IO error occurs.
292         */
293        @Override
294    public void wrapup() throws IllegalActionException {
295                try {
296                        if (_writer != null)
297                                _writer.close();
298                        if (_fs != null)
299                                _fs.close();
300                        _writer = null;
301                } catch (IOException e) {
302                        e.printStackTrace();
303                        throw new IllegalActionException(e.getMessage());
304                }
305        }
306
307        // /////////////////////////////////////////////////////////////////
308        // // public members ////
309
310        public static final String CORE_SITE = "core-site.xml";
311        public static final String HDFS_NAME = "fs.default.name";
312        public static final String FS_NAME = "fs.defaultFS";
313
314        // /////////////////////////////////////////////////////////////////
315        // // protected members ////
316
317        /** The current writer. */
318        protected BufferedWriter _writer;
319
320        // /////////////////////////////////////////////////////////////////
321        // // private members ////
322
323        /** Previous value of fileName parameter. */
324        private String _previousFileName;
325
326        /** FileSystem. */
327        private FileSystem _fs;
328
329        private Configuration _conf;
330
331        private String _configDirStr;
332
333        private String _hdfsURL;
334
335        private String _fileNameValue;
336
337}