001/* A director to run DDP models.
002 * 
003 * Copyright (c) 2013 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-11-05 00:26:32 +0000 (Thu, 05 Nov 2015) $' 
008 * '$Revision: 34215 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.director;
031
032import java.lang.reflect.Constructor;
033import java.util.List;
034
035import org.kepler.configuration.ConfigurationManager;
036import org.kepler.configuration.ConfigurationProperty;
037
038import ptolemy.actor.Director;
039import ptolemy.data.BooleanToken;
040import ptolemy.data.IntToken;
041import ptolemy.data.expr.Parameter;
042import ptolemy.data.expr.StringParameter;
043import ptolemy.data.type.BaseType;
044import ptolemy.kernel.CompositeEntity;
045import ptolemy.kernel.util.Attribute;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.InternalErrorException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.Settable;
050import ptolemy.kernel.util.Workspace;
051import ptolemy.util.MessageHandler;
052
053/** A director that converts DDP pattern actors (Map, Reduce, Cross, CoGroup,
054 *  and Match) and I/O actors (DDPDataSink and DDPDataSource) into a job that
055 *  is executed on a DDP engine such as Hadoop or Stratosphere.
056 *  <p>
057 *  <b>NOTE:</b> Only DDP pattern and I/O actors may be present in the
058 *  workflow. Other actors must placed inside the composite pattern actors
059 *  or in a different sub-workflow.
060 *  </p>
061 * 
062 * 
063 * @author Daniel Crawl
064 * @version $Id: DDPDirector.java 34215 2015-11-05 00:26:32Z crawl $
065 */
066public class DDPDirector extends Director {
067        
068    /** Construct a new DDPBaseDirector in a container with a given name. */
069        public DDPDirector(CompositeEntity container, String name)
070                        throws IllegalActionException, NameDuplicationException {
071                super(container, name);
072                _initializeParameters();
073        }
074
075    /** Construct a new DDPBaseDirector for a workspace. */
076        public DDPDirector(Workspace workspace) throws IllegalActionException,
077                        NameDuplicationException {
078                super(workspace);
079                _initializeParameters();
080        }
081        
082        /** React to a change in an attribute. This overrides the base class to
083         *  handles changes to local parameters.
084         */
085        @Override
086        public void attributeChanged(Attribute attribute) throws IllegalActionException {
087
088            if(attribute == engine) {
089                // see if engine actually changed
090                String newEngineStr = engine.stringValue();
091
092            if(newEngineStr.equals("default")) {
093                newEngineStr = _defaultEngine;
094            }
095
096                if(_engine == null || !_engine.getName().equals(newEngineStr)) {
097                DDPEngine newEngine = _getEngine(newEngineStr);
098                if(newEngine == null) {
099                    throw new IllegalActionException(this,
100                            "Unable to load DDP engine " + newEngineStr + ".");
101                } else {
102                    
103                    // remove engine-specific parameters from the director if
104                    // there was a previous engine.
105                    if(_engine != null) {
106                            try {
107                            _engine._removeParameters();
108                        } catch (NameDuplicationException e) {
109                            throw new IllegalActionException(this, e,
110                                    "Error removing parameters for " + _engine.getName());
111                        }
112                    }
113                    _engine = newEngine;
114                   
115                    // update the values in the startServerType parameter
116                    
117                    boolean found = false;
118                    final String oldServerType = startServerType.stringValue();
119                    
120                    startServerType.removeAllChoices();
121                    
122                    String[] serverTypes = _engine.getServerTypes();
123                    for(String typeStr : serverTypes) {
124                        startServerType.addChoice(typeStr);
125                        if(oldServerType.equals(typeStr)) {
126                            found = true;
127                        }
128                    }
129                    
130                    if(found) {
131                        startServerType.setToken(oldServerType);
132                    } else {
133                        startServerType.setToken(serverTypes[0]);
134                    }
135
136                }
137                }
138            } else if(attribute == startServerType) {
139            String newTypeStr = startServerType.stringValue();
140            if(!newTypeStr.isEmpty() && !newTypeStr.equals("default")) {
141                boolean found = false;
142                boolean haveDistributed = false;
143                // see if the start type is found
144                for(String typeStr : startServerType.getChoices()) {
145                    if(typeStr.equals(newTypeStr)) {
146                        found = true;
147                        break;
148                    }
149                    // see if it supports distributed
150                    if(typeStr.equals(DISTRIBUTED_STRING)) {
151                        haveDistributed = true;
152                    }
153                }
154                // backwards compatibility: if the type was set to local or cluster,
155                // change to distributed
156                if(haveDistributed && 
157                        (newTypeStr.equals("local") || newTypeStr.equals("cluster"))) {
158                    startServerType.setExpression(DISTRIBUTED_STRING);
159                    found = true;
160                }
161                
162                if(!found) {
163                    throw new IllegalActionException(this, "Invalid type: " + newTypeStr);
164                }
165            }
166            } else {    
167                if(_engine != null) {
168                        _engine.attributeChanged(attribute);
169                }
170            super.attributeChanged(attribute);
171            }
172        }
173        
174    /** Clone the object into the specified workspace.
175     *  @param workspace The workspace for the cloned object.
176     *  @exception CloneNotSupportedException Not thrown in this base class
177     *  @return The new Attribute.
178     */
179    @Override
180    public Object clone(Workspace workspace) throws CloneNotSupportedException {
181        DDPDirector newObject = (DDPDirector) super.clone(workspace);
182        if(_engine != null) {
183            newObject._engine = (DDPEngine)_engine.clone(workspace);
184            // set the director of the cloned engine to be the cloned director.
185            newObject._engine._director = newObject;
186        }
187        return newObject;
188    }
189
190    /** Run the engine. */
191    @Override
192    public void fire() throws IllegalActionException {
193        _engine.fire();
194    }
195
196    /** Get the dir to redirect display related actors. */
197    public String getDisplayRedirectDir() throws IllegalActionException {
198        return _engine.getDisplayRedirectDir();
199    }
200    
201    /** Get the name of the DDP engine. */
202    public String getEngineName() throws IllegalActionException {
203        if(_engine == null) {
204                return null;
205        } else {
206                return _engine.getName();
207        }
208    }
209    
210    /** Postfire the engine.
211     *  @return If true, engine can be fired again. If false, do not run again.
212     */
213    @Override
214        public boolean postfire() throws IllegalActionException {
215                return _engine.postfire();
216        }
217
218    /** Preinitialize the engine. */
219    @Override
220    public void preinitialize() throws IllegalActionException {
221        super.preinitialize();
222        _engine.preinitialize();
223    }
224
225    /** Stop any running DDP jobs. */
226    @Override
227    public void stop() {
228        super.stop();
229        try {
230            _engine.stop();
231        } catch (IllegalActionException e) {
232            MessageHandler.error("Error stopping DDP job.", e);
233        }
234    }
235    
236    /** Perform any cleanup in the engine. */
237    @Override
238    public void wrapup() throws IllegalActionException {
239        super.wrapup();
240        _engine.wrapup();
241    }
242
243    ///////////////////////////////////////////////////////////////////
244    ////                         parameters                        ////
245
246    /** DDP engine configuration directory. */
247    public StringParameter configDir;
248    
249    /** The default degree of parallelism for ddp pattern actors. 
250     *  This value is used if the actor's degreeOfParallelism <= 0.
251     */
252    public Parameter degreeOfParallelism;
253
254    /** A comma-separated list of jar files to include. If the absolute
255     *  path to the jar is not specified, then the jar should be
256     *  located in a Kepler module. 
257     */
258    public StringParameter includeJars;
259    
260    /** A comma-separated list of arguments for the job. It is only useful
261     *  when DDP actors' logics are described by java classes, not
262     *  sub-workflows.
263     */
264    public StringParameter jobArguments;
265    
266    /** The directory where the display related actors in DDP pattern sub-workflows will save their outputs.
267     *  If it is empty, the display actors will be discarded before execution.
268     *  More information about display redirect can be found at display-redirect module.
269     */
270    public StringParameter displayRedirectDir;
271
272    /** If true, sub-workflows are written to files in a directory. 
273     *  The directory path is printed to stdout when the workflow
274     *  executes.
275     */
276    public Parameter writeSubWorkflowsToFiles;
277
278    /** The execution engine. */
279    public StringParameter engine;
280    
281    /** The type of DDP server to start (if one is not running). */
282    public StringParameter startServerType;
283
284    /** String for serverType parameter for running DDP Engine in the same JVM. */
285    public final static String SAME_JVM_STRING = "sameJVM";
286
287    /** String for serverType parameter for running DDP Engine in a distributed environment. */
288    public final static String DISTRIBUTED_STRING = "distributed";
289
290    /** The value used to signify the default degree of parallelism for
291     *  the degreeOfParallelism parameter.
292     */
293    public static final IntToken DEFAULT_INTTOKEN = new IntToken(-1);
294
295    /** The name of the DEFAULT degree of parallelism parameter. */
296    public static final String DEFAULT_NAME = "DEFAULT";
297    
298    ///////////////////////////////////////////////////////////////////
299    ////                      private methods                    //////
300        
301    /** Get a DDP engine.
302     *  @param name The DDP engine name.
303     *  @return The DDP engine or null if the engine name could not be
304     *  found in the configuration file. 
305     */
306    private DDPEngine _getEngine(String name) {
307        
308        // try to find the engine name in the configuration file
309        List<ConfigurationProperty> engineProperties = ConfigurationManager
310                .getInstance()
311                .getProperty(ConfigurationManager.getModule("ddp-common"))
312                .getProperties("Engines.Engine");   
313        
314        if(engineProperties == null || engineProperties.isEmpty()) {
315            throw new InternalErrorException("No DDP engines found.");
316        }
317        
318        String engineClassName = null;
319        
320        for(ConfigurationProperty engineProperty : engineProperties) {
321            ConfigurationProperty nameProperty = engineProperty.getProperty("Name");
322            if(nameProperty == null) {
323                throw new InternalErrorException("DDP Engine does not have a name. Try deleting\n" +
324                        "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" +
325                        "and restarting Kepler.");
326            }
327            
328            if (nameProperty.getValue().equalsIgnoreCase(name)) {
329                ConfigurationProperty classProperty = engineProperty.getProperty("ImplementationClass");
330                if(classProperty == null) {
331                    throw new InternalErrorException("DDP Engine does not have an implementation class.\n" +
332                            "Try deleting " +
333                            "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" +
334                            "and restarting Kepler.");
335                }
336                engineClassName = classProperty.getValue();
337                break;
338            }
339        }
340        
341        // try to instantiate the engine        
342        DDPEngine newEngine = null;
343        if(engineClassName != null) {
344            try {
345                Class<?> clazz = Class.forName(engineClassName);
346                Constructor<?> constructor = clazz.getConstructor(DDPDirector.class);
347                newEngine = (DDPEngine) constructor.newInstance(new Object[] {this});
348            } catch(Exception e) {
349                throw new InternalErrorException(this, e, "Could not instantiate " + name + " engine.");
350            }
351        }
352        
353        return newEngine;
354    }
355    
356    /** Create parameters and the default engine. */
357        private void _initializeParameters() {
358            
359            try {                       
360                        jobArguments = new StringParameter(this, "jobArguments");
361                } catch (Throwable t) {
362                        throw new InternalErrorException(this, t,
363                                        "Cannot create jobArguments parameter.");
364                }
365                
366                try {
367                        configDir = new StringParameter(this, "configDir");
368                } catch (Throwable t) {
369                        throw new InternalErrorException(this, t,
370                                        "Cannot create configDir parameter.");
371                }
372
373                try {
374                        writeSubWorkflowsToFiles = new Parameter(this,
375                                        "writeSubWorkflowsToFiles");
376                        writeSubWorkflowsToFiles.setTypeEquals(BaseType.BOOLEAN);
377                        writeSubWorkflowsToFiles.setToken(BooleanToken.FALSE);
378                } catch (Throwable t) {
379                        throw new InternalErrorException(this, t,
380                                        "Cannot create writeModelsToFiles parameter.");
381                }
382
383                try {
384                        includeJars = new StringParameter(this, "includeJars");
385                } catch (Throwable t) {
386                        throw new InternalErrorException(this, t,
387                                        "Cannot create includeJars parameter.");
388                }
389                
390                try {
391                        displayRedirectDir = new StringParameter(this, "displayRedirectDir");
392                } catch (Throwable t) {
393                        throw new InternalErrorException(this, t,
394                                        "Cannot create displayRedirectDir parameter.");
395                }
396                
397                try {
398            Parameter DEFAULT = new Parameter(this, DEFAULT_NAME);
399            DEFAULT.setToken(DEFAULT_INTTOKEN);
400            DEFAULT.setVisibility(Settable.EXPERT);
401            DEFAULT.setPersistent(false);
402                } catch(Throwable t) {
403                    throw new InternalErrorException(this, t,
404                            "Cannot create DEFAULT parameter.");
405                }
406                
407        try {
408            degreeOfParallelism = new Parameter(this, "degreeOfParallelism");
409        } catch(Throwable t) {
410            throw new InternalErrorException(this, t, "Cannot create degreeOfParallelism parameter.");
411        }
412        degreeOfParallelism.setExpression(DEFAULT_NAME);
413        degreeOfParallelism.addChoice(DEFAULT_NAME);
414        
415        // hide these parameters since they are not used.
416        startTime.setVisibility(Settable.NONE);
417        stopTime.setVisibility(Settable.NONE);
418        localClock.setVisibility(Settable.NONE);
419
420        try {
421            startServerType = new StringParameter(this, "startServerType");
422            startServerType.setToken("default");
423        } catch(Throwable t) {
424            throw new InternalErrorException(this, t, "Cannot create startServerType parameter.");
425        }
426        
427        try {
428            engine = new StringParameter(this, "engine");
429        } catch (Throwable t) {
430            throw new InternalErrorException(this, t,
431                    "Cannot create engine parameter.");
432        }
433
434        // NOTE: all the parameters must be created before setting the engine,
435        // since the engine may reference the parameters.
436
437        // get the default engine name
438        ConfigurationProperty defaultProperty = ConfigurationManager
439                .getInstance()
440                .getProperty(ConfigurationManager.getModule("ddp-common"))
441                .getProperty("Engines.default");
442        if(defaultProperty == null || defaultProperty.getValue().trim().isEmpty()) {
443            System.err.println("WARNING: default DDP engine not found in configuration files; using Spark.");
444            _defaultEngine = "Spark";
445        } else {
446            _defaultEngine = defaultProperty.getValue().trim();
447        }
448        
449        // load the default engine
450        try {
451            engine.setToken("default");
452        } catch (IllegalActionException e) {
453            throw new InternalErrorException(this, e, "Error getting default DDP engine.");
454        }
455        
456        // add choices for all engines
457        List<ConfigurationProperty> engineProperties = ConfigurationManager
458                .getInstance()
459                .getProperty(ConfigurationManager.getModule("ddp-common"))
460                .getProperties("Engines.Engine");   
461        
462        if(engineProperties == null || engineProperties.isEmpty()) {
463            throw new InternalErrorException("No DDP engines found.");
464        }
465         
466        engine.addChoice("default");
467        for(ConfigurationProperty engineProperty : engineProperties) {
468            ConfigurationProperty nameProperty = engineProperty.getProperty("Name");
469            if(nameProperty == null) {
470                throw new InternalErrorException("DDP Engine does not have a name. Try deleting\n" +
471                        "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" +
472                        "and restarting Kepler.");
473            }
474            engine.addChoice(nameProperty.getValue());
475        }
476        }
477            
478    ///////////////////////////////////////////////////////////////////
479    ////                      private fields                     //////
480
481        /** The engine object. */
482        private DDPEngine _engine;
483        
484        private String _defaultEngine;
485}