001/* An operator for file sinks.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-03 18:41:53 +0000 (Thu, 03 Sep 2015) $' 
008 * '$Revision: 33858 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.operator;
031
032import java.net.URI;
033
034import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
035import org.apache.spark.api.java.JavaPairRDD;
036import org.kepler.spark.util.SparkUtilities;
037
038/** A data sink that writes to files.
039 * 
040 *  @author Daniel Crawl
041 *  @version $Id: FileDataSink.java 33858 2015-09-03 18:41:53Z crawl $
042 */
043public class FileDataSink extends DataSink {
044
045    /** Create a new FileDataSink.
046     *  @param outputFormatClass the format class to write the file
047     *  @param path the output file path
048     *  @param name the name of the operator  
049     */
050    public FileDataSink(Class<? extends FileOutputFormat<?,?>> outputFormatClass, URI path, String name) {
051        super(1, outputFormatClass, name);
052        _path = path;        
053    }
054    
055    /** Execute the operator. */
056    @Override
057    public JavaPairRDD<Object, ?> execute() {
058        
059        // delete the output path if it exists.
060        SparkUtilities.deleteDirectoryOutput(_path, _configuration);
061        
062        _inputData[0].saveAsNewAPIHadoopFile(_path.toString(), Object.class, Object.class,
063                (Class<? extends FileOutputFormat>) _stub, _configuration);
064        return null;
065    }
066    
067    ///////////////////////////////////////////////////////////////////
068    ////                      private fields                     //////
069
070    /** The path of the output file. */
071    private URI _path;
072
073}