001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: chandrika $' 006 * '$Date: 2011-01-04 03:01:07 +0000 (Tue, 04 Jan 2011) $' 007 * '$Revision: 26613 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.job; 031 032import java.util.ArrayList; 033import java.util.Arrays; 034import java.util.HashMap; 035 036import org.apache.commons.logging.Log; 037import org.apache.commons.logging.LogFactory; 038import org.kepler.build.modules.Module; 039import org.kepler.configuration.ConfigurationManager; 040import org.kepler.job.Job; 041import org.kepler.job.JobStatusCode; 042import org.kepler.job.JobStatusInfo; 043import org.kepler.job.TaskParallelJobStatusInfo; 044 045import ptolemy.actor.TypedAtomicActor; 046import ptolemy.actor.TypedIOPort; 047import ptolemy.data.BooleanToken; 048import ptolemy.data.IntToken; 049import ptolemy.data.LongToken; 050import ptolemy.data.ObjectToken; 051import ptolemy.data.StringToken; 052import ptolemy.data.expr.Parameter; 053import ptolemy.data.type.BaseType; 054import ptolemy.kernel.CompositeEntity; 055import ptolemy.kernel.util.Attribute; 056import ptolemy.kernel.util.IllegalActionException; 057import ptolemy.kernel.util.NameDuplicationException; 058import ptolemy.kernel.util.Settable; 059 060////////////////////////////////////////////////////////////////////////// 061//// JobStatus 062 063/** 064 * <p> 065 * Check the status of a Job 066 * </p> 067 * 068 * <p> 069 * This actor uses the Job class to ask for the status of a submitted job. 070 * </p> 071 * 072 * <p> 073 * The input should be a previously submitted job. i.e. the output from a 074 * JobSubmitter. When the job contains more than one task also provide the 075 * number of tasks using the numTasks parameter 076 * </p> 077 * <p> 078 * Optional inputs - 079 * 1. you could specify the actor to wait for a specific status - for example Running 080 * 2. or wait till one of many status - for example Error,NotInQueue 081 * 3. Or you could ask the actor to all status changes as and when it is detected. 082 * </p> 083 * 084 * <p> 085 * The output is the status code of the job: 086 * </p> 087 * <ul> 088 * <li>0: Error: some error occured during the execution of the actor</li> 089 * <li>1: NotInQueue: no such job in the queue, i.e. never was or already gone</li> 090 * <li>2: Wait: the job is in the queue and it is not running yet</li> 091 * <li>3: Running: the job is running</li> 092 * </ul> 093 * <p> 094 * If not such job exists, the result will be also the Error status. 095 * </p> 096 * 097 * <p> 098 * For convenience, the job is also passed on output port <i>jobOut</i> if the 099 * status is NOT Error and NOT NotInQueue. This token can be used (delaying it 100 * with a Sleep actor) to ask its Status again and again until the job is 101 * finished or aborted. 102 * </p> 103 * 104 * <p> 105 * When numTasks input is greater than zero, in addition to job's overall 106 * status code the actor also outputs task id and task status of individual tasks 107 * </p> 108 * 109 * @author Norbert Podhorszki, Chandrika Sivaramakrishnan, Jared Chase 110 * @version $Id: JobStatus.java 26613 2011-01-04 03:01:07Z chandrika $ 111 * @since Ptolemy II 5.0.1 112 */ 113public class JobStatus extends TypedAtomicActor { 114 /** 115 * Construct an actor with the given container and name. 116 * 117 * @param container 118 * The container. 119 * @param name 120 * The name of this actor. 121 * @exception IllegalActionException 122 * If the actor cannot be contained by the proposed 123 * container. 124 * @exception NameDuplicationException 125 * If the container already has an actor with this name. 126 */ 127 public JobStatus(CompositeEntity container, String name) 128 throws NameDuplicationException, IllegalActionException { 129 super(container, name); 130 131 // Uncomment the next line to see debugging statements 132 // addDebugListener(new ptolemy.kernel.util.StreamListener()); 133 jobIn = new TypedIOPort(this, "jobIn", true, false); 134 jobIn.setTypeEquals(BaseType.OBJECT); 135 new Parameter(jobIn, "_showName", BooleanToken.FALSE); 136 137 //Input Parameters 138 waitUntil = new Parameter(this, "Wait Until Status", new StringToken( 139 "ANY")); 140 waitUntil.setStringMode(true); 141 waitUntil.addChoice("ANY"); 142 waitUntil.addChoice("NEXT"); 143 for (JobStatusCode code : JobStatusCode.values()) { 144 waitUntil.addChoice(code.toString()); 145 } 146 147 sleepWhileWaiting = new Parameter(this, "Wait Until Sleep", 148 new LongToken(_sleepWhileWaitingVal)); 149 sleepWhileWaiting.setTypeEquals(BaseType.LONG); 150 151 sendAllChanges = new Parameter(this, "Send all status changes", 152 new BooleanToken(false)); 153 sendAllChanges.setTypeEquals(BaseType.BOOLEAN); 154 155 // Output: jobID of the submitted job 156 jobOut = new TypedIOPort(this, "jobOut", false, true); 157 jobOut.setTypeEquals(BaseType.OBJECT); 158 new Parameter(jobOut, "_showName", BooleanToken.TRUE); 159 160 // Output: task id 161 taskId = new TypedIOPort(this, "taskId", false, true); 162 taskId.setTypeEquals(BaseType.INT); 163 164 // Output: task status code 165 taskStatusCode = new TypedIOPort(this, "taskStatusCode", false, true); 166 taskStatusCode.setTypeEquals(BaseType.INT); 167 new Parameter(taskStatusCode, "_showName", BooleanToken.TRUE); 168 169 // Output: status code 170 statusCode = new TypedIOPort(this, "statusCode", false, true); 171 statusCode.setTypeEquals(BaseType.INT); 172 new Parameter(statusCode, "_showName", BooleanToken.TRUE); 173 174 // Output: log 175 logport = new TypedIOPort(this, "log", false, true); 176 logport.setTypeEquals(BaseType.STRING); 177 new Parameter(logport, "_showName", BooleanToken.TRUE); 178 179 180 statuscode_tokenProdRate = new Parameter(statusCode, 181 "tokenProductionRate"); 182 statuscode_tokenProdRate.setExpression("4"); 183 statuscode_tokenProdRate.setVisibility(Settable.NOT_EDITABLE); 184 statuscode_tokenProdRate.setTypeEquals(BaseType.INT); 185 statuscode_tokenProdRate.setPersistent(false); 186 187 log_tokenProdRate = new Parameter(logport, 188 "tokenProductionRate"); 189 log_tokenProdRate.setExpression("4"); 190 log_tokenProdRate.setVisibility(Settable.NOT_EDITABLE); 191 log_tokenProdRate.setTypeEquals(BaseType.INT); 192 log_tokenProdRate.setPersistent(false); 193 194 job_tokenProdRate = new Parameter(jobOut, 195 "tokenProductionRate"); 196 job_tokenProdRate.setExpression("4"); 197 job_tokenProdRate.setVisibility(Settable.NOT_EDITABLE); 198 job_tokenProdRate.setTypeEquals(BaseType.INT); 199 job_tokenProdRate.setPersistent(false); 200 201 numTasks = new Parameter(this,"numTasks"); 202 numTasks.setExpression("0"); 203 } 204 205 /*********************************************************** 206 * ports and parameters 207 */ 208 209 /** 210 * A submitted job This port is an output port of type Object. 211 */ 212 public TypedIOPort jobIn; 213 214 /** 215 * The job is passed on in this actor. This token can be used (delaying it 216 * with a Sleep actor) to ask its Status again and again until the job is 217 * finished or aborted. This port is an output port of type Object. 218 */ 219 public TypedIOPort jobOut; 220 221 /** 222 * Status code of the job 0 : for some error during execution or if jobID is 223 * invalid 1 : not in queue: i.e. already finished if it had ever been there 224 * (this is good news!) 2 : job is waiting in the queue 3 : job is running 225 * This port is an output port of type Integer; 226 */ 227 public TypedIOPort statusCode; 228 229 /** 230 * Task status code : for some error during execution or if jobID is 231 * invalid 1 : not in queue: i.e. already finished if it had ever been there 232 * (this is good news!) 2 : job is waiting in the queue 3 : job is running 233 * This port is an output port of type Integer; 234 */ 235 public TypedIOPort taskStatusCode; 236 237 /** 238 * Task ID : this is the task id for the status code 239 */ 240 public TypedIOPort taskId; 241 242 /** 243 * Logging information of job status query. Useful to inform user about 244 * problems at unsuccessful status query but it also prints out job status 245 * and job id on successful query. This port is an output port of type 246 * String. The name of port on canvas is 'log' 247 */ 248 public TypedIOPort logport; 249 250 /** Wait until the job has a specific status. */ 251 public Parameter waitUntil; 252 253 /** 254 * Amount of time (in milliseconds) to sleep between checking job status. 255 */ 256 public Parameter sleepWhileWaiting; 257 258 /** 259 * Parameter to set if you want job status to ignore waitUntil parameter 260 * and send out every status change 261 */ 262 public Parameter sendAllChanges; 263 264 /** 265 * Parameter to set if the workflow is task parallel. This allows for the 266 * tasks to be created and set to the submitted state 267 */ 268 public Parameter numTasks; 269 270 /** The rate parameter for the output port. 271 */ 272 public Parameter statuscode_tokenProdRate; 273 public Parameter log_tokenProdRate; 274 public Parameter job_tokenProdRate; 275 276 277 /*********************************************************** 278 * public methods 279 */ 280 281 /** React to a change in an attribute. */ 282 public void attributeChanged(Attribute attribute) 283 throws IllegalActionException { 284 if (attribute == waitUntil) { 285 String waitUntilStr = waitUntil.getExpression(); 286 waitUntilStr = waitUntilStr.trim(); 287 String[] split = waitUntilStr.split("\\s*,\\s*"); 288 _waitUntilCodes = new ArrayList<String>(Arrays.asList(split)); 289 // check validity 290 if (_waitUntilCodes.contains("ANY")) { 291 _waitUntilCodes.clear(); 292 } else if(!_waitUntilCodes.contains("NEXT")){ 293 for(int i=0;i<_waitUntilCodes.size();i++){ 294 JobStatusCode waitUntilCode = JobStatusCode.getFromString(_waitUntilCodes.get(i)); 295 if (waitUntilCode == null) { 296 throw new IllegalActionException(this, 297 "Invalid job status type: " + _waitUntilCodes.get(i)); 298 } 299 } 300 } 301 } else if (attribute == sleepWhileWaiting) { 302 _sleepWhileWaitingVal = ((LongToken) sleepWhileWaiting.getToken()) 303 .longValue(); 304 if (_sleepWhileWaitingVal < 0) { 305 throw new IllegalActionException(this, 306 "Sleep While Waiting value cannot be negative."); 307 } 308 } else { 309 super.attributeChanged(attribute); 310 } 311 } 312 313 @Override 314 public void initialize(){ 315 log.info("Initializing lastStatusCode to null"); 316 //reset last recorded job and status 317 lastJobID = null; 318 lastStatusCode = null; 319 lastTaskStatusCodes.clear(); 320 } 321 /** 322 * fire 323 * 324 * @exception IllegalActionException 325 * Not thrown. 326 */ 327 public void fire() throws IllegalActionException { 328 super.fire(); 329 330 Module module = ConfigurationManager.getModule("actors"); 331 //System.out.println("KEPLER HOME IS " + System.getProperty("KEPLER")); 332 //System.out.println("Resource dir is " + module.getResourcesDir()); 333 boolean bSendAll = ((BooleanToken) sendAllChanges.getToken()).booleanValue(); 334 ObjectToken jobToken = (ObjectToken) jobIn.get(0); 335 Job job = (Job) jobToken.getValue(); 336 log.info("****** In job status actor for job: "+ job.getJobID()); 337 int numTasksVal = ((IntToken)numTasks.getToken()).intValue(); 338 job.setNumTasks(numTasksVal); 339 340 if(_waitUntilCodes.contains("NEXT")|| bSendAll){ 341 //if it is a new job and waitUntil is NEXT or if all status changes have to be sent 342 //set lastJobID to current job and 343 //set lastStatusCode to null(we will start tracking status) 344 String realJobID = job.status.jobID; 345 if(lastJobID==null || !realJobID.equalsIgnoreCase(lastJobID)){ 346 log.debug("lastJobId was " + lastJobID + " current job id is " + realJobID + " " + job.getJobID() +" Reseting lastjobid and status"); 347 lastJobID = realJobID; 348 lastStatusCode = null; 349 lastTaskStatusCodes.clear(); 350 } 351 } 352 353 JobStatusCode jobStatusCode; 354 JobStatusInfo jobStatusInfo; 355 try { 356 if(bSendAll){ 357 do{ 358 log.debug("In send all loop of " + job.getJobID()); 359 jobStatusInfo = getNextStatus(job,numTasksVal); 360 jobStatusCode = jobStatusInfo.statusCode; 361 sendResult(jobToken, job, jobStatusCode); 362 if(numTasksVal > 0){ 363 sendTaskResults(jobStatusInfo); 364 } 365 }while(jobStatusCode.ordinal()>1); 366 367 } else if(_waitUntilCodes.contains("NEXT")){ 368 if(lastStatusCode!=null && lastStatusCode.ordinal()<2) { 369 //If last status=error or notinqueue there is no NEXT status 370 //Return last status(ERROR or NotInQueue) 371 jobStatusInfo = lastStatusInfo; 372 jobStatusCode = lastStatusCode; 373 }else{ 374 jobStatusInfo = getNextStatus(job,numTasksVal); 375 jobStatusCode = jobStatusInfo.statusCode; 376 } 377 }else { 378 //Wait for a specific status 379 jobStatusInfo = _checkStatus(job); 380 jobStatusCode = jobStatusInfo.statusCode; 381 //Modified to support multiple _waitUntil codes - Chandrika Sivaramakrishnan 382 //while (_waitUntilCode != null && _waitUntilCode != jobStatusCode) { 383 384 //Loop if there is no match and job status is NOT Error or NotInQueue. 385 //Second check is necessary to avoid infinite loop in case where job 386 //never gets to the user requested state or if the state goes undetected 387 //(say during sleep between poll). 388 while(!matchStatus(jobStatusCode)&& jobStatusCode.ordinal()>1){ 389 log.debug("cur status(" + jobStatusCode + 390 ") is not equal to any of the waitUntil codes (" + _waitUntilCodes.toString() + ")"); 391 Thread.sleep(_sleepWhileWaitingVal); 392 jobStatusInfo = _checkStatus(job); 393 jobStatusCode = jobStatusInfo.statusCode; 394 } 395 } 396 } catch (Exception ex) { 397 log.error(ex); 398 jobStatusCode = JobStatusCode.Error; 399 jobStatusInfo= new JobStatusInfo(); 400 ex.printStackTrace(); 401 throw new IllegalActionException("JobStatus Error: " + ex.toString()); 402 } 403 404 if(!bSendAll){ //already sent in the while loop 405 sendResult(jobToken, job, jobStatusCode); 406 if(numTasksVal > 0){ 407 //sendTaskResults(job,(TaskParallelJobStatusInfo)lastStatusInfo); 408 sendTaskResults(jobStatusInfo); 409 } 410 } 411 412 } 413 414 private JobStatusInfo getNextStatus( 415 Job job, int numTasksVal) throws Exception, InterruptedException { 416 log.info("IN getNextStatus for job "+ job.getJobID()); 417 JobStatusInfo jobStatusInfo = _checkStatus(job); 418 JobStatusCode jobStatusCode = jobStatusInfo.statusCode; 419 HashMap<String, JobStatusCode> changedTasks = new HashMap<String, JobStatusCode>(); 420 if(numTasksVal>0){ 421 changedTasks = getTaskStatusChanges(job,numTasksVal); 422 } 423 log.debug("Before while loop for job " + job.getJobID()+ " laststatus code = " +lastStatusCode); 424 // while the job status code hasn't changed AND 425 // either there are 0 tasks OR all the tasks have remained at the same status 426 while( ( lastStatusCode != null && jobStatusCode.ordinal() > 1 && jobStatusCode == lastStatusCode ) && 427 ( numTasksVal == 0 || changedTasks.size() == 0 ) ) { 428 log.debug("job " + job.getJobID() + " cur status(" + jobStatusCode + 429 ") is equal to lastStatusInfo(" + lastStatusCode + ")"); 430 Thread.sleep(_sleepWhileWaitingVal); 431 jobStatusInfo = _checkStatus(job); 432 jobStatusCode = jobStatusInfo.statusCode; 433 434 if(numTasksVal>0){ 435 changedTasks = getTaskStatusChanges(job,numTasksVal); 436 } 437 438 } 439 if(numTasksVal>0){ 440 //record last job and task status 441 lastTaskStatusCodes.clear(); 442 lastTaskStatusCodes.putAll( 443 ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes); 444 lastStatusCode = jobStatusCode; 445 //return only the changed task statuses 446 ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes = changedTasks; 447 } else { 448 //record only job statuscode 449 lastStatusCode = jobStatusCode; 450 log.debug("Found Next state for job " + job.getJobID()+ " setting laststatus code = " +lastStatusCode); 451 } 452 return jobStatusInfo; 453 } 454 455 456 457 private HashMap<String,JobStatusCode> getTaskStatusChanges( 458 Job job,int numTasksVal) throws IllegalActionException { 459 460 HashMap<String,JobStatusCode> result = new HashMap<String,JobStatusCode>(); 461 JobStatusCode jobcode = job.status.statusCode; 462 463 for( int idx = 0; idx < numTasksVal; idx++ ) { 464 String taskId = "" + idx; 465 466 JobStatusCode code = 467 ((TaskParallelJobStatusInfo)job.status).taskStatusCodes.get(taskId); 468 JobStatusCode oldCode = lastTaskStatusCodes.get(taskId); 469 470 if(oldCode != code) { 471 result.put(taskId,code); 472 } 473 } 474 475 return result; 476 } 477 478 private void sendResult(ObjectToken jobToken, Job job, 479 JobStatusCode jobStatusCode) throws IllegalActionException { 480 String strLog; 481 strLog = new String("*******JobStatus: Status of job - " + job.getJobID() + ":" + job.getJobID() + ": " 482 + jobStatusCode.toString()); 483 log.info(this.getName() + ":Sending job status "+jobStatusCode.ordinal() + " for " + job.getJobID() ); 484 485 if (strLog != null) 486 logport.send(0, new StringToken(strLog)); 487 statusCode.send(0, new IntToken(jobStatusCode.ordinal())); 488 jobOut.send(0, jobToken); 489 } 490 491 private void sendTaskResults(JobStatusInfo jobStatusInfo) 492 throws IllegalActionException { 493 494 HashMap<String,JobStatusCode> taskStatuses = 495 ((TaskParallelJobStatusInfo)jobStatusInfo).taskStatusCodes; 496 for(String taskIdStr : taskStatuses.keySet()) { 497 taskId.send(0, new IntToken(Integer.parseInt(taskIdStr))); 498 taskStatusCode.send(0,new IntToken(taskStatuses.get(taskIdStr).ordinal())); 499 500 // set the old task status 501 lastTaskStatusCodes.put(taskIdStr,taskStatuses.get(taskIdStr)); 502 } 503 } 504 505 private boolean matchStatus(JobStatusCode jobStatusCode) { 506 String str = jobStatusCode.toString(); 507 508 if(_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)){ 509 return true; 510 } 511 return false; 512 } 513 514 /*********************************************************** 515 * private methods 516 */ 517 518 private JobStatusInfo _checkStatus(Job job) throws Exception { 519 JobStatusCode jobStatusCode = JobStatusCode.Error; 520 if (job == null) 521 throw new Exception("JobStatus: Job is null"); 522 523 job.status(); // successful query or exception 524 log.info("Status of job " + job.getJobID() + ": " 525 + job.status.statusCode.toString()); 526 527 return job.status; 528 } 529 530 private static final Log log = LogFactory.getLog(JobStatus.class.getName()); 531 private static final boolean isDebugging = log.isDebugEnabled(); 532 private ArrayList<String> _waitUntilCodes = new ArrayList<String>(); 533 //private JobStatusCode _waitUntilCode = null; 534 private TaskParallelJobStatusInfo lastStatusInfo = new TaskParallelJobStatusInfo(); 535 private HashMap<String, JobStatusCode> lastTaskStatusCodes = new HashMap<String, JobStatusCode>(); 536 private JobStatusCode lastStatusCode = null; 537 private long _sleepWhileWaitingVal = 5000; 538 private String lastJobID =null; 539}