001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 007 * '$Revision: 30295 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.job; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034 035/** 036 * Support class for the fork job manager support. Uses the jobmgr-fork.sh 037 * script to fork processes, which should be installed in path on the target 038 * machine. Class Job uses the methods of a supporter class to submit jobs and 039 * check status 040 */ 041public class JobSupportFork implements JobSupport { 042 043 private static final Log log = LogFactory.getLog(JobSupportFork.class 044 .getName()); 045 private static final boolean isDebugging = log.isDebugEnabled(); 046 047 public JobSupportFork() { 048 } 049 050 public void init(String nccsBinPath) { 051 if (nccsBinPath != null && !nccsBinPath.trim().equals("")) { 052 String binPath = new String(nccsBinPath); 053 if (!nccsBinPath.endsWith("/")) 054 binPath += "/"; 055 _forkMgrCmd = binPath + _forkMgrCmd; 056 } 057 } 058 059 /** 060 * Create a submission file for the specific job manager, based on the 061 * information available in Job: - executable name - input files - output 062 * files - arguments for the job 063 */ 064 public boolean createSubmitFile(String filename, Job job) { 065 066 return false; 067 } 068 069 /** 070 * Submit command for fork return: the command for submission 071 */ 072 public String getSubmitCmd(String submitFile, String options, Job job) throws JobException { 073 074 if(job.getDependentJobs() != null) { 075 throw new JobException("Support for job dependencies with Fork has not been implemented."); 076 } 077 078 String _commandStr; 079 if (options != null) 080 _commandStr = _forkMgrCmd + " " + options + " -s " + submitFile; 081 else 082 _commandStr = _forkMgrCmd + " -s " + submitFile; 083 084 return _commandStr; 085 } 086 087 /** 088 * Parse output of submission and get information: jobID return String jobID 089 * on success throws JobException at failure (will contain the error stream 090 * or output stream) 091 */ 092 public String parseSubmitOutput(String output, String error) 093 throws JobException { 094 095 /* 096 * jmgr-fork.sh submission output: on success, it is like: Submitted job 097 * 102368 on error, messages are printed on stderr, stdout is empty 098 */ 099 String jobID = null; 100 int idx = output.indexOf("\n"); 101 102 if (idx > -1) { 103 String firstrow = output.substring(0, idx); 104 if (firstrow.matches("Submitted job [0-9]*.*")) { 105 jobID = firstrow.substring(14); 106 } 107 if (isDebugging) 108 log.debug("Fork parse: jobID = " + jobID + " firstrow = " 109 + firstrow); 110 } 111 112 if (jobID == null) { 113 if (error != null && error.length() > 0) 114 throw new JobException("Error at submission of fork job: " 115 + error); 116 else 117 throw new JobException("Error at submission of fork job: " 118 + output); 119 } 120 return jobID; 121 } // end-of-submit 122 123 /** 124 * Get the command to ask the status of the job return: the String of 125 * command 126 */ 127 public String getStatusCmd(String jobID) { 128 // jmgr-fork.sh -t <jobID> 129 // parseStatusOutput has to look for the given job 130 String _commandStr = _forkMgrCmd + " -t " + jobID; 131 return _commandStr; 132 } 133 134 /** 135 * Parse output of status check command and get status info return: a 136 * JobStatusInfo object, or throws an JobException with the error output 137 */ 138 public JobStatusInfo parseStatusOutput(String jobID, int exitCode, 139 String output, String error) throws JobException { 140 141 // Fork status gives back the unix ps command string on the specific 142 // process id 143 // with the format: PID USER STAT COMMAND START 144 // e.g. 1625 pnorbert Ss -bash 15:09 145 // 5087 root Ss /share/apps/merc Jun 14 146 // note that start-up time can be two fields 147 // If no such job found, the exit code is 1 148 // (and error log is: No job with id 24135 found) 149 150 JobStatusInfo stat = new JobStatusInfo(); 151 stat.statusCode = JobStatusCode.NotInQueue; 152 153 if (exitCode == 1) { 154 // no such job, which is not an error for us 155 if (isDebugging) 156 log.debug("No such process found with process ID: " + jobID); 157 return stat; 158 } 159 160 boolean foundStatus = false; 161 162 if (output.trim().startsWith(jobID)) { 163 164 String vals[] = output.trim().split("( )+", 6); 165 if (vals.length >= 5) { 166 stat.jobID = vals[0].trim(); 167 stat.owner = vals[1].trim(); 168 String jobName = vals[3].trim(); 169 stat.submissionTime = vals[4].trim(); 170 if (vals.length > 5) 171 stat.submissionTime += " " + vals[5].trim(); 172 stat.statusCode = JobStatusCode.Running; 173 174 foundStatus = true; 175 if (isDebugging) 176 log.debug("Fork status Values: jobid=" + stat.jobID 177 + " owner=" + stat.owner + " jobname=" + jobName 178 + " submit/startTime=" + stat.submissionTime 179 + " status=[" + vals[2].trim() + "]"); 180 } 181 } 182 183 if (!foundStatus) { 184 if (error != null && error.length() > 0) { 185 log 186 .warn("Error string = [" + error + "] len=" 187 + error.length()); 188 stat.jobID = jobID; 189 stat.statusCode = JobStatusCode.Error; 190 } else { // have no idea what happened, not even an error string? 191 log.warn("Unexpected case: No process found with ID " + jobID 192 + " but the command did not return with error code 1"); 193 stat.jobID = jobID; 194 stat.statusCode = JobStatusCode.Error; 195 } 196 } 197 198 return stat; 199 } 200 201 /** 202 * Get the command to remove a job (kill the process) return: the String of 203 * command 204 */ 205 public String getDeleteCmd(String jobID) { 206 String _commandStr = _forkMgrCmd + " -r " + jobID; 207 return _commandStr; 208 } 209 210 /** 211 * Parse output of delete command. return: true if stdout is empty and 212 * exitCode is 0, otherwise return false 213 */ 214 public boolean parseDeleteOutput(String jobID, int exitCode, String output, 215 String error) throws JobException { 216 if (exitCode == 0 && output.length() == 0) 217 return true; 218 else 219 return false; 220 } 221 222 // //////////////////////////////////////////////////////////////////// 223 // // private variables //// 224 225 // The jobmanager command to execute. 226 private String _forkMgrCmd = "jmgr-fork.sh"; 227 228 public String getTaskStatusCmd(String jobID) throws NotSupportedException { 229 throw new NotSupportedException("Task parallel jobs are not supported"); 230 } 231 232 public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID, 233 int numTasks, int exitCode, String output, String error) 234 throws JobException, NotSupportedException { 235 throw new NotSupportedException("Task parallel jobs are not supported"); 236 } 237 238} // end-of-class-JobSupportFork