001/* 002 * Copyright (c) 2011-2012 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * 006 * Permission is hereby granted, without written agreement and without 007 * license or royalty fees, to use, copy, modify, and distribute this 008 * software and its documentation for any purpose, provided that the above 009 * copyright notice and the following two paragraphs appear in all copies 010 * of this software. 011 * 012 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 013 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 014 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 015 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 016 * SUCH DAMAGE. 017 * 018 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 019 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 020 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 021 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 * ENHANCEMENTS, OR MODIFICATIONS. 024 * 025 */ 026 027package org.kepler.job; 028 029import java.util.HashMap; 030import java.util.regex.Matcher; 031import java.util.regex.Pattern; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035 036/** Support class for MOAB job manager support used on Chinook. 037 * Class Job uses the methods of a supporter class to 038 * submit jobs and check status 039 * 040 * History: Copied from JobSubmitPBS and modified. 041 * Settings taken from Ecce. 042 */ 043public class JobSupportMoab implements JobSupport 044{ 045 046 private static final Log log = LogFactory.getLog( JobSupportMoab.class.getName() ); 047 private static final boolean isDebugging = log.isDebugEnabled(); 048 private String _moabSubmitCmd="msub "; 049 private String _moabStatusCmd="checkjob -A "; // to be followed by jobid 050 private String _moabTaskStatusCmd="squeue -h -as -o %i | grep "; // to be followed by jobid 051 private String _moabDeleteCmd="mjobctl -c "; // to be followed by jobid 052 053 public JobSupportMoab() 054 { 055 } 056 057 058 public void init( String moabBinPath ) 059 { 060 if ( moabBinPath != null && !moabBinPath.trim().equals("") ) { 061 String binPath = new String(moabBinPath); 062 if ( ! moabBinPath.endsWith("/") ) 063 binPath += "/"; 064 _moabSubmitCmd = binPath + _moabSubmitCmd; 065 _moabStatusCmd = binPath + _moabStatusCmd; 066 _moabDeleteCmd = binPath + _moabDeleteCmd; 067 } 068 } 069 070 /** Create a submission file for the specific job manager, 071 * based on the information available in Job: 072 * - executable name 073 * - input files 074 * - output files 075 * - arguments for the job 076 */ 077 public boolean createSubmitFile ( String filename, Job job ) 078 { 079 return false; 080 } 081 082 083 084 /** Submit command for Moab 085 * return: the command for submission 086 */ 087 public String getSubmitCmd(String submitFile, String options, Job job) throws JobException 088 { 089 090 if(job.getDependentJobs() != null) { 091 throw new JobException("Support for job dependencies with Moab has not been implemented."); 092 } 093 094 String _commandStr; 095 if (options != null) 096 _commandStr = _moabSubmitCmd + " " + options + " " + submitFile; 097 else 098 _commandStr = _moabSubmitCmd + " " + submitFile; 099 100 return _commandStr; 101 } 102 103 104 /** Parse output of submission and get information: jobID 105 * return String jobID on success 106 * throws JobException at failure (will contain the error stream or output stream) 107 */ 108 public String parseSubmitOutput ( 109 String output, 110 String error ) throws JobException 111 { 112 113 // For successful submissions, the interactive session looks like: 114 // [d39974@cu0login1 mpp-moabtesting]$ msub submit__mpp-moabtesting 115 // 116 // 106165 117 // Ecce uses the following parse expresssion [0-9]+ 118 // Don't know what the error condtion looks like.... 119 String jobID = null; 120 Pattern pattern = Pattern.compile("([0-9]+).*"); 121 122 String lines[] = output.split("\n"); 123 for (int idx=0; idx<lines.length; idx++) { 124 Matcher matcher = pattern.matcher(lines[idx]); 125 if (matcher.matches()) { 126 jobID = matcher.group(1); 127 break; 128 } 129 } 130 131 if (isDebugging) { 132 log.debug("Moab submit output: "+output); 133 log.debug("Moab jobID = " + jobID); 134 } 135 136 if (jobID == null) { 137 if (error != null && error.length() > 0) 138 throw new JobException("Error submitting Moab job: " + error); 139 else 140 throw new JobException("Error submitting Moab job: " + output); 141 } 142 return jobID; 143 } 144 145 146 /** Get the command to ask the status of the job 147 * return: the String of command 148 */ 149 public String getStatusCmd (String jobID) 150 { 151 return _moabStatusCmd + jobID; 152 } 153 154 /** Get the command to ask the status of each task 155 * return: the String of command 156 */ 157 public String getTaskStatusCmd (String jobID) 158 { 159 return getStatusCmd(jobID) + ";" + _moabTaskStatusCmd + jobID; 160 } 161 162 /** 163 * Parse output of status check command and get status info 164 * @return: a JobStatusInfo object, or throws an JobException with the error output 165 */ 166 public JobStatusInfo parseStatusOutput ( 167 String jobID, 168 int exitCode, 169 String output, 170 String error ) throws JobException 171 { 172 // Output should be a single word indicating the status. 173 // If the job doesn't exist, the output will be empty 174 // The known values include: 175 // RUNNING 176 // COMPLETING 177 // PENDING 178 // IDLE 179 // STARTING 180 // BATCHHOLD 181 // SYSTEMHOLD 182 // USERHOLD 183 // DEFERRED 184 // MIGRATED 185 // STAGING 186 187 // PBS status does not use exitCode. It can show error, but in real it can mean only that 188 // job is not in the queue anymore, which is good... 189 190 String lines[] = output.split("\n"); 191 for (int idx=0; idx<lines.length; idx++) { 192 Pattern pattern = Pattern.compile(".*STATE\\=(.+);UNAME.*"); 193 Matcher matcher = pattern.matcher(lines[idx]); 194 if (matcher.matches()) { 195 output = matcher.group(1).toUpperCase(); 196 idx = lines.length; 197 } 198 } 199 200 JobStatusInfo stat = new TaskParallelJobStatusInfo(); 201 stat.statusCode = JobStatusCode.NotInQueue; 202 stat.jobID = jobID; 203 204 boolean foundStatus = false; 205 if (output.length() > 0) { 206 if (output.equals("PENDING") || 207 output.equals("IDLE") || 208 output.equals("STARTING") || 209 output.equals("BATCHHOLD") || 210 output.equals("SYSTEMHOLD") || 211 output.equals("USERHOLD") || 212 output.equals("DEFERRED") || 213 output.equals("MIGRATED") || 214 output.equals("STAGING")) { 215 foundStatus = true; 216 stat.statusCode = JobStatusCode.Wait; 217 } else if (output.equals("RUNNING")) { 218 foundStatus = true; 219 stat.statusCode = JobStatusCode.Running; 220 } else if (output.equals("COMPLETED") || 221 output.equals("REMOVED")) { 222 // Note sure - leave it at not in queue? 223 foundStatus = true; 224 stat.statusCode = JobStatusCode.NotInQueue; 225 } else { 226 foundStatus = true; 227 stat.statusCode = JobStatusCode.Wait; 228 } 229 } else { 230 stat.statusCode = JobStatusCode.NotInQueue; 231 } 232 233 if (!foundStatus) { 234 // May want to look at err string or something here 235 } 236 237 return stat; 238 } 239 240 241 242 /** 243 * Parse output of task status check command and get status info 244 * @return: a JobStatusInfo object, or throws an JobException with the error output 245 */ 246 public TaskParallelJobStatusInfo parseTaskStatusOutput ( 247 String jobID, 248 int numTasks, 249 int exitCode, 250 String output, 251 String error ) throws JobException 252 { 253 254 String[] lines = output.split("\n"); 255 256 TaskParallelJobStatusInfo jobStatus = 257 (TaskParallelJobStatusInfo)parseStatusOutput (jobID, exitCode, lines[0], error); 258 259 jobStatus.taskStatusCodes = new HashMap<String,JobStatusCode>(numTasks); 260 261 /* 262 * if(code == JobStatusCode.Running){ 263 // this is the only unambiguous state so record it 264 265 } else if(oldCode == null) { 266 result.put(taskId,JobStatusCode.Wait); 267 } else if(oldCode == JobStatusCode.Running && 268 code == JobStatusCode.NotInQueue) { 269 result.put(taskId,JobStatusCode.NotInQueue); 270 } else if(code == JobStatusCode.Running) { 271 result.put(taskId,JobStatusCode.Running); 272 } 273 */ 274 if( jobStatus.statusCode == JobStatusCode.Running ) { 275 for (int idx=1; idx<lines.length; idx++) { 276 277 Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+)"); 278 Matcher matcher = pattern.matcher(lines[idx]); 279 if (matcher.matches()) { 280 String jobid = matcher.group(1); 281 if( jobid.equals(jobID) ) { 282 String taskId = matcher.group(2); 283 jobStatus.taskStatusCodes.put(taskId,JobStatusCode.Running); 284 } 285 } 286 } 287 for( int idx = 0; idx < numTasks; idx++ ) { 288 if(! jobStatus.taskStatusCodes.containsKey(Integer.toString(idx))) { 289 jobStatus.taskStatusCodes.put(Integer.toString(idx),JobStatusCode.NotInQueue); 290 } 291 } 292 } else { 293 for( int idx = 0; idx < numTasks; idx++ ) { 294 jobStatus.taskStatusCodes.put(Integer.toString(idx),jobStatus.statusCode) ; 295 } 296 } 297 298 return jobStatus; 299 } 300 301 302 /** 303 * @return: the String of command 304 */ 305 public String getDeleteCmd (String jobID) 306 { 307 return _moabDeleteCmd + jobID; 308 } 309 310 311 312 313 /** 314 * Parse output of delete command. 315 * @return: true or false indicating that the command was successful or not 316 */ 317 public boolean parseDeleteOutput( String jobID, 318 int exitCode, 319 String output, 320 String error ) throws JobException 321 { 322 if (exitCode == 0) 323 return true; 324 else 325 return false; 326 } 327 328}