001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: spurawat $'
006 * '$Date: 2014-02-05 19:12:24 +0000 (Wed, 05 Feb 2014) $' 
007 * '$Revision: 32579 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.job;
031
032import java.io.ByteArrayOutputStream;
033import java.io.File;
034import java.util.Iterator;
035import java.util.Vector;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.kepler.ssh.ExecException;
040import org.kepler.ssh.ExecFactory;
041import org.kepler.ssh.ExecInterface;
042import org.kepler.ssh.LocalExec;
043
044public class JobManager {
045
046        private JobSupport jobSupport = null; // job support class
047
048        private ExecInterface execObject; // class for remote/local execution
049
050        private String host; // remote host of jobmanager
051        private String user; // user at remote host to log in with ssh
052        
053        private String jobManagerName; // the support class name (just to generate
054                                                                   // Kepler ids)
055        private String managerBinPath;
056        
057        private static final Log log = LogFactory
058                        .getLog(JobManager.class.getName());
059        private static final boolean isDebugging = log.isDebugEnabled();
060
061        protected JobManager() {
062                preloadSupporterClasses();
063        }
064
065        public String getID() {
066                return JobManagerFactory.createKey(jobManagerName, user + "@" + host);
067        }
068        
069        private static void preloadSupporterClasses() {
070                // We need to get the supporter class names directly, otherwise
071                // java will not find it at runtime ???
072                // System.out.println("Condor class: " + CondorSupport.class.getName());
073                String c;
074                c = JobSupportFork.class.getName();
075                c = JobSupportCondor.class.getName();
076                c = JobSupportPBS.class.getName();
077                c = JobSupportNCCS.class.getName(); // Obsolete, same as PBS
078                c = JobSupportLoadLeveler.class.getName();
079                c = JobSupportSGE.class.getName();
080                c = JobSupportMoab.class.getName();
081                c = JobSupportSLURM.class.getName();
082        }
083
084        /**
085         * Choose a jobmanager for execution <i>jobmanager</i> can be which is
086         * supported at that time: Condor <i>target</i> is either "localhost" or
087         * "user@host" is the machine where the jobmanager is running <binPath> is
088         * the full path to the jobmanager commands on that machine, or "" or null
089         * if they are in the default path if "'jobmanager'Support" class cannot be
090         * instantiated, a JobException is thrown
091         */
092        protected void selectJobManager(String jobmanager, String target,
093                        String binPath) throws JobException {
094
095                // instantiate the supporter class
096                String classname = "org.kepler.job.JobSupport" + jobmanager;
097                try {
098                        // System.out.println("Condor class: " +
099                        // Class.forName(classname).getName());
100                        jobSupport = (JobSupport) Class.forName(classname).newInstance();
101                } catch (ClassNotFoundException cnf) {
102                        throw new JobException("Couldn't find class " + classname);
103                } catch (InstantiationException ie) {
104                        throw new JobException("Couldn't instantiate an object of type "
105                                        + classname);
106                } catch (IllegalAccessException ia) {
107                        throw new JobException("Couldn't access class " + classname);
108                }
109
110                // initialize the supporter class
111                jobSupport.init(binPath);
112                
113                //Store the bin path
114                managerBinPath = binPath;
115                
116                // store supporter name for Kepler use
117                jobManagerName = jobmanager;
118
119                // process the target
120                if (target == null || target.trim().equals("")
121                                || target.equals("local")|| target.equals("localhost")) {
122                        // localhost, finished
123                        execObject = new LocalExec();
124                        user = System.getProperty("user.name");
125                        host = new String("local");
126                } else {
127                        int atPos = target.indexOf('@');
128                        if (atPos >= 0)
129                                user = target.substring(0, target.indexOf('@'));
130                        else
131                                user = System.getProperty("user.name");
132
133                        host = target.substring(atPos + 1);
134                        try {
135                                execObject = ExecFactory.getExecObject(user,host);
136                        } catch (ExecException e) {
137                                throw new JobException("Error connecting to " + user +"@"+host + " : " + e.toString());
138                        }
139                }
140        }
141
142        /**
143         * Submit a job, called from Job.submit(); boolean <i>overwrite</i>
144         * indicates whether old files that exist on the same directory should be
145         * removed before staging new files. As long jobIDs are not really unique,
146         * this is worth to be true. <i>options</i> can be a special options string
147         * for the actual jobmanager.
148         * 
149         * @return: jobID as String if submission is successful (it is submitted and
150         *          real jobID of the submitted job can be retrieved) real jobID can
151         *          be found in job.status.jobID, but you do not need actually on
152         *          error throws JobException
153         */
154
155        protected String submit(Job job, boolean overwrite, String options)
156                        throws JobException {
157
158                // first, get the submit file
159                String submitFilePath = job.getSubmitFile(); // predefined submitfile?
160                if (submitFilePath == null) { // no, create it now for the specific job
161                        // manager
162                        submitFilePath = new String(job.getLocalWorkdirPath()
163                                        + File.separator + "submitcmd." + job.getJobID());
164                        jobSupport.createSubmitFile(submitFilePath, job);
165                        job.setSubmitFile(submitFilePath, true);
166                }
167
168                // the submitfile will be in current working dir of job, so we have to
169                // get the name of the submitfile without path
170                File sf = new File(submitFilePath);
171                String submitFileName = sf.getName();
172
173                // job manager specific submission command
174                String commandStr = jobSupport.getSubmitCmd(job.getWorkdirPath() + "/" + submitFileName, options, job);
175
176                //System.out.println("submit command is: " + commandStr);
177                
178                String cdCmd = "cd " + job.getWorkdirPath() + "; ";
179                int exitCode = 0;
180
181                if (commandStr == null || commandStr.trim().equals("")) {
182                        throw new JobException(
183                                        "Supporter class could not give back meaningful command to submit your job");
184                }
185
186                // stage the files before submission
187    try {
188                        // delete the remote working directory if asked by the submitter
189                        if (overwrite)
190                                execObject.deleteFile(job.getWorkdirPath(), true, false);
191                        
192                        execObject.createDir(job.getWorkdirPath(), true);
193                        
194                        boolean cpLocalBinScript = false;
195                        boolean cpRemoteBinScript = false;
196                        boolean binPathSpecified = false;
197                        String binFileName = job.getBinFile();
198                        File binFile = null;
199                        //if binFile exists check where it should be staged.
200                        if(binFileName!=null) {
201                                binFile = new File(binFileName);
202                                if(managerBinPath!=null && !(managerBinPath.trim().equals("")))
203                                        binPathSpecified = true;
204                                else if(job.isBinFileLocal()){
205                                        cpLocalBinScript = true;
206                                } else {
207                                        cpRemoteBinScript = true;
208                                }
209                        }
210
211                        //If bin path is specified stage bin file to binpath
212                        if( binPathSpecified ){
213                                if(job.isBinFileLocal()){
214                                        execObject.copyTo(binFile,managerBinPath,false);
215                                }else{
216                                        StringBuffer cmd = new StringBuffer("cp ");
217                                        cmd.append(binFileName);
218                                        cmd.append(" ");
219                                        cmd.append(managerBinPath);
220                                        ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
221                                        ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
222                                        exitCode = _exec(new String(cmd), commandStdout, commandStderr);
223                                        if (exitCode != 0) {
224                                                throw new JobException(
225                                                                "Error at copying remote bin file into specified bin path."
226                                                                                + "\nStdout:\n" + commandStdout
227                                                                                + "\nStderr:\n" + commandStderr);
228                                        }
229                                }
230                                
231                                //format bin file if it was copied from mac or windows
232                                Vector<File> vector = new Vector<File>();
233                                vector.add(binFile);
234                                formatFiles(managerBinPath,vector);
235                        }
236                        
237                        // stage local files/directories to the remote working directory
238                        // while also stripping end of line meta-characters from each file
239                        // if the file comes from mac or windows
240                        Vector<File> files = job.getLocalFiles();
241                        if(cpLocalBinScript){
242                                //add bin file to the list of files to be copied to workingdir
243                                files.add(binFile); 
244                        }
245                        execObject.copyTo(files, job.getWorkdirPath(), true);
246                        
247                        //Now try to change the format of the copied files
248                        //if the copy was from Windows or Mac
249                        formatFiles(job.getWorkdirPath(), files);
250                        
251                        
252                        // copy already remote files/directories to the working directory
253                        Vector<String> rfiles = job.getRemoteFiles();
254                        if (rfiles.size() > 0) {
255                                StringBuffer cmd = new StringBuffer("cp -r ");
256                                Iterator<String> it = rfiles.iterator();
257                                while (it.hasNext()) {
258                                        cmd = cmd.append((String) it.next());
259                                        cmd = cmd.append(" ");
260                                }
261                                if(cpRemoteBinScript){
262                                        //add bin file to the list of files to be copied to workingdir
263                                        cmd = cmd.append(binFileName);
264                                        cmd = cmd.append(" ");
265                                }
266                                cmd = cmd.append(job.getWorkdirPath());
267
268                                if (isDebugging)
269                                        log.debug("Remote file copy command: " + cmd);
270
271                                ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
272                                ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
273                                exitCode = _exec(new String(cmd), commandStdout, commandStderr);
274                                if (exitCode != 0) {
275                                        //if the error is because same file, do not throw exception.
276                                        String[] stdErrArray = commandStderr.toString().split("\n");
277                                        boolean throwE = false;
278                                        for (int index = 0; index < stdErrArray.length; index++) {
279                                                if (!stdErrArray[index].trim().endsWith("are the same file")) {
280                                                        throwE = true;
281                                                        break;
282                                                }
283                                        }
284                                        if (throwE)
285                                                throw new JobException(
286                                                        "Error at copying remote files into the job directory."
287                                                                        + "\nStdout:\n" + commandStdout
288                                                                        + "\nStderr:\n" + commandStderr);
289                                }
290                        }
291
292                } catch (ExecException e) {
293                        throw new JobException(
294                                        "Jobmanager.submit: Error at staging files to " + user
295                                                        + "@" + host + "\n" + e);
296                }
297
298                // we have to enter the workdir before submitting the job
299                commandStr = new String(cdCmd + commandStr);
300
301                // submit the job finally
302                ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
303                ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
304
305                exitCode = _exec(commandStr, commandStdout, commandStderr);
306
307                if (exitCode != 0) {
308                        throw new JobException("Error at job submission." + "\nCommand:" + commandStr + "\nStdout:\n"
309                                        + commandStdout + "\nStderr:\n" + commandStderr);
310                }
311
312                // parse the output for real jobID
313                // This method can throw JobException as well!
314                String jobID = jobSupport.parseSubmitOutput(commandStdout.toString(),
315                                commandStderr.toString());
316
317                return jobID;
318        } // end-of-submit
319
320        private void formatFiles(String dir, Vector<File> files) 
321        throws JobException {
322                boolean isMac = System.getProperty("os.name").toLowerCase()
323                .contains("mac");
324                boolean isWindows = System.getProperty("os.name").toLowerCase()
325                .contains("win");
326                int exitCode;
327                String cdCmd = "cd " + dir + "; ";
328                
329                if (files.size() > 0
330                                && !this.host.equals("local")
331                                && (isMac || isWindows)) {
332                        
333                        StringBuffer cmd = new StringBuffer(cdCmd);
334                        ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
335                        ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
336                        
337                        String filename = "*";
338                        if(files.size()==1){
339                                //if it is just one file use name instead of *
340                                filename = files.get(0).getName(); 
341                        }
342                        // use dos/mac 2unix if available
343                        if (isMac) {
344                                cmd.append("mac2unix ");
345                                cmd.append(filename);
346                        }
347                        if (isWindows) {
348                                cmd.append("dos2unix ");
349                                cmd.append(filename);
350                        }
351                        exitCode = _exec(new String(cmd), commandStdout, commandStderr);
352        
353                        if (exitCode != 0) { // use sed -i if dos/mac 2unix not
354                                                                        // available
355                                cmd = new StringBuffer(cdCmd);
356                                if (isMac) {
357                                        cmd.append("sed -i 's/\\r/\\n/g' ");
358                                        cmd.append(filename);
359                                }
360                                if (isWindows) {
361                                        cmd.append("sed -i 's/\\r//g' ");
362                                        cmd.append(filename);
363                                }
364                                exitCode = _exec(new String(cmd), commandStdout,
365                                                commandStderr);
366                                // use tr as last resort
367                                if (exitCode != 0) {
368                                        cmd = new StringBuffer(cdCmd);
369                                        Iterator<File> it = files.iterator();
370                                        while (it.hasNext()) {
371                                                File aFile = it.next();
372                                                String fileName = aFile.getName();
373        
374                                                cmd.append("cp " + fileName);
375                                                cmd.append(" tmp" + fileName);
376        
377                                                if (isMac) {
378                                                        cmd.append("; tr '\\r' '\\n' <tmp" + fileName);
379                                                }
380                                                if (isWindows) {
381                                                        cmd.append("; tr -d '\\r' <tmp" + fileName);
382                                                }
383        
384                                                cmd.append(" >" + fileName);
385                                                cmd.append("; rm tmp" + fileName + "; ");
386                                        }
387                                        exitCode = _exec(new String(cmd), commandStdout,
388                                                        commandStderr);
389                                        if (exitCode != 0) {
390                                                throw new JobException(
391                                                                "Error at copying local files into the job directory."
392                                                                                + "\nStdout:\n" + commandStdout
393                                                                                + "\nStderr:\n" + commandStderr);
394                                        }
395                                }
396                        }
397                }
398        }
399        
400        /**
401         * Check the status of the job
402         * 
403         * @return: JobStatusInfo struct if succeeded throws JobException on error,
404         *          or you call for a non-submitted job
405         */
406        protected JobStatusInfo status(String jobID) throws JobException {
407                return status(jobID,0);
408        }
409        
410        /**
411         * Check the status of the job and tasks if numTasks>0
412         * 
413         * @return: JobStatusInfo struct if succeeded throws JobException on error,
414         *          or you call for a non-submitted job
415         */
416        protected JobStatusInfo status(String jobID, int numTasks) throws JobException {
417                JobStatusInfo stat;
418                if (jobID == null) {
419                        throw new JobException(
420                                        "JobManager.status() called with null argument");
421                }
422                try{
423                        //1. Get command to execute
424                        String commandStr;
425                        if(numTasks > 0) {
426                                //execute jobstatus cmd and task status command
427                                commandStr = jobSupport.getTaskStatusCmd(jobID);
428                        }else{
429                                //execute job status command
430                                commandStr = jobSupport.getStatusCmd(jobID);
431                        }
432        
433                        //2. Execute command
434                        int exitCode = 0;
435                        if (commandStr == null || commandStr.trim().equals("")) {
436                                throw new JobException(
437                                                "Supporter class could not give back meaningful"
438                                                                + "command to check the status of your job");
439                        }
440        
441                        ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
442                        ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
443        
444                        exitCode = _exec(commandStr, commandStdout, commandStderr);
445                        // Do not check the exitCode, as error can mean just: job not in queue
446                        // (e.g. PBS)
447                        // if (exitCode != 0)
448                        // throw new JobException("Error at checking job status. Stdout:\n" +
449                        // commandStdout +
450                        // "\nStderr:\n" + commandStderr);
451        
452                        //3. Parse the output for status info
453                        // This method can throw JobException as well!
454                        
455                        System.out.println("JobManager.status numTasks: " + numTasks);
456                        if(numTasks > 0) {
457                                // parse both job status and individual task status. Returns TaskParallelJobStatusInfo
458                    // This method can throw JobException as well!
459                                stat = jobSupport.parseTaskStatusOutput(
460                                        jobID, numTasks, exitCode, commandStdout.toString(), commandStderr.toString());
461                        }else{
462                                stat = jobSupport.parseStatusOutput(jobID, exitCode, commandStdout
463                                                .toString(), commandStderr.toString());
464                        }
465                }catch(NotSupportedException e){
466                        throw new JobException(e.toString());
467                }
468                return stat;
469                
470        }
471
472        /**
473         * delete a job from queue
474         * 
475         * @return: JobStatusInfo struct if succeeded throws JobException on error,
476         *          or you call for a non-submitted job
477         */
478        protected boolean delete(String jobID) throws JobException {
479
480                if (jobID == null) {
481                        throw new JobException(
482                                        "JobManager.status() called with null argument");
483                }
484
485                String commandStr = jobSupport.getDeleteCmd(jobID);
486
487                int exitCode = 0;
488
489                if (commandStr == null || commandStr.trim().equals("")) {
490                        throw new JobException(
491                                        "Supporter class could not give back meaningful"
492                                                        + "command to remove your job");
493                }
494
495                ByteArrayOutputStream commandStdout = new ByteArrayOutputStream();
496                ByteArrayOutputStream commandStderr = new ByteArrayOutputStream();
497
498                exitCode = _exec(commandStr, commandStdout, commandStderr);
499
500                // Do not check the exitCode, as error can mean just: job not in queue
501                // (e.g. PBS)
502                // if (exitCode != 0)
503                // throw new JobException("Error at checking job removel. Stdout:\n"
504                // + commandStdout + "\nStderr:\n" + commandStderr);
505
506                // parse the output for delete info
507                // This method can throw JobException as well!
508                boolean stat = jobSupport.parseDeleteOutput(jobID, exitCode,
509                                commandStdout.toString(), commandStderr.toString());
510
511                return stat;
512
513        }
514
515        /**
516         * Execute a command either locally (Java Runtime) or remotely (SSH).
517         * 
518         * @return exitCode of the command.
519         */
520        private int _exec(String commandStr, ByteArrayOutputStream commandStdout,
521                        ByteArrayOutputStream commandStderr) throws JobException {
522
523                int exitCode = 0;
524                try {
525                        if (isDebugging)
526                                log
527                                                .debug("Execute on " + user + "@" + host + ": "
528                                                                + commandStr);
529                        exitCode = execObject.executeCmd(commandStr, commandStdout,
530                                        commandStderr);
531
532                } catch (ExecException e) {
533                        throw new JobException("Jobmanager._exec: Error at execution on "
534                                        + user + "@" + host + " of command: " + commandStr + "\n"
535                                        + e);
536                }
537
538                return exitCode;
539        }
540
541} // end-of-class-JobManager
542