001/* 002 * Copyright (c) 2004-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: jianwu $' 006 * '$Date: 2012-09-14 22:48:44 +0000 (Fri, 14 Sep 2012) $' 007 * '$Revision: 30678 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.actor.job; 031 032//import org.kepler.job.JobManager; // same name... 033import java.util.List; 034 035import org.apache.commons.logging.Log; 036import org.apache.commons.logging.LogFactory; 037import org.kepler.configuration.ConfigurationManager; 038import org.kepler.configuration.ConfigurationNamespace; 039import org.kepler.configuration.ConfigurationProperty; 040import org.kepler.job.JobException; 041import org.kepler.job.JobManagerFactory; 042 043import ptolemy.actor.TypedAtomicActor; 044import ptolemy.actor.TypedIOPort; 045import ptolemy.actor.parameters.PortParameter; 046import ptolemy.data.BooleanToken; 047import ptolemy.data.ObjectToken; 048import ptolemy.data.StringToken; 049import ptolemy.data.expr.Parameter; 050import ptolemy.data.type.BaseType; 051import ptolemy.kernel.CompositeEntity; 052import ptolemy.kernel.util.IllegalActionException; 053import ptolemy.kernel.util.NameDuplicationException; 054 055////////////////////////////////////////////////////////////////////////// 056//// JobManager 057 058/** 059 * <p> 060 * Define a jobmanager on the local/remote machine 061 * </p> 062 * 063 * <p> 064 * This actor uses the org.kepler.job.JobManagerFactory and org.kepler.job 065 * JobManager classes to define a job manager 066 * </p> 067 * 068 * <p> 069 * The input should be: 070 * </p> 071 * <ul> 072 * <li>the supporter class' name, e.g. Condor, PBS, LoadLeveler or whatever for 073 * that a class org.kepler.job.JobSupporter<name>.class exists</li> 074 * <li>the target machine, either null, "" or "local" to denote a local 075 * jobmanager to be used by local execution command, OR "[user@]host" to denote 076 * a remote jobmanager to be used by an ssh connection.</li> 077 * </ul> 078 * 079 * <p> 080 * The output is the created jobmanager of type ObjectToken, which should be 081 * given as parameter to a JobSubmitter. 082 * </p> 083 * 084 * <p> 085 * On error, an empty string is returned. 086 * </p> 087 * 088 * @author Norbert Podhorszki 089 * @version $Id: JobManager.java 30678 2012-09-14 22:48:44Z jianwu $ 090 * @since Ptolemy II 5.0.1 091 */ 092public class JobManager extends TypedAtomicActor { 093 /** 094 * Construct an actor with the given container and name. 095 * 096 * @param container 097 * The container. 098 * @param name 099 * The name of this actor. 100 * @exception IllegalActionException 101 * If the actor cannot be contained by the proposed 102 * container. 103 * @exception NameDuplicationException 104 * If the container already has an actor with this name. 105 */ 106 public JobManager(CompositeEntity container, String name) 107 throws NameDuplicationException, IllegalActionException { 108 super(container, name); 109 110 // Uncomment the next line to see debugging statements 111 // addDebugListener(new ptolemy.kernel.util.StreamListener()); 112 113 // jobManager denotes the name of the actual job manager 114 jobManager = new PortParameter(this, "jobManager", new StringToken( 115 "SGE")); 116 // jobManager.setStringMode(true); // string mode (no "s, but no 117 // variables as well! 118 jobManager.setStringMode(true); 119 cp = ConfigurationManager.getInstance() 120 .getProperty(ConfigurationManager.getModule("actors"), 121 new ConfigurationNamespace("JobLauncher")); 122 properties = cp.getProperties("value", true); 123 for (ConfigurationProperty property : properties) { 124 jobManager.addChoice(property.getValue()); 125 } 126 new Parameter(jobManager.getPort(), "_showName", BooleanToken.TRUE); 127 128 // target selects the machine where the jobmanager is running 129 target = new PortParameter(this, "target", new StringToken( 130 "[local | [user]@host]")); 131 new Parameter(target.getPort(), "_showName", BooleanToken.TRUE); 132 target.setStringMode(true); 133 134 // binPath is the full path to the jobmanager commands on the target 135 // machine 136 binPath = new PortParameter(this, "binPath", new StringToken( 137 "/path/to/[remote]jobmanager/bin")); 138 new Parameter(binPath.getPort(), "_showName", BooleanToken.TRUE); 139 binPath.setStringMode(true); 140 141 jmgr = new TypedIOPort(this, "jmgr", false, true); 142 jmgr.setTypeEquals(BaseType.OBJECT); 143 new Parameter(jmgr, "_showName", BooleanToken.FALSE); 144 } 145 146 /*********************************************************** 147 * ports and parameters 148 */ 149 150 /** 151 * The name of the jobmanager to be used It should be a name, for which a 152 * supporter class exist as <i>org.kepler.job.JobSupport<jobManager>.class 153 * 154 * This parameter is read each time in fire(). 155 */ 156 public PortParameter jobManager; 157 158 /** 159 * The machine to be used at job submission. It should be null, "" or 160 * "local" for the local machine or [user@]host to denote a remote machine 161 * accessible with ssh. 162 * 163 * This parameter is read each time in fire(). 164 */ 165 public PortParameter target; 166 167 /** 168 * The path to the job manager commands on the target machines. Commands are 169 * constructed as <i>binPath/command</i> and they should be executable this 170 * way. This parameter is read each time in fire(). 171 */ 172 public PortParameter binPath; 173 174 /** 175 * The created org.kepler.job.JobManager object as ObjectToken. This jmgr 176 * should be used in JobSubmitter to submit a job. This port is an output 177 * port of type ObjectToken. 178 */ 179 public TypedIOPort jmgr; 180 181 /*********************************************************** 182 * public methods 183 */ 184 185 /** 186 * fire 187 * 188 * @exception IllegalActionException 189 * If the subprocess cannot be started, if the input of the 190 * subprocess cannot be written, if the subprocess gets 191 * interrupted, or if the return value of the process is 192 * non-zero. 193 */ 194 public void fire() throws IllegalActionException { 195 super.fire(); 196 197 // update PortParameters 198 jobManager.update(); 199 target.update(); 200 binPath.update(); 201 202 String strJobManager = ((StringToken) jobManager.getToken()) 203 .stringValue(); 204 String strTarget = ((StringToken) target.getToken()).stringValue(); 205 String strBinPath = ((StringToken) binPath.getToken()).stringValue(); 206 207 //back compatibility, remove the double quotes at the very beginning and at the very last. 208 strJobManager = strJobManager.replaceAll("^\"|\"$", ""); 209 strTarget = strTarget.replaceAll("^\"|\"$", ""); 210 strBinPath = strBinPath.replaceAll("^\"|\"$", ""); 211 212 // Create a JobManager object or get it if it was already created 213 org.kepler.job.JobManager myJmgr = null; 214 try { 215 if (isDebugging) 216 log.debug("Create/get JobManager object. Name = " 217 + strJobManager + "; target = " + strTarget 218 + "; binPath = " + strBinPath); 219 JobManagerFactory factory = JobManagerFactory.instance; 220 myJmgr = factory.get(strJobManager, strTarget, strBinPath); 221 // Note that myJmgr.getID can give back a String reference to the 222 // object 223 // that can be used with JobManagerFactory.get 224 } catch (JobException ex) { 225 log.error("Job manager object could not be created. " + ex); 226 myJmgr = null; 227 ex.printStackTrace(); 228 throw new IllegalActionException("JobManager Error: " + ex.toString()); 229 } 230 231 jmgr.send(0, new ObjectToken(myJmgr)); 232 } 233 234 private static final Log log = LogFactory 235 .getLog(JobManager.class.getName()); 236 private static final boolean isDebugging = log.isDebugEnabled(); 237 private List<ConfigurationProperty> properties; 238 private ConfigurationProperty cp; 239 240}