001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: jianwu $' 006 * '$Date: 2012-09-14 22:48:44 +0000 (Fri, 14 Sep 2012) $' 007 * '$Revision: 30678 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.job; 031 032import org.apache.commons.logging.Log; 033import org.apache.commons.logging.LogFactory; 034import org.kepler.job.Job; 035import org.kepler.job.JobException; 036import org.kepler.job.JobManager; 037 038import ptolemy.actor.TypedAtomicActor; 039import ptolemy.actor.TypedIOPort; 040import ptolemy.actor.parameters.PortParameter; 041import ptolemy.data.BooleanToken; 042import ptolemy.data.ObjectToken; 043import ptolemy.data.StringToken; 044import ptolemy.data.expr.Parameter; 045import ptolemy.data.type.BaseType; 046import ptolemy.kernel.CompositeEntity; 047import ptolemy.kernel.util.IllegalActionException; 048import ptolemy.kernel.util.NameDuplicationException; 049 050////////////////////////////////////////////////////////////////////////// 051//// JobSubmitter 052 053/** 054 * <p> 055 * Submit a job into a jobmanager on the local/remote machine, using external 056 * execute command/ssh 057 * </p> 058 * 059 * <p> 060 * This actor uses the Job and JobManager classes that use<br/> 061 * - either java.lang.Runtime.exec() to invoke a subprocess - or org.kepler.ssh 062 * classes to use ssh/scp to submit a job and ask for the status of a submitted 063 * job. 064 * </p> 065 * 066 * <p> 067 * Inputs: a created Job, a JobManager and Submit options. Job can be created by JobCreator. 068 * JobManager can be created by JobManager. Submit options are different for different job scheduler. 069 * </p> 070 * 071 * <p> 072 * Outputs: - <i>jobOut</i> (Object): the job of the submitted job is passed on 073 * so that it can be used later to query its status in JobStatus, or destroyed 074 * (by ???) - <i>succ</i> (boolean): true means successful submission, false 075 * means error - <i>log</i> (String): for log/error message 076 * </p> 077 * 078 * @author Norbert Podhorszki 079 * @version $Id: JobSubmitter.java 30678 2012-09-14 22:48:44Z jianwu $ 080 * @since Ptolemy II 5.0.1 081 */ 082public class JobSubmitter extends TypedAtomicActor { 083 /** 084 * Construct an actor with the given container and name. 085 * 086 * @param container 087 * The container. 088 * @param name 089 * The name of this actor. 090 * @exception IllegalActionException 091 * If the actor cannot be contained by the proposed 092 * container. 093 * @exception NameDuplicationException 094 * If the container already has an actor with this name. 095 */ 096 public JobSubmitter(CompositeEntity container, String name) 097 throws NameDuplicationException, IllegalActionException { 098 super(container, name); 099 100 // Uncomment the next line to see debugging statements 101 // addDebugListener(new ptolemy.kernel.util.StreamListener()); 102 103 // job as input 104 jobIn = new TypedIOPort(this, "jobIn", true, false); 105 jobIn.setTypeEquals(BaseType.OBJECT); 106 new Parameter(jobIn, "_showName", BooleanToken.TRUE); 107 108 // jobManager 109 jobManager = new TypedIOPort(this, "jobManager", true, false); 110 jobManager.setTypeEquals(BaseType.OBJECT); 111 new Parameter(jobManager, "_showName", BooleanToken.TRUE); 112 113 // Output: the submitted job (actually the same as jobIn) 114 jobOut = new TypedIOPort(this, "jobOut", false, true); 115 jobOut.setTypeEquals(BaseType.OBJECT); 116 new Parameter(jobOut, "_showName", BooleanToken.TRUE); 117 118 // Output: boolean succ 119 succ = new TypedIOPort(this, "succ", false, true); 120 succ.setTypeEquals(BaseType.BOOLEAN); 121 new Parameter(succ, "_showName", BooleanToken.TRUE); 122 123 // Output: log 124 logport = new TypedIOPort(this, "log", false, true); 125 logport.setTypeEquals(BaseType.STRING); 126 new Parameter(logport, "_showName", BooleanToken.TRUE); 127 128 129 // submit options, which are different for different job scheduler. 130 jobSubmitOptions = new PortParameter(this, "jobSubmitOptions", new StringToken("")); 131 jobSubmitOptions.setStringMode(true); 132 133 new Parameter(jobSubmitOptions.getPort(), "_showName", BooleanToken.TRUE); 134 135 } 136 137 /*********************************************************** 138 * ports and parameters 139 */ 140 141 /** 142 * The job to be submitted Such a job can be created by the JobCreator. This 143 * port is an input port of type Object. 144 */ 145 public TypedIOPort jobIn; 146 147 /** 148 * The selected jobmanager (result of JobManager) This port is an input port 149 * of type Object; 150 */ 151 public TypedIOPort jobManager; 152 153 /** 154 * The job, which is passed on in this actor for later use (status, destroy, 155 * etc) This port is an output port of type Object. 156 */ 157 public TypedIOPort jobOut; 158 159 /** 160 * The status of the job submission. True if submission succeeded, false 161 * otherwise. This port is an output port of type Boolean. 162 */ 163 public TypedIOPort succ; 164 165 /** 166 * Logging information of job submission. (to inform user about problems at 167 * unsuccessful submission). This port is an output port of type String. 168 * It's name on canvas is 'log' 169 */ 170 public TypedIOPort logport; 171 172 /** 173 * The Options of the job submission. Such as "-o /u/joboutput/ -j y -l h_rt=24:00:00" 174 * for SGE job scheduler. Its default value is empty. 175 */ 176 public PortParameter jobSubmitOptions; 177 178 /** 179 * The rate parameter for the log port. 180 */ 181 public Parameter log_tokenProductionRate; 182 183 /*********************************************************** 184 * public methods 185 */ 186 187 /** 188 * fire 189 * 190 * @exception IllegalActionException 191 * Not thrown 192 */ 193 public void fire() throws IllegalActionException { 194 super.fire(); 195 jobSubmitOptions.update(); 196 197 ObjectToken jobToken = (ObjectToken) jobIn.get(0); 198 Job job = (Job) jobToken.getValue(); 199 JobManager jmgr = (JobManager) ((ObjectToken) jobManager.get(0)) 200 .getValue(); 201 boolean bSucc = false; 202 String strLog = null; 203 String strJobOptions = ((StringToken) jobSubmitOptions.getToken()).stringValue(); 204 //back compatibility, remove the double quotes at the very beginning and at the very last. 205 strJobOptions = strJobOptions.replaceAll("^\"|\"$", ""); 206 207 try { 208 if (job == null) 209 throw new Exception("JobSubmitter: incoming Job is null"); 210 211 if (jmgr == null) 212 throw new Exception("JobSubmitter: JobManager is null"); 213 214 if (isDebugging) 215 log.debug("JobSubmit: submit job " + job.getJobID() + "..."); 216 217 String realJobID = job.submit(jmgr, true, strJobOptions); 218 219 strLog = new String("JobSubmitter: Job " + job.getJobID() 220 + " is submitted, it's real jobID is: " + realJobID); 221 log.info(strLog); 222 223 bSucc = true; 224 225 } catch (JobException ex) { 226 log.error(ex); 227 strLog = "JobSubmitter Error: " + ex.toString(); 228 ex.printStackTrace(); 229 throw new IllegalActionException("JobSubmitter Error: " + ex.toString()); 230 } catch (Exception ex) { 231 log.error(ex); 232 strLog = "JobSubmitter Error: " + ex.toString(); 233 ex.printStackTrace(); 234 throw new IllegalActionException("JobSubmitter Error: " + ex.toString()); 235 } 236 237 succ.send(0, new BooleanToken(bSucc)); 238 logport.send(0, new StringToken(strLog)); 239 jobOut.send(0, jobToken); 240 } 241 242 private static final Log log = LogFactory.getLog(JobSubmitter.class 243 .getName()); 244 private static final boolean isDebugging = log.isDebugEnabled(); 245 246}