001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: aschultz $' 006 * '$Date: 2010-02-22 16:21:40 -0800 (Mon, 22 Feb 2010) $' 007 * '$Revision: 23182 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.job; 031 032import java.io.File; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.List; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.kepler.configuration.ConfigurationManager; 040import org.kepler.configuration.ConfigurationNamespace; 041import org.kepler.configuration.ConfigurationProperty; 042import org.kepler.job.Job; 043import org.kepler.job.JobException; 044import org.kepler.job.JobFactory; 045import org.kepler.job.JobManagerFactory; 046import org.kepler.job.JobStatusCode; 047 048import ptolemy.actor.TypedAtomicActor; 049import ptolemy.actor.TypedIOPort; 050import ptolemy.actor.parameters.PortParameter; 051import ptolemy.data.BooleanToken; 052import ptolemy.data.IntToken; 053import ptolemy.data.ObjectToken; 054import ptolemy.data.StringToken; 055import ptolemy.data.Token; 056import ptolemy.data.expr.Parameter; 057import ptolemy.data.type.BaseType; 058import ptolemy.kernel.CompositeEntity; 059import ptolemy.kernel.util.Attribute; 060import ptolemy.kernel.util.IllegalActionException; 061import ptolemy.kernel.util.NameDuplicationException; 062import ptolemy.kernel.util.Settable; 063 064/** 065 * A generic job launcher actor that can launch a job using PBS, NCCS, Condor, 066 * Loadleveler, or SGE, and wait till a user specified status. 067 * <p> 068 * JobLauncher actor is based on code from Norbert Podhorszki's JobCreator, JobManager, 069 * JobStatus, and JobSubmitter actors. It uses JobLauncher.properties to 070 * find the list of supported job schedulers and the corresponding support class. 071 * @author Frankie Kwok, Chandrika Sivaramakrishnan 072 * @version $Id$ 073 074 */ 075@SuppressWarnings("serial") 076public class GenericJobReconnect extends TypedAtomicActor { 077 public GenericJobReconnect(CompositeEntity container, String name) 078 throws NameDuplicationException, IllegalActionException { 079 super(container, name); 080 081 /** Job creator parameter and port */ 082 // target selects the machine where the jobmanager is running 083 target = new PortParameter(this, "target", new StringToken( 084 "[local | [user]@host]")); 085 new Parameter(target.getPort(), "_showName", BooleanToken.TRUE); 086 087 // real job id generated by the scheduler 088 realJobId = new PortParameter(this, "real job id", new StringToken( 089 "")); 090 new Parameter(realJobId.getPort(), "_showName", BooleanToken.TRUE); 091 092 // working dir name parameter & port 093 workdir = new PortParameter(this, "workdir", new StringToken( 094 ".kepler-hpcc")); 095 new Parameter(workdir.getPort(), "_showName", BooleanToken.TRUE); 096 097 /** Job Manager parameter and port */ 098 // jobManager denotes the name of the actual job manager 099 scheduler = new PortParameter(this, "scheduler", new StringToken( 100 "One of [Condor | PBS | LoadLeveler | SGE | Fork]")); 101 // jobManager.setStringMode(true); // string mode (no "s, but no 102 // variables as well! 103 new Parameter(scheduler.getPort(), "_showName", BooleanToken.TRUE); 104 105 // flag to set if you want the actor to stage the default fork script 106 defaultForkScript = new Parameter(this, "Use default fork script", 107 new BooleanToken(false)); 108 defaultForkScript.setTypeEquals(BaseType.BOOLEAN); 109 defaultForkScript.setVisibility(Settable.EXPERT); 110 // binPath is the full path to the jobmanager commands on the target 111 // machine 112 binPath = new Parameter(this, "binary path"); 113 binPath.setVisibility(Settable.EXPERT); 114 115 /** Job Status parameter and port */ 116 117 waitUntil = new Parameter(this, "Wait Until Status", new StringToken( 118 "ANY")); 119 waitUntil.setStringMode(true); 120 waitUntil.addChoice("ANY"); 121 for (JobStatusCode code : JobStatusCode.values()) { 122 waitUntil.addChoice(code.toString()); 123 } 124 125 sleepWhileWaiting = new Parameter(this, "Wait Until Sleep (seconds)", 126 new IntToken(_sleepWhileWaitingVal)); 127 sleepWhileWaiting.setTypeEquals(BaseType.INT); 128 129 // Output: jobID of the submitted job 130 jobOut = new TypedIOPort(this, "jobOut", false, true); 131 jobOut.setTypeEquals(BaseType.OBJECT); 132 new Parameter(jobOut, "_showName", BooleanToken.TRUE); 133 134 // Output: log 135 logPort = new TypedIOPort(this, "logPort", false, true); 136 logPort.setTypeEquals(BaseType.STRING); 137 new Parameter(logPort, "_showName", BooleanToken.TRUE); 138 139 //Output: success 140 success = new TypedIOPort(this, "success", false, true); 141 success.setTypeEquals(BaseType.BOOLEAN); 142 new Parameter(success, "_showName", BooleanToken.TRUE); 143 144 //Output: reconnect. True if the job was found in queue. 145 //False if job was not in queue. Not in queue could be either 146 //because the job completed or if the jobid passed was wrong. 147 reconnect = new TypedIOPort(this, "reconnect", false, true); 148 reconnect.setTypeEquals(BaseType.BOOLEAN); 149 new Parameter(reconnect, "_showName", BooleanToken.TRUE); 150 } 151 152 /*********************************************************** 153 * ports and parameters 154 */ 155 156 /** 157 * The real job id generated by the scheduler when the job was 158 * originally submitted 159 * 160 * <p> 161 * This parameter is read each time in fire(). 162 * </p> 163 */ 164 public PortParameter realJobId; 165 166 /** 167 * The working directory in which the actual job submission command will be 168 * executed (on the remote machine if the job manager is a remote 169 * jobmanager). 170 * 171 * <p> 172 * It should be an absolute path, or a relative one. In the latter case on 173 * remote machine, the directory path will be relative to the user's home 174 * directory (coming from the use of ssh). 175 * </p> 176 * By default, a new unique sub directory is created within this workdir based on 177 * the job id created by kepler. Job is run from this sub directory. This can 178 * be overwritten by setting the parameter "use given workdir" 179 * <p> 180 * This parameter is read each time in fire(). 181 * </p> 182 */ 183 public PortParameter workdir; 184 185 /** 186 * The name of the jobmanager to be used It should be a name, for which a 187 * supporter class exist as <i>org.kepler.job.JobSupport<jobManager>.class 188 * 189 * This parameter is read each time in fire(). 190 */ 191 public PortParameter scheduler; 192 193 /** 194 * Boolean flag to indicate if the default fork script should be staged. If 195 * bin path is provided the default script is uploaded to bin path, else it 196 * is uploaded to the working directory 197 */ 198 public Parameter defaultForkScript; 199 200 /** 201 * The machine to be used at job submission. It should be null, "" or 202 * "local" for the local machine or [user@]host to denote a remote machine 203 * accessible with ssh. 204 * 205 * This parameter is read each time in fire(). 206 */ 207 public PortParameter target; 208 209 /** 210 * The path to the job manager commands on the target machines. Commands are 211 * constructed as <i>binPath/command</i> and they should be executable this 212 * way. This parameter is read each time in fire(). 213 */ 214 public Parameter binPath; 215 216 /** 217 * The job is passed on in this actor. This token can be used (delaying it 218 * with a Sleep actor) to ask its Status again and again until the job is 219 * finished or aborted. This port is an output port of type Object. 220 */ 221 public TypedIOPort jobOut; 222 223 /** 224 * Logging information of job status query. Useful to inform user about 225 * problems at unsuccessful status query but it also prints out job status 226 * and job id on successful query. This port is an output port of type 227 * String. The name of port on canvas is 'log' 228 */ 229 public TypedIOPort logPort; 230 231 /** 232 * Wait until the job has a reached specific status. The available status' 233 * that can be reached are: any, wait, running, not in queue, and error. 234 */ 235 public Parameter waitUntil; 236 237 /** 238 * Amount of time (in seconds) to sleep between checking job status. 239 */ 240 public Parameter sleepWhileWaiting; 241 242 /** 243 * The exit code of the command. If the exit code is 0, the command was 244 * performed successfully. If the exit code is anything other than a 0, an 245 * error occured. 246 */ 247 // public TypedIOPort exitcode; 248 249 /** 250 * true if reconnect was successful, false otherwise 251 */ 252 public TypedIOPort success; 253 254 /** 255 * true if the job was found in queue. 256 * False if job was not in queue. Not in queue could be either 257 * because the job completed or if the jobid passed was wrong. 258 * Workflows that get jobid from user might want to check this 259 * flag in addition to the port success 260 */ 261 public TypedIOPort reconnect; 262 263 /** 264 * fire 265 * 266 * @exception IllegalActionException 267 * Not thrown. 268 */ 269 public void fire() throws IllegalActionException { 270 super.fire(); 271 272 /* Job creation by processing port parameters */ 273 System.out.println("KEPLER HOME IS "+System.getProperty("KEPLER")); 274 realJobId.update(); 275 workdir.update(); 276 277 String strLog = null; 278 String strBinPath = null; 279 280 boolean bDefaultFork = false; 281 282 if (this.getAttribute("_expertMode") != null) { 283 Token temp = null; 284 285 temp = (binPath != null) ? binPath.getToken() : null; 286 strBinPath = (temp != null) ? ((StringToken) temp).stringValue() 287 .trim() : null; 288 bDefaultFork = ((BooleanToken) defaultForkScript.getToken()) 289 .booleanValue(); 290 } 291 292 scheduler.update(); 293 target.update(); 294 295 Token token = realJobId.getToken(); 296 String strRealJobId = (token==null) ? null : ((StringToken)token ).stringValue().trim(); 297 298 token = workdir.getToken(); 299 String strWorkdir = (token==null) ? null :((StringToken) token).stringValue().trim(); 300 301 token = scheduler.getToken(); 302 String strScheduler = (token==null) ? null :((StringToken)token).stringValue().trim(); 303 304 token = target.getToken(); 305 String strTarget = (token==null) ? null :((StringToken)token).stringValue().trim(); 306 307 // create job object 308 String strJobID = JobFactory.create(); 309 Job _job = JobFactory.get(strJobID); 310 try{ 311 // set _job's working dir 312 if (strWorkdir != null && strWorkdir.trim().length() > 0) { 313 //never create a sub directory - use given dir as working dir 314 _job.setWorkdir(strWorkdir,false); 315 } else { 316 throw new JobException("Please provide a valid working directory"); 317 } 318 319 if (strRealJobId != null && strRealJobId.trim().length() > 0) { 320 _job.status.jobID = strRealJobId ; 321 } else { 322 throw new JobException("Please provide a valid job id"); 323 } 324 //Set binfile 325 if(bDefaultFork){ 326 File resourcesDir = ConfigurationManager.getModule("actors").getResourcesDir(); 327 File binFile = new File(resourcesDir,"jmgr-fork.sh"); 328 if(!binFile.exists()){ 329 throw new JobException("Unable to locate default fork script - " 330 + binFile.getAbsolutePath() + ". Please copy fork script manually."); 331 } 332 _job.setBinFile(binFile.getAbsolutePath(), true); 333 334 } 335 if("Fork".equalsIgnoreCase(strScheduler)){ 336 //Set the bin path explicitly if it is not already set 337 //This is required because running jmgr-fork.sh without path 338 // fails with command not found. 339 //It works only if there is an absolute or relative path prefix 340 if(strBinPath == null || strBinPath.trim().equals("")){ 341 strBinPath = _job.getWorkdirPath(); //setWorkdir was already called 342 //so, this method should return the right path 343 } 344 } 345 346 }catch(JobException ex){ 347 log.error(ex); 348 JobFactory.remove(strJobID); 349 strJobID = ""; 350 _job = null; 351 throw new IllegalActionException("Error creating job: " 352 + ex.toString()); 353 } 354 355 /* Process the input scheduler */ 356 org.kepler.job.JobManager myJmgr = null; 357 try{ 358 if (strScheduler == null || strScheduler.equals("")) { 359 throw new JobException( 360 "Please provide a valid input for the port/parameter scheduler."); 361 } 362 //Find the job support class name prefix for a given job scheduler name 363 //This is done to keep the parameter case insensitive. 364 //For example the string pbs or PBS or Pbs would map to the right class name prefix say PBS 365 ConfigurationProperty cp = ConfigurationManager.getInstance(). 366 getProperty(ConfigurationManager.getModule("salssajob-module"), 367 new ConfigurationNamespace("JobLauncher")); 368 List<ConfigurationProperty> properties = cp.findProperties("name", 369 strScheduler.toLowerCase(), true); 370 String jobsSupported = null; 371 if(properties.size() != 0){ 372 ConfigurationProperty prop = properties.get(0); 373 jobsSupported = prop.getProperty("value").getValue(); 374 } 375 if (jobsSupported != null) { 376 strScheduler = jobsSupported; 377 } else { 378 throw new JobException("Job Scheduler " + strScheduler 379 + " is not supported."); 380 } 381 382 // Create a JobManager object or get it if it was already created 383 if (isDebugging) 384 log.debug("Create/get JobManager object. Name = " 385 + strScheduler + "; target = " + strTarget 386 + "; binPath = " + strBinPath); 387 myJmgr = JobManagerFactory.get(strScheduler, strTarget, strBinPath); 388 389 } catch (JobException ex) { 390 log.error("Job manager object could not be created. " + ex); 391 myJmgr = null; 392 JobFactory.remove(strJobID); 393 strJobID = ""; 394 throw new IllegalActionException("JobManager Error: " 395 + ex.toString()); 396 } 397 398 /* Job Reconnect */ 399 boolean bSucc = false; 400 boolean bReconnect = false; 401 try { 402 if (_job == null) { 403 throw new JobException("JobSubmitter: incoming Job is null"); 404 } 405 406 if (isDebugging) { 407 log.debug("JobSubmit: reconnect to job " + strRealJobId + "..."); 408 } 409 _job.reconnect(myJmgr); 410 if(_job.status.statusCode == JobStatusCode.NotInQueue){ 411 StringBuffer sblog = new StringBuffer(100); 412 sblog.append("JobStatus: Status of job ") 413 .append(_job.getJobID()) 414 .append( ": ") 415 .append(JobStatusCode.NotInQueue) 416 .append("\nWarning:Initial status query couldn't find") 417 .append(" job in queue. Either job id is invalid or job is") 418 .append(" complete and not in queue anymore"); 419 success.send(0, new BooleanToken(true)); 420 reconnect.send(0, new BooleanToken(bReconnect)); 421 logPort.send(0, new StringToken(sblog.toString())); 422 jobOut.send(0, new ObjectToken(_job)); 423 //No job to do status check on. Return 424 return; 425 } 426 log.info("Reconnected successfully to "+ strRealJobId); 427 bReconnect = true; 428 log.info("Reconnected to job " + _job.status.jobID); 429 } catch (JobException ex) { 430 log.error(ex); 431 strLog = "JobReconnect Error: " + ex.toString(); 432 logPort.send(0, new StringToken(ex.toString())); 433 success.send(0, new BooleanToken(bSucc)); 434 reconnect.send(0, new BooleanToken(bReconnect)); 435 return; 436 } 437 438 439 /* Job Status Checking */ 440 JobStatusCode jobStatusCode; 441 try { 442 jobStatusCode = _checkStatus(_job); 443 444 // while (_waitUntilCode != null && _waitUntilCode != jobStatusCode) 445 // { 446 //Loop if there is no match and job status is NOT Error or NotInQueue. 447 //Second check is necessary to avoid infinite loop in case where job 448 //never gets to the user requested state or if the state goes undetected 449 //(say during sleep between poll). 450 while (!matchStatus(jobStatusCode) && jobStatusCode.ordinal()>1) { 451 Long time = 1000L * _sleepWhileWaitingVal; 452 Thread.sleep(time); 453 jobStatusCode = _checkStatus(_job); 454 } 455 bSucc = true; 456 } catch (Exception ex) { 457 log.error(ex); 458 jobStatusCode = JobStatusCode.Error; 459 strLog = "JobStatus Error: " + ex.toString(); 460 bSucc = false; 461 success.send(0, new BooleanToken(bSucc)); 462 reconnect.send(0, new BooleanToken(bReconnect)); 463 logPort.send(0, new StringToken(strLog)); 464 return; 465 } 466 467 if (_job != null) { 468 strLog = new String("JobStatus: Status of job " + _job.getJobID() 469 + ": " + jobStatusCode.toString()); 470 jobOut.send(0, new ObjectToken(_job)); 471 } 472 success.send(0, new BooleanToken(bSucc)); 473 reconnect.send(0, new BooleanToken(bReconnect)); 474 logPort.send(0, new StringToken(strLog)); 475 } 476 477 /** React to a change in an attribute. */ 478 public void attributeChanged(Attribute attribute) 479 throws IllegalActionException { 480 if (attribute == waitUntil) { 481 String waitUntilStr = waitUntil.getExpression(); 482 waitUntilStr = waitUntilStr.trim(); 483 String[] split = waitUntilStr.split("\\s*,\\s*"); 484 _waitUntilCodes = new ArrayList<String>(Arrays.asList(split)); 485 // check validity 486 if (_waitUntilCodes.contains("ANY")) { 487 _waitUntilCodes.clear(); 488 } else { 489 for (int i = 0; i < _waitUntilCodes.size(); i++) { 490 JobStatusCode waitUntilCode = JobStatusCode 491 .getFromString(_waitUntilCodes.get(i)); 492 if (waitUntilCode == null) { 493 throw new IllegalActionException(this, 494 "Invalid job status type: " 495 + _waitUntilCodes.get(i)); 496 } 497 } 498 } 499 } else if (attribute == sleepWhileWaiting) { 500 if ((IntToken) sleepWhileWaiting.getToken() != null) { 501 _sleepWhileWaitingVal = ((IntToken) sleepWhileWaiting 502 .getToken()).intValue(); 503 if (_sleepWhileWaitingVal < 0) { 504 throw new IllegalActionException(this, 505 "Sleep While Waiting value cannot be negative."); 506 } 507 } 508 } else { 509 super.attributeChanged(attribute); 510 } 511 } 512 513 private JobStatusCode _checkStatus(Job job) throws Exception { 514 JobStatusCode jobStatusCode = JobStatusCode.Error; 515 if (job == null) { 516 throw new Exception("JobStatus: Job is null"); 517 } 518 519 job.status(); // successful query or exception 520 521 jobStatusCode = job.status.statusCode; 522 log.info("Status of job " + job.getJobID() + ": " 523 + jobStatusCode.toString()); 524 return jobStatusCode; 525 } 526 527 private boolean matchStatus(JobStatusCode jobStatusCode) { 528 String str = jobStatusCode.toString(); 529 530 if (_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)) { 531 return true; 532 } 533 return false; 534 } 535 536 private static final Log log = LogFactory.getLog(GenericJobLauncher.class 537 .getName()); 538 private static final boolean isDebugging = log.isDebugEnabled(); 539 //private JobStatusCode _waitUntilCode = null; 540 private ArrayList<String> _waitUntilCodes = new ArrayList<String>(); 541 private int _sleepWhileWaitingVal = 5; 542}