001/* HDFSRemover class to remove files/directories at HDFS.
002
003/*
004 * Copyright (c) 2010-2014 The Regents of the University of California.
005 * All rights reserved.
006 *
007 * Permission is hereby granted, without written agreement and without
008 * license or royalty fees, to use, copy, modify, and distribute this
009 * software and its documentation for any purpose, provided that the above
010 * copyright notice and the following two paragraphs appear in all copies
011 * of this software.
012 *
013 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
014 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
015 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
016 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
017 * SUCH DAMAGE.
018 *
019 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
020 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
021 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
022 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
023 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
024 * ENHANCEMENTS, OR MODIFICATIONS.
025 *
026 */
027
028package org.kepler.hadoop.actor;
029
030import java.io.File;
031import java.io.IOException;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036
037import ptolemy.actor.TypedAtomicActor;
038import ptolemy.actor.TypedIOPort;
039import ptolemy.actor.parameters.PortParameter;
040import ptolemy.data.BooleanToken;
041import ptolemy.data.StringToken;
042import ptolemy.data.expr.Parameter;
043import ptolemy.data.expr.StringParameter;
044import ptolemy.data.type.BaseType;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.InternalErrorException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.Workspace;
050
051//////////////////////////////////////////////////////////////////////////
052////HDFSRemove
053
054/**
055 * This class remove files/directories at HDFS.
056 * 
057 * @author Jianwu Wang (jianwu@sdsc.edu)
058 * @version $Id: HDFSRemover.java 33075 2014-11-13 17:08:52Z jianwu $
059 */
060
061public class HDFSRemover extends TypedAtomicActor {
062
063        // /////////////////////////////////////////////////////////////////
064        // // ports and parameters ////
065
066        /**
067         * The full paths on HDFS that need to be deleted.
068         * 
069         * @see PortParameter
070         */
071        public PortParameter hdfsPaths;
072
073        /**
074         * If <i>false</i> and the folder has non-empty sub-folders, an exception will thrown. 
075         * If <i>true</i> (the default), then the folder will deleted recursively.
076         */
077        public Parameter recursive;
078
079        /** DDP engine configuration directory. */
080        public StringParameter configDir;
081
082        /**
083         * Whether the deletion is success or not. The value will be false if there is an error when deleting an input path. 
084         * If one input path doesn't exist, it won't affect this value. 
085         */
086        public TypedIOPort success;
087        
088        /**
089         * The full paths on HDFS that need to be deleted.
090         */
091        public TypedIOPort hdfsPathsOut;
092
093        public HDFSRemover(CompositeEntity container, String name)
094                        throws NameDuplicationException, IllegalActionException {
095                super(container, name);
096                hdfsPaths = new PortParameter(this, "hdfsPaths");
097                hdfsPaths.setExpression("hdfs://localhost:9000/a.txt");
098                hdfsPaths.setStringMode(true);
099
100                recursive = new Parameter(this, "recursive");
101                recursive.setTypeEquals(BaseType.BOOLEAN);
102                recursive.setToken(BooleanToken.TRUE);
103
104                configDir = new StringParameter(this, "configDir");
105
106                success = new TypedIOPort(this, "success", false, true);
107                success.setTypeEquals(BaseType.BOOLEAN);
108                success.setMultiport(false);
109                
110                hdfsPathsOut = new TypedIOPort(this, "hdfsPathsOut", false, true);
111                hdfsPathsOut.setTypeEquals(BaseType.STRING);
112                hdfsPathsOut.setMultiport(false);
113
114                _attachText("_iconDescription", "<svg>\n"
115                                + "<rect x=\"-25\" y=\"-20\" " + "width=\"50\" height=\"40\" "
116                                + "style=\"fill:white\"/>\n"
117                                + "<polygon points=\"-15,-10 -12,-10 -8,-14 -1,-14 3,-10"
118                                + " 15,-10 15,10, -15,10\" " + "style=\"fill:red\"/>\n"
119                                + "</svg>\n");
120
121        }
122
123        // /////////////////////////////////////////////////////////////////
124        // // public methods ////
125
126        /**
127         * Clone the actor into the specified workspace.
128         * 
129         * @param workspace
130         *            The workspace for the new object.
131         * @return A new actor.
132         * @exception CloneNotSupportedException
133         *                If a derived class contains an attribute that cannot be
134         *                cloned.
135         */
136        @Override
137    public Object clone(Workspace workspace) throws CloneNotSupportedException {
138                HDFSRemover newObject = (HDFSRemover) super.clone(workspace);
139                return newObject;
140        }
141
142        /**
143         * Parse input paths and delete them one by one.
144         * 
145         * @exception IllegalActionException
146         *                If the paths cannot be deleted, or the file system has 
147         *                exception during its initialization, path existence checking, closing.
148         */
149        @Override
150    public boolean postfire() throws IllegalActionException {
151                hdfsPaths.update();
152                boolean _success = true;
153                try {
154                        FileSystem _fs = FileSystem.get(_conf);
155                        _hdfsPaths = ((StringToken) hdfsPaths.getToken())
156                                        .stringValue();
157                        String[] hdfsPathStrs = _hdfsPaths.split(";");
158                        boolean bRecursive = ((BooleanToken) recursive
159                                        .getToken()).booleanValue();
160                        
161                        for (int i = 0; i < hdfsPathStrs.length; i++) {
162                                Path hdfsPath = new Path(hdfsPathStrs[i]);
163                                if (_fs.exists(hdfsPath) && !_fs.delete(hdfsPath, bRecursive))
164                                        _success = false;
165                        }
166                        _fs.close();
167                        success.broadcast(new BooleanToken(_success));
168                        hdfsPathsOut.broadcast(new StringToken(_hdfsPaths));
169                } catch (IOException e) {
170                        e.printStackTrace();
171                        throw new IllegalActionException(e.getMessage());
172                }
173                return super.postfire();
174        }
175
176        /**
177         * Get default value of HDFS Configure.
178         * 
179         * @exception IllegalActionException
180         *                If there is an error reading the alwaysFlush parameter.
181         */
182        @Override
183    public void preinitialize() throws IllegalActionException {
184                super.preinitialize();
185                _configDirStr = configDir.stringValue();
186                if (_configDirStr.trim().isEmpty()) {
187                        // set the default location of the config directory
188                        String workflowDirStr = System.getProperty("hadoop.workflowdir");
189                        if (workflowDirStr == null) {
190                                throw new InternalErrorException(
191                                                "System property hadoop.workflowdir not set.");
192                        }
193                        _configDirStr = workflowDirStr + File.separator + "tools"
194                                        + File.separator + "etc" + File.separator + "hadoop";
195                }
196                _conf = new Configuration();
197                _conf.addResource(new Path(_configDirStr + File.separator + CORE_SITE));
198        }
199
200        /**
201         * Close the file system if there is one.
202         * 
203         * @exception IllegalActionException
204         *                If an IO error occurs.
205         */
206        @Override
207    public void wrapup() throws IllegalActionException {
208                try {
209                        if (_fs != null)
210                                _fs.close();
211                } catch (IOException e) {
212                        e.printStackTrace();
213                        throw new IllegalActionException(e.getMessage());
214                }
215        }
216
217        // /////////////////////////////////////////////////////////////////
218        // // public members ////
219
220        public static final String CORE_SITE = "core-site.xml";
221
222        // /////////////////////////////////////////////////////////////////
223        // // private members ////
224
225        /** FileSystem. */
226        private FileSystem _fs;
227
228        private Configuration _conf;
229
230        private String _configDirStr;
231
232        private String _hdfsPaths;
233
234}