001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2012-07-27 18:35:29 +0000 (Fri, 27 Jul 2012) $' 
007 * '$Revision: 30295 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034
035/**
036 * Support class for the fork job manager support. Uses the jobmgr-fork.sh
037 * script to fork processes, which should be installed in path on the target
038 * machine. Class Job uses the methods of a supporter class to submit jobs and
039 * check status
040 */
041public class JobSupportFork implements JobSupport {
042
043        private static final Log log = LogFactory.getLog(JobSupportFork.class
044                        .getName());
045        private static final boolean isDebugging = log.isDebugEnabled();
046
047        public JobSupportFork() {
048        }
049
050        public void init(String nccsBinPath) {
051                if (nccsBinPath != null && !nccsBinPath.trim().equals("")) {
052                        String binPath = new String(nccsBinPath);
053                        if (!nccsBinPath.endsWith("/"))
054                                binPath += "/";
055                        _forkMgrCmd = binPath + _forkMgrCmd;
056                }
057        }
058
059        /**
060         * Create a submission file for the specific job manager, based on the
061         * information available in Job: - executable name - input files - output
062         * files - arguments for the job
063         */
064        public boolean createSubmitFile(String filename, Job job) {
065
066                return false;
067        }
068
069        /**
070         * Submit command for fork return: the command for submission
071         */
072    public String getSubmitCmd(String submitFile, String options, Job job) throws JobException {
073
074        if(job.getDependentJobs() != null) {
075            throw new JobException("Support for job dependencies with Fork has not been implemented.");
076        }
077
078                String _commandStr;
079                if (options != null)
080                        _commandStr = _forkMgrCmd + " " + options + " -s " + submitFile;
081                else
082                        _commandStr = _forkMgrCmd + " -s " + submitFile;
083
084                return _commandStr;
085        }
086
087        /**
088         * Parse output of submission and get information: jobID return String jobID
089         * on success throws JobException at failure (will contain the error stream
090         * or output stream)
091         */
092        public String parseSubmitOutput(String output, String error)
093                        throws JobException {
094
095                /*
096                 * jmgr-fork.sh submission output: on success, it is like: Submitted job
097                 * 102368 on error, messages are printed on stderr, stdout is empty
098                 */
099                String jobID = null;
100                int idx = output.indexOf("\n");
101
102                if (idx > -1) {
103                        String firstrow = output.substring(0, idx);
104                        if (firstrow.matches("Submitted job [0-9]*.*")) {
105                                jobID = firstrow.substring(14);
106                        }
107                        if (isDebugging)
108                                log.debug("Fork parse: jobID = " + jobID + " firstrow = "
109                                                + firstrow);
110                }
111
112                if (jobID == null) {
113                        if (error != null && error.length() > 0)
114                                throw new JobException("Error at submission of fork job: "
115                                                + error);
116                        else
117                                throw new JobException("Error at submission of fork job: "
118                                                + output);
119                }
120                return jobID;
121        } // end-of-submit
122
123        /**
124         * Get the command to ask the status of the job return: the String of
125         * command
126         */
127        public String getStatusCmd(String jobID) {
128                // jmgr-fork.sh -t <jobID>
129                // parseStatusOutput has to look for the given job
130                String _commandStr = _forkMgrCmd + " -t " + jobID;
131                return _commandStr;
132        }
133
134        /**
135         * Parse output of status check command and get status info return: a
136         * JobStatusInfo object, or throws an JobException with the error output
137         */
138        public JobStatusInfo parseStatusOutput(String jobID, int exitCode,
139                        String output, String error) throws JobException {
140
141                // Fork status gives back the unix ps command string on the specific
142                // process id
143                // with the format: PID USER STAT COMMAND START
144                // e.g. 1625 pnorbert Ss -bash 15:09
145                // 5087 root Ss /share/apps/merc Jun 14
146                // note that start-up time can be two fields
147                // If no such job found, the exit code is 1
148                // (and error log is: No job with id 24135 found)
149
150                JobStatusInfo stat = new JobStatusInfo();
151                stat.statusCode = JobStatusCode.NotInQueue;
152
153                if (exitCode == 1) {
154                        // no such job, which is not an error for us
155                        if (isDebugging)
156                                log.debug("No such process found with process ID: " + jobID);
157                        return stat;
158                }
159
160                boolean foundStatus = false;
161
162                if (output.trim().startsWith(jobID)) {
163
164                        String vals[] = output.trim().split("( )+", 6);
165                        if (vals.length >= 5) {
166                                stat.jobID = vals[0].trim();
167                                stat.owner = vals[1].trim();
168                                String jobName = vals[3].trim();
169                                stat.submissionTime = vals[4].trim();
170                                if (vals.length > 5)
171                                        stat.submissionTime += " " + vals[5].trim();
172                                stat.statusCode = JobStatusCode.Running;
173
174                                foundStatus = true;
175                                if (isDebugging)
176                                        log.debug("Fork status Values: jobid=" + stat.jobID
177                                                        + " owner=" + stat.owner + " jobname=" + jobName
178                                                        + " submit/startTime=" + stat.submissionTime
179                                                        + " status=[" + vals[2].trim() + "]");
180                        }
181                }
182
183                if (!foundStatus) {
184                        if (error != null && error.length() > 0) {
185                                log
186                                                .warn("Error string = [" + error + "] len="
187                                                                + error.length());
188                                stat.jobID = jobID;
189                                stat.statusCode = JobStatusCode.Error;
190                        } else { // have no idea what happened, not even an error string?
191                                log.warn("Unexpected case: No process found with ID " + jobID
192                                                + " but the command did not return with error code 1");
193                                stat.jobID = jobID;
194                                stat.statusCode = JobStatusCode.Error;
195                        }
196                }
197
198                return stat;
199        }
200
201        /**
202         * Get the command to remove a job (kill the process) return: the String of
203         * command
204         */
205        public String getDeleteCmd(String jobID) {
206                String _commandStr = _forkMgrCmd + " -r " + jobID;
207                return _commandStr;
208        }
209
210        /**
211         * Parse output of delete command. return: true if stdout is empty and
212         * exitCode is 0, otherwise return false
213         */
214        public boolean parseDeleteOutput(String jobID, int exitCode, String output,
215                        String error) throws JobException {
216                if (exitCode == 0 && output.length() == 0)
217                        return true;
218                else
219                        return false;
220        }
221
222        // ////////////////////////////////////////////////////////////////////
223        // // private variables ////
224
225        // The jobmanager command to execute.
226        private String _forkMgrCmd = "jmgr-fork.sh";
227
228        public String getTaskStatusCmd(String jobID) throws NotSupportedException {
229                throw new NotSupportedException("Task parallel jobs are not supported");
230        }
231
232        public TaskParallelJobStatusInfo parseTaskStatusOutput(String jobID,
233                        int numTasks, int exitCode, String output, String error)
234                        throws JobException, NotSupportedException {
235                throw new NotSupportedException("Task parallel jobs are not supported");
236        }
237
238} // end-of-class-JobSupportFork