001/*
002 * Copyright (c) 2004-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: jianwu $'
006 * '$Date: 2012-09-14 22:48:44 +0000 (Fri, 14 Sep 2012) $' 
007 * '$Revision: 30678 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.actor.job;
031
032import org.apache.commons.logging.Log;
033import org.apache.commons.logging.LogFactory;
034import org.kepler.job.Job;
035import org.kepler.job.JobException;
036import org.kepler.job.JobManager;
037
038import ptolemy.actor.TypedAtomicActor;
039import ptolemy.actor.TypedIOPort;
040import ptolemy.actor.parameters.PortParameter;
041import ptolemy.data.BooleanToken;
042import ptolemy.data.ObjectToken;
043import ptolemy.data.StringToken;
044import ptolemy.data.expr.Parameter;
045import ptolemy.data.type.BaseType;
046import ptolemy.kernel.CompositeEntity;
047import ptolemy.kernel.util.IllegalActionException;
048import ptolemy.kernel.util.NameDuplicationException;
049
050//////////////////////////////////////////////////////////////////////////
051//// JobSubmitter
052
053/**
054 * <p>
055 * Submit a job into a jobmanager on the local/remote machine, using external
056 * execute command/ssh
057 * </p>
058 * 
059 * <p>
060 * This actor uses the Job and JobManager classes that use<br/>
061 * - either java.lang.Runtime.exec() to invoke a subprocess - or org.kepler.ssh
062 * classes to use ssh/scp to submit a job and ask for the status of a submitted
063 * job.
064 * </p>
065 * 
066 * <p>
067 * Inputs: a created Job, a JobManager and Submit options. Job can be created by JobCreator.
068 * JobManager can be created by JobManager. Submit options are different for different job scheduler. 
069 * </p>
070 * 
071 * <p>
072 * Outputs: - <i>jobOut</i> (Object): the job of the submitted job is passed on
073 * so that it can be used later to query its status in JobStatus, or destroyed
074 * (by ???) - <i>succ</i> (boolean): true means successful submission, false
075 * means error - <i>log</i> (String): for log/error message
076 * </p>
077 * 
078 * @author Norbert Podhorszki
079 * @version $Id: JobSubmitter.java 30678 2012-09-14 22:48:44Z jianwu $
080 * @since Ptolemy II 5.0.1
081 */
082public class JobSubmitter extends TypedAtomicActor {
083        /**
084         * Construct an actor with the given container and name.
085         * 
086         * @param container
087         *            The container.
088         * @param name
089         *            The name of this actor.
090         * @exception IllegalActionException
091         *                If the actor cannot be contained by the proposed
092         *                container.
093         * @exception NameDuplicationException
094         *                If the container already has an actor with this name.
095         */
096        public JobSubmitter(CompositeEntity container, String name)
097                        throws NameDuplicationException, IllegalActionException {
098                super(container, name);
099
100                // Uncomment the next line to see debugging statements
101                // addDebugListener(new ptolemy.kernel.util.StreamListener());
102
103                // job as input
104                jobIn = new TypedIOPort(this, "jobIn", true, false);
105                jobIn.setTypeEquals(BaseType.OBJECT);
106                new Parameter(jobIn, "_showName", BooleanToken.TRUE);
107
108                // jobManager
109                jobManager = new TypedIOPort(this, "jobManager", true, false);
110                jobManager.setTypeEquals(BaseType.OBJECT);
111                new Parameter(jobManager, "_showName", BooleanToken.TRUE);
112
113                // Output: the submitted job (actually the same as jobIn)
114                jobOut = new TypedIOPort(this, "jobOut", false, true);
115                jobOut.setTypeEquals(BaseType.OBJECT);
116                new Parameter(jobOut, "_showName", BooleanToken.TRUE);
117
118                // Output: boolean succ
119                succ = new TypedIOPort(this, "succ", false, true);
120                succ.setTypeEquals(BaseType.BOOLEAN);
121                new Parameter(succ, "_showName", BooleanToken.TRUE);
122
123                // Output: log
124                logport = new TypedIOPort(this, "log", false, true);
125                logport.setTypeEquals(BaseType.STRING);
126                new Parameter(logport, "_showName", BooleanToken.TRUE);
127                
128                
129                // submit options, which are different for different job scheduler. 
130                jobSubmitOptions = new PortParameter(this, "jobSubmitOptions", new StringToken(""));
131                jobSubmitOptions.setStringMode(true);
132
133                new Parameter(jobSubmitOptions.getPort(), "_showName", BooleanToken.TRUE);
134
135        }
136
137        /***********************************************************
138         * ports and parameters
139         */
140
141        /**
142         * The job to be submitted Such a job can be created by the JobCreator. This
143         * port is an input port of type Object.
144         */
145        public TypedIOPort jobIn;
146
147        /**
148         * The selected jobmanager (result of JobManager) This port is an input port
149         * of type Object;
150         */
151        public TypedIOPort jobManager;
152
153        /**
154         * The job, which is passed on in this actor for later use (status, destroy,
155         * etc) This port is an output port of type Object.
156         */
157        public TypedIOPort jobOut;
158
159        /**
160         * The status of the job submission. True if submission succeeded, false
161         * otherwise. This port is an output port of type Boolean.
162         */
163        public TypedIOPort succ;
164
165        /**
166         * Logging information of job submission. (to inform user about problems at
167         * unsuccessful submission). This port is an output port of type String.
168         * It's name on canvas is 'log'
169         */
170        public TypedIOPort logport;     
171        
172        /**
173         * The Options of the job submission. Such as "-o /u/joboutput/ -j y -l h_rt=24:00:00"
174         * for SGE job scheduler. Its default value is empty.
175         */
176        public PortParameter jobSubmitOptions;
177
178        /**
179         * The rate parameter for the log port.
180         */
181        public Parameter log_tokenProductionRate;
182
183        /***********************************************************
184         * public methods
185         */
186
187        /**
188         * fire
189         * 
190         * @exception IllegalActionException
191         *                Not thrown
192         */
193        public void fire() throws IllegalActionException {
194                super.fire();
195                jobSubmitOptions.update();
196
197                ObjectToken jobToken = (ObjectToken) jobIn.get(0);
198                Job job = (Job) jobToken.getValue();
199                JobManager jmgr = (JobManager) ((ObjectToken) jobManager.get(0))
200                                .getValue();
201                boolean bSucc = false;
202                String strLog = null;
203                String strJobOptions = ((StringToken) jobSubmitOptions.getToken()).stringValue();
204                //back compatibility, remove the double quotes at the very beginning and at the very last.
205                strJobOptions  = strJobOptions.replaceAll("^\"|\"$", "");
206
207                try {
208                        if (job == null)
209                                throw new Exception("JobSubmitter: incoming Job is null");
210
211                        if (jmgr == null)
212                                throw new Exception("JobSubmitter: JobManager is null");
213
214                        if (isDebugging)
215                                log.debug("JobSubmit: submit job " + job.getJobID() + "...");
216
217                        String realJobID = job.submit(jmgr, true, strJobOptions);
218
219                        strLog = new String("JobSubmitter: Job " + job.getJobID()
220                                        + " is submitted, it's real jobID is: " + realJobID);
221                        log.info(strLog);
222
223                        bSucc = true;
224
225                } catch (JobException ex) {
226                        log.error(ex);
227                        strLog = "JobSubmitter Error: " + ex.toString();
228                        ex.printStackTrace();
229                        throw new IllegalActionException("JobSubmitter Error: " + ex.toString());
230                } catch (Exception ex) {
231                        log.error(ex);
232                        strLog = "JobSubmitter Error: " + ex.toString();
233                        ex.printStackTrace();
234                        throw new IllegalActionException("JobSubmitter Error: " + ex.toString());
235                }
236
237                succ.send(0, new BooleanToken(bSucc));
238                logport.send(0, new StringToken(strLog));
239                jobOut.send(0, jobToken);
240        }
241
242        private static final Log log = LogFactory.getLog(JobSubmitter.class
243                        .getName());
244        private static final boolean isDebugging = log.isDebugEnabled();
245
246}