001/* Utilities for Spark.
002 * 
003 * Copyright (c) 2015 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-03 18:41:53 +0000 (Thu, 03 Sep 2015) $' 
008 * '$Revision: 33858 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.util;
031
032import java.io.File;
033import java.io.IOException;
034import java.net.URI;
035
036import org.apache.commons.io.FileUtils;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.fs.FileStatus;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.Path;
041import org.apache.hadoop.fs.PathFilter;
042
043import ptolemy.util.MessageHandler;
044
045/** Utilities for Spark.
046 * 
047 *  @author Daniel Crawl
048 *  @version $Id: SparkUtilities.java 33858 2015-09-03 18:41:53Z crawl $
049 */
050
051public class SparkUtilities {
052
053    /** This class cannot be instantiated. */
054    private SparkUtilities() { }
055
056    /** Delete an output directory. This method removes files like
057     *  "SUCCESS", "part-r-0000", etc., that are created as output from
058     *  a Spark job. 
059     */
060    public static void deleteDirectoryOutput(URI path, Configuration configuration) {
061        try {
062            Path outputFile = new Path(path);
063            try(FileSystem fileSystem = outputFile.getFileSystem(configuration);) {
064                if(fileSystem.exists(outputFile)) {
065                    if(fileSystem.isDirectory(outputFile)) {
066                        FileStatus[] toDelete = fileSystem.listStatus(outputFile, new PathFilter() {
067                            @Override
068                            public boolean accept(Path pathname) {
069                                String name = pathname.getName();
070                                return name.contains("SUCCESS") ||
071                                        name.contains("crc") ||
072                                        name.contains("metadata") ||
073                                        name.contains("part");
074                            }
075                        });
076                        for(FileStatus f : toDelete) {
077                            if(!fileSystem.delete(f.getPath(), false)) {
078                                System.err.println("WARNING: could not delete " + f);
079                            }
080                        }
081                        FileUtils.deleteDirectory(new File(new File(path), "_temporary"));
082                        //fileSystem.delete(new Path(outputFile, "_temporary" + File.separator + "0"), false);
083                        //fileSystem.delete(new Path(outputFile, "_temporary"), false);
084                    }
085                    if(!fileSystem.delete(outputFile, false)) {
086                        System.err.println("WARNING: could not delete " + outputFile);
087                    }
088                }
089            }
090        } catch(IOException e) {
091            MessageHandler.error("Error deleting existing output.", e);
092        }        
093    }
094}