001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 007 * '$Revision: 30295 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.job; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * Support class for PBS job manager support Class Job uses the methods of a 037 * supporter class to submit jobs and check status 038 */ 039public class JobSupportPBS implements JobSupport { 040 041 private static final Log log = LogFactory.getLog(JobSupportPBS.class 042 .getName()); 043 private static final boolean isDebugging = log.isDebugEnabled(); 044 045 public JobSupportPBS() { 046 } 047 048 public void init(String nccsBinPath) { 049 if (nccsBinPath != null && !nccsBinPath.trim().equals("")) { 050 String binPath = new String(nccsBinPath); 051 if (!nccsBinPath.endsWith("/")) 052 binPath += "/"; 053 _nccsSubmitCmd = binPath + _nccsSubmitCmd; 054 _nccsStatusCmd = binPath + _nccsStatusCmd; 055 _nccsDeleteCmd = binPath + _nccsDeleteCmd; 056 } 057 } 058 059 /** 060 * Create a submission file for the specific job manager, based on the 061 * information available in Job: - executable name - input files - output 062 * files - arguments for the job 063 */ 064 public boolean createSubmitFile(String filename, Job job) { 065 066 return false; 067 } 068 069 /** 070 * Submit command for PBS return: the command for submission 071 */ 072 public String getSubmitCmd(String submitFile, String options, Job job) throws JobException { 073 074 StringBuffer _commandStr = new StringBuffer(_nccsSubmitCmd); 075 076 // see if there are any dependent jobs 077 Job[] dependentJobs = job.getDependentJobs(); 078 if(dependentJobs != null) { 079 _commandStr.append("-W depend=afterok"); 080 for(Job dependentJob : dependentJobs) { 081 _commandStr.append(":" + dependentJob.status.jobID); 082 } 083 } 084 085 if (options != null) { 086 _commandStr.append(" " + options); 087 } 088 089 _commandStr.append(" " + submitFile); 090 091 return _commandStr.toString(); 092 } 093 094 /** 095 * Parse output of submission and get information: jobID return String jobID 096 * on success throws JobException at failure (will contain the error stream 097 * or output stream) 098 */ 099 public String parseSubmitOutput(String output, String error) 100 throws JobException { 101 102 // System.out.println("====PBS parse: picking the jobid from output..."); 103 /* 104 * PBS qsub output is simple: on success, it is the jobID in one single 105 * line. if submitfile does not exists or other error, messages are 106 * printed on stdout stderr is empty 107 */ 108 String jobID = null; 109 int idx = output.indexOf("\n"); 110 111 if (idx > -1) { 112 String firstrow = output.substring(0, idx); 113 if (firstrow.matches("[0-9]*.*")) { 114 jobID = firstrow; 115 } 116 if (isDebugging) 117 log.debug("PBS parse: jobID = " + jobID + " firstrow = " 118 + firstrow); 119 } 120 121 if (jobID == null) { 122 if (error != null && error.length() > 0) 123 throw new JobException("Error at submission of PBS job: " 124 + error); 125 else 126 throw new JobException("Error at submission of PBS job: " 127 + output); 128 } 129 return jobID; 130 } // end-of-submit 131 132 /** 133 * Get the command to ask the status of the job return: the String of 134 * command 135 */ 136 public String getStatusCmd(String jobID) { 137 String _commandStr = _nccsStatusCmd + jobID; 138 return _commandStr; 139 } 140 141 /** 142 * Parse output of status check command and get status info return: a 143 * JobStatusInfo object, or throws an JobException with the error output 144 */ 145 public JobStatusInfo parseStatusOutput(String jobID, int exitCode, 146 String output, String error) throws JobException { 147 148 // PBS status does not use exitCode. It can show error, but in real it 149 // can mean only that 150 // job is not in the queue anymore, which is good... 151 152 // System.out.println("+++++ status: picking the status from output" ); 153 JobStatusInfo stat = new JobStatusInfo(); 154 stat.statusCode = JobStatusCode.NotInQueue; 155 156 boolean foundStatus = false; 157 158 String sa[] = output.split("\n"); 159 int idx; 160 for (int i = 0; i < sa.length; i++) { 161 // System.out.println("PBS status string " + i + " = "+ sa[i]); 162 String vals[] = sa[i].trim().split("( )+", 9); 163 if (jobID.startsWith(vals[0].trim())) { // jobID may be longer than 164 // the first field which is 165 // limited in length 166 if (vals.length >= 5) { 167 stat.jobID = jobID; 168 String jobName = vals[1].trim(); 169 stat.owner = vals[2].trim(); 170 stat.runTime = vals[3].trim(); 171 String sts = vals[4].trim(); 172 switch (sts.charAt(0)) { 173 case 'C': 174 stat.statusCode = JobStatusCode.NotInQueue; 175 break; 176 case 'R': 177 case 'E': 178 stat.statusCode = JobStatusCode.Running; 179 break; 180 case 'Q': 181 case 'H': 182 case 'T': 183 case 'W': 184 case 'S': 185 stat.statusCode = JobStatusCode.Wait; 186 break; 187 default: 188 stat.statusCode = JobStatusCode.Wait; 189 } 190 foundStatus = true; 191 if (isDebugging) 192 log.debug("PBS status Values: jobid=" + stat.jobID 193 + " owner=" + stat.owner + " runTime=" 194 + stat.runTime + " status=[" + sts + "]"); 195 } 196 } 197 } 198 // System.out.println("PBS status = " + stat.statusCode); 199 200 if (!foundStatus) { 201 if (error != null && error.length() > 0) { 202 // it can be the message: qstat: Unknown Job Id ... 203 if (error.startsWith("qstat: Unknown Job Id")) { 204 stat.jobID = jobID; 205 stat.statusCode = JobStatusCode.NotInQueue; 206 } else { 207 log.warn("Error string = [" + error + "] len=" 208 + error.length()); 209 stat.jobID = jobID; 210 stat.statusCode = JobStatusCode.Error; 211 } 212 } else { // not an error, just job is not in the job queue now 213 stat.jobID = jobID; 214 stat.statusCode = JobStatusCode.NotInQueue; 215 } 216 } 217 218 return stat; 219 } 220 221 /** 222 * Get the command to remove a job from queue (either running or waiting 223 * jobs). return: the String of command 224 */ 225 public String getDeleteCmd(String jobID) { 226 String _commandStr = _nccsDeleteCmd + jobID; 227 return _commandStr; 228 } 229 230 /** 231 * Parse output of delete command. return: true or false indicating that the 232 * command was successful or not 233 */ 234 public boolean parseDeleteOutput(String jobID, int exitCode, String output, 235 String error) throws JobException { 236 if (exitCode == 0) 237 return true; 238 else 239 return false; 240 } 241 242 // //////////////////////////////////////////////////////////////////// 243 // // private variables //// 244 245 // The combined command to execute. 246 private String _nccsSubmitCmd = "qsub "; 247 private String _nccsStatusCmd = "qstat "; 248 private String _nccsDeleteCmd = "qdel "; 249 250 public String getTaskStatusCmd(String jobID) throws NotSupportedException { 251 //return job status command as PBS doesn't support task status command 252 return getStatusCmd(jobID); 253 } 254 255 public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID, 256 int numTasks, int exitCode, String output, String error) 257 throws JobException, NotSupportedException { 258 JobStatusInfo jobstatus = parseStatusOutput(jobID, exitCode, output, error); 259 TaskParallelJobStatusInfo taskStatus = new TaskParallelJobStatusInfo(jobstatus); 260 for(int i=0;i<numTasks;i++){ 261 taskStatus.taskStatusCodes.put(Integer.toString(i), jobstatus.statusCode); 262 } 263 return taskStatus; 264 } 265 266} // end-of-class-JobSupportPBS