001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: aschultz $'
006 * '$Date: 2010-02-22 16:21:40 -0800 (Mon, 22 Feb 2010) $' 
007 * '$Revision: 23182 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.actor.job;
031
032import java.io.File;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.List;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039import org.kepler.configuration.ConfigurationManager;
040import org.kepler.configuration.ConfigurationNamespace;
041import org.kepler.configuration.ConfigurationProperty;
042import org.kepler.job.Job;
043import org.kepler.job.JobException;
044import org.kepler.job.JobFactory;
045import org.kepler.job.JobManagerFactory;
046import org.kepler.job.JobStatusCode;
047
048import ptolemy.actor.TypedAtomicActor;
049import ptolemy.actor.TypedIOPort;
050import ptolemy.actor.parameters.PortParameter;
051import ptolemy.data.BooleanToken;
052import ptolemy.data.IntToken;
053import ptolemy.data.ObjectToken;
054import ptolemy.data.StringToken;
055import ptolemy.data.Token;
056import ptolemy.data.expr.Parameter;
057import ptolemy.data.type.BaseType;
058import ptolemy.kernel.CompositeEntity;
059import ptolemy.kernel.util.Attribute;
060import ptolemy.kernel.util.IllegalActionException;
061import ptolemy.kernel.util.NameDuplicationException;
062import ptolemy.kernel.util.Settable;
063
064/**
065 * A generic job launcher actor that can launch a job using PBS, NCCS, Condor,
066 * Loadleveler, or SGE, and wait till a user specified status.
067 * <p> 
068 * JobLauncher actor is based on code from Norbert Podhorszki's JobCreator, JobManager, 
069 * JobStatus, and JobSubmitter actors. It uses JobLauncher.properties to 
070 * find the list of supported job schedulers and the corresponding support class.
071 * @author Frankie Kwok, Chandrika Sivaramakrishnan
072 * @version $Id$
073
074 */
075@SuppressWarnings("serial")
076public class GenericJobReconnect extends TypedAtomicActor {
077        public GenericJobReconnect(CompositeEntity container, String name)
078                        throws NameDuplicationException, IllegalActionException {
079                super(container, name);
080
081                /** Job creator parameter and port */
082                // target selects the machine where the jobmanager is running
083                target = new PortParameter(this, "target", new StringToken(
084                                "[local | [user]@host]"));
085                new Parameter(target.getPort(), "_showName", BooleanToken.TRUE);
086
087                // real job id generated by the scheduler
088                realJobId = new PortParameter(this, "real job id", new StringToken(
089                                ""));
090                new Parameter(realJobId.getPort(), "_showName", BooleanToken.TRUE);
091                
092                // working dir name parameter & port
093                workdir = new PortParameter(this, "workdir", new StringToken(
094                                ".kepler-hpcc"));
095                new Parameter(workdir.getPort(), "_showName", BooleanToken.TRUE);
096                
097                /** Job Manager parameter and port */
098                // jobManager denotes the name of the actual job manager
099                scheduler = new PortParameter(this, "scheduler", new StringToken(
100                                "One of [Condor | PBS | LoadLeveler | SGE | Fork]"));
101                // jobManager.setStringMode(true); // string mode (no "s, but no
102                // variables as well!
103                new Parameter(scheduler.getPort(), "_showName", BooleanToken.TRUE);
104
105                // flag to set if you want the actor to stage the default fork script
106                defaultForkScript = new Parameter(this, "Use default fork script",
107                                new BooleanToken(false));
108                defaultForkScript.setTypeEquals(BaseType.BOOLEAN);
109                defaultForkScript.setVisibility(Settable.EXPERT);
110                // binPath is the full path to the jobmanager commands on the target
111                // machine
112                binPath = new Parameter(this, "binary path");
113                binPath.setVisibility(Settable.EXPERT);
114
115                /** Job Status parameter and port */
116                
117                waitUntil = new Parameter(this, "Wait Until Status", new StringToken(
118                "ANY"));
119                waitUntil.setStringMode(true);
120                waitUntil.addChoice("ANY");
121                for (JobStatusCode code : JobStatusCode.values()) {
122                        waitUntil.addChoice(code.toString());
123                }
124                
125                sleepWhileWaiting = new Parameter(this, "Wait Until Sleep (seconds)",
126                                new IntToken(_sleepWhileWaitingVal));
127                sleepWhileWaiting.setTypeEquals(BaseType.INT);
128
129                // Output: jobID of the submitted job
130                jobOut = new TypedIOPort(this, "jobOut", false, true);
131                jobOut.setTypeEquals(BaseType.OBJECT);
132                new Parameter(jobOut, "_showName", BooleanToken.TRUE);
133
134                // Output: log
135                logPort = new TypedIOPort(this, "logPort", false, true);
136                logPort.setTypeEquals(BaseType.STRING);
137                new Parameter(logPort, "_showName", BooleanToken.TRUE);
138
139                //Output: success
140                success = new TypedIOPort(this, "success", false, true);
141                success.setTypeEquals(BaseType.BOOLEAN);
142                new Parameter(success, "_showName", BooleanToken.TRUE);
143                
144                //Output: reconnect. True if the job was found in queue. 
145                //False if job was not in queue. Not in queue could be either
146                //because the job completed or if the jobid passed was wrong. 
147                reconnect = new TypedIOPort(this, "reconnect", false, true);
148                reconnect.setTypeEquals(BaseType.BOOLEAN);
149                new Parameter(reconnect, "_showName", BooleanToken.TRUE);
150        }
151
152        /***********************************************************
153         * ports and parameters
154         */
155
156        /**
157         * The real job id generated by the scheduler when the job was
158         * originally submitted
159         * 
160         * <p>
161         * This parameter is read each time in fire().
162         * </p>
163         */
164        public PortParameter realJobId;
165
166        /**
167         * The working directory in which the actual job submission command will be
168         * executed (on the remote machine if the job manager is a remote
169         * jobmanager).
170         * 
171         * <p>
172         * It should be an absolute path, or a relative one. In the latter case on
173         * remote machine, the directory path will be relative to the user's home
174         * directory (coming from the use of ssh).
175         * </p>
176         * By default, a new unique sub directory is created within this workdir based on 
177         * the job id created by kepler. Job is run from this sub directory. This can
178         * be overwritten by setting the parameter "use given workdir" 
179         * <p>
180         * This parameter is read each time in fire().
181         * </p>
182         */
183        public PortParameter workdir;
184
185        /**
186         * The name of the jobmanager to be used It should be a name, for which a
187         * supporter class exist as <i>org.kepler.job.JobSupport<jobManager>.class
188         * 
189         * This parameter is read each time in fire().
190         */
191        public PortParameter scheduler;
192
193        /**
194         * Boolean flag to indicate if the default fork script should be staged. If
195         * bin path is provided the default script is uploaded to bin path, else it
196         * is uploaded to the working directory
197         */
198        public Parameter defaultForkScript;
199
200        /**
201         * The machine to be used at job submission. It should be null, "" or
202         * "local" for the local machine or [user@]host to denote a remote machine
203         * accessible with ssh.
204         * 
205         * This parameter is read each time in fire().
206         */
207        public PortParameter target;
208
209        /**
210         * The path to the job manager commands on the target machines. Commands are
211         * constructed as <i>binPath/command</i> and they should be executable this
212         * way. This parameter is read each time in fire().
213         */
214        public Parameter binPath;
215
216        /**
217         * The job is passed on in this actor. This token can be used (delaying it
218         * with a Sleep actor) to ask its Status again and again until the job is
219         * finished or aborted. This port is an output port of type Object.
220         */
221        public TypedIOPort jobOut;
222
223        /**
224         * Logging information of job status query. Useful to inform user about
225         * problems at unsuccessful status query but it also prints out job status
226         * and job id on successful query. This port is an output port of type
227         * String. The name of port on canvas is 'log'
228         */
229        public TypedIOPort logPort;
230
231        /**
232         * Wait until the job has a reached specific status. The available status'
233         * that can be reached are: any, wait, running, not in queue, and error.
234         */
235        public Parameter waitUntil;
236
237        /**
238         * Amount of time (in seconds) to sleep between checking job status.
239         */
240        public Parameter sleepWhileWaiting;
241
242        /**
243         * The exit code of the command. If the exit code is 0, the command was
244         * performed successfully. If the exit code is anything other than a 0, an
245         * error occured.
246         */
247        // public TypedIOPort exitcode;
248
249        /**
250         * true if reconnect was successful, false otherwise  
251         */
252         public TypedIOPort success;
253         
254         /**
255          * true if the job was found in queue.
256          * False if job was not in queue. Not in queue could be either 
257          * because the job completed or if the jobid passed was wrong.
258          * Workflows that get jobid from user might want to check this
259          * flag in addition to the port success 
260          */
261         public TypedIOPort reconnect;
262         
263        /**
264         * fire
265         * 
266         * @exception IllegalActionException
267         *                Not thrown.
268         */
269        public void fire() throws IllegalActionException {
270                super.fire();
271
272                /* Job creation by processing port parameters */
273                System.out.println("KEPLER HOME IS "+System.getProperty("KEPLER"));
274                realJobId.update();
275                workdir.update();
276
277                String strLog = null;
278                String strBinPath = null;
279                
280                boolean bDefaultFork = false;
281                
282                if (this.getAttribute("_expertMode") != null) {
283                        Token temp = null;
284
285                        temp = (binPath != null) ? binPath.getToken() : null;
286                        strBinPath = (temp != null) ? ((StringToken) temp).stringValue()
287                                        .trim() : null;
288                        bDefaultFork = ((BooleanToken) defaultForkScript.getToken())
289                                        .booleanValue();
290                }
291
292                scheduler.update();
293                target.update();
294
295                Token token = realJobId.getToken();
296                String strRealJobId = (token==null) ? null : ((StringToken)token ).stringValue().trim();
297                
298                token = workdir.getToken();
299                String strWorkdir = (token==null) ? null :((StringToken) token).stringValue().trim();
300                
301                token = scheduler.getToken();
302                String strScheduler = (token==null) ? null :((StringToken)token).stringValue().trim();
303                
304                token = target.getToken();
305                String strTarget = (token==null) ? null :((StringToken)token).stringValue().trim();
306                
307                // create job object
308                String strJobID = JobFactory.create();
309                Job _job = JobFactory.get(strJobID);
310                try{
311                        // set _job's  working dir 
312                        if (strWorkdir != null && strWorkdir.trim().length() > 0) {
313                                //never create a sub directory - use given dir as working dir
314                                _job.setWorkdir(strWorkdir,false);  
315                        } else {
316                                throw new JobException("Please provide a valid working directory");
317                        }
318        
319                        if (strRealJobId != null && strRealJobId.trim().length() > 0) {
320                                _job.status.jobID = strRealJobId ;
321                        } else {
322                                throw new JobException("Please provide a valid job id");
323                        }
324                        //Set binfile
325                        if(bDefaultFork){
326                                File resourcesDir = ConfigurationManager.getModule("actors").getResourcesDir();
327                                File binFile = new File(resourcesDir,"jmgr-fork.sh");
328                                if(!binFile.exists()){
329                                        throw new JobException("Unable to locate default fork script - "
330                                                        + binFile.getAbsolutePath() + ". Please copy fork script manually.");
331                                }
332                                _job.setBinFile(binFile.getAbsolutePath(), true);
333
334                        }
335                        if("Fork".equalsIgnoreCase(strScheduler)){
336                                //Set the bin path explicitly if it is not already set
337                                //This is required because running jmgr-fork.sh without path 
338                                // fails with command not found. 
339                                //It works only if there is an absolute or relative path prefix
340                                if(strBinPath == null || strBinPath.trim().equals("")){
341                                        strBinPath = _job.getWorkdirPath(); //setWorkdir was already called 
342                                                                                                        //so, this method should return the right path
343                                }
344                        }
345
346                }catch(JobException ex){
347                        log.error(ex);
348                        JobFactory.remove(strJobID);
349                        strJobID = "";
350                        _job = null;
351                        throw new IllegalActionException("Error creating job: "
352                                        + ex.toString());
353                }
354                
355                /* Process the input scheduler */
356                org.kepler.job.JobManager myJmgr = null;
357                try{
358                        if (strScheduler == null || strScheduler.equals("")) {
359                                throw new JobException(
360                                "Please provide a valid input for the port/parameter scheduler.");
361                        }
362                        //Find the job support class name prefix for a given job scheduler name 
363                        //This is done to keep the parameter case insensitive. 
364                        //For example the string pbs or PBS or Pbs would map to the right class name prefix say PBS
365                        ConfigurationProperty cp = ConfigurationManager.getInstance().
366                        getProperty(ConfigurationManager.getModule("salssajob-module"), 
367                          new ConfigurationNamespace("JobLauncher"));
368                        List<ConfigurationProperty> properties = cp.findProperties("name",
369                                        strScheduler.toLowerCase(), true);
370                        String jobsSupported = null;
371                        if(properties.size() != 0){
372                                ConfigurationProperty prop = properties.get(0);
373                                jobsSupported = prop.getProperty("value").getValue();
374                        }
375                    if (jobsSupported != null) {
376                      strScheduler = jobsSupported;
377                    } else {
378                        throw new JobException("Job Scheduler " + strScheduler
379                                                + " is not supported."); 
380                    }
381        
382                    // Create a JobManager object or get it if it was already created
383                        if (isDebugging)
384                                log.debug("Create/get JobManager object. Name = "
385                                                + strScheduler + "; target = " + strTarget
386                                                + "; binPath = " + strBinPath);
387                        myJmgr = JobManagerFactory.get(strScheduler, strTarget, strBinPath);
388
389                } catch (JobException ex) {
390                        log.error("Job manager object could not be created. " + ex);
391                        myJmgr = null;
392                        JobFactory.remove(strJobID);
393                        strJobID = "";
394                        throw new IllegalActionException("JobManager Error: "
395                                        + ex.toString());
396                }
397
398                /* Job Reconnect */
399                boolean bSucc = false;
400                boolean bReconnect = false;
401                try {
402                        if (_job == null) {
403                                throw new JobException("JobSubmitter: incoming Job is null");
404                        }
405
406                        if (isDebugging) {
407                                log.debug("JobSubmit: reconnect to job " + strRealJobId + "...");
408                        }
409                        _job.reconnect(myJmgr);
410                        if(_job.status.statusCode == JobStatusCode.NotInQueue){
411                                StringBuffer sblog = new StringBuffer(100);
412                                sblog.append("JobStatus: Status of job ")
413                                        .append(_job.getJobID())
414                                        .append( ": ")
415                                        .append(JobStatusCode.NotInQueue)
416                                        .append("\nWarning:Initial status query couldn't find")
417                                        .append(" job in queue. Either job id is invalid or job is")
418                                        .append(" complete and not in queue anymore");
419                                success.send(0, new BooleanToken(true));
420                                reconnect.send(0, new BooleanToken(bReconnect));
421                                logPort.send(0, new StringToken(sblog.toString()));
422                                jobOut.send(0, new ObjectToken(_job));
423                                //No job to do status check on. Return
424                                return;
425                        }
426                        log.info("Reconnected successfully to "+ strRealJobId);
427                        bReconnect = true;
428                        log.info("Reconnected to job " + _job.status.jobID);
429                } catch (JobException ex) {
430                        log.error(ex);
431                        strLog = "JobReconnect Error: " + ex.toString();
432                        logPort.send(0, new StringToken(ex.toString()));
433                        success.send(0, new BooleanToken(bSucc));
434                        reconnect.send(0, new BooleanToken(bReconnect));
435                        return;
436                }
437                
438
439                /* Job Status Checking */
440                JobStatusCode jobStatusCode;
441                try {
442                        jobStatusCode = _checkStatus(_job);
443
444                        // while (_waitUntilCode != null && _waitUntilCode != jobStatusCode)
445                        // {
446                        //Loop if there is no match and job status is NOT Error or NotInQueue.
447                        //Second check is necessary to avoid infinite loop in case where job
448                        //never gets to the user requested state or if the state goes undetected
449                        //(say during sleep between poll). 
450                        while (!matchStatus(jobStatusCode) && jobStatusCode.ordinal()>1) {
451                                Long time = 1000L * _sleepWhileWaitingVal;
452                                Thread.sleep(time);
453                                jobStatusCode = _checkStatus(_job);
454                        }
455                        bSucc = true;
456                } catch (Exception ex) {
457                        log.error(ex);
458                        jobStatusCode = JobStatusCode.Error;
459                        strLog = "JobStatus Error: " + ex.toString();
460                        bSucc = false;
461                        success.send(0, new BooleanToken(bSucc));
462                        reconnect.send(0, new BooleanToken(bReconnect));
463                        logPort.send(0, new StringToken(strLog));
464                        return;
465                }
466
467                if (_job != null) {
468                        strLog = new String("JobStatus: Status of job " + _job.getJobID()
469                                        + ": " + jobStatusCode.toString());
470                        jobOut.send(0, new ObjectToken(_job));
471                }
472                success.send(0, new BooleanToken(bSucc));
473                reconnect.send(0, new BooleanToken(bReconnect));
474                logPort.send(0, new StringToken(strLog));
475        }
476
477        /** React to a change in an attribute. */
478        public void attributeChanged(Attribute attribute)
479                        throws IllegalActionException {
480                if (attribute == waitUntil) {
481                        String waitUntilStr = waitUntil.getExpression();
482                        waitUntilStr = waitUntilStr.trim();
483                        String[] split = waitUntilStr.split("\\s*,\\s*");
484                        _waitUntilCodes = new ArrayList<String>(Arrays.asList(split));
485                        // check validity
486                        if (_waitUntilCodes.contains("ANY")) {
487                                _waitUntilCodes.clear();
488                        } else {
489                                for (int i = 0; i < _waitUntilCodes.size(); i++) {
490                                        JobStatusCode waitUntilCode = JobStatusCode
491                                                        .getFromString(_waitUntilCodes.get(i));
492                                        if (waitUntilCode == null) {
493                                                throw new IllegalActionException(this,
494                                                                "Invalid job status type: "
495                                                                                + _waitUntilCodes.get(i));
496                                        }
497                                }
498                        }
499                } else if (attribute == sleepWhileWaiting) {
500                        if ((IntToken) sleepWhileWaiting.getToken() != null) {
501                                _sleepWhileWaitingVal = ((IntToken) sleepWhileWaiting
502                                                .getToken()).intValue();
503                                if (_sleepWhileWaitingVal < 0) {
504                                        throw new IllegalActionException(this,
505                                                        "Sleep While Waiting value cannot be negative.");
506                                }
507                        }
508                } else {
509                        super.attributeChanged(attribute);
510                }
511        }
512
513        private JobStatusCode _checkStatus(Job job) throws Exception {
514                JobStatusCode jobStatusCode = JobStatusCode.Error;
515                if (job == null) {
516                        throw new Exception("JobStatus: Job is null");
517                }
518
519                job.status(); // successful query or exception
520
521                jobStatusCode = job.status.statusCode;
522                log.info("Status of job " + job.getJobID() + ": "
523                                + jobStatusCode.toString());
524                return jobStatusCode;
525        }
526        
527        private boolean matchStatus(JobStatusCode jobStatusCode) {
528                String str = jobStatusCode.toString();
529
530                if (_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)) {
531                        return true;
532                }
533                return false;
534        }
535        
536        private static final Log log = LogFactory.getLog(GenericJobLauncher.class
537                        .getName());
538        private static final boolean isDebugging = log.isDebugEnabled();
539        //private JobStatusCode _waitUntilCode = null;
540        private ArrayList<String> _waitUntilCodes = new ArrayList<String>();
541        private int _sleepWhileWaitingVal = 5;
542}