001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-10-23 19:16:46 +0000 (Fri, 23 Oct 2015) $' 007 * '$Revision: 34054 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.job; 031 032import java.io.File; 033import java.io.FileWriter; 034import java.io.IOException; 035import java.io.Writer; 036import java.util.ArrayList; 037import java.util.Arrays; 038import java.util.List; 039import java.util.regex.Matcher; 040import java.util.regex.Pattern; 041 042import org.apache.commons.logging.Log; 043import org.apache.commons.logging.LogFactory; 044import org.kepler.configuration.ConfigurationManager; 045import org.kepler.configuration.ConfigurationNamespace; 046import org.kepler.configuration.ConfigurationProperty; 047import org.kepler.job.Job; 048import org.kepler.job.JobException; 049import org.kepler.job.JobFactory; 050import org.kepler.job.JobManagerFactory; 051import org.kepler.job.JobStatusCode; 052import org.kepler.job.TaskParallelJobStatusInfo; 053 054import ptolemy.actor.TypedAtomicActor; 055import ptolemy.actor.TypedIOPort; 056import ptolemy.actor.gui.style.TextStyle; 057import ptolemy.actor.parameters.PortParameter; 058import ptolemy.data.ArrayToken; 059import ptolemy.data.BooleanToken; 060import ptolemy.data.IntToken; 061import ptolemy.data.ObjectToken; 062import ptolemy.data.StringToken; 063import ptolemy.data.Token; 064import ptolemy.data.expr.Parameter; 065import ptolemy.data.expr.StringParameter; 066import ptolemy.data.type.BaseType; 067import ptolemy.kernel.CompositeEntity; 068import ptolemy.kernel.util.Attribute; 069import ptolemy.kernel.util.IllegalActionException; 070import ptolemy.kernel.util.NameDuplicationException; 071import ptolemy.kernel.util.Settable; 072 073/** 074 * A generic job launcher actor that can launch a job using PBS, NCCS, Condor, 075 * Loadleveler, SGE, Moab or LSF, and wait till a user specified status. 076 * <p> 077 * JobLauncher actor is based on code from Norbert Podhorszki's JobCreator, JobManager, 078 * JobStatus, and JobSubmitter actors. It uses JobLauncher.properties to find 079 * the list of supported job schedulers and the corresponding support class. 080 * 081 * Additionally it can support multi task jobs 082 * @author Frankie Kwok, Chandrika Sivaramakrishnan, Jared Chase 083 * @version $Id: GenericJobLauncher.java 34054 2015-10-23 19:16:46Z crawl $ 084 */ 085@SuppressWarnings("serial") 086public class GenericJobLauncher extends TypedAtomicActor { 087 public GenericJobLauncher(CompositeEntity container, String name) 088 throws NameDuplicationException, IllegalActionException { 089 super(container, name); 090 091 /** Job creator parameter and port */ 092 // target selects the machine where the jobmanager is running 093 target = new PortParameter(this, "target", new StringToken( 094 "[local | [user]@host]")); 095 new Parameter(target.getPort(), "_showName", BooleanToken.TRUE); 096 target.setStringMode(true); 097 098 // submission file parameter & port 099 cmdFile = new PortParameter(this, "cmdFile", new StringToken( 100 "/path/to/job.submit")); 101 cmdFile.setTypeEquals(BaseType.STRING); 102 cmdFile.getPort().setTypeEquals(BaseType.STRING); 103 new Parameter(cmdFile.getPort(), "_showName", BooleanToken.TRUE); 104 cmdFile.setStringMode(true); 105 106 // local/remote submission file flag parameter 107 cmdFileLocal = new Parameter(this, "cmdFileLocal", BooleanToken.TRUE); 108 cmdFileLocal.setTypeEquals(BaseType.BOOLEAN); 109 110 // executable file's name parameter & port 111 executable = new StringParameter(this, "executable file"); 112 executable.setVisibility(Settable.EXPERT); 113 114 // working dir name parameter & port 115 workdir = new PortParameter(this, "workdir", new StringToken( 116 ".kepler-hpcc")); 117 new Parameter(workdir.getPort(), "_showName", BooleanToken.TRUE); 118 workdir.setStringMode(true); 119 120 //if true - actor doesn't create a unique sub directory, assumes that the 121 //user given workdir is unique and to be used as such. 122 usegivendir = new Parameter(this, "use given workdir", 123 new BooleanToken(false)); 124 usegivendir.setTypeEquals(BaseType.BOOLEAN); 125 usegivendir.setVisibility(Settable.EXPERT); 126 127 // list of input files' names parameter & port 128 inputfiles = new PortParameter(this, "inputfiles", new ArrayToken( 129 BaseType.STRING)); 130 new Parameter(inputfiles.getPort(), "_showName", BooleanToken.TRUE); 131 132 // list of remote input files' names parameter & port 133 remotefiles = new PortParameter(this, "remotefiles", new ArrayToken( 134 BaseType.STRING)); 135 new Parameter(remotefiles.getPort(), "_showName", BooleanToken.TRUE); 136 137 /** Job Manager parameter and port */ 138 // jobManager denotes the name of the actual job manager 139 scheduler = new PortParameter(this, "scheduler", new StringToken( 140 "SGE")); 141 scheduler.setStringMode(true); 142 cp = ConfigurationManager.getInstance() 143 .getProperty(ConfigurationManager.getModule("actors"), 144 new ConfigurationNamespace("JobLauncher")); 145 properties = cp.getProperties("value", true); 146 for (ConfigurationProperty property : properties) { 147 scheduler.addChoice(property.getValue()); 148 } 149 150 151 // jobManager.setStringMode(true); // string mode (no "s, but no 152 // variables as well! 153 new Parameter(scheduler.getPort(), "_showName", BooleanToken.TRUE); 154 155 // flag to set if you want the actor to stage the default fork script 156 defaultForkScript = new Parameter(this, "Use default fork script", 157 new BooleanToken(false)); 158 defaultForkScript.setTypeEquals(BaseType.BOOLEAN); 159 defaultForkScript.setVisibility(Settable.EXPERT); 160 161 // binPath is the full path to the jobmanager commands on the target 162 // machine 163 binPath = new StringParameter(this, "binary path"); 164 binPath.setVisibility(Settable.EXPERT); 165 166 // jobSubmitOptions are optional parameters to pass to 167 // submitting a job 168 jobSubmitOptions = new StringParameter(this, "job submit options"); 169 jobSubmitOptions.setVisibility(Settable.EXPERT); 170 171 // numTasks is the number of tasks for this job 172 numTasks = new Parameter(this, "numTasks"); 173 numTasks.setExpression("0"); 174 175 /** Job Status parameter and port */ 176 177 waitUntil = new Parameter(this, "Wait Until Status", new StringToken( 178 "ANY")); 179 waitUntil.setStringMode(true); 180 waitUntil.addChoice("ANY"); 181 for (JobStatusCode code : JobStatusCode.values()) { 182 waitUntil.addChoice(code.toString()); 183 } 184 185 sleepWhileWaiting = new Parameter(this, "Wait Until Sleep (seconds)", 186 new IntToken(_sleepWhileWaitingVal)); 187 sleepWhileWaiting.setTypeEquals(BaseType.INT); 188 189 // Output: jobID of the submitted job 190 jobOut = new TypedIOPort(this, "jobOut", false, true); 191 jobOut.setTypeEquals(BaseType.OBJECT); 192 new Parameter(jobOut, "_showName", BooleanToken.TRUE); 193 194 // Output: log 195 logPort = new TypedIOPort(this, "logPort", false, true); 196 logPort.setTypeEquals(BaseType.STRING); 197 new Parameter(logPort, "_showName", BooleanToken.TRUE); 198 199 //Output: success 200 success = new TypedIOPort(this, "success", false, true); 201 success.setTypeEquals(BaseType.BOOLEAN); 202 new Parameter(success, "_showName", BooleanToken.TRUE); 203 204 cmdText = new PortParameter(this, "cmdText"); 205 cmdText.setTypeEquals(BaseType.STRING); 206 cmdText.setStringMode(true); 207 new TextStyle(cmdText, "_style"); 208 cmdText.getPort().setTypeEquals(BaseType.STRING); 209 new Parameter(cmdText.getPort(), "_showName"); 210 211 dependentJob = new TypedIOPort(this, "dependentJob", true, false); 212 dependentJob.setTypeEquals(BaseType.OBJECT); 213 dependentJob.setMultiport(true); 214 new Parameter(dependentJob, "_showName"); 215 216 jobID = new TypedIOPort(this, "jobID", false, true); 217 jobID.setTypeEquals(BaseType.STRING); 218 new Parameter(jobID, "_showName", BooleanToken.TRUE); 219 220 } 221 222 /*************************************************************************** 223 * ports and parameters 224 */ 225 226 /** 227 * The submit file to be used at job submission. Absolute (or relative to 228 * current dir of Java) file path should be provided. The job file must 229 * be provided here, or the contents can be specified in cmdText. 230 * 231 * <p> 232 * This parameter is read each time in fire(). 233 * </p> 234 */ 235 public PortParameter cmdFile; 236 237 /** 238 * Specifying whether the cmdFile is locally stored or on the remote target. 239 * 240 * <p> 241 * This parameter is read each time in fire(). 242 * </p> 243 */ 244 public Parameter cmdFileLocal; 245 246 /** 247 * The executable file to be used at job submission. Absolute path names, or 248 * relative to current dir of the running java virtual machine, should be 249 * provided. If it is "" then it is considered to be already at the remote 250 * site, otherwise the actor will look for it locally and stage it to the 251 * <i>workdir</i> before job submission. 252 * 253 * <p> 254 * This parameter is read each time in fire(). 255 * </p> 256 */ 257 public Parameter executable; 258 259 /** 260 * The working directory in which the actual job submission command will be 261 * executed (on the remote machine if the job manager is a remote 262 * jobmanager). 263 * 264 * <p> 265 * It should be an absolute path, or a relative one. In the latter case on 266 * remote machine, the directory path will be relative to the user's home 267 * directory (coming from the use of ssh). 268 * </p> 269 * By default, a new unique sub directory is created within this workdir 270 * based on the job id created by kepler. Job is run from this sub 271 * directory. This can be overwritten by setting the parameter "use given 272 * workdir" 273 * <p> 274 * This parameter is read each time in fire(). 275 * </p> 276 */ 277 public PortParameter workdir; 278 279 /** 280 * By default, Kepler creates a unique sub directory within workdir based on 281 * the the job id it creates for the job. Job is run from this sub 282 * directory. Set this flag to true if you want job to be run directly from 283 * workdir instead of a subdir 284 * 285 * <p> 286 * This parameter is read each time in fire(). 287 * </p> 288 */ 289 public Parameter usegivendir; 290 291 /** 292 * The string array of inputfiles. Absolute path names, or relative to 293 * current dir of the running java virtual machine, should be provided. 294 * 295 * <p> 296 * This parameter is read each time in fire(). 297 * </p> 298 */ 299 public PortParameter inputfiles; 300 301 /** 302 * The string array of remote input files. Absolute path names, or relative 303 * to the user home dir on the remote host should be provided. 304 * 305 * <p> 306 * This parameter is read each time in fire(). 307 * </p> 308 */ 309 public PortParameter remotefiles; 310 311 /** 312 * The name of the jobmanager to be used It should be a name, for which a 313 * supporter class exist as <i>org.kepler.job.JobSupport<jobManager>.class 314 * 315 * This parameter is read each time in fire(). 316 */ 317 public PortParameter scheduler; 318 319 /** 320 * Boolean flag to indicate if the default fork script should be staged. If 321 * bin path is provided the default script is uploaded to bin path, else it 322 * is uploaded to the working directory 323 */ 324 public Parameter defaultForkScript; 325 326 /** 327 * The machine to be used at job submission. It should be null, "" or 328 * "local" for the local machine or [user@]host to denote a remote machine 329 * accessible with ssh. 330 * 331 * This parameter is read each time in fire(). 332 */ 333 public PortParameter target; 334 335 /** 336 * The path to the job manager commands on the target machines. Commands are 337 * constructed as <i>binPath/command</i> and they should be executable this 338 * way. This parameter is read each time in fire(). 339 */ 340 public Parameter binPath; 341 342 /** 343 * The number of tasks for the job - used in a task parallel job 344 */ 345 public Parameter numTasks; 346 347 /** 348 * The Options of the job submission. Such as "-o /u/joboutput/ -j y -l 349 * h_rt=24:00:00" for SGE job scheduler. Its default value is empty. 350 */ 351 public Parameter jobSubmitOptions; 352 353 /** 354 * The job is passed on in this actor. This token can be used (delaying it 355 * with a Sleep actor) to ask its Status again and again until the job is 356 * finished or aborted. This port is an output port of type Object. 357 */ 358 public TypedIOPort jobOut; 359 360 /** 361 * The real job ID generated from the job scheduler. 362 */ 363 public TypedIOPort jobID; 364 365 /** 366 * Logging information of job status query. Useful to inform user about 367 * problems at unsuccessful status query but it also prints out job status 368 * and job id on successful query. This port is an output port of type 369 * String. The name of port on canvas is 'log' 370 */ 371 public TypedIOPort logPort; 372 373 /** 374 * Wait until the job has a reached specific status. The available status' 375 * that can be reached are: any, wait, running, not in queue, and error. 376 */ 377 public Parameter waitUntil; 378 379 /** 380 * Amount of time (in seconds) to sleep between checking job status. 381 */ 382 public Parameter sleepWhileWaiting; 383 384 /** 385 * The exit code of the command. If the exit code is 0, the command was 386 * performed successfully. If the exit code is anything other than a 0, an 387 * error occured. 388 */ 389 // public TypedIOPort exitcode; 390 391 /** 392 * boolean flag to indicate if job launch was successful 393 */ 394 public TypedIOPort success; 395 396 397 /** The text of the job specification. The job specification must either 398 * be provided in this parameter or the file name in cmdFile. 399 */ 400 public PortParameter cmdText; 401 402 /** One or more jobs that must successfully complete before this job can run. */ 403 public TypedIOPort dependentJob; 404 405 /** 406 * fire 407 * 408 * @exception IllegalActionException 409 * Not thrown. 410 */ 411 public void fire() throws IllegalActionException { 412 super.fire(); 413 414 /* Job creation by processing port parameters */ 415 System.out.println("KEPLER HOME IS " + System.getProperty("KEPLER")); 416 System.out.println("USER DIR IS "+ System.getProperty("user.dir")); 417 cmdFile.update(); 418 cmdText.update(); 419 workdir.update(); 420 inputfiles.update(); 421 remotefiles.update(); 422 423 String strLog = null; 424 String strExecutable = null; 425 String strJobOptions = null; 426 String strBinPath = null; 427 boolean bUseGivenDir = false; 428 boolean bDefaultFork = false; 429 430 // read any dependent jobs 431 Job[] dependentJobArray = null; 432 if(dependentJob.numberOfSources() > 0) { 433 dependentJobArray = new Job[dependentJob.getWidth()]; 434 for(int i = 0; i < dependentJob.getWidth(); i++) { 435 dependentJobArray[i] = (Job) ((ObjectToken)dependentJob.get(i)).getValue(); 436 } 437 } 438 439 if (this.getAttribute("_expertMode") != null) { 440 Token temp = null; 441 temp = (executable != null) ? executable.getToken() : null; 442 strExecutable = (temp != null) ? ((StringToken) temp).stringValue() 443 .trim() : null; 444 //back compatibility, remove the double quotes at the very beginning and at the very last. 445 strExecutable = strExecutable.replaceAll("^\"|\"$", ""); 446 447 temp = (binPath != null) ? binPath.getToken() : null; 448 strBinPath = (temp != null) ? ((StringToken) temp).stringValue() 449 .trim() : null; 450 //back compatibility, remove the double quotes at the very beginning and at the very last. 451 strBinPath = strBinPath.replaceAll("^\"|\"$", ""); 452 453 temp = (jobSubmitOptions != null) ? jobSubmitOptions.getToken() 454 : null; 455 strJobOptions = (temp != null) ? ((StringToken) temp).stringValue() 456 .trim() : null; 457 //back compatibility, remove the double quotes at the very beginning and at the very last. 458 strJobOptions = strJobOptions.replaceAll("^\"|\"$", ""); 459 460 bUseGivenDir = ((BooleanToken) usegivendir.getToken()) 461 .booleanValue(); 462 bDefaultFork = ((BooleanToken) defaultForkScript.getToken()) 463 .booleanValue(); 464 } 465 466 scheduler.update(); 467 target.update(); 468 469 String strCmdFile = null; 470 String strCmdText = null; 471 StringToken token = (StringToken) cmdFile.getToken(); 472 if(token != null) { 473 strCmdFile = token.stringValue().trim(); 474 //back compatibility, remove the double quotes at the very beginning and at the very last. 475 strCmdFile = strCmdFile.replaceAll("^\"|\"$", ""); 476 if(strCmdFile.isEmpty()) { 477 strCmdFile = null; 478 } 479 } 480 481 token = (StringToken) cmdText.getToken(); 482 if(token != null) { 483 strCmdText = token.stringValue().trim(); 484 if(strCmdText.isEmpty()) { 485 strCmdText = null; 486 } 487 } 488 489 boolean bCmdFileLocal = ((BooleanToken) cmdFileLocal.getToken()) 490 .booleanValue(); 491 492 StringToken temp = ((StringToken) workdir.getToken()); 493 String strWorkdir = temp==null? null :temp.stringValue().trim(); 494 //back compatibility, remove the double quotes at the very beginning and at the very last. 495 strWorkdir = strWorkdir.replaceAll("^\"|\"$", ""); 496 497 temp = ((StringToken) scheduler.getToken()); 498 strScheduler = temp==null? null :temp.stringValue().trim(); 499 //back compatibility, remove the double quotes at the very beginning and at the very last. 500 strScheduler = strScheduler.replaceAll("^\"|\"$", ""); 501 502 temp = ((StringToken) target.getToken()); 503 strTarget = temp==null? null :temp.stringValue().trim(); 504 //back compatibility, remove the double quotes at the very beginning and at the very last. 505 strTarget = strTarget.replaceAll("^\"|\"$", ""); 506 507 // Process the inputfiles parameter. 508 ArrayToken inputTokens = (ArrayToken) inputfiles.getToken(); 509 String[] inputArray = null; 510 511 try { 512 if (inputTokens.length() >= 1) { 513 int i; 514 515 ArrayList<String> iFiles = new ArrayList<String>(); 516 517 for (i = 0; i < inputTokens.length(); i++) { 518 boolean fileFound = false; 519 File pattern = new File(((StringToken) inputTokens 520 .getElement(i)).stringValue().trim()); 521 String[] contents = (pattern.getParent() != null) ? new File( 522 pattern.getParent()).list() 523 : null; 524 String fileName = pattern.getName(); 525 526 if (!fileName.equals("")) { 527 fileName = fileName.replaceAll("[*]", ".*").replaceAll( 528 "[?]", ".?").replaceAll("[+]", ".+"); 529 530 Pattern p = Pattern.compile(fileName); 531 if (contents != null) { 532 for (int index = 0; index < contents.length; index++) { 533 Matcher m = p.matcher(contents[index].trim()); 534 if (m.matches()) { 535 iFiles 536 .add(pattern.getParent() 537 + System 538 .getProperty("file.separator") 539 + contents[index]); 540 fileFound = true; 541 } 542 } 543 } 544 if (!fileFound) { 545 throw new JobException( 546 "No matching file found for " 547 + pattern.toString()); 548 } 549 } 550 } 551 552 if (iFiles.size() != 0) { 553 inputArray = new String[iFiles.size()]; 554 iFiles.toArray(inputArray); 555 } 556 } 557 } catch (JobException ex) { 558 log.error(ex); 559 // ex.printStackTrace(); 560 throw new IllegalActionException(ex.toString()); 561 } 562 563 // Process the remotefiles parameter. 564 ArrayToken remoteTokens = (ArrayToken) remotefiles.getToken(); 565 String[] remoteArray = null; 566 if (remoteTokens.length() >= 1) { 567 remoteArray = new String[remoteTokens.length()]; 568 int i; 569 for (i = 0; i < remoteTokens.length(); i++) { 570 remoteArray[i] = (((StringToken) remoteTokens.getElement(i)) 571 .stringValue().trim()); 572 } 573 // process empty array 574 if (i == 0 || remoteArray[0] == "") { 575 remoteArray = null; 576 } 577 } 578 579 // create job 580 String strJobID = JobFactory.create(); 581 Job _job = JobFactory.get(strJobID); 582 583 // set the dependencies, if any 584 if(dependentJobArray != null) { 585 _job.setDependentJobs(dependentJobArray); 586 } 587 588 try { 589 // set _job's executable, working dir and input files 590 if (strExecutable != null && strExecutable.trim().length() > 0) { 591 _job.setExecutable(strExecutable, true, strJobOptions); 592 } 593 594 if (strWorkdir != null && strWorkdir.trim().length() > 0) { 595 _job.setWorkdir(strWorkdir, !bUseGivenDir); 596 } else { 597 if (bUseGivenDir) { 598 throw new JobException( 599 "The flag 'use given workdir' is set to true. " + 600 "Please provide a valid working directory. \n " + 601 "Or you could uncheck the flag and let the actor create a " + 602 "unique working directory for your job"); 603 } 604 if (strTarget == null || strTarget.trim().equals("") 605 || strTarget.equals("local")|| strTarget.equals("localhost")) { 606 //If submitting to localhost, find home dir using java 607 strWorkdir = System.getProperty("user.home"); 608 if ( System.getProperty("os.name").toLowerCase().indexOf("win") >= 0 ) { 609 strWorkdir = System.getenv().get("HOMEPATH"); 610 } 611 }else{ 612 strWorkdir = "$HOME"; 613 } 614 _job.setWorkdir(strWorkdir); 615 } 616 617 // make sure both cmdFile and cmdText were not used 618 if(strCmdText != null && strCmdFile != null) { 619 throw new IllegalActionException(this, "Do not specify both cmdText and cmdFile."); 620 } 621 622 // make sure at least one of cmdFile and cmdText were used 623 if(strCmdText == null && strCmdFile == null) { 624 throw new IllegalActionException(this, "Must specify either cmdText or cmdFile."); 625 } 626 627 // if the commands were specified as text, write to a temporary file 628 if (strCmdText != null) { 629 File file; 630 try { 631 file = File.createTempFile("job", null); 632 } catch (IOException e) { 633 throw new IllegalActionException(this, e, 634 "Error creating temporary file for cmd text."); 635 } 636 Writer writer = null; 637 try { 638 writer = new FileWriter(file); 639 writer.write(strCmdText); 640 } catch(IOException e) { 641 throw new IllegalActionException(this, e, "Error write cmd text to file."); 642 } finally { 643 if(writer != null) { 644 try { 645 writer.close(); 646 } catch (IOException e) { 647 throw new IllegalActionException(this, e, "Error closing cmd text file."); 648 } 649 } 650 } 651 strCmdFile = file.getAbsolutePath(); 652 bCmdFileLocal = true; //if commands are text, bCmdFileLocal should be true so the file will be transfered to the target cluster. 653 } 654 655 _job.setSubmitFile(strCmdFile, bCmdFileLocal); 656 657 if(bDefaultFork && "Fork".equalsIgnoreCase(strScheduler)){ 658 File resourcesDir = ConfigurationManager.getModule("job").getResourcesDir(); 659 File binFile = new File(resourcesDir,"jmgr-fork.sh"); 660 if(!binFile.exists()){ 661 throw new JobException("Unable to locate default fork script - " 662 + binFile.getAbsolutePath() + ". Please copy fork script manually."); 663 } 664 _job.setBinFile(binFile.getAbsolutePath(), true); 665 //Set the bin path explicitly if it is not already set 666 //This is required because jmgr-fork.sh fails with command not found. 667 //It works only if there is an absolute or relative path prefix to "jmgr-fork.sh" 668 if(strBinPath == null || strBinPath.trim().equals("")){ 669 strBinPath = _job.getWorkdirPath(); //setWorkdir was already called 670 //so, this method should return the right path 671 } 672 } 673 674 if (inputArray != null) { 675 for (int i = 0; i < inputArray.length; i++) { 676 if (inputArray[i] != null 677 && inputArray[i].trim().length() > 0) 678 _job.setInputFile(inputArray[i], true); 679 } 680 } 681 682 if (remoteArray != null) { 683 for (int i = 0; i < remoteArray.length; i++) { 684 if (remoteArray[i] != null 685 && remoteArray[i].trim().length() > 0) 686 _job.setInputFile(remoteArray[i], false); 687 } 688 } 689 } catch (JobException ex) { 690 log.error(ex); 691 JobFactory.remove(strJobID); 692 strJobID = ""; 693 _job = null; 694 throw new IllegalActionException(this, ex, "Error creating job."); 695 } 696 697 /* Job Manager processing */ 698 699 org.kepler.job.JobManager myJmgr = null; 700 701 try { 702 if (strScheduler==null || strScheduler.equals("")) { 703 throw new JobException( 704 "Please provide a valid input for the port/parameter scheduler."); 705 } 706 // Read properties file. 707 // String filePath = System.getProperty("KEPLER"); 708 // filePath = filePath + "/common/configs/ptolemy/configs" 709 // + "/kepler/JobLauncher.properties"; 710 711 // File myFile = new File(filePath); 712 // Properties properties = new Properties(); 713 // properties.load(new FileInputStream(filePath)); 714 // String jobsSupported = properties.getProperty(strScheduler 715 // .toLowerCase()); 716 717// ConfigurationProperty cp = ConfigurationManager.getInstance() 718// .getProperty(ConfigurationManager.getModule("actors"), 719// new ConfigurationNamespace("JobLauncher")); 720 properties = cp.findProperties("name", 721 strScheduler.toLowerCase(), true); 722 String jobsSupported = null; 723 if(properties.size() != 0){ 724 ConfigurationProperty prop = properties.get(0); 725 jobsSupported = prop.getProperty("value").getValue(); 726 } 727 if (jobsSupported != null) { 728 strScheduler = jobsSupported; 729 } else { 730 throw new JobException("Job Scheduler " + strScheduler 731 + " is not supported."); 732 } 733 734 // Create a JobManager object or get it if it was already created 735 if (isDebugging) 736 log.debug("Create/get JobManager object. Name = " 737 + strScheduler + "; target = " + strTarget 738 + "; binPath = " + strBinPath); 739 myJmgr = JobManagerFactory.get(strScheduler, strTarget, strBinPath); 740 741 742 // Note that myJmgr.getID can give back a String reference to the 743 // object that can be used with JobManagerFactory.get 744 } catch (JobException ex) { 745 log.error("Job manager object could not be created. " + ex); 746 myJmgr = null; 747 JobFactory.remove(strJobID); 748 strJobID = ""; 749 throw new IllegalActionException("JobManager Error: " 750 + ex.toString()); 751 } 752 753 /* Job Submission */ 754 boolean bSucc = false; 755 try { 756 if (_job == null) { 757 throw new JobException("JobSubmitter: incoming Job is null"); 758 } 759 760 if (isDebugging) { 761 log.debug("JobSubmit: submit job " + _job.getJobID() + "..."); 762 } 763 String realJobID; 764 765 int numTasksVal = ((IntToken)numTasks.getToken()).intValue(); 766 if(numTasksVal > 0) { 767 _job.status = new TaskParallelJobStatusInfo(); 768 _job.setNumTasks(numTasksVal); 769 } 770 if(bUseGivenDir){ 771 //do not overwrite existing folder. create if not present 772 realJobID = _job.submit(myJmgr, false, strJobOptions); 773 }else { 774 realJobID = _job.submit(myJmgr, true, strJobOptions); 775 } 776 strLog = new String("JobSubmitter: Job " + _job.getJobID() 777 + " is submitted, it's real jobID is: " + realJobID); 778 log.info(strLog); 779 jobID.send(0, new StringToken(realJobID)); 780 bSucc = true; 781 } catch (JobException ex) { 782 log.error(ex); 783 strLog = "JobSubmitter Error: " + ex.toString(); 784 success.send(0, new BooleanToken(bSucc)); 785 logPort.send(0, new StringToken(strLog)); 786 return; 787 } catch (Exception ex) { 788 log.error(ex); 789 strLog = "JobSubmitter Error: " + ex.toString(); 790 success.send(0, new BooleanToken(bSucc)); 791 logPort.send(0, new StringToken(strLog)); 792 return; 793 } 794 795 /* Job Status Checking */ 796 JobStatusCode jobStatusCode; 797 798 try { 799 800 do { 801 //System.out.println("BEFORE CHECK JOB STATUS"); 802 //jobStatusCode = _checkStatus(_job); 803 //System.out.println("AFTER CHECK JOB STATUS " + jobStatusCode); 804 // while (_waitUntilCode != null && _waitUntilCode != jobStatusCode) 805 // { 806 //Loop if there is no match and job status is NOT Error or NotInQueue. 807 //Second check is necessary to avoid infinite loop in case where job 808 //never gets to the user requested state or if the state goes undetected 809 //(say during sleep between poll). 810 Long time = 1000L * _sleepWhileWaitingVal; 811 Thread.sleep(time); 812 jobStatusCode = _checkStatus(_job); 813 } while (!matchStatus(jobStatusCode) && jobStatusCode.ordinal()>1); 814 815 } catch (Exception ex) { 816 log.error(ex); 817 jobStatusCode = JobStatusCode.Error; 818 strLog = "JobStatus Error: " + ex.toString(); 819 bSucc = false; 820 success.send(0, new BooleanToken(bSucc)); 821 logPort.send(0, new StringToken(strLog)); 822 return; 823 } 824 825 if (_job != null) { 826 strLog = new String("JobStatus: Status of job " + _job.getJobID() 827 + ": " + jobStatusCode.toString()); 828 jobOut.send(0, new ObjectToken(_job)); 829 } 830 success.send(0, new BooleanToken(bSucc)); 831 logPort.send(0, new StringToken(strLog)); 832 } 833 834 private JobStatusCode _checkStatus(Job job) throws Exception { 835 JobStatusCode jobStatusCode = JobStatusCode.Error; 836 if (job == null) { 837 throw new Exception("JobStatus: Job is null"); 838 } 839 840 job.status(); // successful query or exception 841 842 jobStatusCode = job.status.statusCode; 843 log.info("Status of job " + job.getJobID() + ": " 844 + jobStatusCode.toString()); 845 return jobStatusCode; 846 } 847 848 private boolean matchStatus(JobStatusCode jobStatusCode) { 849 String str = jobStatusCode.toString(); 850 851 if (_waitUntilCodes.size() == 0 || _waitUntilCodes.contains(str)) { 852 return true; 853 } 854 return false; 855 } 856 857 /** React to a change in an attribute. */ 858 public void attributeChanged(Attribute attribute) 859 throws IllegalActionException { 860 if (attribute == waitUntil) { 861 String waitUntilStr = waitUntil.getExpression(); 862 waitUntilStr = waitUntilStr.trim(); 863 String[] split = waitUntilStr.split("\\s*,\\s*"); 864 _waitUntilCodes = new ArrayList<String>(Arrays.asList(split)); 865 // check validity 866 if (_waitUntilCodes.contains("ANY")) { 867 _waitUntilCodes.clear(); 868 } else { 869 for (int i = 0; i < _waitUntilCodes.size(); i++) { 870 JobStatusCode waitUntilCode = JobStatusCode 871 .getFromString(_waitUntilCodes.get(i)); 872 if (waitUntilCode == null) { 873 throw new IllegalActionException(this, 874 "Invalid job status type: " 875 + _waitUntilCodes.get(i)); 876 } 877 } 878 } 879 } else if (attribute == sleepWhileWaiting) { 880 if ((IntToken) sleepWhileWaiting.getToken() != null) { 881 _sleepWhileWaitingVal = ((IntToken) sleepWhileWaiting 882 .getToken()).intValue(); 883 if (_sleepWhileWaitingVal < 0) { 884 throw new IllegalActionException(this, 885 "Sleep While Waiting value cannot be negative."); 886 } 887 } 888 } else if (attribute == binPath) { 889 //if binPath is changed, we should remove JobManager from the table to force it update. 890 if (strScheduler != null && !strScheduler.isEmpty() && strTarget != null && !strTarget.isEmpty()) 891 JobManagerFactory.instance.removeJmgrFromTable(strScheduler, strTarget); 892 } else { 893 super.attributeChanged(attribute); 894 } 895 } 896 897 private static final Log log = LogFactory.getLog(GenericJobLauncher.class 898 .getName()); 899 private static final boolean isDebugging = log.isDebugEnabled(); 900 // private JobStatusCode _waitUntilCode = null; 901 private ArrayList<String> _waitUntilCodes = new ArrayList<String>(); 902 private int _sleepWhileWaitingVal = 5; 903 private List<ConfigurationProperty> properties; 904 private ConfigurationProperty cp; 905 private String strScheduler, strTarget; 906}