001/* HDFSUnLoader class to copy HDFS data into local file system.
002
003/*
004 * Copyright (c) 2010-2014 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027
028package org.kepler.hadoop.actor;
029
030import java.io.File;
031import java.io.IOException;
032
033import org.apache.commons.io.FileUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037
038import ptolemy.actor.TypedAtomicActor;
039import ptolemy.actor.TypedIOPort;
040import ptolemy.actor.parameters.PortParameter;
041import ptolemy.data.BooleanToken;
042import ptolemy.data.StringToken;
043import ptolemy.data.expr.Parameter;
044import ptolemy.data.expr.StringParameter;
045import ptolemy.data.type.BaseType;
046import ptolemy.kernel.CompositeEntity;
047import ptolemy.kernel.util.IllegalActionException;
048import ptolemy.kernel.util.InternalErrorException;
049import ptolemy.kernel.util.NameDuplicationException;
050import ptolemy.kernel.util.Workspace;
051import ptolemy.util.MessageHandler;
052
053//////////////////////////////////////////////////////////////////////////
054////HDFSUnLoader
055
056/**
057 * This class copies HDFS data into local file system.
058 * 
059 * @author Jianwu Wang (jianwu@sdsc.edu)
060 * @version $Id: HDFSUnLoader.java 33862 2015-09-04 16:53:05Z crawl $
061 */
062
063public class HDFSUnLoader extends TypedAtomicActor {
064
065        // /////////////////////////////////////////////////////////////////
066        // // ports and parameters ////
067
068        /**
069         * The local file path to which to read. This is a string with any form
070         * accepted by PortParameter.
071         * 
072         * @see PortParameter
073         */
074        public PortParameter sourceHDFSPath;
075
076        /**
077         * The target file path on HDFS to which to write. This is a string with any
078         * form accepted by PortParameter.
079         * 
080         * @see PortParameter
081         */
082        public PortParameter targetLocalDir;
083
084        /**
085         * If <i>false</i>, then overwrite the specified file if it exists without
086         * asking. If <i>true</i> (the default), then if the file exists, ask for
087         * confirmation before overwriting.
088         */
089        public Parameter confirmOverwrite;
090
091        /** DDP engine configuration directory. */
092        public StringParameter configDir;
093
094        /**
095         * Written file URL
096         */
097        public TypedIOPort url;
098
099        public HDFSUnLoader(CompositeEntity container, String name)
100                        throws NameDuplicationException, IllegalActionException {
101                super(container, name);
102                sourceHDFSPath = new PortParameter(this, "sourceHDFSPath");
103                sourceHDFSPath.setExpression("hdfs://localhost:9000/");
104                sourceHDFSPath.setStringMode(true);
105
106                targetLocalDir = new PortParameter(this, "targetLocalDir");
107                targetLocalDir.setExpression("/home/ubuntu/");
108                targetLocalDir.setStringMode(true);
109
110                confirmOverwrite = new Parameter(this, "confirmOverwrite");
111                confirmOverwrite.setTypeEquals(BaseType.BOOLEAN);
112                confirmOverwrite.setToken(BooleanToken.TRUE);
113
114                configDir = new StringParameter(this, "configDir");
115
116                url = new TypedIOPort(this, "url", false, true);
117                url.setTypeEquals(BaseType.STRING);
118                url.setMultiport(false);
119
120                _attachText("_iconDescription", "<svg>\n"
121                                + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" "
122                                + "style=\"fill:white\"/>\n"
123                                + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10"
124                                + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n"
125                                + "</svg>\n");
126
127        }
128
129        // /////////////////////////////////////////////////////////////////
130        // // public methods ////
131
132        /**
133         * Clone the actor into the specified workspace.
134         * 
135         * @param workspace
136         *            The workspace for the new object.
137         * @return A new actor.
138         * @exception CloneNotSupportedException
139         *                If a derived class contains an attribute that cannot be
140         *                cloned.
141         */
142        @Override
143    public Object clone(Workspace workspace) throws CloneNotSupportedException {
144                HDFSUnLoader newObject = (HDFSUnLoader) super.clone(workspace);
145                return newObject;
146        }
147
148        /**
149         * Load HDFS files from input port to local file system. 
150         * If the file does not exist, then create it. 
151         * If the file already exists, then query the user for overwrite,
152         * 
153         * @exception IllegalActionException
154         *                If the file cannot be opened or created, or if the user
155         *                refuses to overwrite an existing file.
156         */
157        @Override
158    public boolean postfire() throws IllegalActionException {
159                sourceHDFSPath.update();
160                targetLocalDir.update();
161                try {
162                        FileSystem _fs = FileSystem.get(_conf);
163
164                        _srcPath = ((StringToken) sourceHDFSPath.getToken()).stringValue();
165
166                        _targetDir = ((StringToken) targetLocalDir.getToken()).stringValue();
167                        if (!_srcPath.startsWith("hdfs"))
168                                _srcPath = _hdfsURL + "/" + _srcPath;
169
170                        // find file or folder name to be copied.
171                        String srcName = _srcPath;
172                        if (_srcPath.endsWith("/"))
173                                srcName = _srcPath.substring(0, _srcPath.length() - 1);
174                        if (srcName.lastIndexOf("/") == srcName.lastIndexOf("//") + 1) 
175                                // The only '/' is '//', which means src file/folder is empty.
176                                srcName = "";
177                        else
178                                srcName = srcName.substring(srcName.lastIndexOf("/"),
179                                                srcName.length());
180                        String targetFilePath = _targetDir + srcName;
181
182                        Path targetDirPath = new Path(_targetDir);
183
184                        boolean overwrite = false;
185
186                        boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite
187                                        .getToken()).booleanValue();
188                        if(new File(targetFilePath).exists()){
189                                if (confirmOverwriteValue) {
190                                        // Query for overwrite.
191                                        overwrite = MessageHandler.yesNoQuestion("OK to overwrite "
192                                                        + targetFilePath + "?");
193                                        if (!overwrite) {
194                                                throw new IllegalActionException(this,
195                                                                "Please select another file name.");
196                                        }
197                                }
198                                FileUtils.forceDelete(new File(targetFilePath));
199                        }
200
201                        _fs.copyToLocalFile(new Path(_srcPath), targetDirPath);
202                        _fs.close();
203                        url.broadcast(new StringToken(_targetDir));
204                } catch (IOException e) {
205                        e.printStackTrace();
206                        throw new IllegalActionException(e.getMessage());
207                }
208                return super.postfire();
209        }
210
211        /**
212         * Get default value of HDFS Configure.
213         * 
214         * @exception IllegalActionException
215         *                If there is an error reading the alwaysFlush parameter.
216         */
217        @Override
218    public void preinitialize() throws IllegalActionException {
219                super.preinitialize();
220                _configDirStr = configDir.stringValue();
221                if (_configDirStr.trim().isEmpty()) {
222                        // set the default location of the config directory
223                        String workflowDirStr = System.getProperty("hadoop.workflowdir");
224                        if (workflowDirStr == null) {
225                                throw new InternalErrorException(
226                                                "System property hadoop.workflowdir not set.");
227                        }
228                        _configDirStr = workflowDirStr + File.separator + "tools"
229                                        + File.separator + "etc" + File.separator + "hadoop";
230                }
231                _conf = new Configuration();
232                _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE));
233                _hdfsURL = _conf.get(FS_NAME);
234        }
235
236        /**
237         * Close the writer if there is one.
238         * 
239         * @exception IllegalActionException
240         *                If an IO error occurs.
241         */
242        @Override
243    public void wrapup() throws IllegalActionException {
244                try {
245                        if (_fs != null)
246                                _fs.close();
247                } catch (IOException e) {
248                        e.printStackTrace();
249                        throw new IllegalActionException(e.getMessage());
250                }
251        }
252
253        // /////////////////////////////////////////////////////////////////
254        // // public members ////
255
256        public static final String CORE_SITE = "core-site.xml";
257        public static final String FS_NAME = "fs.defaultFS";
258
259        // /////////////////////////////////////////////////////////////////
260        // // private members ////
261
262        /** FileSystem. */
263        private FileSystem _fs;
264
265        private Configuration _conf;
266
267        private String _configDirStr;
268
269        private String _hdfsURL;
270
271        private String _targetDir;
272
273        private String _srcPath;
274
275}