001/* A Spark job that can be cancelled. 002 * 003 * Copyright (c) 2014 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2014-06-28 16:49:47 +0000 (Sat, 28 Jun 2014) $' 008 * '$Revision: 32789 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.spark.director; 031 032import java.util.HashSet; 033import java.util.Set; 034 035import org.apache.spark.api.java.JavaSparkContext; 036import org.kepler.spark.operator.DataSink; 037 038/** A Spark job that can be cancelled. 039 * 040 * @author Daniel Crawl 041 * @version $Id: SparkJob.java 32789 2014-06-28 16:49:47Z crawl $ 042 * 043 */ 044public class SparkJob { 045 046 /** Create a new job. 047 * @param operator sink operator 048 * @param name job name 049 */ 050 public SparkJob(JavaSparkContext context, DataSink operator, String name) { 051 _context = context; 052 addDataSink(operator); 053 } 054 055 /** Start the job. */ 056 public void start() { 057 058 _jobThread = new JobThread(_sinks); 059 _jobThread.start(); 060 } 061 062 public void stop() { 063 _context.cancelAllJobs(); 064 } 065 066 /** Wait until the job in complete. */ 067 public void waitForFinish() throws Exception { 068 069 // wait for the job thread to finish 070 _jobThread.join(); 071 072 // see if there was an error 073 Exception error = _jobThread.getException(); 074 if(error != null) { 075 String message = error.getMessage(); 076 if(message == null || 077 (!message.matches("Job \\d+ cancelled") && 078 !message.matches("Job \\d+ cancelled as part of cancellation of all jobs"))) { 079 throw error; 080 } 081 } 082 } 083 084 /** Add a sink operator to the job. */ 085 public void addDataSink(DataSink operator) { 086 _sinks.add(operator); 087 } 088 089 /////////////////////////////////////////////////////////////////// 090 //// private classes ////// 091 092 /** A class that submits the job to Spark and waits for completion. */ 093 private static class JobThread extends Thread { 094 095 /** Create a new JobThread with the client. */ 096 public JobThread(Set<DataSink> sinks) { 097 _sinks = sinks; 098 } 099 100 /** If there was an exception running the job, returns the exception. 101 * Otherwise, returns null. 102 */ 103 public Exception getException() { 104 return _exception; 105 } 106 107 /** Submit the job and wait until completion. */ 108 @Override 109 public void run() { 110 for(DataSink sink : _sinks) { 111 try { 112 sink.writeOutput(); 113 } catch(Exception e) { 114 _exception = e; 115 break; 116 } 117 } 118 } 119 120 private Set<DataSink> _sinks; 121 private Exception _exception; 122 } 123 124 /////////////////////////////////////////////////////////////////// 125 //// private fields ////// 126 127 /** The thread running the job. */ 128 private JobThread _jobThread; 129 130 /** The file sinks. */ 131 private Set<DataSink> _sinks = new HashSet<DataSink>(); 132 133 private JavaSparkContext _context; 134}