001/*
002 * Copyright (c) 2004-2012 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2012-07-27 11:35:29 -0700 (Fri, 27 Jul 2012) $' 
007 * '$Revision: 30295 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * Support class for Load Sharing Facility (or simply LSF) job manager support Class Job uses the
037 * methods of a supporter class to submit jobs and check status
038 * @author Jianwu Wang
039 * @version $Id: JobSupportlsf.java 30523 2012-08-24 23:13:55Z jianwu $
040 */
041public class JobSupportLSF implements JobSupport {
042
043        private static final Log log = LogFactory.getLog(JobSupportLSF.class
044                        .getName());
045        private static final boolean isDebugging = log.isDebugEnabled();
046
047        public JobSupportLSF() {
048        }
049
050        public void init(String nccsBinPath) {
051                if (nccsBinPath != null && !nccsBinPath.trim().equals("")) {
052                        String binPath = new String(nccsBinPath);
053                        if (!nccsBinPath.endsWith("/"))
054                                binPath += "/";
055                        _lsfSubmitCmd = binPath + _lsfSubmitCmd;
056                        _lsfStatusCmd = binPath + _lsfStatusCmd;
057                        _lsfDeleteCmd = binPath + _lsfDeleteCmd;
058                }
059        }
060
061        /**
062         * Create a submission file for the specific job manager, based on the
063         * information available in Job: - executable name - input files - output
064         * files - arguments for the job
065         */
066        public boolean createSubmitFile(String filename, Job job) {
067
068                return false;
069        }
070
071        /**
072         * Submit command for lsf return: the command for submission
073         */
074        public String getSubmitCmd(String submitFile, String options, Job job) throws JobException {
075
076                StringBuilder _commandStr = new StringBuilder(_lsfSubmitCmd);
077                
078        Job[] dependentJobs = job.getDependentJobs();
079        if(job.getDependentJobs() != null) {
080            _commandStr.append("-w 'ended(");
081            for(Job dependentJob : dependentJobs) {
082                _commandStr.append(dependentJob.status.jobID + ")&&ended(");
083            }
084            // deal with the last letters.
085            _commandStr.delete(_commandStr.length()-8, _commandStr.length());
086            _commandStr.append("' ");
087            log.info("command string after adding dependency jobs:" + _commandStr);
088        }
089
090                if (options != null) {
091                        _commandStr.append(" " + options);
092                }
093                
094                _commandStr.append(" " + submitFile);
095
096                return _commandStr.toString();
097        }
098
099        /**
100         * Parse output of submission and get information: jobID return String jobID
101         * on success throws JobException at failure (will contain the error stream
102         * or output stream)
103         */
104        public String parseSubmitOutput(String output, String error)
105                        throws JobException {
106
107                // System.out.println("====lsf parse: picking the jobid from output...");
108                /*
109                 * lsf bsub output: on success, it is: Your job 102368 ("lsf.cmd") has
110                 * been submitted. on error, messages are printed on stderr, stdout is
111                 * empty
112                 */
113                String jobID = null;
114                int idx = output.indexOf("\n");
115
116                if (idx > -1) {
117                        String firstrow = output.substring(0, idx);
118                        if (firstrow.matches("Job <[0-9]*.*")) {
119                                String s = firstrow.substring(5);
120                                int toIdx = s.indexOf('>');
121                                jobID = s.substring(0, toIdx);
122                        }
123                        if (isDebugging)
124                                log.debug("lsf parse: jobID = " + jobID + " firstrow = "
125                                                + firstrow);
126                }
127
128                if (jobID == null) {
129                        if (error != null && error.length() > 0)
130                                throw new JobException("Error at submission of lsf job: "
131                                                + error);
132                        else
133                                throw new JobException("Error at submission of lsf job: "
134                                                + output);
135                }
136                return jobID;
137        } // end-of-submit
138
139        /**
140         * Get the command to ask the status of the job return: the String of
141         * command
142         */
143        public String getStatusCmd(String jobID) {
144                // simple 'qstat' which gives back list of all jobs!
145                // parseStatusOutput has to look for the given job
146                String _commandStr = _lsfStatusCmd + " " + jobID;
147                return _commandStr;
148        }
149
150        /**
151         * Parse output of status check command and get status info return: a
152         * JobStatusInfo object, or throws an JobException with the error output
153         */
154        public JobStatusInfo parseStatusOutput(String jobID, int exitCode,
155                        String output, String error) throws JobException {
156
157                // System.out.println("+++++ status: picking the status from output" );
158                JobStatusInfo stat = new JobStatusInfo();
159                stat.statusCode = JobStatusCode.NotInQueue;
160
161                boolean foundStatus = false;
162
163                String sa[] = output.split("\n");
164                int idx;
165                for (int i = 0; i < sa.length; i++) {
166                        // System.out.println("lsf status string " + i + " = "+ sa[i]);
167                        if (sa[i].trim().startsWith(jobID)) {
168                                String vals[] = sa[i].trim().split("( )+", 9);
169                                if (vals.length >= 7) {
170                                        stat.jobID = vals[0].trim();
171                                        String jobName = vals[6].trim();
172                                        stat.owner = vals[1].trim();
173                                        stat.submissionTime = vals[7].trim() + " " + vals[8].trim();
174                                        String sts = vals[2].trim();
175                                        //"DONE" means correctly finished, "EXIT" means exit with non-zero result.
176                                        if (sts.equals("RUN")) {
177                                                stat.statusCode = JobStatusCode.Running;
178                                        }  else if (sts.equals("PEND")) {
179                                                stat.statusCode = JobStatusCode.Wait;
180                                        }  else if (sts.equals("EXIT")) {
181                                                stat.statusCode = JobStatusCode.Error;
182                                        }
183                                        foundStatus = true;
184                                        if (isDebugging)
185                                                log
186                                                                .debug("lsf status Values: jobid=" + stat.jobID
187                                                                                + " owner=" + stat.owner
188                                                                                + " submit/startTime="
189                                                                                + stat.submissionTime + " status=["
190                                                                                + sts + "]");
191                                }
192                        }
193                }
194                // System.out.println("lsf status = " + stat.statusCode);
195
196                if (!foundStatus) {
197                        if (error != null && error.length() > 0) {
198                                log
199                                                .warn("Error string = [" + error + "] len="
200                                                                + error.length());
201                                stat.jobID = jobID;
202                                stat.statusCode = JobStatusCode.Error;
203                        } else { // not an error, just job is not in the job queue now
204                                stat.jobID = jobID;
205                                stat.statusCode = JobStatusCode.NotInQueue;
206                        }
207                }
208
209                return stat;
210        }
211
212        /**
213         * Get the command to remove a job from queue (either running or waiting
214         * jobs). return: the String of command
215         */
216        public String getDeleteCmd(String jobID) {
217                String _commandStr = _lsfDeleteCmd + jobID;
218                return _commandStr;
219        }
220
221        /**
222         * Parse output of delete command. return: true if stdout is empty and
223         * exitCode is 0, otherwise return false
224         */
225        public boolean parseDeleteOutput(String jobID, int exitCode, String output,
226                        String error) throws JobException {
227                if (exitCode == 0)
228                        return true;
229                else
230                        return false;
231        }
232
233        // ////////////////////////////////////////////////////////////////////
234        // // private variables ////
235
236        // The combined command to execute.
237        private String _lsfSubmitCmd = "bsub ";
238        private String _lsfStatusCmd = "bjobs ";
239        private String _lsfDeleteCmd = "bkill ";
240
241        public String getTaskStatusCmd(String jobID) throws NotSupportedException {
242                throw new NotSupportedException("Task parallel jobs are not supported");
243        }
244
245        public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID,
246                        int numTasks, int exitCode, String output, String error)
247                        throws JobException, NotSupportedException {
248                throw new NotSupportedException("Task parallel jobs are not supported");
249        }
250
251} // end-of-class-JobSupportLSF