001/*
002 * Copyright (c) 2011-2012 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * 
006 * Permission is hereby granted, without written agreement and without
007 * license or royalty fees, to use, copy, modify, and distribute this
008 * software and its documentation for any purpose, provided that the above
009 * copyright notice and the following two paragraphs appear in all copies
010 * of this software.
011 *
012 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
013 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
014 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
015 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
016 * SUCH DAMAGE.
017 *
018 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
019 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
020 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
021 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 * ENHANCEMENTS, OR MODIFICATIONS.
024 *
025 */
026
027package org.kepler.job;
028
029import java.util.HashMap;
030import java.util.regex.Matcher;
031import java.util.regex.Pattern;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035
036/** Support class for MOAB job manager support used on Chinook.
037 *  Class Job uses the methods of a supporter class to
038 *  submit jobs and check status
039 *
040 * History: Copied from JobSubmitPBS and modified.
041 * Settings taken from Ecce.
042 */
043public class JobSupportMoab implements JobSupport
044{
045
046    private static final Log log = LogFactory.getLog( JobSupportMoab.class.getName() );
047    private static final boolean isDebugging = log.isDebugEnabled();
048    private String _moabSubmitCmd="msub ";
049    private String _moabStatusCmd="checkjob -A "; // to be followed by jobid
050    private String _moabTaskStatusCmd="squeue -h -as -o %i | grep "; // to be followed by jobid
051    private String _moabDeleteCmd="mjobctl -c "; // to be followed by jobid
052
053    public JobSupportMoab()
054    {
055    }
056
057
058    public void init( String moabBinPath )
059    {
060       if ( moabBinPath != null && !moabBinPath.trim().equals("") )  {
061          String binPath = new String(moabBinPath);
062          if ( ! moabBinPath.endsWith("/") )
063             binPath += "/";
064          _moabSubmitCmd = binPath + _moabSubmitCmd;
065          _moabStatusCmd = binPath + _moabStatusCmd;
066          _moabDeleteCmd = binPath + _moabDeleteCmd;
067       }
068    }
069
070    /** Create a submission file for the specific job manager,
071     *  based on the information available in Job:
072     *   - executable name
073     *   - input files
074     *   - output files
075     *   - arguments for the job
076     */
077    public boolean createSubmitFile ( String filename, Job job )
078    {
079       return false;
080    }
081
082
083
084    /** Submit command for Moab
085     *   return: the command for submission
086     */
087    public String getSubmitCmd(String submitFile, String options, Job job) throws JobException
088    {
089        
090        if(job.getDependentJobs() != null) {
091            throw new JobException("Support for job dependencies with Moab has not been implemented.");
092        }
093
094       String _commandStr;
095       if (options != null)
096          _commandStr = _moabSubmitCmd + " " + options + " " + submitFile;
097       else
098          _commandStr = _moabSubmitCmd + " " + submitFile;
099
100       return _commandStr;
101    }
102
103
104    /** Parse output of submission and get information: jobID
105     *  return String jobID on success
106     *  throws JobException at failure (will contain the error stream or output stream)
107     */
108    public String parseSubmitOutput (
109          String output,
110          String error ) throws JobException
111    {
112
113       // For successful submissions, the interactive session looks like:
114       // [d39974@cu0login1 mpp-moabtesting]$ msub submit__mpp-moabtesting
115       //
116       // 106165
117       // Ecce uses the following parse expresssion [0-9]+
118       // Don't know what the error condtion looks like....
119       String jobID = null;
120       Pattern pattern = Pattern.compile("([0-9]+).*");
121
122       String lines[] = output.split("\n");
123       for (int idx=0; idx<lines.length; idx++) {
124           Matcher matcher = pattern.matcher(lines[idx]);
125          if (matcher.matches()) {
126             jobID = matcher.group(1);
127             break;
128          }
129       }
130
131       if (isDebugging) {
132          log.debug("Moab submit output: "+output);
133          log.debug("Moab jobID = " + jobID);
134       }
135
136       if (jobID == null) {
137          if (error != null && error.length() > 0)
138             throw new JobException("Error submitting Moab job: " + error);
139          else
140             throw new JobException("Error submitting Moab job: " + output);
141       }
142       return jobID;
143    }
144
145
146    /** Get the command to ask the status of the job
147     *   return: the String of command
148     */
149    public String getStatusCmd (String jobID)
150    {
151       return _moabStatusCmd + jobID;
152    }
153
154    /** Get the command to ask the status of each task
155     *   return: the String of command
156     */
157    public String getTaskStatusCmd (String jobID)
158    {
159       return getStatusCmd(jobID) + ";" + _moabTaskStatusCmd + jobID;
160    }
161
162    /**
163     * Parse output of status check command and get status info
164     * @return: a JobStatusInfo object, or throws an JobException with the error output
165     */
166    public JobStatusInfo parseStatusOutput (
167        String jobID,
168        int exitCode,
169        String output,
170        String error )  throws JobException
171    {
172       // Output should be a single word indicating the status.
173       // If the job doesn't exist, the output will be empty
174       // The known values include:
175       //    RUNNING
176       //    COMPLETING
177       //    PENDING
178       //    IDLE
179       //    STARTING
180       //    BATCHHOLD
181       //    SYSTEMHOLD
182       //    USERHOLD
183       //    DEFERRED
184       //    MIGRATED
185       //    STAGING
186
187       // PBS status does not use exitCode. It can show error, but in real it can mean only that
188       // job is not in the queue anymore, which is good...
189
190       String lines[] = output.split("\n");
191       for (int idx=0; idx<lines.length; idx++) {
192           Pattern pattern = Pattern.compile(".*STATE\\=(.+);UNAME.*");
193           Matcher matcher = pattern.matcher(lines[idx]);
194           if (matcher.matches()) {
195              output = matcher.group(1).toUpperCase();
196              idx = lines.length;
197           }
198       }
199
200       JobStatusInfo stat = new TaskParallelJobStatusInfo();
201       stat.statusCode = JobStatusCode.NotInQueue;
202       stat.jobID = jobID;
203
204       boolean foundStatus = false;
205       if (output.length() > 0) {
206          if (output.equals("PENDING") ||
207                output.equals("IDLE") ||
208                output.equals("STARTING") ||
209                output.equals("BATCHHOLD") ||
210                output.equals("SYSTEMHOLD") ||
211                output.equals("USERHOLD") ||
212                output.equals("DEFERRED") ||
213                output.equals("MIGRATED") ||
214                output.equals("STAGING")) {
215             foundStatus = true;
216             stat.statusCode = JobStatusCode.Wait;
217          } else if (output.equals("RUNNING")) {
218             foundStatus = true;
219             stat.statusCode = JobStatusCode.Running;
220          } else if (output.equals("COMPLETED") ||
221                                 output.equals("REMOVED")) {
222             // Note sure - leave it at not in queue?
223             foundStatus = true;
224             stat.statusCode = JobStatusCode.NotInQueue;
225          } else {
226             foundStatus = true;
227             stat.statusCode = JobStatusCode.Wait;
228          }
229       } else {
230          stat.statusCode = JobStatusCode.NotInQueue;
231       }
232
233       if (!foundStatus) {
234          // May want to look at err string or something here
235       }
236
237       return stat;
238    }
239
240
241
242    /**
243     * Parse output of task status check command and get status info
244     * @return: a JobStatusInfo object, or throws an JobException with the error output
245     */
246    public TaskParallelJobStatusInfo parseTaskStatusOutput (
247        String jobID,
248        int numTasks,
249        int exitCode,
250        String output,
251        String error )  throws JobException
252    {
253
254       String[] lines = output.split("\n");
255
256       TaskParallelJobStatusInfo jobStatus = 
257           (TaskParallelJobStatusInfo)parseStatusOutput (jobID, exitCode, lines[0], error);
258
259       jobStatus.taskStatusCodes = new HashMap<String,JobStatusCode>(numTasks);
260
261       /*
262        * if(code == JobStatusCode.Running){
263                                        // this is the only unambiguous state so record it
264                                        
265                                } else if(oldCode == null) {
266                                        result.put(taskId,JobStatusCode.Wait);
267                                } else if(oldCode == JobStatusCode.Running && 
268                                        code == JobStatusCode.NotInQueue) {
269                                        result.put(taskId,JobStatusCode.NotInQueue);
270                                } else if(code == JobStatusCode.Running) {
271                                        result.put(taskId,JobStatusCode.Running);
272                                }
273        */
274       if( jobStatus.statusCode == JobStatusCode.Running ) {
275         for (int idx=1; idx<lines.length; idx++) {
276
277            Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+)");
278            Matcher matcher = pattern.matcher(lines[idx]);
279            if (matcher.matches()) {
280               String jobid = matcher.group(1);
281               if( jobid.equals(jobID) ) {
282                  String taskId = matcher.group(2);
283                  jobStatus.taskStatusCodes.put(taskId,JobStatusCode.Running);
284               }
285            }
286         }
287         for( int idx = 0; idx < numTasks; idx++ ) {
288            if(! jobStatus.taskStatusCodes.containsKey(Integer.toString(idx))) {
289                jobStatus.taskStatusCodes.put(Integer.toString(idx),JobStatusCode.NotInQueue);
290            }
291         }
292       } else {
293         for( int idx = 0; idx < numTasks; idx++ ) {
294                 jobStatus.taskStatusCodes.put(Integer.toString(idx),jobStatus.statusCode) ;
295         }
296       }
297       
298       return jobStatus;
299    }
300
301
302    /**
303     * @return: the String of command
304     */
305    public String getDeleteCmd (String jobID)
306    {
307       return  _moabDeleteCmd + jobID;
308    }
309
310
311
312
313    /**
314     * Parse output of delete command.
315     * @return: true or false indicating that the command was successful or not
316     */
317    public boolean parseDeleteOutput( String jobID,
318          int exitCode,
319          String output,
320          String error ) throws JobException
321    {
322       if (exitCode == 0)
323          return true;
324       else
325          return false;
326    }
327
328}