001/* A director to run DDP models. 002 * 003 * Copyright (c) 2013 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-11-05 00:26:32 +0000 (Thu, 05 Nov 2015) $' 008 * '$Revision: 34215 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.director; 031 032import java.lang.reflect.Constructor; 033import java.util.List; 034 035import org.kepler.configuration.ConfigurationManager; 036import org.kepler.configuration.ConfigurationProperty; 037 038import ptolemy.actor.Director; 039import ptolemy.data.BooleanToken; 040import ptolemy.data.IntToken; 041import ptolemy.data.expr.Parameter; 042import ptolemy.data.expr.StringParameter; 043import ptolemy.data.type.BaseType; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.util.Attribute; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.InternalErrorException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.Settable; 050import ptolemy.kernel.util.Workspace; 051import ptolemy.util.MessageHandler; 052 053/** A director that converts DDP pattern actors (Map, Reduce, Cross, CoGroup, 054 * and Match) and I/O actors (DDPDataSink and DDPDataSource) into a job that 055 * is executed on a DDP engine such as Hadoop or Stratosphere. 056 * <p> 057 * <b>NOTE:</b> Only DDP pattern and I/O actors may be present in the 058 * workflow. Other actors must placed inside the composite pattern actors 059 * or in a different sub-workflow. 060 * </p> 061 * 062 * 063 * @author Daniel Crawl 064 * @version $Id: DDPDirector.java 34215 2015-11-05 00:26:32Z crawl $ 065 */ 066public class DDPDirector extends Director { 067 068 /** Construct a new DDPBaseDirector in a container with a given name. */ 069 public DDPDirector(CompositeEntity container, String name) 070 throws IllegalActionException, NameDuplicationException { 071 super(container, name); 072 _initializeParameters(); 073 } 074 075 /** Construct a new DDPBaseDirector for a workspace. */ 076 public DDPDirector(Workspace workspace) throws IllegalActionException, 077 NameDuplicationException { 078 super(workspace); 079 _initializeParameters(); 080 } 081 082 /** React to a change in an attribute. This overrides the base class to 083 * handles changes to local parameters. 084 */ 085 @Override 086 public void attributeChanged(Attribute attribute) throws IllegalActionException { 087 088 if(attribute == engine) { 089 // see if engine actually changed 090 String newEngineStr = engine.stringValue(); 091 092 if(newEngineStr.equals("default")) { 093 newEngineStr = _defaultEngine; 094 } 095 096 if(_engine == null || !_engine.getName().equals(newEngineStr)) { 097 DDPEngine newEngine = _getEngine(newEngineStr); 098 if(newEngine == null) { 099 throw new IllegalActionException(this, 100 "Unable to load DDP engine " + newEngineStr + "."); 101 } else { 102 103 // remove engine-specific parameters from the director if 104 // there was a previous engine. 105 if(_engine != null) { 106 try { 107 _engine._removeParameters(); 108 } catch (NameDuplicationException e) { 109 throw new IllegalActionException(this, e, 110 "Error removing parameters for " + _engine.getName()); 111 } 112 } 113 _engine = newEngine; 114 115 // update the values in the startServerType parameter 116 117 boolean found = false; 118 final String oldServerType = startServerType.stringValue(); 119 120 startServerType.removeAllChoices(); 121 122 String[] serverTypes = _engine.getServerTypes(); 123 for(String typeStr : serverTypes) { 124 startServerType.addChoice(typeStr); 125 if(oldServerType.equals(typeStr)) { 126 found = true; 127 } 128 } 129 130 if(found) { 131 startServerType.setToken(oldServerType); 132 } else { 133 startServerType.setToken(serverTypes[0]); 134 } 135 136 } 137 } 138 } else if(attribute == startServerType) { 139 String newTypeStr = startServerType.stringValue(); 140 if(!newTypeStr.isEmpty() && !newTypeStr.equals("default")) { 141 boolean found = false; 142 boolean haveDistributed = false; 143 // see if the start type is found 144 for(String typeStr : startServerType.getChoices()) { 145 if(typeStr.equals(newTypeStr)) { 146 found = true; 147 break; 148 } 149 // see if it supports distributed 150 if(typeStr.equals(DISTRIBUTED_STRING)) { 151 haveDistributed = true; 152 } 153 } 154 // backwards compatibility: if the type was set to local or cluster, 155 // change to distributed 156 if(haveDistributed && 157 (newTypeStr.equals("local") || newTypeStr.equals("cluster"))) { 158 startServerType.setExpression(DISTRIBUTED_STRING); 159 found = true; 160 } 161 162 if(!found) { 163 throw new IllegalActionException(this, "Invalid type: " + newTypeStr); 164 } 165 } 166 } else { 167 if(_engine != null) { 168 _engine.attributeChanged(attribute); 169 } 170 super.attributeChanged(attribute); 171 } 172 } 173 174 /** Clone the object into the specified workspace. 175 * @param workspace The workspace for the cloned object. 176 * @exception CloneNotSupportedException Not thrown in this base class 177 * @return The new Attribute. 178 */ 179 @Override 180 public Object clone(Workspace workspace) throws CloneNotSupportedException { 181 DDPDirector newObject = (DDPDirector) super.clone(workspace); 182 if(_engine != null) { 183 newObject._engine = (DDPEngine)_engine.clone(workspace); 184 // set the director of the cloned engine to be the cloned director. 185 newObject._engine._director = newObject; 186 } 187 return newObject; 188 } 189 190 /** Run the engine. */ 191 @Override 192 public void fire() throws IllegalActionException { 193 _engine.fire(); 194 } 195 196 /** Get the dir to redirect display related actors. */ 197 public String getDisplayRedirectDir() throws IllegalActionException { 198 return _engine.getDisplayRedirectDir(); 199 } 200 201 /** Get the name of the DDP engine. */ 202 public String getEngineName() throws IllegalActionException { 203 if(_engine == null) { 204 return null; 205 } else { 206 return _engine.getName(); 207 } 208 } 209 210 /** Postfire the engine. 211 * @return If true, engine can be fired again. If false, do not run again. 212 */ 213 @Override 214 public boolean postfire() throws IllegalActionException { 215 return _engine.postfire(); 216 } 217 218 /** Preinitialize the engine. */ 219 @Override 220 public void preinitialize() throws IllegalActionException { 221 super.preinitialize(); 222 _engine.preinitialize(); 223 } 224 225 /** Stop any running DDP jobs. */ 226 @Override 227 public void stop() { 228 super.stop(); 229 try { 230 _engine.stop(); 231 } catch (IllegalActionException e) { 232 MessageHandler.error("Error stopping DDP job.", e); 233 } 234 } 235 236 /** Perform any cleanup in the engine. */ 237 @Override 238 public void wrapup() throws IllegalActionException { 239 super.wrapup(); 240 _engine.wrapup(); 241 } 242 243 /////////////////////////////////////////////////////////////////// 244 //// parameters //// 245 246 /** DDP engine configuration directory. */ 247 public StringParameter configDir; 248 249 /** The default degree of parallelism for ddp pattern actors. 250 * This value is used if the actor's degreeOfParallelism <= 0. 251 */ 252 public Parameter degreeOfParallelism; 253 254 /** A comma-separated list of jar files to include. If the absolute 255 * path to the jar is not specified, then the jar should be 256 * located in a Kepler module. 257 */ 258 public StringParameter includeJars; 259 260 /** A comma-separated list of arguments for the job. It is only useful 261 * when DDP actors' logics are described by java classes, not 262 * sub-workflows. 263 */ 264 public StringParameter jobArguments; 265 266 /** The directory where the display related actors in DDP pattern sub-workflows will save their outputs. 267 * If it is empty, the display actors will be discarded before execution. 268 * More information about display redirect can be found at display-redirect module. 269 */ 270 public StringParameter displayRedirectDir; 271 272 /** If true, sub-workflows are written to files in a directory. 273 * The directory path is printed to stdout when the workflow 274 * executes. 275 */ 276 public Parameter writeSubWorkflowsToFiles; 277 278 /** The execution engine. */ 279 public StringParameter engine; 280 281 /** The type of DDP server to start (if one is not running). */ 282 public StringParameter startServerType; 283 284 /** String for serverType parameter for running DDP Engine in the same JVM. */ 285 public final static String SAME_JVM_STRING = "sameJVM"; 286 287 /** String for serverType parameter for running DDP Engine in a distributed environment. */ 288 public final static String DISTRIBUTED_STRING = "distributed"; 289 290 /** The value used to signify the default degree of parallelism for 291 * the degreeOfParallelism parameter. 292 */ 293 public static final IntToken DEFAULT_INTTOKEN = new IntToken(-1); 294 295 /** The name of the DEFAULT degree of parallelism parameter. */ 296 public static final String DEFAULT_NAME = "DEFAULT"; 297 298 /////////////////////////////////////////////////////////////////// 299 //// private methods ////// 300 301 /** Get a DDP engine. 302 * @param name The DDP engine name. 303 * @return The DDP engine or null if the engine name could not be 304 * found in the configuration file. 305 */ 306 private DDPEngine _getEngine(String name) { 307 308 // try to find the engine name in the configuration file 309 List<ConfigurationProperty> engineProperties = ConfigurationManager 310 .getInstance() 311 .getProperty(ConfigurationManager.getModule("ddp-common")) 312 .getProperties("Engines.Engine"); 313 314 if(engineProperties == null || engineProperties.isEmpty()) { 315 throw new InternalErrorException("No DDP engines found."); 316 } 317 318 String engineClassName = null; 319 320 for(ConfigurationProperty engineProperty : engineProperties) { 321 ConfigurationProperty nameProperty = engineProperty.getProperty("Name"); 322 if(nameProperty == null) { 323 throw new InternalErrorException("DDP Engine does not have a name. Try deleting\n" + 324 "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" + 325 "and restarting Kepler."); 326 } 327 328 if (nameProperty.getValue().equalsIgnoreCase(name)) { 329 ConfigurationProperty classProperty = engineProperty.getProperty("ImplementationClass"); 330 if(classProperty == null) { 331 throw new InternalErrorException("DDP Engine does not have an implementation class.\n" + 332 "Try deleting " + 333 "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" + 334 "and restarting Kepler."); 335 } 336 engineClassName = classProperty.getValue(); 337 break; 338 } 339 } 340 341 // try to instantiate the engine 342 DDPEngine newEngine = null; 343 if(engineClassName != null) { 344 try { 345 Class<?> clazz = Class.forName(engineClassName); 346 Constructor<?> constructor = clazz.getConstructor(DDPDirector.class); 347 newEngine = (DDPEngine) constructor.newInstance(new Object[] {this}); 348 } catch(Exception e) { 349 throw new InternalErrorException(this, e, "Could not instantiate " + name + " engine."); 350 } 351 } 352 353 return newEngine; 354 } 355 356 /** Create parameters and the default engine. */ 357 private void _initializeParameters() { 358 359 try { 360 jobArguments = new StringParameter(this, "jobArguments"); 361 } catch (Throwable t) { 362 throw new InternalErrorException(this, t, 363 "Cannot create jobArguments parameter."); 364 } 365 366 try { 367 configDir = new StringParameter(this, "configDir"); 368 } catch (Throwable t) { 369 throw new InternalErrorException(this, t, 370 "Cannot create configDir parameter."); 371 } 372 373 try { 374 writeSubWorkflowsToFiles = new Parameter(this, 375 "writeSubWorkflowsToFiles"); 376 writeSubWorkflowsToFiles.setTypeEquals(BaseType.BOOLEAN); 377 writeSubWorkflowsToFiles.setToken(BooleanToken.FALSE); 378 } catch (Throwable t) { 379 throw new InternalErrorException(this, t, 380 "Cannot create writeModelsToFiles parameter."); 381 } 382 383 try { 384 includeJars = new StringParameter(this, "includeJars"); 385 } catch (Throwable t) { 386 throw new InternalErrorException(this, t, 387 "Cannot create includeJars parameter."); 388 } 389 390 try { 391 displayRedirectDir = new StringParameter(this, "displayRedirectDir"); 392 } catch (Throwable t) { 393 throw new InternalErrorException(this, t, 394 "Cannot create displayRedirectDir parameter."); 395 } 396 397 try { 398 Parameter DEFAULT = new Parameter(this, DEFAULT_NAME); 399 DEFAULT.setToken(DEFAULT_INTTOKEN); 400 DEFAULT.setVisibility(Settable.EXPERT); 401 DEFAULT.setPersistent(false); 402 } catch(Throwable t) { 403 throw new InternalErrorException(this, t, 404 "Cannot create DEFAULT parameter."); 405 } 406 407 try { 408 degreeOfParallelism = new Parameter(this, "degreeOfParallelism"); 409 } catch(Throwable t) { 410 throw new InternalErrorException(this, t, "Cannot create degreeOfParallelism parameter."); 411 } 412 degreeOfParallelism.setExpression(DEFAULT_NAME); 413 degreeOfParallelism.addChoice(DEFAULT_NAME); 414 415 // hide these parameters since they are not used. 416 startTime.setVisibility(Settable.NONE); 417 stopTime.setVisibility(Settable.NONE); 418 localClock.setVisibility(Settable.NONE); 419 420 try { 421 startServerType = new StringParameter(this, "startServerType"); 422 startServerType.setToken("default"); 423 } catch(Throwable t) { 424 throw new InternalErrorException(this, t, "Cannot create startServerType parameter."); 425 } 426 427 try { 428 engine = new StringParameter(this, "engine"); 429 } catch (Throwable t) { 430 throw new InternalErrorException(this, t, 431 "Cannot create engine parameter."); 432 } 433 434 // NOTE: all the parameters must be created before setting the engine, 435 // since the engine may reference the parameters. 436 437 // get the default engine name 438 ConfigurationProperty defaultProperty = ConfigurationManager 439 .getInstance() 440 .getProperty(ConfigurationManager.getModule("ddp-common")) 441 .getProperty("Engines.default"); 442 if(defaultProperty == null || defaultProperty.getValue().trim().isEmpty()) { 443 System.err.println("WARNING: default DDP engine not found in configuration files; using Spark."); 444 _defaultEngine = "Spark"; 445 } else { 446 _defaultEngine = defaultProperty.getValue().trim(); 447 } 448 449 // load the default engine 450 try { 451 engine.setToken("default"); 452 } catch (IllegalActionException e) { 453 throw new InternalErrorException(this, e, "Error getting default DDP engine."); 454 } 455 456 // add choices for all engines 457 List<ConfigurationProperty> engineProperties = ConfigurationManager 458 .getInstance() 459 .getProperty(ConfigurationManager.getModule("ddp-common")) 460 .getProperties("Engines.Engine"); 461 462 if(engineProperties == null || engineProperties.isEmpty()) { 463 throw new InternalErrorException("No DDP engines found."); 464 } 465 466 engine.addChoice("default"); 467 for(ConfigurationProperty engineProperty : engineProperties) { 468 ConfigurationProperty nameProperty = engineProperty.getProperty("Name"); 469 if(nameProperty == null) { 470 throw new InternalErrorException("DDP Engine does not have a name. Try deleting\n" + 471 "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" + 472 "and restarting Kepler."); 473 } 474 engine.addChoice(nameProperty.getValue()); 475 } 476 } 477 478 /////////////////////////////////////////////////////////////////// 479 //// private fields ////// 480 481 /** The engine object. */ 482 private DDPEngine _engine; 483 484 private String _defaultEngine; 485}