001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 
007 * '$Revision: 30295 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * Support class for Condor job manager support. Class Job uses the methods of a
037 * supporter class to submit jobs and check status
038 */
039public class JobSupportCondor implements JobSupport {
040
041        private static final Log log = LogFactory.getLog(JobSupportCondor.class
042                        .getName());
043        private static final boolean isDebugging = log.isDebugEnabled();
044
045        public JobSupportCondor() {
046        }
047
048        public void init(String condorBinPath) {
049                if (condorBinPath != null && !condorBinPath.trim().equals("")) {
050                        String binPath = new String(condorBinPath);
051                        if (!condorBinPath.endsWith("/"))
052                                binPath += "/";
053                        _condorSubmitCmd = binPath + _condorSubmitCmd;
054                        _condorStatusCmd = binPath + _condorStatusCmd;
055                        _condorDeleteCmd = binPath + _condorDeleteCmd;
056                }
057        }
058
059        /**
060         * Create a submission file for the specific job manager, based on the
061         * information available in Job: - executable name - input files - output
062         * files - arguments for the job
063         */
064        public boolean createSubmitFile(String filename, Job job) {
065
066                return false;
067        }
068
069        /**
070         * Submit command for Condor. return: the command for submission
071         */
072    public String getSubmitCmd(String submitFile, String options, Job job) throws JobException {
073
074        if(job.getDependentJobs() != null) {
075            throw new JobException("Support for job dependencies with Condor has not been implemented.");
076        }
077
078                String _commandStr;
079                if (options != null)
080                        _commandStr = _condorSubmitCmd + " " + options + " " + submitFile;
081                else
082                        _commandStr = _condorSubmitCmd + " " + submitFile;
083
084                return _commandStr;
085        }
086
087        /**
088         * Parse output of submission and get information: jobID. return String
089         * jobID on success throws JobException at failure (will contain the error
090         * stream or output stream)
091         */
092        public String parseSubmitOutput(String output, String error)
093                        throws JobException {
094
095                // System.out.println("====Condor parse: picking the jobid from output...");
096                String jobID = null;
097                String sa[] = output.split("\n");
098                int idx;
099                for (int i = 0; i < sa.length; i++) {
100                        // System.out.println("string " + i + " = "+ sa[i]);
101                        idx = sa[i].indexOf("submitted to cluster ");
102                        if (idx >= 0) {
103                                jobID = sa[i].substring(idx + 21).trim();
104                                if (jobID.endsWith(".")) {
105                                        idx = jobID.lastIndexOf(".");
106                                        jobID = jobID.substring(0, idx);
107                                }
108                                // System.out.println("jobID = " + jobID);
109                        }
110                }
111
112                if (jobID == null) {
113                        if (error != null && error.length() > 0)
114                                throw new JobException("Error at submission of Condor job: "
115                                                + error);
116                        else
117                                throw new JobException("Error at submission of Condor job: "
118                                                + output);
119                }
120                return jobID;
121        } // end-of-submit
122
123        /**
124         * Get the command to ask the status of the job. return: the String of
125         * command
126         */
127        public String getStatusCmd(String jobID) {
128                String _commandStr = _condorStatusCmd;
129                return _commandStr;
130        }
131
132        /**
133         * Parse output of status check command and get status info. return: a
134         * JobStatusInfo object, or throws an JobException with the error output
135         */
136        public JobStatusInfo parseStatusOutput(String jobID, int exitCode,
137                        String output, String error) throws JobException {
138
139                // System.out.println("+++++ status: picking the status from output" );
140                JobStatusInfo stat = new JobStatusInfo();
141
142                boolean foundStatus = false;
143
144                if (exitCode == 0) {
145                        String sa[] = output.split("\n");
146                        int idx;
147                        for (int i = 0; i < sa.length; i++) {
148                                // System.out.println("string " + i + " = "+ sa[i]);
149                                if (sa[i].trim().startsWith(jobID + ".")) {
150                                        String vals[] = sa[i].trim().split("( )+", 9);
151                                        if (vals.length >= 6) {
152                                                stat.jobID = vals[0].trim();
153                                                stat.owner = vals[1].trim();
154                                                stat.submissionTime = vals[2].trim() + " "
155                                                                + vals[3].trim();
156                                                stat.runTime = vals[4].trim();
157                                                String sts = vals[5].trim();
158                                                switch (sts.charAt(0)) {
159                                                case 'R':
160                                                        stat.statusCode = JobStatusCode.Running;
161                                                        break;
162                                                case 'I':
163                                                        stat.statusCode = JobStatusCode.Wait;
164                                                        break;
165                                                default:
166                                                        stat.statusCode = JobStatusCode.Wait;
167                                                }
168                                                foundStatus = true;
169                                                if (isDebugging)
170                                                        log.debug("Values: jobid=" + stat.jobID + " owner="
171                                                                        + stat.owner + " submTime="
172                                                                        + stat.submissionTime + " runTime="
173                                                                        + stat.runTime + " status=[" + sts + "]");
174                                        }
175                                }
176                        }
177                        // System.out.println("status = " + stat.statusCode);
178                }
179
180                if (!foundStatus) {
181                        if (error != null && error.length() > 0) {
182                                log
183                                                .warn("Error string = [" + error + "] len="
184                                                                + error.length());
185                                stat.jobID = jobID;
186                                stat.statusCode = JobStatusCode.Error;
187                        } else if (exitCode != 0) {
188                                log.warn("exitCode from execution = " + exitCode
189                                                + ", stdout = \n" + output);
190                                stat.jobID = jobID;
191                                stat.statusCode = JobStatusCode.Error;
192                        } else { // not an error, just job is not in the job queue now
193                                stat.jobID = jobID;
194                                stat.statusCode = JobStatusCode.NotInQueue;
195                        }
196                }
197
198                return stat;
199        }
200
201        /**
202         * Get the command to remove a job from queue (either running or waiting
203         * jobs). return: the String of command
204         */
205        public String getDeleteCmd(String jobID) {
206                String _commandStr = _condorDeleteCmd + jobID;
207                return _commandStr;
208        }
209
210        /**
211         * Parse output of delete command. return: true or false indicating that the
212         * command was successful or not
213         */
214        public boolean parseDeleteOutput(String jobID, int exitCode, String output,
215                        String error) throws JobException {
216                if (exitCode == 0)
217                        return true;
218                else
219                        return false;
220        }
221
222        // ////////////////////////////////////////////////////////////////////
223        // // private variables ////
224
225        // The combined command to execute.
226        private String _condorSubmitCmd = "condor_submit ";
227        private String _condorStatusCmd = "condor_q ";
228        private String _condorDeleteCmd = "condor_rm ";
229
230        public String getTaskStatusCmd(String jobID) throws NotSupportedException {
231                throw new NotSupportedException("Task parallel jobs are not supported");
232        }
233
234        public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID,
235                        int numTasks, int exitCode, String output, String error)
236                        throws JobException, NotSupportedException {
237                throw new NotSupportedException("Task parallel jobs are not supported");
238        }
239
240} // end-of-class-JobSupportCondor