001/* A base class for DDP engines.
002 * 
003 * Copyright (c) 2013 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-11-23 20:26:41 +0000 (Mon, 23 Nov 2015) $' 
008 * '$Revision: 34245 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.director;
031
032import java.io.File;
033import java.io.IOException;
034import java.io.InputStream;
035import java.net.InetSocketAddress;
036import java.net.MalformedURLException;
037import java.net.Socket;
038import java.net.URI;
039import java.net.URL;
040import java.net.URLClassLoader;
041import java.util.Collections;
042import java.util.HashMap;
043import java.util.HashSet;
044import java.util.LinkedList;
045import java.util.List;
046import java.util.Map;
047import java.util.Random;
048import java.util.Set;
049
050import org.apache.commons.logging.Log;
051import org.apache.commons.logging.LogFactory;
052import org.kepler.build.modules.Module;
053import org.kepler.build.modules.ModuleTree;
054import org.kepler.configuration.ConfigurationProperty;
055import org.kepler.ddp.Utilities;
056import org.kepler.ddp.actor.pattern.DDPDataSink;
057import org.kepler.ddp.actor.pattern.DDPDataSource;
058import org.kepler.ddp.actor.pattern.DDPPatternActor;
059import org.kepler.ddp.actor.pattern.SingleInputPatternActor;
060import org.kepler.sms.SemanticType;
061
062import ptolemy.actor.CompositeActor;
063import ptolemy.actor.Executable;
064import ptolemy.actor.TypeAttribute;
065import ptolemy.actor.TypedIOPort;
066import ptolemy.actor.gui.Effigy;
067import ptolemy.actor.gui.ModelDirectory;
068import ptolemy.actor.gui.PtolemyEffigy;
069import ptolemy.actor.parameters.PortParameter;
070import ptolemy.data.BooleanToken;
071import ptolemy.data.IntToken;
072import ptolemy.data.StringToken;
073import ptolemy.data.expr.Parameter;
074import ptolemy.data.expr.Variable;
075import ptolemy.data.type.Type;
076import ptolemy.kernel.util.Attribute;
077import ptolemy.kernel.util.IllegalActionException;
078import ptolemy.kernel.util.NameDuplicationException;
079import ptolemy.kernel.util.NamedObj;
080import ptolemy.kernel.util.ScopeExtender;
081import ptolemy.kernel.util.Workspace;
082
083/** A base class for DDP engines.
084 * 
085 * @author Daniel Crawl
086 * @version $Id: DDPEngine.java 34245 2015-11-23 20:26:41Z crawl $
087 * 
088 */
089public abstract class DDPEngine implements Cloneable {
090
091        /** Create a new DDPEngine.
092         *  @param director The director containing this engine.
093         */
094    public DDPEngine(DDPDirector director) throws IllegalActionException, NameDuplicationException {
095        _director = director;
096        _addParameters();
097    }
098    
099    /** React to a parameter change. */
100        public void attributeChanged(Attribute attribute) throws IllegalActionException {
101            
102            if(attribute == _director.jobArguments) {
103            _jobArgumentsMap.clear();
104                String val = _director.jobArguments.stringValue();
105                if (val != null && !val.isEmpty()) {
106                    for(String entry : val.split(",")) {
107                        if (entry != null && !val.isEmpty()) {
108                                String[] nameVal = entry.split("=");
109                                if(nameVal.length != 2) {
110                                    throw new IllegalActionException(_director, 
111                                            "Job arguments must be of the form: name1 = value1, name2 = value2");
112                                }
113                                _jobArgumentsMap.put(nameVal[0].trim(), nameVal[1].trim());
114                        }
115                    }
116                }
117            }
118        }
119
120        /** Clone the engine into the containing director's workspace. */
121        @Override
122        public Object clone() throws CloneNotSupportedException {
123                return clone(_director.workspace());
124        }
125        
126    /** Clone the object into the specified workspace.
127     *  @param workspace The workspace for the cloned object.
128     *  @exception CloneNotSupportedException Not thrown in this base class
129     *  @return The new Attribute.
130     */
131    public Object clone(Workspace workspace) throws CloneNotSupportedException {
132        DDPEngine newObject = (DDPEngine) super.clone();
133        newObject._additionalJars = new HashSet<String>();
134        newObject._classLoader = null;
135        newObject._configDirStr = null;
136        newObject._container = null;
137        newObject._degreeOfParallelism = 1;
138        // newObject._director is set in DDPDirector.clone().
139        newObject._displayRedirectDir = "";
140        newObject._jobArgumentsMap = new HashMap<String,String>();
141        newObject._jobDir = null;
142        newObject._jobLock = new Object();
143        newObject._random = new Random();
144        newObject._stopRequested = false;
145        newObject._writeSubWorkflowsToFiles = false;
146        return newObject;
147    }
148
149    /** Close all the effigies created. */
150    public static void closeAllEffigies() throws IllegalActionException {
151        synchronized(_effigiesForStubModels) {
152            for(Effigy effigy : _effigiesForStubModels) {
153                _setEffigiesToNotModified(effigy);
154                effigy.closeTableaux();
155                try {
156                    effigy.setContainer(null);
157                } catch(NameDuplicationException e) {
158                    throw new IllegalActionException(
159                        "Error setting effigy container to null: " +
160                            e.getMessage());
161                }
162            }
163            _effigiesForStubModels.clear();
164        }
165    }
166
167    /** Create an Effigy for a model so that windows may be opened by gui actors. */
168    public static synchronized void createEffigy(CompositeActor model) {
169        
170        List<?> configurations = ptolemy.actor.gui.Configuration.configurations();
171        
172        // make sure there is at least one Configuration. there are none when
173        // running headless.
174        if(!configurations.isEmpty()) {
175            ptolemy.actor.gui.Configuration configuration = 
176                    (ptolemy.actor.gui.Configuration) configurations.iterator().next();
177
178        try {
179    
180                PtolemyEffigy effigy = new PtolemyEffigy(configuration.workspace());
181                effigy.setModel(model);
182                ModelDirectory directory = (ModelDirectory) configuration
183                        .getEntity("directory");
184                
185                effigy.setName(model.getName());
186                
187                effigy.identifier.setExpression(model.getName());
188                if (directory != null) {
189                    if (directory.getEntity(model.getName()) != null) {
190                        // Name is already taken.
191                        int count = 2;
192                        String newName = effigy.getName() + " " + count;
193                        while (directory.getEntity(newName) != null) {
194                            newName = effigy.getName() + " " + ++count;
195                        }
196                        effigy.setName(newName);
197                    }
198                }
199                effigy.setContainer(directory);
200    
201                // do not open a new window for the workflow
202                /*Tableau t = configuration.openModel(model);*/
203                
204                _effigiesForStubModels.add(effigy);
205                
206            } catch (Exception e) {
207                throw new RuntimeException("Error creating Effigy.", e);
208            }
209        }
210    }
211
212    /** Get the directory to redirect display related actors. */
213    public String getDisplayRedirectDir() throws IllegalActionException {
214        return _displayRedirectDir;
215    }
216
217    /** Get the model for a specific name. */
218    public static CompositeActor getModel(String name) {
219        return _subWorkflows.get(name);
220    }
221
222    /** Get the name of the engine. */
223    public final String getName() {
224        return _engineName;
225    }
226    
227    /** Valid types of servers that can be started. The first value is the
228     *  default for the startServerType parameter.
229     */
230    public String[] getServerTypes() {
231        return new String[] {"default", DDPDirector.SAME_JVM_STRING, DDPDirector.DISTRIBUTED_STRING};
232    }
233
234    /** Execute the engine. In this base class, does nothing. */
235    public void fire() throws IllegalActionException {
236
237        // iterate the source actors once
238        final List<DDPDataSource> sourceActors = _container.entityList(DDPDataSource.class);
239        for(DDPDataSource source : sourceActors) {
240            final int rc = source.iterate(1);
241            if(rc == Executable.NOT_READY) {
242                throw new IllegalActionException(_director, "Actor " + source.getName() +
243                    " is not ready to fire. (Maybe prefire() returned false?)");
244            }
245        }
246        
247        // call prefire for all the sink actors
248        final List<DDPDataSink> sinkActors = _container.entityList(DDPDataSink.class);
249        for(DDPDataSink sink : sinkActors) {
250            if(!sink.prefire()) {
251                throw new IllegalActionException(_director, "Actor " + sink.getName() +
252                    " is not ready to fire. (Prefire() returned false.)");
253            }
254        }
255
256        // run the DDP job
257        _executeDDPJob();
258        
259        if(!_stopRequested) {
260            // call fire and postfire for all the sink actors
261            for(Object object : sinkActors) {
262                final DDPDataSink sink = (DDPDataSink)object;
263                sink.fire();
264                sink.postfire();            
265            }
266        }
267    }
268    
269    /** Postfire the engine. In this base class, returns true. */
270        public boolean postfire() throws IllegalActionException {
271                return true;
272        }
273
274    /** Initialize fields from parameters. */
275    public void preinitialize() throws IllegalActionException {
276        _container = (CompositeActor) _director.getContainer();
277        _configDirStr = _director.configDir.stringValue();
278        _writeSubWorkflowsToFiles = ((BooleanToken)_director.writeSubWorkflowsToFiles.getToken()).booleanValue();
279        _displayRedirectDir = _director.displayRedirectDir.stringValue();
280                _classLoader = Thread.currentThread().getContextClassLoader();
281                _stopRequested = false;
282        _jobDir = null;
283
284        
285        IntToken parallelism = (IntToken)_director.degreeOfParallelism.getToken();
286        if(parallelism == DDPDirector.DEFAULT_INTTOKEN) {
287            _degreeOfParallelism = 1;
288        } else {
289            _degreeOfParallelism = parallelism.intValue();
290        }
291
292                // see if the we should use a server in the same jvm
293                String typeStr = ((StringToken)_director.startServerType.getToken()).stringValue();
294                if(typeStr != null && (typeStr.equals("default") ||
295                        typeStr.equals(DDPDirector.SAME_JVM_STRING))) {
296                    _sameJVM = true;
297                } else {
298                    _sameJVM = false;
299                }
300                
301                // close any effigies that were opened during the previous execution
302                // FIXME this closes all the effigies, not just the ones used by
303                // this engine.
304                closeAllEffigies();
305    }
306    
307    /** Stop any running DDP jobs. */
308    public void stop() throws IllegalActionException {
309        _stopRequested = true;
310    }
311    
312    /** Perform cleanup. */
313    public void wrapup() throws IllegalActionException {
314
315        _subWorkflows.clear();
316
317    }
318    
319    ///////////////////////////////////////////////////////////////////
320    ////                      protected methods                  //////
321
322    /** Add parameters to the containing director. In this base class, does nothing. */
323    protected void _addParameters() throws IllegalActionException, NameDuplicationException {
324        
325    }
326    
327    /** Check that the configuration directory is set and exists on the file system. */
328    protected void _checkConfigDir() throws IllegalActionException {
329    
330        if(_configDirStr.trim().isEmpty()) {
331                
332                String moduleName = _engineName.toLowerCase();
333                
334            // set the default location of the config directory
335            String workflowDirStr = System.getProperty(moduleName + ".workflowdir");
336            if(workflowDirStr == null) {
337                throw new IllegalActionException(_director, "System property " +
338                                moduleName + ".workflowdir not set.");
339            }
340            
341            _configDirStr = workflowDirStr + File.separator +
342                "tools" + File.separator + "conf";
343        }
344        
345        // make sure conf dir exists
346        final File configDirFile = new File(_configDirStr);
347        if(!configDirFile.exists()) {
348            throw new IllegalActionException(_director, _engineName + " configuration directory " +
349                    _configDirStr + " does not exist.");
350        }
351        
352    }
353    
354    /** Check the existence of required files before starting the DDP server.
355     *  The list of required files is found in configuration.xml for each
356     *  engine, in Engine.Server.Required.
357     */
358    protected boolean _checkFilesBeforeStartingServer() throws IllegalActionException {
359        
360        ConfigurationProperty engineProperty = Utilities.getEngineProperty(_engineName, _director);
361        
362        List<ConfigurationProperty> requiredProperties = 
363            engineProperty.getProperties("Server.Required.File");
364        if(requiredProperties != null && !requiredProperties.isEmpty()) {
365            final File configDirFile = new File(_configDirStr);
366            final String dirStr = configDirFile.getParentFile().getAbsolutePath();
367            for(ConfigurationProperty fileProperty : requiredProperties) {
368                File requiredFile = new File(dirStr, fileProperty.getValue());
369                System.out.println("Check that file exists before starting the server: " + requiredFile);
370                if(!requiredFile.exists()) {
371                    System.out.println("ERROR: required file not found: " + requiredFile);
372                    return false;
373                }
374            }
375        }
376        return true;
377    }
378
379    /** Check the container of this director for incorrect actors, etc. */
380    protected void _checkModel() throws IllegalActionException {
381                
382        // make sure container only has ddp actors
383        _checkModelForNonDDPActors(_container);
384        
385    }
386    
387    /** Make sure the container only contains DDP actors. */
388    protected void _checkModelForNonDDPActors(CompositeActor container)
389            throws IllegalActionException {
390        
391        // verify all contained actors are ddp pattern actors
392        for(Object object : container.entityList()) {
393            if(!(object instanceof DDPPatternActor)) {
394                throw new IllegalActionException(_director, ((NamedObj)object).getName() +
395                        " is not a DDP Pattern actor.");
396            } else {
397                // make sure composites have a director
398                // NOTE: composites must have a director even if the execution
399                // class is specified (and the sub-workflow is not used) since
400                // the output port type can only be set if the composite actor
401                // is opaque. 
402                // @see TypedIOPort.getType()
403                //
404                DDPPatternActor actor = (DDPPatternActor)object;
405                if(!actor.isOpaque()) {
406                    throw new IllegalActionException(_director, 
407                            ((NamedObj)object).getName() +  " must contain a director.");
408                }
409            }   
410        }
411    }
412    
413    /** Check if the DDP engine server is running. If not, try to start it.
414     *  @param socketAddress Host and port of the server to check.
415     *  @param startScriptStr The script to start the server if not running.
416     *  @return True if a server was started, false if could connect to already running server. 
417     */
418    protected boolean _checkServer(InetSocketAddress socketAddress, String startScriptStr)
419                throws IllegalActionException {
420        
421        boolean startedServer = false;
422        
423        synchronized(_serverStartStopLock) {
424                Socket socket = null;
425                try {
426                        socket = new Socket();
427                        boolean connected = false;
428                        try {
429                            socket.connect(socketAddress, _CONNECT_TIMEOUT);
430                            connected = true;
431                        } catch (IOException e) {
432                            
433                            System.out.println(_engineName + " server " + socketAddress +
434                                        " does not appear to be running. Starting...");
435                            
436                            // start the server
437                            
438                            if(!_checkFilesBeforeStartingServer()) {
439                                throw new IllegalActionException(_director,
440                                        "One or more files required to start the server were not found.");
441                            }
442                                            
443                            // see if the script is executable. kepler modules are zipped,
444                            // which does not preserve the permissions.
445                            File startScriptFile = new File(startScriptStr);
446                            if(!startScriptFile.canExecute()) {
447                                throw new IllegalActionException(_director, 
448                                        "The script " + startScriptFile + " is not executable.\n" +
449                                                        "You must change the permissions so that " +
450                                                        startScriptFile.getName() + 
451                                                        " and all the other scripts in \n" +
452                                                        startScriptFile.getParent() + " are executable.");
453                            }
454                            
455                            ProcessBuilder builder = new ProcessBuilder(startScriptStr);
456                            
457                            // make sure JAVA_HOME is set
458                            java.util.Map<String,String> env = builder.environment();
459                            if(env.get("JAVA_HOME") == null) {
460                                env.put("JAVA_HOME", System.getProperty("java.home"));
461                            }
462
463                            builder.redirectErrorStream(true);
464                            
465                            try {
466                                Process process = builder.start();
467                                InetSocketAddress newAddress = 
468                                        _parseOutputFromStartingServer(process.getInputStream());
469                                if(newAddress != null) {
470                                    socketAddress = newAddress;
471                                }
472                                process.waitFor();
473                                startedServer = true;
474                            } catch (Exception e1) {
475                                throw new IllegalActionException(_director, e1, "Unable to start " +
476                                                _engineName + " server.");
477                            }
478                                                        
479                            int tries = 0;
480                            while(tries < 5) {
481                                // wait for the server to start
482                                try {
483                                    Thread.sleep(5000);
484                                    tries++;
485                                    System.out.print("Connecting to " + _engineName + " server port try #" + tries + ": ");
486                                    try {
487                                        socket.close();
488                                        socket = new Socket();
489                                        socket.connect(socketAddress, _CONNECT_TIMEOUT);
490                                        connected = true;
491                                        System.out.println("connected.");
492                                        break;
493                                    } catch (IOException e1) {
494                                        // do nothing
495                                        System.out.println(e1);
496                                    }
497                                } catch (InterruptedException e2) {
498                                    throw new IllegalActionException(_director, e2, "Error while sleeping.");
499                                }
500                            }
501                            
502                            
503                            // if we get here, we were able to connect to the master/job manager port.
504                            // however, the server may not be completely initialized, so wait a few more seconds
505                            System.out.println("Waiting 15 seconds for " + _engineName + " server to initialize.");
506                            try {
507                                Thread.sleep(15000);
508                            } catch (InterruptedException e2) {
509                                throw new IllegalActionException(_director, e2, "Error while waiting " +
510                                        " for " + _engineName + " server to initialize.");
511                            }
512                
513                        }
514                        
515                        if(connected) {
516                            try {
517                                socket.close();
518                                socket = null;
519                            } catch (IOException e) {
520                                throw new IllegalActionException(_director, e, "Error closing socket.");
521                            }
522                        } else {
523                            throw new IllegalActionException(_director, 
524                                        "Could not connect to " + _engineName + " server: " + socketAddress);
525                        }
526                } finally {
527                        if(socket != null) {
528                                try {
529                                                socket.close();
530                                        } catch (IOException e) {
531                                throw new IllegalActionException(_director, e, "Error closing socket.");
532                                        }
533                        }
534                }
535        }
536        
537        return startedServer;
538    }
539
540    /** Copy the workflow parameters from one sub-workflow to another including
541     *  the parameters in all the containers of the source.
542     *
543     *  @param sourceSubWorkflow the source sub-workflow
544     *  @param destSubWorkflow the destination sub-workflow
545     */
546        protected void _copyParameters(DDPPatternActor sourceSubWorkflow, DDPPatternActor destSubWorkflow)
547                throws IllegalActionException {
548
549                // clone the parameters into the same workspace as the destination
550                // subworkflow
551                final Workspace workspace = ((NamedObj) destSubWorkflow).workspace();
552                
553        // get the parameters up the hierarchy
554        final java.util.Map<String,Variable> parameters = 
555            _getParametersInHierarchy(sourceSubWorkflow.getContainer());
556        // copy the parameters into the pactor
557        // TODO: only need to get the parameters once per run, instead of once
558        // per pactor
559        for(Variable p : parameters.values()) {
560            
561            //System.out.println("copying parameter: " + p.getFullName());
562            
563            // make sure the cloned actor does not already have a parameter
564            // with the same name. this can happen if the parameter is a SharedParameter
565            if(destSubWorkflow.getAttribute(p.getName()) == null) {   
566                //System.out.println("parameter " + p);
567                try {
568                    // if the parameter is a PortParameter, create a new parameter
569                    // instead of clone it, since we do not want the associated port
570
571                    // we also need to set persistence so the parameter appears when serialized
572                    if(p instanceof PortParameter) {
573                        final Parameter copiedParameter = new Parameter((NamedObj) destSubWorkflow, p.getName()); 
574                        String value = p.getExpression();
575                        if(p.isStringMode()) {
576                                copiedParameter.setExpression("\"" + value + "\"");
577                        } else {
578                                copiedParameter.setExpression(value);
579                        }
580                        copiedParameter.setPersistent(true);
581                    } else {
582                        final Variable cloneParameter;
583                        
584                        // NOTE: Variable values are not written during exportMoML(),
585                        // so we need to put the value in a new Parameter.
586                        if(p instanceof Parameter) {
587                            cloneParameter = (Variable) p.clone(workspace);
588                        } else {
589                            cloneParameter = new Parameter(workspace);
590                        }
591                        cloneParameter.setContainer((NamedObj) destSubWorkflow);
592                        cloneParameter.setPersistent(true);
593
594                        if(!(p instanceof Parameter)) {
595                            cloneParameter.setName(p.getName());
596                            ((Parameter)cloneParameter).setExpression(p.getExpression());
597                        }
598                    } 
599                } catch(Exception e) {
600                    throw new IllegalActionException(_director, e, "Unable to add " +
601                        " parameter " + p.getFullName() + " to " + sourceSubWorkflow.getFullName());
602                }
603            }
604        }
605        }
606        
607        /** Create a new directory for this job. */
608        protected void _createJobDirectory() throws IllegalActionException {
609
610                int number = _random.nextInt(Integer.MAX_VALUE);
611                File directory = new File(System.getProperty("user.home")
612                                + File.separator + number);
613                while (directory.exists()) {
614                        number = _random.nextInt(Integer.MAX_VALUE);
615                        directory = new File(System.getProperty("user.home")
616                                        + File.separator + number);
617                }
618                if (!directory.mkdir()) {
619                        throw new IllegalActionException("Could not create directory "
620                                        + directory.getPath());
621                }
622                _jobDir = directory.getPath() + File.separator;
623                _log.debug("Job directory is " + _jobDir);
624        }
625
626    /** Get the parameters for a NamedObj and all its containers. */
627    protected static Map<String,Variable> _getParametersInHierarchy(NamedObj namedObj)
628    {
629        java.util.Map<String,Variable> retval = new HashMap<String,Variable>();
630        final List<?> attributes = namedObj.attributeList();
631        for(Object object : attributes) {
632            if(object instanceof Variable) {
633                retval.put(((Variable)object).getName(), (Variable)object);
634            }
635            
636            
637            if(object instanceof ScopeExtender) {
638                try {
639                    ((ScopeExtender)object).expand();
640                } catch (IllegalActionException e) {
641                    // TODO Auto-generated catch block
642                    e.printStackTrace();
643                }
644                for(Object subAttribute : ((ScopeExtender)object).attributeList()) {
645                    if(subAttribute instanceof Variable) {
646                        retval.put(((Variable)subAttribute).getName(), (Variable)subAttribute);                        
647                    }
648                }
649            }
650        }
651               
652        // get the parameters above 
653        final NamedObj container = namedObj.getContainer();
654        if(container != null) {
655            final java.util.Map<String,Variable> aboveParameters = _getParametersInHierarchy(container);
656            for(java.util.Map.Entry<String,Variable> entry : aboveParameters.entrySet()) {
657                final String name = entry.getKey();
658                // do not add parameters with the same name since they are overridden
659                if(!retval.containsKey(name)) {
660                    retval.put(name, entry.getValue());
661                }
662            }
663        }
664        
665        // remove certain parameters
666        java.util.Map<String,Variable> copy = new HashMap<String,Variable>(retval);
667        for(java.util.Map.Entry<String,Variable> entry : copy.entrySet()) {
668            final String name = entry.getKey();
669            final Variable parameter = entry.getValue();
670            // remove parameters whose name begins with "_"
671            // remove semantic type parameters
672            if(name.startsWith("_") || (parameter instanceof SemanticType)) {
673                retval.remove(name);
674            }
675        }
676        
677        return retval;
678    }
679
680    /** Get a list of jars required for director to start. 
681     *  It also set _classLoader value based on the jars.
682     */
683    protected List<URI> _getJarList() throws IllegalActionException{
684        final List<URI> jarPaths = new LinkedList<URI>();
685        final List<File> jarsWithRelativePaths = new LinkedList<File>();
686        
687        for(String additionalJar : _additionalJars) {
688            jarsWithRelativePaths.add(new File(additionalJar));
689        }
690        
691        // get the jars in the director's includeJars parameter
692        String includeJarsStr = _director.includeJars.stringValue();
693        if(includeJarsStr != null && !includeJarsStr.isEmpty()) {
694            for(String jarPath : includeJarsStr.split(",")) {
695                File jarFile = new File(jarPath);
696                // see if jar is an absolute path
697                if(jarFile.isAbsolute()) {
698                    if(!jarFile.exists() || !jarFile.canRead()) {
699                        throw new IllegalActionException(_director,
700                                "Jar does not exist or cannot be read: " + jarFile);
701                    }
702                    // jars with absolute paths are added directly
703                    System.out.println("Adding jar: " + jarFile.getAbsolutePath());
704                    jarPaths.add(jarFile.toURI());
705                } else {
706                    jarsWithRelativePaths.add(jarFile);
707                }
708            }
709        }
710                    
711        // add the module jars, e.g., actors.jar, ptolemy.jar, etc.
712        // also add any jars in includeJars with a relative path - these jars
713        // are assumed to be module/lib/jar.
714        final ModuleTree moduleTree = ModuleTree.instance();
715        for(Module module : moduleTree) {
716            
717            final File moduleJar = module.getTargetJar();
718            
719            // add the module jar if it exists.
720            // some modules, e.g., outreach, do not have jars.
721            if(moduleJar.exists()) {
722                jarPaths.add(moduleJar.toURI());                
723            }
724
725            final List<File> moduleJars = module.getJars();
726            for(File jar : moduleJars) {
727                // include kepler-tasks.jar since we need classes
728                // in org.kepler.build to initialize kepler in the
729                // stub. see StubUtilities.initializeKepler()
730                if(jar.getName().equals("kepler-tasks.jar")) {
731                    //System.out.println("adding jar " + jar);
732                        jarPaths.add(jar.toURI());                
733                } else if(jar.getName().matches("^log4j.*jar$") || //add log4j jar since it is used for display-redirect function in DDP.
734                        jar.getName().equals("ant.jar")) {
735                        jarPaths.add(jar.toURI());
736                } else if(!jarsWithRelativePaths.isEmpty()) {
737                    for(File jarFile : jarsWithRelativePaths) {
738                        if(jar.getName().equals(jarFile.getName())) {
739                            System.out.println("Adding jar in module " + module.getName() +
740                                    ": " + jar);
741                            jarPaths.add(jar.toURI());
742                        }
743                    }
744                }
745            }                        
746        }
747        
748        // add any jars specified by the actors.
749        final List<DDPPatternActor> actors = _container.entityList(DDPPatternActor.class);
750        for(DDPPatternActor actor : actors) {
751            final String jarsStr = actor.getJars();
752            if(!jarsStr.isEmpty()) {
753                final String[] jars = jarsStr.split(",");
754                for(String jar : jars) {
755                    final File jarFile = new File(jar);
756                    if(!jarFile.exists() || !jarFile.canRead()) {
757                        throw new IllegalActionException(actor,
758                                "Jar does not exist or cannot be read: " + jarFile.getAbsolutePath());
759                    }
760                    System.out.println("Adding jar for " + actor.getFullName() + ": " +
761                            jarFile.getAbsolutePath());
762                    jarPaths.add(jarFile.toURI());
763                }
764            }
765        }
766        
767                URL[] jarArray;
768                try{
769                List<URL> jarURLs = new LinkedList<URL>();
770                for (URI jarURI : jarPaths) {
771                        jarURLs.add(jarURI.toURL());
772                }
773        
774                jarArray = jarURLs.toArray(new URL[jarURLs.size()]);    
775
776                        if (jarArray != null && jarArray.length > 0) {
777                                _classLoader = new URLClassLoader(jarArray, Thread.currentThread()
778                                                .getContextClassLoader());
779                                Thread.currentThread().setContextClassLoader(_classLoader);
780                        }                       
781                        
782                } catch (MalformedURLException e) {
783                        e.printStackTrace();
784                        throw new IllegalActionException(_director, e.getMessage());
785                }
786                
787        return jarPaths;
788    }
789    
790    /** Execute the DDP job. */
791    protected abstract void _executeDDPJob() throws IllegalActionException;
792
793    /** Parse the output from the script that starts the server. In this
794     *  class, does nothing and returns null.
795     *  @return If the start script specifies a server URL, returns the
796     *  socket address for that URL. Otherwise, returns null.  
797     */
798    protected InetSocketAddress _parseOutputFromStartingServer(InputStream input) throws IOException, IllegalActionException {
799        return null;
800    }
801    
802    /** Remove engine-specific parameters from the director.
803     *  Does nothing in this base class.
804     */
805    protected void _removeParameters() throws IllegalActionException, NameDuplicationException {
806                
807    }
808
809    /** Set the port types inside a cloned pattern actor.
810     *  @param actor the cloned actor
811     */
812        protected Map<String, Type> _setPortTypes(DDPPatternActor actor)
813                throws IllegalActionException {
814                
815                final Map<String,Type> typeMap = new HashMap<String,Type>();
816                
817        // set the input ports of the stub source actors. the output ports will be
818        // set by the actor's PortFunction class, see _createPortFunction().
819        // the stub sink actor ports do not need to be set.       
820        final List<?> pactorPorts = actor.inputPortList();
821        for(Object object : pactorPorts) {
822            final TypedIOPort pactorPort = (TypedIOPort) object; 
823            
824            // get the connected ports - the input ports of the source actors
825            final List<?> connectedPorts = pactorPort.insidePortList();
826            for(Object object2 : connectedPorts) {
827                final TypedIOPort connectedPort = (TypedIOPort) object2;
828                
829                // set the types of ports connected to the pactor port so their
830                // types can be used to set the pact types in the stub
831                TypeAttribute typeAttribute;
832                try {
833                    typeAttribute = new TypeAttribute(connectedPort, connectedPort.getName() + "Type");
834                } catch(NameDuplicationException e) {
835                    throw new IllegalActionException(_director, e,
836                        "Error creating type attribute for " + connectedPort.getFullName());
837                }
838                typeMap.put(connectedPort.getName(), connectedPort.getType());
839                typeAttribute.setExpression(connectedPort.getType().toString());
840                
841                // call TypedIOPort.attributeChanged() with the type attribute so that
842                // the port type will be set. this is necessary if the cloned subworkflow
843                // is not serialized.
844                connectedPort.attributeChanged(typeAttribute);
845            }
846        }   
847        return typeMap;
848        }    
849    
850    ///////////////////////////////////////////////////////////////////
851    ////                      protected fields                   //////
852
853    /** DDP Engine configuration directory. */
854    protected String _configDirStr;
855
856    /** Directory for the current job. */
857    protected String _jobDir;
858
859        /** The ClassLoader class for loading implementation class for DDP actors. */
860    protected ClassLoader _classLoader;
861
862    /** Timeout when seeing if server is running. */
863    protected final static int _CONNECT_TIMEOUT = 5*1000;
864
865    /** Directory for display redirect. */
866    protected String _displayRedirectDir;
867
868    /** If true, write the sub-workflows to files, otherwise pass them as
869     *  to in the configuration object.
870     */
871    protected boolean _writeSubWorkflowsToFiles = false;
872
873        /** The container of this director. */
874    protected CompositeActor _container;
875
876    /** The default degree of parallelism for ddp pattern actors. */
877        protected int _degreeOfParallelism = 1;
878        
879    /** A collection of job arguments as key-values. */
880    protected Map<String,String> _jobArgumentsMap = new HashMap<String,String>();
881
882    /** The containing director. */
883        protected DDPDirector _director;
884        
885         /** The name of the engine. */
886        protected String _engineName = "unknown";
887        
888    /** A mapping of model name to model. FIXME: change to full name */
889    protected static final java.util.Map<String,SingleInputPatternActor> _subWorkflows =
890            Collections.synchronizedMap(new HashMap<String,SingleInputPatternActor>());
891
892    /** If true, run and use server in the same JVM as Kepler. */
893    protected boolean _sameJVM = true;
894    
895    /** A set of additional jar names to send to the server. */
896    protected Set<String> _additionalJars = new HashSet<String>();
897    
898    /** A lock for DDP jobs. */
899    protected Object _jobLock = new Object();
900    
901    /** A static object used for synchronization. */
902    protected final static Object _serverStartStopLock = new Object();  
903
904    
905    ///////////////////////////////////////////////////////////////////
906    ////                      private methods                    //////
907
908    /** Set an effigy and any contained effigies to be not modified. */
909    private static void _setEffigiesToNotModified(Effigy effigy) {
910        //System.out.println("setting not modified for : " + effigy.getFullName());
911        effigy.setModified(false);
912        for(Effigy containedEffigy : effigy.entityList(Effigy.class)) {
913            _setEffigiesToNotModified(containedEffigy);
914        }
915    }
916
917    ///////////////////////////////////////////////////////////////////
918    ////                      private fields                     //////
919
920        /** Random number generator for job directories. */
921        private Random _random = new Random();
922
923        /** Logging. */
924    private final static Log _log = LogFactory.getLog(DDPEngine.class);
925    
926    /** A list of Effigies opened for models. */
927    private static List<Effigy> _effigiesForStubModels = 
928        Collections.synchronizedList(new LinkedList<Effigy>());
929    
930    /** If true, the user requested the workflow to stop. */
931    private boolean _stopRequested = false;
932}