001/* HDFSLoader class to load data from local file system to HDFS.
002
003/*
004 * Copyright (c) 2010-2014 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027
028package org.kepler.hadoop.actor;
029
030import java.io.File;
031import java.io.IOException;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036
037import ptolemy.actor.TypedAtomicActor;
038import ptolemy.actor.TypedIOPort;
039import ptolemy.actor.parameters.PortParameter;
040import ptolemy.data.BooleanToken;
041import ptolemy.data.StringToken;
042import ptolemy.data.expr.Parameter;
043import ptolemy.data.expr.StringParameter;
044import ptolemy.data.type.BaseType;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.InternalErrorException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.Workspace;
050import ptolemy.util.MessageHandler;
051
052//////////////////////////////////////////////////////////////////////////
053////HDFSLoader
054
055/**
056 * This class load files/directories to HDFS.
057 * 
058 * @author Jianwu Wang (jianwu@sdsc.edu)
059 * @version $Id: HDFSLoader.java 33862 2015-09-04 16:53:05Z crawl $
060 */
061
062public class HDFSLoader extends TypedAtomicActor {
063
064        // /////////////////////////////////////////////////////////////////
065        // // ports and parameters ////
066
067        /**
068         * The local file path to which to read. This is a string with any form
069         * accepted by PortParameter.
070         * 
071         * @see PortParameter
072         */
073        public PortParameter sourceLocalPaths;
074
075        /**
076         * The target file path on HDFS to which to write. This is a string with any
077         * form accepted by PortParameter.
078         * 
079         * @see PortParameter
080         */
081        public PortParameter targetHDFSPath;
082
083        /**
084         * If <i>false</i>, then overwrite the specified file if it exists without
085         * asking. If <i>true</i> (the default), then if the file exists, ask for
086         * confirmation before overwriting.
087         */
088        public Parameter confirmOverwrite;
089
090        /** DDP engine configuration directory. */
091        public StringParameter configDir;
092
093        /**
094         * Written file URL
095         */
096        public TypedIOPort url;
097
098        public HDFSLoader(CompositeEntity container, String name)
099                        throws NameDuplicationException, IllegalActionException {
100                super(container, name);
101                sourceLocalPaths = new PortParameter(this, "sourceLocalPaths");
102                sourceLocalPaths.setExpression("/home/ubuntu/a.txt");
103                sourceLocalPaths.setStringMode(true);
104
105                targetHDFSPath = new PortParameter(this, "targetHDFSPath");
106                targetHDFSPath.setExpression("hdfs://localhost:9000/");
107                targetHDFSPath.setStringMode(true);
108
109                confirmOverwrite = new Parameter(this, "confirmOverwrite");
110                confirmOverwrite.setTypeEquals(BaseType.BOOLEAN);
111                confirmOverwrite.setToken(BooleanToken.TRUE);
112
113                configDir = new StringParameter(this, "configDir");
114
115                url = new TypedIOPort(this, "url", false, true);
116                url.setTypeEquals(BaseType.STRING);
117                url.setMultiport(false);
118
119                _attachText("_iconDescription", "<svg>\n"
120                                + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" "
121                                + "style=\"fill:white\"/>\n"
122                                + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10"
123                                + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n"
124                                + "</svg>\n");
125
126        }
127
128        // /////////////////////////////////////////////////////////////////
129        // // public methods ////
130
131        /**
132         * Clone the actor into the specified workspace.
133         * 
134         * @param workspace
135         *            The workspace for the new object.
136         * @return A new actor.
137         * @exception CloneNotSupportedException
138         *                If a derived class contains an attribute that cannot be
139         *                cloned.
140         */
141        @Override
142    public Object clone(Workspace workspace) throws CloneNotSupportedException {
143                HDFSLoader newObject = (HDFSLoader) super.clone(workspace);
144                return newObject;
145        }
146
147        /**
148         * Load local files from input port to HDFS. If the file does not exist, then
149         * create it. If the file already exists, then query the user for overwrite.
150         * 
151         * @exception IllegalActionException
152         *                If the file cannot be opened or created, or if the user
153         *                refuses to overwrite an existing file.
154         */
155        @Override
156    public boolean postfire() throws IllegalActionException {
157                sourceLocalPaths.update();
158                targetHDFSPath.update();
159                try {
160                        FileSystem _fs = FileSystem.get(_conf);
161
162                        _targetHDFSPath = ((StringToken) targetHDFSPath.getToken())
163                                        .stringValue();
164                        if (!_targetHDFSPath.startsWith("hdfs"))
165                                _targetHDFSPath = _hdfsURL + "/" + _targetHDFSPath;
166                        Path targetPath = new Path(_targetHDFSPath);
167
168                        _srcLocalPath = ((StringToken) sourceLocalPaths.getToken())
169                                        .stringValue();
170                        String[] localPathStrs = _srcLocalPath.split(";");
171                        Path[] targePaths = new Path[localPathStrs.length];
172                        Path[] localPaths = new Path[localPathStrs.length];
173                        String srcName;
174                        for (int i = 0; i < localPathStrs.length; i++) {
175                                localPaths[i] = new Path(localPathStrs[i]);
176                                // find file or folder name to be copied.
177                                srcName = localPathStrs[i];
178                                if (localPathStrs[i].endsWith(File.separator))
179                                        srcName = localPathStrs[i].substring(0,
180                                                        localPathStrs[i].length() - 1);
181                                srcName = srcName.substring(
182                                                srcName.lastIndexOf(File.separator), srcName.length());
183                                targePaths[i] = new Path(_targetHDFSPath + srcName);
184                        }
185
186                        boolean confirmOverwriteValue = ((BooleanToken) confirmOverwrite
187                                        .getToken()).booleanValue();
188                        boolean overwrite = false;
189
190                        if (confirmOverwriteValue) {
191                                for (int i = 0; i < targePaths.length; i++) {
192                                        if (_fs.exists(targePaths[i])) {
193                                                overwrite = MessageHandler
194                                                                .yesNoQuestion("OK to overwrite "
195                                                                                + targePaths[i] + "?");
196                                                if (!overwrite) {
197                                                        throw new IllegalActionException(this,
198                                                                        "Please select another file name.");
199                                                }
200                                        }
201                                }
202                        }
203
204                        for (int i = 0; i < targePaths.length; i++) {
205                                if (_fs.exists(targePaths[i])) {
206                                        _fs.delete(targePaths[i], true);
207                                }
208                        }
209                        _fs.copyFromLocalFile(false, true, localPaths, targetPath);
210                        _fs.close();
211                        url.broadcast(new StringToken(_targetHDFSPath));
212                } catch (IOException e) {
213                        e.printStackTrace();
214                        throw new IllegalActionException(e.getMessage());
215                }
216                return super.postfire();
217        }
218
219        /**
220         * Get default value of HDFS Configure.
221         * 
222         * @exception IllegalActionException
223         *                If there is an error reading the alwaysFlush parameter.
224         */
225        @Override
226    public void preinitialize() throws IllegalActionException {
227                super.preinitialize();
228                _configDirStr = configDir.stringValue();
229                if (_configDirStr.trim().isEmpty()) {
230                        // set the default location of the config directory
231                        String workflowDirStr = System.getProperty("hadoop.workflowdir");
232                        if (workflowDirStr == null) {
233                                throw new InternalErrorException(
234                                                "System property hadoop.workflowdir not set.");
235                        }
236                        _configDirStr = workflowDirStr + File.separator + "tools"
237                                        + File.separator + "etc" + File.separator + "hadoop";
238                }
239                _conf = new Configuration();
240                _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE));
241                _hdfsURL = _conf.get(FS_NAME);
242        }
243
244        /**
245         * Close the writer if there is one.
246         * 
247         * @exception IllegalActionException
248         *                If an IO error occurs.
249         */
250        @Override
251    public void wrapup() throws IllegalActionException {
252                try {
253                        if (_fs != null)
254                                _fs.close();
255                } catch (IOException e) {
256                        e.printStackTrace();
257                        throw new IllegalActionException(e.getMessage());
258                }
259        }
260
261        // /////////////////////////////////////////////////////////////////
262        // // public members ////
263
264        public static final String CORE_SITE = "core-site.xml";
265        public static final String FS_NAME = "fs.defaultFS";
266
267        // /////////////////////////////////////////////////////////////////
268        // // private members ////
269
270        /** FileSystem. */
271        private FileSystem _fs;
272
273        private Configuration _conf;
274
275        private String _configDirStr;
276
277        private String _hdfsURL;
278
279        private String _targetHDFSPath;
280
281        private String _srcLocalPath;
282
283}