001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 
007 * '$Revision: 30295 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * Support class for Sun Grid Engine job manager support Class Job uses the
037 * methods of a supporter class to submit jobs and check status
038 */
039public class JobSupportSGE implements JobSupport {
040
041        private static final Log log = LogFactory.getLog(JobSupportSGE.class
042                        .getName());
043        private static final boolean isDebugging = log.isDebugEnabled();
044
045        public JobSupportSGE() {
046        }
047
048        public void init(String nccsBinPath) {
049                if (nccsBinPath != null && !nccsBinPath.trim().equals("")) {
050                        String binPath = new String(nccsBinPath);
051                        if (!nccsBinPath.endsWith("/"))
052                                binPath += "/";
053                        _sgeSubmitCmd = binPath + _sgeSubmitCmd;
054                        _sgeStatusCmd = binPath + _sgeStatusCmd;
055                        _sgeDeleteCmd = binPath + _sgeDeleteCmd;
056                }
057        }
058
059        /**
060         * Create a submission file for the specific job manager, based on the
061         * information available in Job: - executable name - input files - output
062         * files - arguments for the job
063         */
064        public boolean createSubmitFile(String filename, Job job) {
065
066                return false;
067        }
068
069        /**
070         * Submit command for SGE return: the command for submission
071         */
072        public String getSubmitCmd(String submitFile, String options, Job job) throws JobException {
073
074                StringBuilder _commandStr = new StringBuilder(_sgeSubmitCmd);
075                
076                // see if there are any dependent jobs
077        Job[] dependentJobs = job.getDependentJobs();
078        if(dependentJobs != null) {
079            _commandStr.append("-hold_jid ");
080            for(Job dependentJob : dependentJobs) {
081                _commandStr.append(dependentJob.status.jobID + ",");
082            }
083            // remove the trailing comma
084            _commandStr.deleteCharAt(_commandStr.length()-1);
085        }
086
087                if (options != null) {
088                        _commandStr.append(" " + options);
089                }
090                
091                _commandStr.append(" " + submitFile);
092
093                return _commandStr.toString();
094        }
095
096        /**
097         * Parse output of submission and get information: jobID return String jobID
098         * on success throws JobException at failure (will contain the error stream
099         * or output stream)
100         */
101        public String parseSubmitOutput(String output, String error)
102                        throws JobException {
103
104                // System.out.println("====SGE parse: picking the jobid from output...");
105                /*
106                 * SGE qsub output: on success, it is: Your job 102368 ("sge.cmd") has
107                 * been submitted. on error, messages are printed on stderr, stdout is
108                 * empty
109                 */
110                String jobID = null;
111                int idx = output.indexOf("\n");
112
113                if (idx > -1) {
114                        String firstrow = output.substring(0, idx);
115                        if (firstrow.matches("Your job [0-9]*.*")) {
116                                String s = firstrow.substring(9);
117                                int toIdx = s.indexOf(' ');
118                                jobID = s.substring(0, toIdx);
119                        }
120                        if (isDebugging)
121                                log.debug("SGE parse: jobID = " + jobID + " firstrow = "
122                                                + firstrow);
123                }
124
125                if (jobID == null) {
126                        if (error != null && error.length() > 0)
127                                throw new JobException("Error at submission of SGE job: "
128                                                + error);
129                        else
130                                throw new JobException("Error at submission of SGE job: "
131                                                + output);
132                }
133                return jobID;
134        } // end-of-submit
135
136        /**
137         * Get the command to ask the status of the job return: the String of
138         * command
139         */
140        public String getStatusCmd(String jobID) {
141                // simple 'qstat' which gives back list of all jobs!
142                // parseStatusOutput has to look for the given job
143                String _commandStr = _sgeStatusCmd;
144                return _commandStr;
145        }
146
147        /**
148         * Parse output of status check command and get status info return: a
149         * JobStatusInfo object, or throws an JobException with the error output
150         */
151        public JobStatusInfo parseStatusOutput(String jobID, int exitCode,
152                        String output, String error) throws JobException {
153
154                // SGE's qsub gives back all jobs, one per line
155                // we have to look for the given jobID in the beginning of a line
156                // line format: jobid priority name user status date queue slots
157
158                // System.out.println("+++++ status: picking the status from output" );
159                JobStatusInfo stat = new JobStatusInfo();
160                stat.statusCode = JobStatusCode.NotInQueue;
161
162                boolean foundStatus = false;
163
164                String sa[] = output.split("\n");
165                int idx;
166                for (int i = 0; i < sa.length; i++) {
167                        // System.out.println("SGE status string " + i + " = "+ sa[i]);
168                        if (sa[i].trim().startsWith(jobID)) {
169                                String vals[] = sa[i].trim().split("( )+", 9);
170                                if (vals.length >= 7) {
171                                        stat.jobID = vals[0].trim();
172                                        String jobName = vals[2].trim();
173                                        stat.owner = vals[3].trim();
174                                        stat.submissionTime = vals[5].trim() + " " + vals[6].trim();
175                                        String sts = vals[4].trim();
176                                        switch (sts.charAt(0)) {
177                                        case 'r': // running
178                                        case 'R': // restarted
179                                        case 't': // transferred
180                                        case 'd': // deletion (under removal)
181                                                stat.statusCode = JobStatusCode.Running;
182                                                break;
183                                        case 's': // suspended
184                                        case 'S': // suspended
185                                        case 'w': // wait
186                                        case 'h': // hold
187                                        case 'T': // Threshold
188                                                stat.statusCode = JobStatusCode.Wait;
189                                                break;
190                                        default:
191                                                stat.statusCode = JobStatusCode.Wait;
192                                        }
193                                        foundStatus = true;
194                                        if (isDebugging)
195                                                log
196                                                                .debug("SGE status Values: jobid=" + stat.jobID
197                                                                                + " owner=" + stat.owner
198                                                                                + " submit/startTime="
199                                                                                + stat.submissionTime + " status=["
200                                                                                + sts + "]");
201                                }
202                        }
203                }
204                // System.out.println("SGE status = " + stat.statusCode);
205
206                if (!foundStatus) {
207                        if (error != null && error.length() > 0) {
208                                log
209                                                .warn("Error string = [" + error + "] len="
210                                                                + error.length());
211                                stat.jobID = jobID;
212                                stat.statusCode = JobStatusCode.Error;
213                        } else { // not an error, just job is not in the job queue now
214                                stat.jobID = jobID;
215                                stat.statusCode = JobStatusCode.NotInQueue;
216                        }
217                }
218
219                return stat;
220        }
221
222        /**
223         * Get the command to remove a job from queue (either running or waiting
224         * jobs). return: the String of command
225         */
226        public String getDeleteCmd(String jobID) {
227                String _commandStr = _sgeDeleteCmd + jobID;
228                return _commandStr;
229        }
230
231        /**
232         * Parse output of delete command. return: true if stdout is empty and
233         * exitCode is 0, otherwise return false
234         */
235        public boolean parseDeleteOutput(String jobID, int exitCode, String output,
236                        String error) throws JobException {
237                if (exitCode == 0 && output.length() == 0)
238                        return true;
239                else
240                        return false;
241        }
242
243        // ////////////////////////////////////////////////////////////////////
244        // // private variables ////
245
246        // The combined command to execute.
247        private String _sgeSubmitCmd = "qsub ";
248        private String _sgeStatusCmd = "qstat ";
249        private String _sgeDeleteCmd = "qdel ";
250
251        public String getTaskStatusCmd(String jobID) throws NotSupportedException {
252                throw new NotSupportedException("Task parallel jobs are not supported");
253        }
254
255        public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID,
256                        int numTasks, int exitCode, String output, String error)
257                        throws JobException, NotSupportedException {
258                throw new NotSupportedException("Task parallel jobs are not supported");
259        }
260
261} // end-of-class-JobSupportSGE