001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: spurawat $' 006 * '$Date: 2014-02-05 19:12:24 +0000 (Wed, 05 Feb 2014) $' 007 * '$Revision: 32579 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.job; 031 032import java.io.ByteArrayOutputStream; 033import java.io.File; 034import java.util.Iterator; 035import java.util.Vector; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039import org.kepler.ssh.ExecException; 040import org.kepler.ssh.ExecFactory; 041import org.kepler.ssh.ExecInterface; 042import org.kepler.ssh.LocalExec; 043 044public class JobManager { 045 046 private JobSupport jobSupport = null; // job support class 047 048 private ExecInterface execObject; // class for remote/local execution 049 050 private String host; // remote host of jobmanager 051 private String user; // user at remote host to log in with ssh 052 053 private String jobManagerName; // the support class name (just to generate 054 // Kepler ids) 055 private String managerBinPath; 056 057 private static final Log log = LogFactory 058 .getLog(JobManager.class.getName()); 059 private static final boolean isDebugging = log.isDebugEnabled(); 060 061 protected JobManager() { 062 preloadSupporterClasses(); 063 } 064 065 public String getID() { 066 return JobManagerFactory.createKey(jobManagerName, user + "@" + host); 067 } 068 069 private static void preloadSupporterClasses() { 070 // We need to get the supporter class names directly, otherwise 071 // java will not find it at runtime ??? 072 // System.out.println("Condor class: " + CondorSupport.class.getName()); 073 String c; 074 c = JobSupportFork.class.getName(); 075 c = JobSupportCondor.class.getName(); 076 c = JobSupportPBS.class.getName(); 077 c = JobSupportNCCS.class.getName(); // Obsolete, same as PBS 078 c = JobSupportLoadLeveler.class.getName(); 079 c = JobSupportSGE.class.getName(); 080 c = JobSupportMoab.class.getName(); 081 c = JobSupportSLURM.class.getName(); 082 } 083 084 /** 085 * Choose a jobmanager for execution <i>jobmanager</i> can be which is 086 * supported at that time: Condor <i>target</i> is either "localhost" or 087 * "user@host" is the machine where the jobmanager is running <binPath> is 088 * the full path to the jobmanager commands on that machine, or "" or null 089 * if they are in the default path if "'jobmanager'Support" class cannot be 090 * instantiated, a JobException is thrown 091 */ 092 protected void selectJobManager(String jobmanager, String target, 093 String binPath) throws JobException { 094 095 // instantiate the supporter class 096 String classname = "org.kepler.job.JobSupport" + jobmanager; 097 try { 098 // System.out.println("Condor class: " + 099 // Class.forName(classname).getName()); 100 jobSupport = (JobSupport) Class.forName(classname).newInstance(); 101 } catch (ClassNotFoundException cnf) { 102 throw new JobException("Couldn't find class " + classname); 103 } catch (InstantiationException ie) { 104 throw new JobException("Couldn't instantiate an object of type " 105 + classname); 106 } catch (IllegalAccessException ia) { 107 throw new JobException("Couldn't access class " + classname); 108 } 109 110 // initialize the supporter class 111 jobSupport.init(binPath); 112 113 //Store the bin path 114 managerBinPath = binPath; 115 116 // store supporter name for Kepler use 117 jobManagerName = jobmanager; 118 119 // process the target 120 if (target == null || target.trim().equals("") 121 || target.equals("local")|| target.equals("localhost")) { 122 // localhost, finished 123 execObject = new LocalExec(); 124 user = System.getProperty("user.name"); 125 host = new String("local"); 126 } else { 127 int atPos = target.indexOf('@'); 128 if (atPos >= 0) 129 user = target.substring(0, target.indexOf('@')); 130 else 131 user = System.getProperty("user.name"); 132 133 host = target.substring(atPos + 1); 134 try { 135 execObject = ExecFactory.getExecObject(user,host); 136 } catch (ExecException e) { 137 throw new JobException("Error connecting to " + user +"@"+host + " : " + e.toString()); 138 } 139 } 140 } 141 142 /** 143 * Submit a job, called from Job.submit(); boolean <i>overwrite</i> 144 * indicates whether old files that exist on the same directory should be 145 * removed before staging new files. As long jobIDs are not really unique, 146 * this is worth to be true. <i>options</i> can be a special options string 147 * for the actual jobmanager. 148 * 149 * @return: jobID as String if submission is successful (it is submitted and 150 * real jobID of the submitted job can be retrieved) real jobID can 151 * be found in job.status.jobID, but you do not need actually on 152 * error throws JobException 153 */ 154 155 protected String submit(Job job, boolean overwrite, String options) 156 throws JobException { 157 158 // first, get the submit file 159 String submitFilePath = job.getSubmitFile(); // predefined submitfile? 160 if (submitFilePath == null) { // no, create it now for the specific job 161 // manager 162 submitFilePath = new String(job.getLocalWorkdirPath() 163 + File.separator + "submitcmd." + job.getJobID()); 164 jobSupport.createSubmitFile(submitFilePath, job); 165 job.setSubmitFile(submitFilePath, true); 166 } 167 168 // the submitfile will be in current working dir of job, so we have to 169 // get the name of the submitfile without path 170 File sf = new File(submitFilePath); 171 String submitFileName = sf.getName(); 172 173 // job manager specific submission command 174 String commandStr = jobSupport.getSubmitCmd(job.getWorkdirPath() + "/" + submitFileName, options, job); 175 176 //System.out.println("submit command is: " + commandStr); 177 178 String cdCmd = "cd " + job.getWorkdirPath() + "; "; 179 int exitCode = 0; 180 181 if (commandStr == null || commandStr.trim().equals("")) { 182 throw new JobException( 183 "Supporter class could not give back meaningful command to submit your job"); 184 } 185 186 // stage the files before submission 187 try { 188 // delete the remote working directory if asked by the submitter 189 if (overwrite) 190 execObject.deleteFile(job.getWorkdirPath(), true, false); 191 192 execObject.createDir(job.getWorkdirPath(), true); 193 194 boolean cpLocalBinScript = false; 195 boolean cpRemoteBinScript = false; 196 boolean binPathSpecified = false; 197 String binFileName = job.getBinFile(); 198 File binFile = null; 199 //if binFile exists check where it should be staged. 200 if(binFileName!=null) { 201 binFile = new File(binFileName); 202 if(managerBinPath!=null && !(managerBinPath.trim().equals(""))) 203 binPathSpecified = true; 204 else if(job.isBinFileLocal()){ 205 cpLocalBinScript = true; 206 } else { 207 cpRemoteBinScript = true; 208 } 209 } 210 211 //If bin path is specified stage bin file to binpath 212 if( binPathSpecified ){ 213 if(job.isBinFileLocal()){ 214 execObject.copyTo(binFile,managerBinPath,false); 215 }else{ 216 StringBuffer cmd = new StringBuffer("cp "); 217 cmd.append(binFileName); 218 cmd.append(" "); 219 cmd.append(managerBinPath); 220 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 221 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 222 exitCode = _exec(new String(cmd), commandStdout, commandStderr); 223 if (exitCode != 0) { 224 throw new JobException( 225 "Error at copying remote bin file into specified bin path." 226 + "\nStdout:\n" + commandStdout 227 + "\nStderr:\n" + commandStderr); 228 } 229 } 230 231 //format bin file if it was copied from mac or windows 232 Vector<File> vector = new Vector<File>(); 233 vector.add(binFile); 234 formatFiles(managerBinPath,vector); 235 } 236 237 // stage local files/directories to the remote working directory 238 // while also stripping end of line meta-characters from each file 239 // if the file comes from mac or windows 240 Vector<File> files = job.getLocalFiles(); 241 if(cpLocalBinScript){ 242 //add bin file to the list of files to be copied to workingdir 243 files.add(binFile); 244 } 245 execObject.copyTo(files, job.getWorkdirPath(), true); 246 247 //Now try to change the format of the copied files 248 //if the copy was from Windows or Mac 249 formatFiles(job.getWorkdirPath(), files); 250 251 252 // copy already remote files/directories to the working directory 253 Vector<String> rfiles = job.getRemoteFiles(); 254 if (rfiles.size() > 0) { 255 StringBuffer cmd = new StringBuffer("cp -r "); 256 Iterator<String> it = rfiles.iterator(); 257 while (it.hasNext()) { 258 cmd = cmd.append((String) it.next()); 259 cmd = cmd.append(" "); 260 } 261 if(cpRemoteBinScript){ 262 //add bin file to the list of files to be copied to workingdir 263 cmd = cmd.append(binFileName); 264 cmd = cmd.append(" "); 265 } 266 cmd = cmd.append(job.getWorkdirPath()); 267 268 if (isDebugging) 269 log.debug("Remote file copy command: " + cmd); 270 271 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 272 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 273 exitCode = _exec(new String(cmd), commandStdout, commandStderr); 274 if (exitCode != 0) { 275 //if the error is because same file, do not throw exception. 276 String[] stdErrArray = commandStderr.toString().split("\n"); 277 boolean throwE = false; 278 for (int index = 0; index < stdErrArray.length; index++) { 279 if (!stdErrArray[index].trim().endsWith("are the same file")) { 280 throwE = true; 281 break; 282 } 283 } 284 if (throwE) 285 throw new JobException( 286 "Error at copying remote files into the job directory." 287 + "\nStdout:\n" + commandStdout 288 + "\nStderr:\n" + commandStderr); 289 } 290 } 291 292 } catch (ExecException e) { 293 throw new JobException( 294 "Jobmanager.submit: Error at staging files to " + user 295 + "@" + host + "\n" + e); 296 } 297 298 // we have to enter the workdir before submitting the job 299 commandStr = new String(cdCmd + commandStr); 300 301 // submit the job finally 302 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 303 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 304 305 exitCode = _exec(commandStr, commandStdout, commandStderr); 306 307 if (exitCode != 0) { 308 throw new JobException("Error at job submission." + "\nCommand:" + commandStr + "\nStdout:\n" 309 + commandStdout + "\nStderr:\n" + commandStderr); 310 } 311 312 // parse the output for real jobID 313 // This method can throw JobException as well! 314 String jobID = jobSupport.parseSubmitOutput(commandStdout.toString(), 315 commandStderr.toString()); 316 317 return jobID; 318 } // end-of-submit 319 320 private void formatFiles(String dir, Vector<File> files) 321 throws JobException { 322 boolean isMac = System.getProperty("os.name").toLowerCase() 323 .contains("mac"); 324 boolean isWindows = System.getProperty("os.name").toLowerCase() 325 .contains("win"); 326 int exitCode; 327 String cdCmd = "cd " + dir + "; "; 328 329 if (files.size() > 0 330 && !this.host.equals("local") 331 && (isMac || isWindows)) { 332 333 StringBuffer cmd = new StringBuffer(cdCmd); 334 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 335 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 336 337 String filename = "*"; 338 if(files.size()==1){ 339 //if it is just one file use name instead of * 340 filename = files.get(0).getName(); 341 } 342 // use dos/mac 2unix if available 343 if (isMac) { 344 cmd.append("mac2unix "); 345 cmd.append(filename); 346 } 347 if (isWindows) { 348 cmd.append("dos2unix "); 349 cmd.append(filename); 350 } 351 exitCode = _exec(new String(cmd), commandStdout, commandStderr); 352 353 if (exitCode != 0) { // use sed -i if dos/mac 2unix not 354 // available 355 cmd = new StringBuffer(cdCmd); 356 if (isMac) { 357 cmd.append("sed -i 's/\\r/\\n/g' "); 358 cmd.append(filename); 359 } 360 if (isWindows) { 361 cmd.append("sed -i 's/\\r//g' "); 362 cmd.append(filename); 363 } 364 exitCode = _exec(new String(cmd), commandStdout, 365 commandStderr); 366 // use tr as last resort 367 if (exitCode != 0) { 368 cmd = new StringBuffer(cdCmd); 369 Iterator<File> it = files.iterator(); 370 while (it.hasNext()) { 371 File aFile = it.next(); 372 String fileName = aFile.getName(); 373 374 cmd.append("cp " + fileName); 375 cmd.append(" tmp" + fileName); 376 377 if (isMac) { 378 cmd.append("; tr '\\r' '\\n' <tmp" + fileName); 379 } 380 if (isWindows) { 381 cmd.append("; tr -d '\\r' <tmp" + fileName); 382 } 383 384 cmd.append(" >" + fileName); 385 cmd.append("; rm tmp" + fileName + "; "); 386 } 387 exitCode = _exec(new String(cmd), commandStdout, 388 commandStderr); 389 if (exitCode != 0) { 390 throw new JobException( 391 "Error at copying local files into the job directory." 392 + "\nStdout:\n" + commandStdout 393 + "\nStderr:\n" + commandStderr); 394 } 395 } 396 } 397 } 398 } 399 400 /** 401 * Check the status of the job 402 * 403 * @return: JobStatusInfo struct if succeeded throws JobException on error, 404 * or you call for a non-submitted job 405 */ 406 protected JobStatusInfo status(String jobID) throws JobException { 407 return status(jobID,0); 408 } 409 410 /** 411 * Check the status of the job and tasks if numTasks>0 412 * 413 * @return: JobStatusInfo struct if succeeded throws JobException on error, 414 * or you call for a non-submitted job 415 */ 416 protected JobStatusInfo status(String jobID, int numTasks) throws JobException { 417 JobStatusInfo stat; 418 if (jobID == null) { 419 throw new JobException( 420 "JobManager.status() called with null argument"); 421 } 422 try{ 423 //1. Get command to execute 424 String commandStr; 425 if(numTasks > 0) { 426 //execute jobstatus cmd and task status command 427 commandStr = jobSupport.getTaskStatusCmd(jobID); 428 }else{ 429 //execute job status command 430 commandStr = jobSupport.getStatusCmd(jobID); 431 } 432 433 //2. Execute command 434 int exitCode = 0; 435 if (commandStr == null || commandStr.trim().equals("")) { 436 throw new JobException( 437 "Supporter class could not give back meaningful" 438 + "command to check the status of your job"); 439 } 440 441 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 442 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 443 444 exitCode = _exec(commandStr, commandStdout, commandStderr); 445 // Do not check the exitCode, as error can mean just: job not in queue 446 // (e.g. PBS) 447 // if (exitCode != 0) 448 // throw new JobException("Error at checking job status. Stdout:\n" + 449 // commandStdout + 450 // "\nStderr:\n" + commandStderr); 451 452 //3. Parse the output for status info 453 // This method can throw JobException as well! 454 455 System.out.println("JobManager.status numTasks: " + numTasks); 456 if(numTasks > 0) { 457 // parse both job status and individual task status. Returns TaskParallelJobStatusInfo 458 // This method can throw JobException as well! 459 stat = jobSupport.parseTaskStatusOutput( 460 jobID, numTasks, exitCode, commandStdout.toString(), commandStderr.toString()); 461 }else{ 462 stat = jobSupport.parseStatusOutput(jobID, exitCode, commandStdout 463 .toString(), commandStderr.toString()); 464 } 465 }catch(NotSupportedException e){ 466 throw new JobException(e.toString()); 467 } 468 return stat; 469 470 } 471 472 /** 473 * delete a job from queue 474 * 475 * @return: JobStatusInfo struct if succeeded throws JobException on error, 476 * or you call for a non-submitted job 477 */ 478 protected boolean delete(String jobID) throws JobException { 479 480 if (jobID == null) { 481 throw new JobException( 482 "JobManager.status() called with null argument"); 483 } 484 485 String commandStr = jobSupport.getDeleteCmd(jobID); 486 487 int exitCode = 0; 488 489 if (commandStr == null || commandStr.trim().equals("")) { 490 throw new JobException( 491 "Supporter class could not give back meaningful" 492 + "command to remove your job"); 493 } 494 495 ByteArrayOutputStream commandStdout = new ByteArrayOutputStream(); 496 ByteArrayOutputStream commandStderr = new ByteArrayOutputStream(); 497 498 exitCode = _exec(commandStr, commandStdout, commandStderr); 499 500 // Do not check the exitCode, as error can mean just: job not in queue 501 // (e.g. PBS) 502 // if (exitCode != 0) 503 // throw new JobException("Error at checking job removel. Stdout:\n" 504 // + commandStdout + "\nStderr:\n" + commandStderr); 505 506 // parse the output for delete info 507 // This method can throw JobException as well! 508 boolean stat = jobSupport.parseDeleteOutput(jobID, exitCode, 509 commandStdout.toString(), commandStderr.toString()); 510 511 return stat; 512 513 } 514 515 /** 516 * Execute a command either locally (Java Runtime) or remotely (SSH). 517 * 518 * @return exitCode of the command. 519 */ 520 private int _exec(String commandStr, ByteArrayOutputStream commandStdout, 521 ByteArrayOutputStream commandStderr) throws JobException { 522 523 int exitCode = 0; 524 try { 525 if (isDebugging) 526 log 527 .debug("Execute on " + user + "@" + host + ": " 528 + commandStr); 529 exitCode = execObject.executeCmd(commandStr, commandStdout, 530 commandStderr); 531 532 } catch (ExecException e) { 533 throw new JobException("Jobmanager._exec: Error at execution on " 534 + user + "@" + host + " of command: " + commandStr + "\n" 535 + e); 536 } 537 538 return exitCode; 539 } 540 541} // end-of-class-JobManager 542