001/* 002 * Copyright (c) 2004-2012 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2012-07-27 11:35:29 -0700 (Fri, 27 Jul 2012) $' 007 * '$Revision: 30295 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.job; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * Support class for Load Sharing Facility (or simply LSF) job manager support Class Job uses the 037 * methods of a supporter class to submit jobs and check status 038 * @author Jianwu Wang 039 * @version $Id: JobSupportlsf.java 30523 2012-08-24 23:13:55Z jianwu $ 040 */ 041public class JobSupportLSF implements JobSupport { 042 043 private static final Log log = LogFactory.getLog(JobSupportLSF.class 044 .getName()); 045 private static final boolean isDebugging = log.isDebugEnabled(); 046 047 public JobSupportLSF() { 048 } 049 050 public void init(String nccsBinPath) { 051 if (nccsBinPath != null && !nccsBinPath.trim().equals("")) { 052 String binPath = new String(nccsBinPath); 053 if (!nccsBinPath.endsWith("/")) 054 binPath += "/"; 055 _lsfSubmitCmd = binPath + _lsfSubmitCmd; 056 _lsfStatusCmd = binPath + _lsfStatusCmd; 057 _lsfDeleteCmd = binPath + _lsfDeleteCmd; 058 } 059 } 060 061 /** 062 * Create a submission file for the specific job manager, based on the 063 * information available in Job: - executable name - input files - output 064 * files - arguments for the job 065 */ 066 public boolean createSubmitFile(String filename, Job job) { 067 068 return false; 069 } 070 071 /** 072 * Submit command for lsf return: the command for submission 073 */ 074 public String getSubmitCmd(String submitFile, String options, Job job) throws JobException { 075 076 StringBuilder _commandStr = new StringBuilder(_lsfSubmitCmd); 077 078 Job[] dependentJobs = job.getDependentJobs(); 079 if(job.getDependentJobs() != null) { 080 _commandStr.append("-w 'ended("); 081 for(Job dependentJob : dependentJobs) { 082 _commandStr.append(dependentJob.status.jobID + ")&&ended("); 083 } 084 // deal with the last letters. 085 _commandStr.delete(_commandStr.length()-8, _commandStr.length()); 086 _commandStr.append("' "); 087 log.info("command string after adding dependency jobs:" + _commandStr); 088 } 089 090 if (options != null) { 091 _commandStr.append(" " + options); 092 } 093 094 _commandStr.append(" " + submitFile); 095 096 return _commandStr.toString(); 097 } 098 099 /** 100 * Parse output of submission and get information: jobID return String jobID 101 * on success throws JobException at failure (will contain the error stream 102 * or output stream) 103 */ 104 public String parseSubmitOutput(String output, String error) 105 throws JobException { 106 107 // System.out.println("====lsf parse: picking the jobid from output..."); 108 /* 109 * lsf bsub output: on success, it is: Your job 102368 ("lsf.cmd") has 110 * been submitted. on error, messages are printed on stderr, stdout is 111 * empty 112 */ 113 String jobID = null; 114 int idx = output.indexOf("\n"); 115 116 if (idx > -1) { 117 String firstrow = output.substring(0, idx); 118 if (firstrow.matches("Job <[0-9]*.*")) { 119 String s = firstrow.substring(5); 120 int toIdx = s.indexOf('>'); 121 jobID = s.substring(0, toIdx); 122 } 123 if (isDebugging) 124 log.debug("lsf parse: jobID = " + jobID + " firstrow = " 125 + firstrow); 126 } 127 128 if (jobID == null) { 129 if (error != null && error.length() > 0) 130 throw new JobException("Error at submission of lsf job: " 131 + error); 132 else 133 throw new JobException("Error at submission of lsf job: " 134 + output); 135 } 136 return jobID; 137 } // end-of-submit 138 139 /** 140 * Get the command to ask the status of the job return: the String of 141 * command 142 */ 143 public String getStatusCmd(String jobID) { 144 // simple 'qstat' which gives back list of all jobs! 145 // parseStatusOutput has to look for the given job 146 String _commandStr = _lsfStatusCmd + " " + jobID; 147 return _commandStr; 148 } 149 150 /** 151 * Parse output of status check command and get status info return: a 152 * JobStatusInfo object, or throws an JobException with the error output 153 */ 154 public JobStatusInfo parseStatusOutput(String jobID, int exitCode, 155 String output, String error) throws JobException { 156 157 // System.out.println("+++++ status: picking the status from output" ); 158 JobStatusInfo stat = new JobStatusInfo(); 159 stat.statusCode = JobStatusCode.NotInQueue; 160 161 boolean foundStatus = false; 162 163 String sa[] = output.split("\n"); 164 int idx; 165 for (int i = 0; i < sa.length; i++) { 166 // System.out.println("lsf status string " + i + " = "+ sa[i]); 167 if (sa[i].trim().startsWith(jobID)) { 168 String vals[] = sa[i].trim().split("( )+", 9); 169 if (vals.length >= 7) { 170 stat.jobID = vals[0].trim(); 171 String jobName = vals[6].trim(); 172 stat.owner = vals[1].trim(); 173 stat.submissionTime = vals[7].trim() + " " + vals[8].trim(); 174 String sts = vals[2].trim(); 175 //"DONE" means correctly finished, "EXIT" means exit with non-zero result. 176 if (sts.equals("RUN")) { 177 stat.statusCode = JobStatusCode.Running; 178 } else if (sts.equals("PEND")) { 179 stat.statusCode = JobStatusCode.Wait; 180 } else if (sts.equals("EXIT")) { 181 stat.statusCode = JobStatusCode.Error; 182 } 183 foundStatus = true; 184 if (isDebugging) 185 log 186 .debug("lsf status Values: jobid=" + stat.jobID 187 + " owner=" + stat.owner 188 + " submit/startTime=" 189 + stat.submissionTime + " status=[" 190 + sts + "]"); 191 } 192 } 193 } 194 // System.out.println("lsf status = " + stat.statusCode); 195 196 if (!foundStatus) { 197 if (error != null && error.length() > 0) { 198 log 199 .warn("Error string = [" + error + "] len=" 200 + error.length()); 201 stat.jobID = jobID; 202 stat.statusCode = JobStatusCode.Error; 203 } else { // not an error, just job is not in the job queue now 204 stat.jobID = jobID; 205 stat.statusCode = JobStatusCode.NotInQueue; 206 } 207 } 208 209 return stat; 210 } 211 212 /** 213 * Get the command to remove a job from queue (either running or waiting 214 * jobs). return: the String of command 215 */ 216 public String getDeleteCmd(String jobID) { 217 String _commandStr = _lsfDeleteCmd + jobID; 218 return _commandStr; 219 } 220 221 /** 222 * Parse output of delete command. return: true if stdout is empty and 223 * exitCode is 0, otherwise return false 224 */ 225 public boolean parseDeleteOutput(String jobID, int exitCode, String output, 226 String error) throws JobException { 227 if (exitCode == 0) 228 return true; 229 else 230 return false; 231 } 232 233 // //////////////////////////////////////////////////////////////////// 234 // // private variables //// 235 236 // The combined command to execute. 237 private String _lsfSubmitCmd = "bsub "; 238 private String _lsfStatusCmd = "bjobs "; 239 private String _lsfDeleteCmd = "bkill "; 240 241 public String getTaskStatusCmd(String jobID) throws NotSupportedException { 242 throw new NotSupportedException("Task parallel jobs are not supported"); 243 } 244 245 public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID, 246 int numTasks, int exitCode, String output, String error) 247 throws JobException, NotSupportedException { 248 throw new NotSupportedException("Task parallel jobs are not supported"); 249 } 250 251} // end-of-class-JobSupportLSF