001/* A Spark job that can be cancelled.
002 * 
003 * Copyright (c) 2014 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2014-06-28 16:49:47 +0000 (Sat, 28 Jun 2014) $' 
008 * '$Revision: 32789 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.spark.director;
031
032import java.util.HashSet;
033import java.util.Set;
034
035import org.apache.spark.api.java.JavaSparkContext;
036import org.kepler.spark.operator.DataSink;
037
038/** A Spark job that can be cancelled.
039 * 
040 * @author Daniel Crawl
041 * @version $Id: SparkJob.java 32789 2014-06-28 16:49:47Z crawl $
042 * 
043 */
044public class SparkJob {
045
046    /** Create a new job.
047     *  @param operator sink operator
048     *  @param name job name
049     */ 
050    public SparkJob(JavaSparkContext context, DataSink operator, String name) {
051        _context = context;
052        addDataSink(operator);
053    }
054
055    /** Start the job. */
056    public void start() {
057        
058        _jobThread = new JobThread(_sinks);
059        _jobThread.start();
060    }
061
062    public void stop() {
063        _context.cancelAllJobs();
064    }
065
066    /** Wait until the job in complete. */
067    public void waitForFinish() throws Exception {
068        
069        // wait for the job thread to finish
070        _jobThread.join();
071        
072        // see if there was an error
073        Exception error = _jobThread.getException();
074        if(error != null) {
075            String message = error.getMessage();
076            if(message == null ||
077                    (!message.matches("Job \\d+ cancelled") &&
078                    !message.matches("Job \\d+ cancelled as part of cancellation of all jobs"))) {
079                throw error;
080            }
081        }   
082    }
083
084    /** Add a sink operator to the job. */
085    public void addDataSink(DataSink operator) {
086        _sinks.add(operator);        
087    }
088    
089    ///////////////////////////////////////////////////////////////////
090    ////                      private classes                    //////
091
092    /** A class that submits the job to Spark and waits for completion. */
093    private static class JobThread extends Thread {
094
095        /** Create a new JobThread with the client. */
096        public JobThread(Set<DataSink> sinks) {
097            _sinks = sinks;
098        }
099        
100        /** If there was an exception running the job, returns the exception.
101         *  Otherwise, returns null.
102         */
103        public Exception getException() {
104            return _exception;
105        }
106
107        /** Submit the job and wait until completion. */
108        @Override
109        public void run() {
110            for(DataSink sink : _sinks) {
111                try {
112                    sink.writeOutput();
113                } catch(Exception e) {
114                    _exception = e;
115                    break;
116                }
117            }
118        }
119        
120        private Set<DataSink> _sinks;
121        private Exception _exception;
122    }
123
124    ///////////////////////////////////////////////////////////////////
125    ////                      private fields                     //////
126
127    /** The thread running the job. */
128    private JobThread _jobThread; 
129
130    /** The file sinks. */
131    private Set<DataSink> _sinks = new HashSet<DataSink>();
132
133    private JavaSparkContext _context;
134}