001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: chandrika $'
006 * '$Date: 2011-01-04 03:01:07 +0000 (Tue, 04 Jan 2011) $' 
007 * '$Revision: 26613 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.actor.job;
031
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035
036import org.apache.commons.logging.Log;
037import org.apache.commons.logging.LogFactory;
038import org.kepler.build.modules.Module;
039import org.kepler.configuration.ConfigurationManager;
040import org.kepler.job.Job;
041import org.kepler.job.JobStatusCode;
042import org.kepler.job.JobStatusInfo;
043import org.kepler.job.TaskParallelJobStatusInfo;
044
045import ptolemy.actor.TypedAtomicActor;
046import ptolemy.actor.TypedIOPort;
047import ptolemy.data.BooleanToken;
048import ptolemy.data.IntToken;
049import ptolemy.data.LongToken;
050import ptolemy.data.ObjectToken;
051import ptolemy.data.StringToken;
052import ptolemy.data.expr.Parameter;
053import ptolemy.data.type.BaseType;
054import ptolemy.kernel.CompositeEntity;
055import ptolemy.kernel.util.Attribute;
056import ptolemy.kernel.util.IllegalActionException;
057import ptolemy.kernel.util.NameDuplicationException;
058import ptolemy.kernel.util.Settable;
059
060//////////////////////////////////////////////////////////////////////////
061//// JobStatus
062
063/**
064 * <p>
065 * Check the status of a Job
066 * </p>
067 * 
068 * <p>
069 * This actor uses the Job class to ask for the status of a submitted job.
070 * </p>
071 * 
072 * <p>
073 * The input should be a previously submitted job. i.e. the output from a
074 * JobSubmitter. When the job contains more than one task also provide the
075 * number of tasks using the numTasks parameter
076 * </p>
077 * <p> 
078 * Optional inputs -
079 * 1. you could specify the actor to wait for a specific status - for example Running
080 * 2. or wait till one of many status - for example Error,NotInQueue
081 * 3. Or you could ask the actor to all status changes as and when it is detected.
082 * </p>
083 * 
084 * <p>
085 * The output is the status code of the job:
086 * </p>
087 * <ul>
088 * <li>0: Error: some error occured during the execution of the actor</li>
089 * <li>1: NotInQueue: no such job in the queue, i.e. never was or already gone</li>
090 * <li>2: Wait: the job is in the queue and it is not running yet</li>
091 * <li>3: Running: the job is running</li>
092 * </ul>
093 * <p>
094 * If not such job exists, the result will be also the Error status.
095 * </p>
096 * 
097 * <p>
098 * For convenience, the job is also passed on output port <i>jobOut</i> if the
099 * status is NOT Error and NOT NotInQueue. This token can be used (delaying it
100 * with a Sleep actor) to ask its Status again and again until the job is
101 * finished or aborted.
102 * </p>
103 * 
104 * <p>
105 * When numTasks input is greater than zero, in addition to job's overall
106 * status code the actor also outputs  task id and task status of individual tasks
107 * </p>
108 * 
109 * @author Norbert Podhorszki, Chandrika Sivaramakrishnan, Jared Chase
110 * @version $Id: JobStatus.java 26613 2011-01-04 03:01:07Z chandrika $
111 * @since Ptolemy II 5.0.1
112 */
113public class JobStatus extends TypedAtomicActor {
114        /**
115         * Construct an actor with the given container and name.
116         * 
117         * @param container
118         *            The container.
119         * @param name
120         *            The name of this actor.
121         * @exception IllegalActionException
122         *                If the actor cannot be contained by the proposed
123         *                container.
124         * @exception NameDuplicationException
125         *                If the container already has an actor with this name.
126         */
127        public JobStatus(CompositeEntity container, String name)
128                        throws NameDuplicationException, IllegalActionException {
129                super(container, name);
130
131                // Uncomment the next line to see debugging statements
132                // addDebugListener(new ptolemy.kernel.util.StreamListener());
133                jobIn = new TypedIOPort(this, "jobIn", true, false);
134                jobIn.setTypeEquals(BaseType.OBJECT);
135                new Parameter(jobIn, "_showName", BooleanToken.FALSE);
136
137                //Input Parameters
138                waitUntil = new Parameter(this, "Wait Until Status", new StringToken(
139                                "ANY"));
140                waitUntil.setStringMode(true);
141                waitUntil.addChoice("ANY");
142                waitUntil.addChoice("NEXT");
143                for (JobStatusCode code : JobStatusCode.values()) {
144                        waitUntil.addChoice(code.toString());
145                }
146
147                sleepWhileWaiting = new Parameter(this, "Wait Until Sleep",
148                                new LongToken(_sleepWhileWaitingVal));
149                sleepWhileWaiting.setTypeEquals(BaseType.LONG);
150                
151                sendAllChanges = new Parameter(this, "Send all status changes",
152                                new BooleanToken(false));
153                sendAllChanges.setTypeEquals(BaseType.BOOLEAN);
154                
155                // Output: jobID of the submitted job
156                jobOut = new TypedIOPort(this, "jobOut", false, true);
157                jobOut.setTypeEquals(BaseType.OBJECT);
158                new Parameter(jobOut, "_showName", BooleanToken.TRUE);
159
160                // Output: task id
161                taskId = new TypedIOPort(this, "taskId", false, true);
162                taskId.setTypeEquals(BaseType.INT);
163
164                // Output: task status code
165                taskStatusCode = new TypedIOPort(this, "taskStatusCode", false, true);
166                taskStatusCode.setTypeEquals(BaseType.INT);
167                new Parameter(taskStatusCode, "_showName", BooleanToken.TRUE);
168
169                // Output: status code
170                statusCode = new TypedIOPort(this, "statusCode", false, true);
171                statusCode.setTypeEquals(BaseType.INT);
172                new Parameter(statusCode, "_showName", BooleanToken.TRUE);
173
174                // Output: log
175                logport = new TypedIOPort(this, "log", false, true);
176                logport.setTypeEquals(BaseType.STRING);
177                new Parameter(logport, "_showName", BooleanToken.TRUE);
178
179                
180                statuscode_tokenProdRate = new Parameter(statusCode,
181        "tokenProductionRate");
182                statuscode_tokenProdRate.setExpression("4");
183                statuscode_tokenProdRate.setVisibility(Settable.NOT_EDITABLE);
184                statuscode_tokenProdRate.setTypeEquals(BaseType.INT);
185                statuscode_tokenProdRate.setPersistent(false);
186                
187                log_tokenProdRate = new Parameter(logport,
188        "tokenProductionRate");
189                log_tokenProdRate.setExpression("4");
190                log_tokenProdRate.setVisibility(Settable.NOT_EDITABLE);
191                log_tokenProdRate.setTypeEquals(BaseType.INT);
192                log_tokenProdRate.setPersistent(false);
193                
194                job_tokenProdRate = new Parameter(jobOut,
195        "tokenProductionRate");
196                job_tokenProdRate.setExpression("4");
197                job_tokenProdRate.setVisibility(Settable.NOT_EDITABLE);
198                job_tokenProdRate.setTypeEquals(BaseType.INT);
199                job_tokenProdRate.setPersistent(false);
200
201                numTasks = new Parameter(this,"numTasks");
202                numTasks.setExpression("0");
203        }
204
205        /***********************************************************
206         * ports and parameters
207         */
208        
209        /**
210         * A submitted job This port is an output port of type Object.
211         */
212        public TypedIOPort jobIn;
213
214        /**
215         * The job is passed on in this actor. This token can be used (delaying it
216         * with a Sleep actor) to ask its Status again and again until the job is
217         * finished or aborted. This port is an output port of type Object.
218         */
219        public TypedIOPort jobOut;
220
221        /**
222         * Status code of the job 0 : for some error during execution or if jobID is
223         * invalid 1 : not in queue: i.e. already finished if it had ever been there
224         * (this is good news!) 2 : job is waiting in the queue 3 : job is running
225         * This port is an output port of type Integer;
226         */
227        public TypedIOPort statusCode;
228
229        /**
230         * Task status code : for some error during execution or if jobID is
231         * invalid 1 : not in queue: i.e. already finished if it had ever been there
232         * (this is good news!) 2 : job is waiting in the queue 3 : job is running
233         * This port is an output port of type Integer;
234         */
235        public TypedIOPort taskStatusCode;
236
237        /**
238         * Task ID : this is the task id for the status code
239         */
240        public TypedIOPort taskId;
241
242        /**
243         * Logging information of job status query. Useful to inform user about
244         * problems at unsuccessful status query but it also prints out job status
245         * and job id on successful query. This port is an output port of type
246         * String. The name of port on canvas is 'log'
247         */
248        public TypedIOPort logport;
249
250        /** Wait until the job has a specific status. */
251        public Parameter waitUntil;
252
253        /**
254         * Amount of time (in milliseconds) to sleep between checking job status.
255         */
256        public Parameter sleepWhileWaiting;
257        
258        /**
259         * Parameter to set if you want job status to ignore waitUntil parameter
260         * and send out every status change
261         */
262        public Parameter sendAllChanges;
263
264        /**
265         * Parameter to set if the workflow is task parallel.  This allows for the
266         * tasks to be created and set to the submitted state
267         */
268        public Parameter numTasks;
269        
270         /** The rate parameter for the output port.
271     */
272    public Parameter statuscode_tokenProdRate;
273    public Parameter log_tokenProdRate;
274    public Parameter job_tokenProdRate;
275
276
277        /***********************************************************
278         * public methods
279         */
280
281        /** React to a change in an attribute. */
282        public void attributeChanged(Attribute attribute)
283                        throws IllegalActionException {
284                if (attribute == waitUntil) {
285                        String waitUntilStr = waitUntil.getExpression();
286                        waitUntilStr = waitUntilStr.trim();
287                        String[] split = waitUntilStr.split("\\s*,\\s*");
288                        _waitUntilCodes = new ArrayList<String>(Arrays.asList(split));
289                        // check validity
290                        if (_waitUntilCodes.contains("ANY")) {
291                                _waitUntilCodes.clear();
292                        } else if(!_waitUntilCodes.contains("NEXT")){
293                                for(int i=0;i<_waitUntilCodes.size();i++){
294                                        JobStatusCode waitUntilCode = JobStatusCode.getFromString(_waitUntilCodes.get(i));
295                                        if (waitUntilCode == null) {
296                                                throw new IllegalActionException(this,
297                                                                "Invalid job status type: " + _waitUntilCodes.get(i));
298                                        }
299                                }
300                        }
301                } else if (attribute == sleepWhileWaiting) {
302                        _sleepWhileWaitingVal = ((LongToken) sleepWhileWaiting.getToken())
303                                        .longValue();
304                        if (_sleepWhileWaitingVal < 0) {
305                                throw new IllegalActionException(this,
306                                                "Sleep While Waiting value cannot be negative.");
307                        }
308                } else {
309                        super.attributeChanged(attribute);
310                }
311        }
312
313        @Override
314        public void initialize(){
315                log.info("Initializing lastStatusCode to null");
316                //reset last recorded job and status 
317                lastJobID = null;
318                lastStatusCode = null;
319                lastTaskStatusCodes.clear();
320        }
321        /**
322         * fire
323         * 
324         * @exception IllegalActionException
325         *                Not thrown.
326         */
327        public void fire() throws IllegalActionException {
328                super.fire();
329                
330                Module module = ConfigurationManager.getModule("actors");
331                //System.out.println("KEPLER HOME IS " + System.getProperty("KEPLER"));
332                //System.out.println("Resource dir is " + module.getResourcesDir());
333                boolean bSendAll = ((BooleanToken) sendAllChanges.getToken()).booleanValue();
334                ObjectToken jobToken = (ObjectToken) jobIn.get(0);
335                Job job = (Job) jobToken.getValue();
336                log.info("****** In job status actor for job: "+ job.getJobID());
337                int numTasksVal = ((IntToken)numTasks.getToken()).intValue();
338                job.setNumTasks(numTasksVal);
339                
340                if(_waitUntilCodes.contains("NEXT")|| bSendAll){
341                        //if it is a new job and waitUntil is NEXT or if all status changes have to be sent
342                        //set lastJobID to current job and
343                        //set lastStatusCode  to null(we will start tracking status) 
344                        String realJobID = job.status.jobID;
345                        if(lastJobID==null || !realJobID.equalsIgnoreCase(lastJobID)){
346                                log.debug("lastJobId was " + lastJobID + " current job id is " + realJobID + "  " + job.getJobID() +" Reseting lastjobid and status");
347                                lastJobID = realJobID;
348                                lastStatusCode = null;
349                                lastTaskStatusCodes.clear();
350                        }
351                }
352                
353                JobStatusCode jobStatusCode;
354                JobStatusInfo jobStatusInfo; 
355                try {
356                        if(bSendAll){
357                                do{
358                                        log.debug("In send all loop of " + job.getJobID());
359                                        jobStatusInfo = getNextStatus(job,numTasksVal);
360                                        jobStatusCode = jobStatusInfo.statusCode;
361                                        sendResult(jobToken, job, jobStatusCode);
362                                        if(numTasksVal > 0){
363                                                sendTaskResults(jobStatusInfo);
364                                        }
365                                }while(jobStatusCode.ordinal()>1);
366                                
367                        } else if(_waitUntilCodes.contains("NEXT")){
368                                if(lastStatusCode!=null && lastStatusCode.ordinal()<2) {
369                                        //If last status=error or notinqueue there is no NEXT status
370                                        //Return last status(ERROR or NotInQueue)
371                                        jobStatusInfo = lastStatusInfo;
372                                        jobStatusCode = lastStatusCode;
373                                }else{
374                                        jobStatusInfo = getNextStatus(job,numTasksVal);
375                                        jobStatusCode = jobStatusInfo.statusCode;
376                                }
377                        }else {
378                                //Wait for a specific status
379                                jobStatusInfo = _checkStatus(job);
380                                jobStatusCode = jobStatusInfo.statusCode;
381                                //Modified to support multiple _waitUntil codes - Chandrika Sivaramakrishnan
382                                //while (_waitUntilCode != null && _waitUntilCode != jobStatusCode) {
383                                
384                                //Loop if there is no match and job status is NOT Error or NotInQueue.
385                                //Second check is necessary to avoid infinite loop in case where job
386                                //never gets to the user requested state or if the state goes undetected
387                                //(say during sleep between poll). 
388                                while(!matchStatus(jobStatusCode)&& jobStatusCode.ordinal()>1){
389                                        log.debug("cur status(" + jobStatusCode +
390                                          ") is not equal to any of the waitUntil codes (" + _waitUntilCodes.toString() + ")");
391                                        Thread.sleep(_sleepWhileWaitingVal);
392                                        jobStatusInfo = _checkStatus(job);
393                                        jobStatusCode = jobStatusInfo.statusCode;
394                                }
395                        }
396                } catch (Exception ex) {
397                        log.error(ex);
398                        jobStatusCode = JobStatusCode.Error;
399                        jobStatusInfo= new JobStatusInfo();
400                        ex.printStackTrace();
401                        throw new IllegalActionException("JobStatus Error: " + ex.toString());
402                }
403
404                if(!bSendAll){ //already sent in the while loop
405                        sendResult(jobToken, job, jobStatusCode);
406                        if(numTasksVal > 0){
407                                //sendTaskResults(job,(TaskParallelJobStatusInfo)lastStatusInfo);
408                                sendTaskResults(jobStatusInfo);
409                        }
410                }
411
412        }
413
414        private JobStatusInfo getNextStatus(
415                Job job, int numTasksVal) throws Exception, InterruptedException {
416                log.info("IN getNextStatus for job "+ job.getJobID());
417                JobStatusInfo jobStatusInfo = _checkStatus(job);
418                JobStatusCode jobStatusCode = jobStatusInfo.statusCode;
419                HashMap<String, JobStatusCode> changedTasks = new HashMap<String, JobStatusCode>();
420                if(numTasksVal>0){
421                        changedTasks = getTaskStatusChanges(job,numTasksVal);
422                }
423                log.debug("Before while loop for job " + job.getJobID()+ "  laststatus code = " +lastStatusCode);
424                // while the job status code hasn't changed AND
425                // either there are 0 tasks OR all the tasks have remained at the same status
426                while(  ( lastStatusCode != null && jobStatusCode.ordinal() > 1 && jobStatusCode == lastStatusCode ) && 
427                        ( numTasksVal == 0 || changedTasks.size() == 0 ) ) {
428                        log.debug("job " + job.getJobID() + " cur status(" + jobStatusCode +
429                                  ") is equal to lastStatusInfo(" + lastStatusCode + ")");
430                        Thread.sleep(_sleepWhileWaitingVal);
431                        jobStatusInfo = _checkStatus(job);
432                        jobStatusCode = jobStatusInfo.statusCode;
433
434                        if(numTasksVal>0){
435                                changedTasks = getTaskStatusChanges(job,numTasksVal);
436                        }
437
438                }
439                if(numTasksVal>0){
440                        //record last job and task status
441                        lastTaskStatusCodes.clear();
442                        lastTaskStatusCodes.putAll(
443                                ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes); 
444                        lastStatusCode = jobStatusCode;
445                        //return only the changed task statuses
446                        ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes = changedTasks;
447                } else {
448                        //record only job statuscode
449                        lastStatusCode = jobStatusCode;
450                        log.debug("Found Next state for job " + job.getJobID()+ " setting laststatus code = " +lastStatusCode);
451                }
452                return jobStatusInfo;
453        }
454        
455        
456
457        private HashMap<String,JobStatusCode> getTaskStatusChanges(
458                Job job,int numTasksVal) throws IllegalActionException {
459
460                HashMap<String,JobStatusCode> result = new HashMap<String,JobStatusCode>();
461            JobStatusCode jobcode = job.status.statusCode;
462            
463                for( int idx = 0; idx < numTasksVal; idx++ ) {
464                        String taskId = "" + idx;
465
466                        JobStatusCode code = 
467                                ((TaskParallelJobStatusInfo)job.status).taskStatusCodes.get(taskId);
468                        JobStatusCode oldCode = lastTaskStatusCodes.get(taskId);
469
470                        if(oldCode != code) {
471                                result.put(taskId,code); 
472                        }
473                }
474
475                return result;
476        }
477
478        private void sendResult(ObjectToken jobToken, Job job,
479                        JobStatusCode jobStatusCode) throws IllegalActionException {
480                String strLog;
481                strLog = new String("*******JobStatus: Status of job - " + job.getJobID() + ":" + job.getJobID() + ": "
482                                + jobStatusCode.toString());
483                log.info(this.getName() + ":Sending job status "+jobStatusCode.ordinal() + " for  " + job.getJobID() );
484
485                if (strLog != null)
486                        logport.send(0, new StringToken(strLog));
487                statusCode.send(0, new IntToken(jobStatusCode.ordinal()));
488                jobOut.send(0, jobToken);
489        }
490
491        private void sendTaskResults(JobStatusInfo jobStatusInfo) 
492                throws IllegalActionException {
493
494                HashMap<String,JobStatusCode> taskStatuses = 
495                                                        ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes;
496                for(String taskIdStr : taskStatuses.keySet()) {
497                        taskId.send(0, new IntToken(Integer.parseInt(taskIdStr)));
498                        taskStatusCode.send(0,new IntToken(taskStatuses.get(taskIdStr).ordinal()));
499
500                        // set the old task status
501                        lastTaskStatusCodes.put(taskIdStr,taskStatuses.get(taskIdStr));
502                }
503        }
504
505        private boolean matchStatus(JobStatusCode jobStatusCode) {
506                String str = jobStatusCode.toString();
507                
508                if(_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)){
509                        return true;
510                }
511                return false;
512        }
513
514        /***********************************************************
515         * private methods
516         */
517
518        private JobStatusInfo _checkStatus(Job job) throws Exception {
519                JobStatusCode jobStatusCode = JobStatusCode.Error;
520                if (job == null)
521                        throw new Exception("JobStatus: Job is null");
522
523                job.status(); // successful query or exception
524                log.info("Status of job " + job.getJobID() + ": "
525                                + job.status.statusCode.toString());
526
527                return job.status;
528        }
529
530        private static final Log log = LogFactory.getLog(JobStatus.class.getName());
531        private static final boolean isDebugging = log.isDebugEnabled();
532        private ArrayList<String> _waitUntilCodes = new ArrayList<String>();
533        //private JobStatusCode _waitUntilCode = null;
534        private TaskParallelJobStatusInfo lastStatusInfo = new TaskParallelJobStatusInfo();
535        private HashMap<String, JobStatusCode> lastTaskStatusCodes = new HashMap<String, JobStatusCode>();
536        private JobStatusCode lastStatusCode = null;
537        private long _sleepWhileWaitingVal = 5000;
538        private String lastJobID =null;
539}