001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-10-23 19:16:46 +0000 (Fri, 23 Oct 2015) $' 
007 * '$Revision: 34054 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.actor.job;
031
032import java.io.File;
033import java.io.FileWriter;
034import java.io.IOException;
035import java.io.Writer;
036import java.util.ArrayList;
037import java.util.Arrays;
038import java.util.List;
039import java.util.regex.Matcher;
040import java.util.regex.Pattern;
041
042import org.apache.commons.logging.Log;
043import org.apache.commons.logging.LogFactory;
044import org.kepler.configuration.ConfigurationManager;
045import org.kepler.configuration.ConfigurationNamespace;
046import org.kepler.configuration.ConfigurationProperty;
047import org.kepler.job.Job;
048import org.kepler.job.JobException;
049import org.kepler.job.JobFactory;
050import org.kepler.job.JobManagerFactory;
051import org.kepler.job.JobStatusCode;
052import org.kepler.job.TaskParallelJobStatusInfo;
053
054import ptolemy.actor.TypedAtomicActor;
055import ptolemy.actor.TypedIOPort;
056import ptolemy.actor.gui.style.TextStyle;
057import ptolemy.actor.parameters.PortParameter;
058import ptolemy.data.ArrayToken;
059import ptolemy.data.BooleanToken;
060import ptolemy.data.IntToken;
061import ptolemy.data.ObjectToken;
062import ptolemy.data.StringToken;
063import ptolemy.data.Token;
064import ptolemy.data.expr.Parameter;
065import ptolemy.data.expr.StringParameter;
066import ptolemy.data.type.BaseType;
067import ptolemy.kernel.CompositeEntity;
068import ptolemy.kernel.util.Attribute;
069import ptolemy.kernel.util.IllegalActionException;
070import ptolemy.kernel.util.NameDuplicationException;
071import ptolemy.kernel.util.Settable;
072
073/**
074 * A generic job launcher actor that can launch a job using PBS, NCCS, Condor,
075 * Loadleveler, SGE, Moab or LSF, and wait till a user specified status.
076 * <p>
077 * JobLauncher actor is based on code from Norbert Podhorszki's JobCreator, JobManager,
078 * JobStatus, and JobSubmitter actors. It uses JobLauncher.properties to find
079 * the list of supported job schedulers and the corresponding support class.
080 * 
081 * Additionally it can support multi task jobs
082 * @author Frankie Kwok, Chandrika Sivaramakrishnan, Jared Chase
083 * @version $Id: GenericJobLauncher.java 34054 2015-10-23 19:16:46Z crawl $
084 */
085@SuppressWarnings("serial")
086public class GenericJobLauncher extends TypedAtomicActor {
087        public GenericJobLauncher(CompositeEntity container, String name)
088                        throws NameDuplicationException, IllegalActionException {
089                super(container, name);
090
091                /** Job creator parameter and port */
092                // target selects the machine where the jobmanager is running
093                target = new PortParameter(this, "target", new StringToken(
094                                "[local | [user]@host]"));
095                new Parameter(target.getPort(), "_showName", BooleanToken.TRUE);
096                target.setStringMode(true);
097
098                // submission file parameter & port
099                cmdFile = new PortParameter(this, "cmdFile", new StringToken(
100                                "/path/to/job.submit"));
101                cmdFile.setTypeEquals(BaseType.STRING);
102                cmdFile.getPort().setTypeEquals(BaseType.STRING);
103                new Parameter(cmdFile.getPort(), "_showName", BooleanToken.TRUE);
104                cmdFile.setStringMode(true);
105
106                // local/remote submission file flag parameter
107                cmdFileLocal = new Parameter(this, "cmdFileLocal", BooleanToken.TRUE);
108                cmdFileLocal.setTypeEquals(BaseType.BOOLEAN);
109
110                // executable file's name parameter & port
111                executable = new StringParameter(this, "executable file");
112                executable.setVisibility(Settable.EXPERT);
113
114                // working dir name parameter & port
115                workdir = new PortParameter(this, "workdir", new StringToken(
116                                ".kepler-hpcc"));
117                new Parameter(workdir.getPort(), "_showName", BooleanToken.TRUE);
118                workdir.setStringMode(true);
119                
120                //if true - actor doesn't create a unique sub directory, assumes that the
121                //user given workdir is unique and to be used as such.
122                usegivendir = new Parameter(this, "use given workdir",
123                                new BooleanToken(false));
124                usegivendir.setTypeEquals(BaseType.BOOLEAN);
125                usegivendir.setVisibility(Settable.EXPERT);
126
127                // list of input files' names parameter & port
128                inputfiles = new PortParameter(this, "inputfiles", new ArrayToken(
129                                BaseType.STRING));
130                new Parameter(inputfiles.getPort(), "_showName", BooleanToken.TRUE);
131
132                // list of remote input files' names parameter & port
133                remotefiles = new PortParameter(this, "remotefiles", new ArrayToken(
134                                BaseType.STRING));
135                new Parameter(remotefiles.getPort(), "_showName", BooleanToken.TRUE);
136
137                /** Job Manager parameter and port */
138                // jobManager denotes the name of the actual job manager
139                scheduler = new PortParameter(this, "scheduler", new StringToken(
140                                "SGE"));
141                scheduler.setStringMode(true);
142                cp = ConfigurationManager.getInstance()
143                                .getProperty(ConfigurationManager.getModule("actors"),
144                                                new ConfigurationNamespace("JobLauncher"));
145                properties = cp.getProperties("value", true);
146                for (ConfigurationProperty property : properties) {
147                        scheduler.addChoice(property.getValue());
148                }
149                
150                
151                // jobManager.setStringMode(true); // string mode (no "s, but no
152                // variables as well!
153                new Parameter(scheduler.getPort(), "_showName", BooleanToken.TRUE);
154
155                // flag to set if you want the actor to stage the default fork script
156                defaultForkScript = new Parameter(this, "Use default fork script",
157                                new BooleanToken(false));
158                defaultForkScript.setTypeEquals(BaseType.BOOLEAN);
159                defaultForkScript.setVisibility(Settable.EXPERT);
160
161                // binPath is the full path to the jobmanager commands on the target
162                // machine
163                binPath = new StringParameter(this, "binary path");
164                binPath.setVisibility(Settable.EXPERT);
165
166                // jobSubmitOptions are optional parameters to pass to
167                // submitting a job
168                jobSubmitOptions = new StringParameter(this, "job submit options");
169                jobSubmitOptions.setVisibility(Settable.EXPERT);
170
171                // numTasks is the number of tasks for this job
172                numTasks = new Parameter(this, "numTasks");
173                numTasks.setExpression("0");
174
175                /** Job Status parameter and port */
176                
177                waitUntil = new Parameter(this, "Wait Until Status", new StringToken(
178                "ANY"));
179                waitUntil.setStringMode(true);
180                waitUntil.addChoice("ANY");
181                for (JobStatusCode code : JobStatusCode.values()) {
182                        waitUntil.addChoice(code.toString());
183                }
184
185                sleepWhileWaiting = new Parameter(this, "Wait Until Sleep (seconds)",
186                new IntToken(_sleepWhileWaitingVal));
187                sleepWhileWaiting.setTypeEquals(BaseType.INT);
188                
189                // Output: jobID of the submitted job
190                jobOut = new TypedIOPort(this, "jobOut", false, true);
191                jobOut.setTypeEquals(BaseType.OBJECT);
192                new Parameter(jobOut, "_showName", BooleanToken.TRUE);
193
194                // Output: log
195                logPort = new TypedIOPort(this, "logPort", false, true);
196                logPort.setTypeEquals(BaseType.STRING);
197                new Parameter(logPort, "_showName", BooleanToken.TRUE);
198
199                //Output: success
200                success = new TypedIOPort(this, "success", false, true);
201                success.setTypeEquals(BaseType.BOOLEAN);
202                new Parameter(success, "_showName", BooleanToken.TRUE);
203                
204                cmdText = new PortParameter(this, "cmdText");
205                cmdText.setTypeEquals(BaseType.STRING);
206                cmdText.setStringMode(true);
207                new TextStyle(cmdText, "_style");
208                cmdText.getPort().setTypeEquals(BaseType.STRING);
209                new Parameter(cmdText.getPort(), "_showName");
210                
211                dependentJob = new TypedIOPort(this, "dependentJob", true, false);
212                dependentJob.setTypeEquals(BaseType.OBJECT);
213                dependentJob.setMultiport(true);
214                new Parameter(dependentJob, "_showName");
215                
216                jobID = new TypedIOPort(this, "jobID", false, true);
217                jobID.setTypeEquals(BaseType.STRING);
218                new Parameter(jobID, "_showName", BooleanToken.TRUE);
219                
220        }
221
222        /***************************************************************************
223         * ports and parameters
224         */
225
226        /**
227         * The submit file to be used at job submission. Absolute (or relative to
228         * current dir of Java) file path should be provided. The job file must
229         * be provided here, or the contents can be specified in cmdText.
230         * 
231         * <p>
232         * This parameter is read each time in fire().
233         * </p>
234         */
235        public PortParameter cmdFile;
236
237        /**
238         * Specifying whether the cmdFile is locally stored or on the remote target.
239         * 
240         * <p>
241         * This parameter is read each time in fire().
242         * </p>
243         */
244        public Parameter cmdFileLocal;
245
246        /**
247         * The executable file to be used at job submission. Absolute path names, or
248         * relative to current dir of the running java virtual machine, should be
249         * provided. If it is "" then it is considered to be already at the remote
250         * site, otherwise the actor will look for it locally and stage it to the
251         * <i>workdir</i> before job submission.
252         * 
253         * <p>
254         * This parameter is read each time in fire().
255         * </p>
256         */
257        public Parameter executable;
258
259        /**
260         * The working directory in which the actual job submission command will be
261         * executed (on the remote machine if the job manager is a remote
262         * jobmanager).
263         * 
264         * <p>
265         * It should be an absolute path, or a relative one. In the latter case on
266         * remote machine, the directory path will be relative to the user's home
267         * directory (coming from the use of ssh).
268         * </p>
269         * By default, a new unique sub directory is created within this workdir
270         * based on the job id created by kepler. Job is run from this sub
271         * directory. This can be overwritten by setting the parameter "use given
272         * workdir"
273         * <p>
274         * This parameter is read each time in fire().
275         * </p>
276         */
277        public PortParameter workdir;
278
279        /**
280         * By default, Kepler creates a unique sub directory within workdir based on
281         * the the job id it creates for the job. Job is run from this sub
282         * directory. Set this flag to true if you want job to be run directly from
283         * workdir instead of a subdir
284         * 
285         * <p>
286         * This parameter is read each time in fire().
287         * </p>
288         */
289        public Parameter usegivendir;
290
291        /**
292         * The string array of inputfiles. Absolute path names, or relative to
293         * current dir of the running java virtual machine, should be provided.
294         * 
295         * <p>
296         * This parameter is read each time in fire().
297         * </p>
298         */
299        public PortParameter inputfiles;
300
301        /**
302         * The string array of remote input files. Absolute path names, or relative
303         * to the user home dir on the remote host should be provided.
304         * 
305         * <p>
306         * This parameter is read each time in fire().
307         * </p>
308         */
309        public PortParameter remotefiles;
310
311        /**
312         * The name of the jobmanager to be used It should be a name, for which a
313         * supporter class exist as <i>org.kepler.job.JobSupport<jobManager>.class
314         * 
315         * This parameter is read each time in fire().
316         */
317        public PortParameter scheduler;
318
319        /**
320         * Boolean flag to indicate if the default fork script should be staged. If
321         * bin path is provided the default script is uploaded to bin path, else it
322         * is uploaded to the working directory
323         */
324        public Parameter defaultForkScript;
325
326        /**
327         * The machine to be used at job submission. It should be null, "" or
328         * "local" for the local machine or [user@]host to denote a remote machine
329         * accessible with ssh.
330         * 
331         * This parameter is read each time in fire().
332         */
333        public PortParameter target;
334
335        /**
336         * The path to the job manager commands on the target machines. Commands are
337         * constructed as <i>binPath/command</i> and they should be executable this
338         * way. This parameter is read each time in fire().
339         */
340        public Parameter binPath;
341
342        /**
343         * The number of tasks for the job - used in a task parallel job
344         */
345        public Parameter numTasks;
346
347         /** 
348         * The Options of the job submission. Such as "-o /u/joboutput/ -j y -l
349         * h_rt=24:00:00" for SGE job scheduler. Its default value is empty.
350         */
351        public Parameter jobSubmitOptions;
352
353        /**
354         * The job is passed on in this actor. This token can be used (delaying it
355         * with a Sleep actor) to ask its Status again and again until the job is
356         * finished or aborted. This port is an output port of type Object.
357         */
358        public TypedIOPort jobOut;
359        
360        /**
361         * The real job ID generated from the job scheduler.
362         */
363        public TypedIOPort jobID;
364
365        /**
366         * Logging information of job status query. Useful to inform user about
367         * problems at unsuccessful status query but it also prints out job status
368         * and job id on successful query. This port is an output port of type
369         * String. The name of port on canvas is 'log'
370         */
371        public TypedIOPort logPort;
372
373        /**
374         * Wait until the job has a reached specific status. The available status'
375         * that can be reached are: any, wait, running, not in queue, and error.
376         */
377        public Parameter waitUntil;
378
379        /**
380         * Amount of time (in seconds) to sleep between checking job status.
381         */
382        public Parameter sleepWhileWaiting;
383
384        /**
385         * The exit code of the command. If the exit code is 0, the command was
386         * performed successfully. If the exit code is anything other than a 0, an
387         * error occured.
388         */
389        // public TypedIOPort exitcode;
390        
391        /**
392         * boolean flag to indicate if job launch was successful
393         */
394        public TypedIOPort success;
395        
396        
397        /** The text of the job specification. The job specification must either
398         *  be provided in this parameter or the file name in cmdFile. 
399         */
400        public PortParameter cmdText;
401        
402        /** One or more jobs that must successfully complete before this job can run. */
403        public TypedIOPort dependentJob;
404
405        /**
406         * fire
407         * 
408         * @exception IllegalActionException
409         *                Not thrown.
410         */
411        public void fire() throws IllegalActionException {
412                super.fire();
413
414                /* Job creation by processing port parameters */
415                System.out.println("KEPLER HOME IS " + System.getProperty("KEPLER"));
416                System.out.println("USER DIR IS "+ System.getProperty("user.dir"));
417                cmdFile.update();
418                cmdText.update();
419                workdir.update();
420                inputfiles.update();
421                remotefiles.update();
422                
423                String strLog = null;
424                String strExecutable = null;
425                String strJobOptions = null;
426                String strBinPath = null;
427                boolean bUseGivenDir = false;
428                boolean bDefaultFork = false;
429                
430                // read any dependent jobs
431                Job[] dependentJobArray = null;
432                if(dependentJob.numberOfSources() > 0) {
433                    dependentJobArray = new Job[dependentJob.getWidth()];
434                    for(int i = 0; i < dependentJob.getWidth(); i++) {
435                        dependentJobArray[i] = (Job) ((ObjectToken)dependentJob.get(i)).getValue();
436                    }
437                }
438
439                if (this.getAttribute("_expertMode") != null) {
440                        Token temp = null;
441                        temp = (executable != null) ? executable.getToken() : null;
442                        strExecutable = (temp != null) ? ((StringToken) temp).stringValue()
443                                        .trim() : null;
444                        //back compatibility, remove the double quotes at the very beginning and at the very last.
445                        strExecutable = strExecutable.replaceAll("^\"|\"$", "");
446
447                        temp = (binPath != null) ? binPath.getToken() : null;
448                        strBinPath = (temp != null) ? ((StringToken) temp).stringValue()
449                                        .trim() : null;
450                        //back compatibility, remove the double quotes at the very beginning and at the very last.
451                        strBinPath = strBinPath.replaceAll("^\"|\"$", "");
452
453                        temp = (jobSubmitOptions != null) ? jobSubmitOptions.getToken()
454                                        : null;
455                        strJobOptions = (temp != null) ? ((StringToken) temp).stringValue()
456                                        .trim() : null;
457                        //back compatibility, remove the double quotes at the very beginning and at the very last.
458                        strJobOptions = strJobOptions.replaceAll("^\"|\"$", "");                        
459
460                        bUseGivenDir = ((BooleanToken) usegivendir.getToken())
461                                        .booleanValue();
462                        bDefaultFork = ((BooleanToken) defaultForkScript.getToken())
463                                        .booleanValue();
464                }
465
466                scheduler.update();
467                target.update();
468
469            String strCmdFile = null;
470            String strCmdText = null;
471                StringToken token = (StringToken) cmdFile.getToken();
472                if(token != null) {
473                    strCmdFile = token.stringValue().trim();
474                        //back compatibility, remove the double quotes at the very beginning and at the very last.
475                    strCmdFile = strCmdFile.replaceAll("^\"|\"$", "");
476                    if(strCmdFile.isEmpty()) {
477                        strCmdFile = null;
478                    }
479                }
480
481                token = (StringToken) cmdText.getToken();
482                if(token != null) {
483                    strCmdText = token.stringValue().trim();
484                    if(strCmdText.isEmpty()) {
485                        strCmdText = null;
486                    }
487                }
488                
489                boolean bCmdFileLocal = ((BooleanToken) cmdFileLocal.getToken())
490                                .booleanValue();
491
492                StringToken temp = ((StringToken) workdir.getToken());
493                String strWorkdir = temp==null? null :temp.stringValue().trim();
494                //back compatibility, remove the double quotes at the very beginning and at the very last.
495                strWorkdir = strWorkdir.replaceAll("^\"|\"$", "");
496
497                temp = ((StringToken) scheduler.getToken());
498                strScheduler =  temp==null? null :temp.stringValue().trim();
499                //back compatibility, remove the double quotes at the very beginning and at the very last.
500                strScheduler = strScheduler.replaceAll("^\"|\"$", "");
501                
502                temp = ((StringToken) target.getToken());
503                strTarget = temp==null? null :temp.stringValue().trim();
504                //back compatibility, remove the double quotes at the very beginning and at the very last.
505                strTarget = strTarget.replaceAll("^\"|\"$", "");
506                
507                // Process the inputfiles parameter.
508                ArrayToken inputTokens = (ArrayToken) inputfiles.getToken();
509                String[] inputArray = null;
510
511                try {
512                        if (inputTokens.length() >= 1) {
513                                int i;
514
515                                ArrayList<String> iFiles = new ArrayList<String>();
516
517                                for (i = 0; i < inputTokens.length(); i++) {
518                                        boolean fileFound = false;
519                                        File pattern = new File(((StringToken) inputTokens
520                                                        .getElement(i)).stringValue().trim());
521                                        String[] contents = (pattern.getParent() != null) ? new File(
522                                                        pattern.getParent()).list()
523                                                        : null;
524                                        String fileName = pattern.getName();
525
526                                        if (!fileName.equals("")) {
527                                                fileName = fileName.replaceAll("[*]", ".*").replaceAll(
528                                                                "[?]", ".?").replaceAll("[+]", ".+");
529
530                                                Pattern p = Pattern.compile(fileName);
531                                                if (contents != null) {
532                                                        for (int index = 0; index < contents.length; index++) {
533                                                                Matcher m = p.matcher(contents[index].trim());
534                                                                if (m.matches()) {
535                                                                        iFiles
536                                                                                        .add(pattern.getParent()
537                                                                                                        + System
538                                                                                                                        .getProperty("file.separator")
539                                                                                                        + contents[index]);
540                                                                        fileFound = true;
541                                                                }
542                                                        }
543                                                }
544                                                if (!fileFound) {
545                                                        throw new JobException(
546                                                                        "No matching file found for "
547                                                                                        + pattern.toString());
548                                                }
549                                        }
550                                }
551
552                                if (iFiles.size() != 0) {
553                                        inputArray = new String[iFiles.size()];
554                                        iFiles.toArray(inputArray);
555                                }
556                        }
557                } catch (JobException ex) {
558                        log.error(ex);
559                        // ex.printStackTrace();
560                        throw new IllegalActionException(ex.toString());
561                }
562
563                // Process the remotefiles parameter.
564                ArrayToken remoteTokens = (ArrayToken) remotefiles.getToken();
565                String[] remoteArray = null;
566                if (remoteTokens.length() >= 1) {
567                        remoteArray = new String[remoteTokens.length()];
568                        int i;
569                        for (i = 0; i < remoteTokens.length(); i++) {
570                                remoteArray[i] = (((StringToken) remoteTokens.getElement(i))
571                                                .stringValue().trim());
572                        }
573                        // process empty array
574                        if (i == 0 || remoteArray[0] == "") {
575                                remoteArray = null;
576                        }
577                }
578
579                // create job
580                String strJobID = JobFactory.create();
581                Job _job = JobFactory.get(strJobID);
582
583                // set the dependencies, if any
584                if(dependentJobArray != null) {
585                    _job.setDependentJobs(dependentJobArray);
586                }
587                
588                try {
589                        // set _job's executable, working dir and input files
590                        if (strExecutable != null && strExecutable.trim().length() > 0) {
591                                _job.setExecutable(strExecutable, true, strJobOptions);
592                        }
593
594                        if (strWorkdir != null && strWorkdir.trim().length() > 0) {
595                                _job.setWorkdir(strWorkdir, !bUseGivenDir);
596                        } else {
597                                if (bUseGivenDir) {
598                                        throw new JobException(
599                                                        "The flag 'use given workdir' is set to true. " +
600                                                        "Please provide a valid working directory. \n " + 
601                                                        "Or you could uncheck the flag and let the actor create a " +
602                                                        "unique working directory for your job");
603                                }
604                                if (strTarget == null || strTarget.trim().equals("")
605                                                || strTarget.equals("local")|| strTarget.equals("localhost")) {
606                                        //If submitting to localhost, find home dir using java
607                                        strWorkdir = System.getProperty("user.home");
608                                        if ( System.getProperty("os.name").toLowerCase().indexOf("win") >= 0 ) {
609                                                strWorkdir = System.getenv().get("HOMEPATH");
610                                        }
611                                }else{
612                                        strWorkdir = "$HOME";
613                                }
614                                _job.setWorkdir(strWorkdir);
615                        }
616
617                        // make sure both cmdFile and cmdText were not used
618                        if(strCmdText != null && strCmdFile != null) {
619                            throw new IllegalActionException(this, "Do not specify both cmdText and cmdFile.");
620                        }
621                        
622                        // make sure at least one of cmdFile and cmdText were used
623                        if(strCmdText == null && strCmdFile == null) {
624                            throw new IllegalActionException(this, "Must specify either cmdText or cmdFile.");
625                        }
626                        
627                        // if the commands were specified as text, write to a temporary file
628                        if (strCmdText != null) {
629                            File file;
630                try {
631                    file = File.createTempFile("job", null);
632                } catch (IOException e) {
633                    throw new IllegalActionException(this, e,
634                            "Error creating temporary file for cmd text.");
635                }
636                            Writer writer = null;
637                            try {
638                                writer = new FileWriter(file);
639                                writer.write(strCmdText);
640                            } catch(IOException e) {
641                                throw new IllegalActionException(this, e, "Error write cmd text to file.");
642                            } finally {
643                                if(writer != null) {
644                                    try {
645                            writer.close();
646                        } catch (IOException e) {
647                            throw new IllegalActionException(this, e, "Error closing cmd text file.");
648                        }
649                                }
650                            }
651                            strCmdFile = file.getAbsolutePath();
652                            bCmdFileLocal = true; //if commands are text, bCmdFileLocal should be true so the file will be transfered to the target cluster.
653                        }
654                        
655                        _job.setSubmitFile(strCmdFile, bCmdFileLocal);
656                        
657                        if(bDefaultFork && "Fork".equalsIgnoreCase(strScheduler)){
658                                File resourcesDir = ConfigurationManager.getModule("job").getResourcesDir();
659                                File binFile = new File(resourcesDir,"jmgr-fork.sh");
660                                if(!binFile.exists()){
661                                        throw new JobException("Unable to locate default fork script - "
662                                                        + binFile.getAbsolutePath() + ". Please copy fork script manually.");
663                                }
664                                _job.setBinFile(binFile.getAbsolutePath(), true);
665                                //Set the bin path explicitly if it is not already set
666                                //This is required because jmgr-fork.sh fails with command not found. 
667                                //It works only if there is an absolute or relative path prefix to "jmgr-fork.sh"
668                                if(strBinPath == null || strBinPath.trim().equals("")){
669                                        strBinPath = _job.getWorkdirPath(); //setWorkdir was already called 
670                                                                                                        //so, this method should return the right path
671                                }
672                        }
673
674                        if (inputArray != null) {
675                                for (int i = 0; i < inputArray.length; i++) {
676                                        if (inputArray[i] != null
677                                                        && inputArray[i].trim().length() > 0)
678                                                _job.setInputFile(inputArray[i], true);
679                                }
680                        }
681
682                        if (remoteArray != null) {
683                                for (int i = 0; i < remoteArray.length; i++) {
684                                        if (remoteArray[i] != null
685                                                        && remoteArray[i].trim().length() > 0)
686                                                _job.setInputFile(remoteArray[i], false);
687                                }
688                        }
689                } catch (JobException ex) {
690                        log.error(ex);
691                        JobFactory.remove(strJobID);
692                        strJobID = "";
693                        _job = null;
694                        throw new IllegalActionException(this, ex, "Error creating job.");
695                }
696
697                /* Job Manager processing */
698                
699                org.kepler.job.JobManager myJmgr = null;
700
701                try {
702                        if (strScheduler==null || strScheduler.equals("")) {
703                                throw new JobException(
704                                                "Please provide a valid input for the port/parameter scheduler.");
705                        }
706                        // Read properties file.
707                        // String filePath = System.getProperty("KEPLER");
708                        // filePath = filePath + "/common/configs/ptolemy/configs"
709                        // + "/kepler/JobLauncher.properties";
710
711                        // File myFile = new File(filePath);
712                        // Properties properties = new Properties();
713                        // properties.load(new FileInputStream(filePath));
714                        // String jobsSupported = properties.getProperty(strScheduler
715                        // .toLowerCase());
716
717//                      ConfigurationProperty cp = ConfigurationManager.getInstance()
718//                                      .getProperty(ConfigurationManager.getModule("actors"),
719//                                                      new ConfigurationNamespace("JobLauncher"));
720                        properties = cp.findProperties("name",
721                                        strScheduler.toLowerCase(), true);
722                        String jobsSupported = null;
723                        if(properties.size() != 0){
724                                ConfigurationProperty prop = properties.get(0);
725                                jobsSupported = prop.getProperty("value").getValue();
726                        }
727                        if (jobsSupported != null) {
728                                strScheduler = jobsSupported;
729                        } else {
730                                throw new JobException("Job Scheduler " + strScheduler
731                                                + " is not supported.");
732                        }
733
734                        // Create a JobManager object or get it if it was already created
735                        if (isDebugging)
736                                log.debug("Create/get JobManager object. Name = "
737                                                + strScheduler + "; target = " + strTarget
738                                                + "; binPath = " + strBinPath);
739                        myJmgr = JobManagerFactory.get(strScheduler, strTarget, strBinPath);
740
741                        
742                        // Note that myJmgr.getID can give back a String reference to the
743                        // object that can be used with JobManagerFactory.get
744                } catch (JobException ex) {
745                        log.error("Job manager object could not be created. " + ex);
746                        myJmgr = null;
747                        JobFactory.remove(strJobID);
748                        strJobID = "";
749                        throw new IllegalActionException("JobManager Error: "
750                                        + ex.toString());
751                }
752
753                /* Job Submission */
754                boolean bSucc = false;
755                try {
756                        if (_job == null) {
757                                throw new JobException("JobSubmitter: incoming Job is null");
758                        }
759
760                        if (isDebugging) {
761                                log.debug("JobSubmit: submit job " + _job.getJobID() + "...");
762                        }
763                        String realJobID;
764
765                        int numTasksVal = ((IntToken)numTasks.getToken()).intValue();
766                        if(numTasksVal > 0) {
767                                _job.status = new TaskParallelJobStatusInfo();
768                                _job.setNumTasks(numTasksVal);
769                        }
770                        if(bUseGivenDir){
771                                //do not overwrite existing folder. create if not present
772                                realJobID = _job.submit(myJmgr, false, strJobOptions);
773                        }else {
774                                realJobID = _job.submit(myJmgr, true, strJobOptions);
775                        }
776                        strLog = new String("JobSubmitter: Job " + _job.getJobID()
777                                        + " is submitted, it's real jobID is: " + realJobID);
778                        log.info(strLog);
779                        jobID.send(0, new StringToken(realJobID));
780                        bSucc = true;
781                } catch (JobException ex) {
782                        log.error(ex);
783                        strLog = "JobSubmitter Error: " + ex.toString();
784                        success.send(0, new BooleanToken(bSucc));
785                        logPort.send(0, new StringToken(strLog));
786                        return;
787                } catch (Exception ex) {
788                        log.error(ex);
789                        strLog = "JobSubmitter Error: " + ex.toString();
790                        success.send(0, new BooleanToken(bSucc));
791                        logPort.send(0, new StringToken(strLog));
792                        return;
793                }
794
795                /* Job Status Checking */
796                JobStatusCode jobStatusCode;
797
798                try {
799
800              do {
801                //System.out.println("BEFORE CHECK JOB STATUS");
802                //jobStatusCode = _checkStatus(_job);
803                //System.out.println("AFTER CHECK JOB STATUS " + jobStatusCode);
804                // while (_waitUntilCode != null && _waitUntilCode != jobStatusCode)
805                // {
806                //Loop if there is no match and job status is NOT Error or NotInQueue.
807                //Second check is necessary to avoid infinite loop in case where job
808                //never gets to the user requested state or if the state goes undetected
809                //(say during sleep between poll).
810                Long time = 1000L * _sleepWhileWaitingVal;
811                Thread.sleep(time);
812                jobStatusCode = _checkStatus(_job);
813              } while (!matchStatus(jobStatusCode) && jobStatusCode.ordinal()>1);
814
815                } catch (Exception ex) {
816                        log.error(ex);
817                        jobStatusCode = JobStatusCode.Error;
818                        strLog = "JobStatus Error: " + ex.toString();
819                        bSucc = false;
820                        success.send(0, new BooleanToken(bSucc));
821                        logPort.send(0, new StringToken(strLog));
822                        return;
823                }
824
825                if (_job != null) {
826                        strLog = new String("JobStatus: Status of job " + _job.getJobID()
827                                        + ": " + jobStatusCode.toString());
828                        jobOut.send(0, new ObjectToken(_job));
829                }
830                success.send(0, new BooleanToken(bSucc));
831                logPort.send(0, new StringToken(strLog));
832        }
833
834        private JobStatusCode _checkStatus(Job job) throws Exception {
835                JobStatusCode jobStatusCode = JobStatusCode.Error;
836                if (job == null) {
837                        throw new Exception("JobStatus: Job is null");
838                }
839
840                job.status(); // successful query or exception
841
842                jobStatusCode = job.status.statusCode;
843                log.info("Status of job " + job.getJobID() + ": "
844                                + jobStatusCode.toString());
845                return jobStatusCode;
846        }
847
848        private boolean matchStatus(JobStatusCode jobStatusCode) {
849                String str = jobStatusCode.toString();
850
851      if (_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)) {
852                        return true;
853                }
854                return false;
855        }
856
857        /** React to a change in an attribute. */
858        public void attributeChanged(Attribute attribute)
859                        throws IllegalActionException {
860                if (attribute == waitUntil) {
861                        String waitUntilStr = waitUntil.getExpression();
862                        waitUntilStr = waitUntilStr.trim();
863                        String[] split = waitUntilStr.split("\\s*,\\s*");
864                        _waitUntilCodes = new ArrayList<String>(Arrays.asList(split));
865                        // check validity
866                        if (_waitUntilCodes.contains("ANY")) {
867                                _waitUntilCodes.clear();
868                        } else {
869                                for (int i = 0; i < _waitUntilCodes.size(); i++) {
870                                        JobStatusCode waitUntilCode = JobStatusCode
871                                                        .getFromString(_waitUntilCodes.get(i));
872                                        if (waitUntilCode == null) {
873                                                throw new IllegalActionException(this,
874                                                                "Invalid job status type: "
875                                                                                + _waitUntilCodes.get(i));
876                                        }
877                                }
878                        }
879                } else if (attribute == sleepWhileWaiting) {
880                        if ((IntToken) sleepWhileWaiting.getToken() != null) {
881                                _sleepWhileWaitingVal = ((IntToken) sleepWhileWaiting
882                                                .getToken()).intValue();
883                                if (_sleepWhileWaitingVal < 0) {
884                                        throw new IllegalActionException(this,
885                                                        "Sleep While Waiting value cannot be negative.");
886                                }
887                        }
888                } else if (attribute == binPath) {
889                        //if binPath is changed, we should remove JobManager from the table to force it update.
890                        if (strScheduler != null && !strScheduler.isEmpty() && strTarget != null && !strTarget.isEmpty())
891                                JobManagerFactory.instance.removeJmgrFromTable(strScheduler, strTarget);
892                } else {
893                        super.attributeChanged(attribute);
894                }
895        }
896
897        private static final Log log = LogFactory.getLog(GenericJobLauncher.class
898                        .getName());
899        private static final boolean isDebugging = log.isDebugEnabled();
900        // private JobStatusCode _waitUntilCode = null;
901        private ArrayList<String> _waitUntilCodes = new ArrayList<String>();
902        private int _sleepWhileWaitingVal = 5;
903        private List<ConfigurationProperty> properties;
904        private ConfigurationProperty cp;
905        private String strScheduler, strTarget;
906}