001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 
007 * '$Revision: 30295 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * Support class for PBS job manager support Class Job uses the methods of a
037 * supporter class to submit jobs and check status
038 */
039public class JobSupportPBS implements JobSupport {
040
041        private static final Log log = LogFactory.getLog(JobSupportPBS.class
042                        .getName());
043        private static final boolean isDebugging = log.isDebugEnabled();
044
045        public JobSupportPBS() {
046        }
047
048        public void init(String nccsBinPath) {
049                if (nccsBinPath != null && !nccsBinPath.trim().equals("")) {
050                        String binPath = new String(nccsBinPath);
051                        if (!nccsBinPath.endsWith("/"))
052                                binPath += "/";
053                        _nccsSubmitCmd = binPath + _nccsSubmitCmd;
054                        _nccsStatusCmd = binPath + _nccsStatusCmd;
055                        _nccsDeleteCmd = binPath + _nccsDeleteCmd;
056                }
057        }
058
059        /**
060         * Create a submission file for the specific job manager, based on the
061         * information available in Job: - executable name - input files - output
062         * files - arguments for the job
063         */
064        public boolean createSubmitFile(String filename, Job job) {
065
066                return false;
067        }
068
069        /**
070         * Submit command for PBS return: the command for submission
071         */
072    public String getSubmitCmd(String submitFile, String options, Job job) throws JobException {
073
074                StringBuffer _commandStr = new StringBuffer(_nccsSubmitCmd);
075                
076                // see if there are any dependent jobs
077                Job[] dependentJobs = job.getDependentJobs();
078                if(dependentJobs != null) {
079                    _commandStr.append("-W depend=afterok");
080                    for(Job dependentJob : dependentJobs) {
081                        _commandStr.append(":" + dependentJob.status.jobID);
082                    }
083                }
084                
085                if (options != null) {
086                        _commandStr.append(" " + options);
087                }
088                
089                _commandStr.append(" " + submitFile);
090
091                return _commandStr.toString();
092        }
093
094        /**
095         * Parse output of submission and get information: jobID return String jobID
096         * on success throws JobException at failure (will contain the error stream
097         * or output stream)
098         */
099        public String parseSubmitOutput(String output, String error)
100                        throws JobException {
101
102                // System.out.println("====PBS parse: picking the jobid from output...");
103                /*
104                 * PBS qsub output is simple: on success, it is the jobID in one single
105                 * line. if submitfile does not exists or other error, messages are
106                 * printed on stdout stderr is empty
107                 */
108                String jobID = null;
109                int idx = output.indexOf("\n");
110
111                if (idx > -1) {
112                        String firstrow = output.substring(0, idx);
113                        if (firstrow.matches("[0-9]*.*")) {
114                                jobID = firstrow;
115                        }
116                        if (isDebugging)
117                                log.debug("PBS parse: jobID = " + jobID + " firstrow = "
118                                                + firstrow);
119                }
120
121                if (jobID == null) {
122                        if (error != null && error.length() > 0)
123                                throw new JobException("Error at submission of PBS job: "
124                                                + error);
125                        else
126                                throw new JobException("Error at submission of PBS job: "
127                                                + output);
128                }
129                return jobID;
130        } // end-of-submit
131
132        /**
133         * Get the command to ask the status of the job return: the String of
134         * command
135         */
136        public String getStatusCmd(String jobID) {
137                String _commandStr = _nccsStatusCmd + jobID;
138                return _commandStr;
139        }
140
141        /**
142         * Parse output of status check command and get status info return: a
143         * JobStatusInfo object, or throws an JobException with the error output
144         */
145        public JobStatusInfo parseStatusOutput(String jobID, int exitCode,
146                        String output, String error) throws JobException {
147
148                // PBS status does not use exitCode. It can show error, but in real it
149                // can mean only that
150                // job is not in the queue anymore, which is good...
151
152                // System.out.println("+++++ status: picking the status from output" );
153                JobStatusInfo stat = new JobStatusInfo();
154                stat.statusCode = JobStatusCode.NotInQueue;
155
156                boolean foundStatus = false;
157
158                String sa[] = output.split("\n");
159                int idx;
160                for (int i = 0; i < sa.length; i++) {
161                        // System.out.println("PBS status string " + i + " = "+ sa[i]);
162                        String vals[] = sa[i].trim().split("( )+", 9);
163                        if (jobID.startsWith(vals[0].trim())) { // jobID may be longer than
164                                                                                                        // the first field which is
165                                                                                                        // limited in length
166                                if (vals.length >= 5) {
167                                        stat.jobID = jobID;
168                                        String jobName = vals[1].trim();
169                                        stat.owner = vals[2].trim();
170                                        stat.runTime = vals[3].trim();
171                                        String sts = vals[4].trim();
172                                        switch (sts.charAt(0)) {
173                                        case 'C':
174                                                stat.statusCode = JobStatusCode.NotInQueue;
175                                                break;
176                                        case 'R':
177                                        case 'E':
178                                                stat.statusCode = JobStatusCode.Running;
179                                                break;
180                                        case 'Q':
181                                        case 'H':
182                                        case 'T':
183                                        case 'W':
184                                        case 'S':
185                                                stat.statusCode = JobStatusCode.Wait;
186                                                break;
187                                        default:
188                                                stat.statusCode = JobStatusCode.Wait;
189                                        }
190                                        foundStatus = true;
191                                        if (isDebugging)
192                                                log.debug("PBS status Values: jobid=" + stat.jobID
193                                                                + " owner=" + stat.owner + " runTime="
194                                                                + stat.runTime + " status=[" + sts + "]");
195                                }
196                        }
197                }
198                // System.out.println("PBS status = " + stat.statusCode);
199
200                if (!foundStatus) {
201                        if (error != null && error.length() > 0) {
202                                // it can be the message: qstat: Unknown Job Id ...
203                                if (error.startsWith("qstat: Unknown Job Id")) {
204                                        stat.jobID = jobID;
205                                        stat.statusCode = JobStatusCode.NotInQueue;
206                                } else {
207                                        log.warn("Error string = [" + error + "] len="
208                                                        + error.length());
209                                        stat.jobID = jobID;
210                                        stat.statusCode = JobStatusCode.Error;
211                                }
212                        } else { // not an error, just job is not in the job queue now
213                                stat.jobID = jobID;
214                                stat.statusCode = JobStatusCode.NotInQueue;
215                        }
216                }
217
218                return stat;
219        }
220
221        /**
222         * Get the command to remove a job from queue (either running or waiting
223         * jobs). return: the String of command
224         */
225        public String getDeleteCmd(String jobID) {
226                String _commandStr = _nccsDeleteCmd + jobID;
227                return _commandStr;
228        }
229
230        /**
231         * Parse output of delete command. return: true or false indicating that the
232         * command was successful or not
233         */
234        public boolean parseDeleteOutput(String jobID, int exitCode, String output,
235                        String error) throws JobException {
236                if (exitCode == 0)
237                        return true;
238                else
239                        return false;
240        }
241
242        // ////////////////////////////////////////////////////////////////////
243        // // private variables ////
244
245        // The combined command to execute.
246        private String _nccsSubmitCmd = "qsub ";
247        private String _nccsStatusCmd = "qstat ";
248        private String _nccsDeleteCmd = "qdel ";
249
250        public String getTaskStatusCmd(String jobID) throws NotSupportedException {
251                //return job status command as PBS doesn't support task status command
252                return getStatusCmd(jobID);
253        }
254
255        public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID,
256                        int numTasks, int exitCode, String output, String error)
257                        throws JobException, NotSupportedException {
258                JobStatusInfo jobstatus = parseStatusOutput(jobID, exitCode, output, error);
259                TaskParallelJobStatusInfo taskStatus = new TaskParallelJobStatusInfo(jobstatus);
260                for(int i=0;i<numTasks;i++){
261                        taskStatus.taskStatusCodes.put(Integer.toString(i), jobstatus.statusCode);
262                }
263                return taskStatus;
264        }
265
266} // end-of-class-JobSupportPBS