001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 007 * '$Revision: 30295 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.job; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * Support class for Sun Grid Engine job manager support Class Job uses the 037 * methods of a supporter class to submit jobs and check status 038 */ 039public class JobSupportSGE implements JobSupport { 040 041 private static final Log log = LogFactory.getLog(JobSupportSGE.class 042 .getName()); 043 private static final boolean isDebugging = log.isDebugEnabled(); 044 045 public JobSupportSGE() { 046 } 047 048 public void init(String nccsBinPath) { 049 if (nccsBinPath != null && !nccsBinPath.trim().equals("")) { 050 String binPath = new String(nccsBinPath); 051 if (!nccsBinPath.endsWith("/")) 052 binPath += "/"; 053 _sgeSubmitCmd = binPath + _sgeSubmitCmd; 054 _sgeStatusCmd = binPath + _sgeStatusCmd; 055 _sgeDeleteCmd = binPath + _sgeDeleteCmd; 056 } 057 } 058 059 /** 060 * Create a submission file for the specific job manager, based on the 061 * information available in Job: - executable name - input files - output 062 * files - arguments for the job 063 */ 064 public boolean createSubmitFile(String filename, Job job) { 065 066 return false; 067 } 068 069 /** 070 * Submit command for SGE return: the command for submission 071 */ 072 public String getSubmitCmd(String submitFile, String options, Job job) throws JobException { 073 074 StringBuilder _commandStr = new StringBuilder(_sgeSubmitCmd); 075 076 // see if there are any dependent jobs 077 Job[] dependentJobs = job.getDependentJobs(); 078 if(dependentJobs != null) { 079 _commandStr.append("-hold_jid "); 080 for(Job dependentJob : dependentJobs) { 081 _commandStr.append(dependentJob.status.jobID + ","); 082 } 083 // remove the trailing comma 084 _commandStr.deleteCharAt(_commandStr.length()-1); 085 } 086 087 if (options != null) { 088 _commandStr.append(" " + options); 089 } 090 091 _commandStr.append(" " + submitFile); 092 093 return _commandStr.toString(); 094 } 095 096 /** 097 * Parse output of submission and get information: jobID return String jobID 098 * on success throws JobException at failure (will contain the error stream 099 * or output stream) 100 */ 101 public String parseSubmitOutput(String output, String error) 102 throws JobException { 103 104 // System.out.println("====SGE parse: picking the jobid from output..."); 105 /* 106 * SGE qsub output: on success, it is: Your job 102368 ("sge.cmd") has 107 * been submitted. on error, messages are printed on stderr, stdout is 108 * empty 109 */ 110 String jobID = null; 111 int idx = output.indexOf("\n"); 112 113 if (idx > -1) { 114 String firstrow = output.substring(0, idx); 115 if (firstrow.matches("Your job [0-9]*.*")) { 116 String s = firstrow.substring(9); 117 int toIdx = s.indexOf(' '); 118 jobID = s.substring(0, toIdx); 119 } 120 if (isDebugging) 121 log.debug("SGE parse: jobID = " + jobID + " firstrow = " 122 + firstrow); 123 } 124 125 if (jobID == null) { 126 if (error != null && error.length() > 0) 127 throw new JobException("Error at submission of SGE job: " 128 + error); 129 else 130 throw new JobException("Error at submission of SGE job: " 131 + output); 132 } 133 return jobID; 134 } // end-of-submit 135 136 /** 137 * Get the command to ask the status of the job return: the String of 138 * command 139 */ 140 public String getStatusCmd(String jobID) { 141 // simple 'qstat' which gives back list of all jobs! 142 // parseStatusOutput has to look for the given job 143 String _commandStr = _sgeStatusCmd; 144 return _commandStr; 145 } 146 147 /** 148 * Parse output of status check command and get status info return: a 149 * JobStatusInfo object, or throws an JobException with the error output 150 */ 151 public JobStatusInfo parseStatusOutput(String jobID, int exitCode, 152 String output, String error) throws JobException { 153 154 // SGE's qsub gives back all jobs, one per line 155 // we have to look for the given jobID in the beginning of a line 156 // line format: jobid priority name user status date queue slots 157 158 // System.out.println("+++++ status: picking the status from output" ); 159 JobStatusInfo stat = new JobStatusInfo(); 160 stat.statusCode = JobStatusCode.NotInQueue; 161 162 boolean foundStatus = false; 163 164 String sa[] = output.split("\n"); 165 int idx; 166 for (int i = 0; i < sa.length; i++) { 167 // System.out.println("SGE status string " + i + " = "+ sa[i]); 168 if (sa[i].trim().startsWith(jobID)) { 169 String vals[] = sa[i].trim().split("( )+", 9); 170 if (vals.length >= 7) { 171 stat.jobID = vals[0].trim(); 172 String jobName = vals[2].trim(); 173 stat.owner = vals[3].trim(); 174 stat.submissionTime = vals[5].trim() + " " + vals[6].trim(); 175 String sts = vals[4].trim(); 176 switch (sts.charAt(0)) { 177 case 'r': // running 178 case 'R': // restarted 179 case 't': // transferred 180 case 'd': // deletion (under removal) 181 stat.statusCode = JobStatusCode.Running; 182 break; 183 case 's': // suspended 184 case 'S': // suspended 185 case 'w': // wait 186 case 'h': // hold 187 case 'T': // Threshold 188 stat.statusCode = JobStatusCode.Wait; 189 break; 190 default: 191 stat.statusCode = JobStatusCode.Wait; 192 } 193 foundStatus = true; 194 if (isDebugging) 195 log 196 .debug("SGE status Values: jobid=" + stat.jobID 197 + " owner=" + stat.owner 198 + " submit/startTime=" 199 + stat.submissionTime + " status=[" 200 + sts + "]"); 201 } 202 } 203 } 204 // System.out.println("SGE status = " + stat.statusCode); 205 206 if (!foundStatus) { 207 if (error != null && error.length() > 0) { 208 log 209 .warn("Error string = [" + error + "] len=" 210 + error.length()); 211 stat.jobID = jobID; 212 stat.statusCode = JobStatusCode.Error; 213 } else { // not an error, just job is not in the job queue now 214 stat.jobID = jobID; 215 stat.statusCode = JobStatusCode.NotInQueue; 216 } 217 } 218 219 return stat; 220 } 221 222 /** 223 * Get the command to remove a job from queue (either running or waiting 224 * jobs). return: the String of command 225 */ 226 public String getDeleteCmd(String jobID) { 227 String _commandStr = _sgeDeleteCmd + jobID; 228 return _commandStr; 229 } 230 231 /** 232 * Parse output of delete command. return: true if stdout is empty and 233 * exitCode is 0, otherwise return false 234 */ 235 public boolean parseDeleteOutput(String jobID, int exitCode, String output, 236 String error) throws JobException { 237 if (exitCode == 0 && output.length() == 0) 238 return true; 239 else 240 return false; 241 } 242 243 // //////////////////////////////////////////////////////////////////// 244 // // private variables //// 245 246 // The combined command to execute. 247 private String _sgeSubmitCmd = "qsub "; 248 private String _sgeStatusCmd = "qstat "; 249 private String _sgeDeleteCmd = "qdel "; 250 251 public String getTaskStatusCmd(String jobID) throws NotSupportedException { 252 throw new NotSupportedException("Task parallel jobs are not supported"); 253 } 254 255 public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID, 256 int numTasks, int exitCode, String output, String error) 257 throws JobException, NotSupportedException { 258 throw new NotSupportedException("Task parallel jobs are not supported"); 259 } 260 261} // end-of-class-JobSupportSGE