001/* An atomic actor for DDP patterns. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-09-08 23:43:03 +0000 (Tue, 08 Sep 2015) $' 008 * '$Revision: 33875 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern; 031 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Set; 037 038import org.kepler.build.modules.Module; 039import org.kepler.configuration.ConfigurationManager; 040import org.kepler.configuration.ConfigurationProperty; 041import org.kepler.configuration.ConfigurationUtilities; 042import org.kepler.ddp.director.DDPDirector; 043 044import ptolemy.actor.Director; 045import ptolemy.actor.TypedAtomicActor; 046import ptolemy.data.IntToken; 047import ptolemy.data.StringToken; 048import ptolemy.data.Token; 049import ptolemy.data.expr.Parameter; 050import ptolemy.data.expr.StringParameter; 051import ptolemy.data.type.Type; 052import ptolemy.kernel.CompositeEntity; 053import ptolemy.kernel.util.Attribute; 054import ptolemy.kernel.util.IllegalActionException; 055import ptolemy.kernel.util.NameDuplicationException; 056import ptolemy.kernel.util.Settable; 057import ptolemy.kernel.util.Workspace; 058import ptolemy.util.MessageHandler; 059 060/** 061 * An atomic actor for DDP patterns. 062 * 063 * @author Daniel Crawl 064 * @version $Id: AtomicPatternActor.java 33875 2015-09-08 23:43:03Z crawl $ 065 */ 066public class AtomicPatternActor extends TypedAtomicActor implements DDPPatternActor { 067 068 /** Construct a new AtomicPatternActor in a container with a given name. */ 069 public AtomicPatternActor(CompositeEntity container, String name) 070 throws IllegalActionException, NameDuplicationException { 071 super(container, name); 072 073 degreeOfParallelism = new Parameter(this, "degreeOfParallelism"); 074 degreeOfParallelism.setExpression("0"); 075 076 formatType = new StringParameter(this, "formatType"); 077 078 keyValueTypes = new StringParameter(this, "keyValueTypes"); 079 for(String type : _commonKeyValueTypes) { 080 keyValueTypes.addChoice(type); 081 } 082 083 jars = new StringParameter(this, "jars"); 084 085 /* This is not supported for the DDP I/O actors. 086 printExeInfo = new Parameter(this, 087 "printExeSummary", BooleanToken.FALSE); 088 printExeInfo.setTypeEquals(BaseType.BOOLEAN); 089 */ 090 } 091 092 /** See if the formatType parameter changed. */ 093 @Override 094 public void attributeChanged(Attribute attribute) throws IllegalActionException { 095 096 if (attribute == formatType) { 097 String newFormatStr = formatType.stringValue(); 098 _setFormat(newFormatStr); 099 } else { 100 super.attributeChanged(attribute); 101 } 102 } 103 104 /** Clone this actor into the specified workspace. */ 105 @Override 106 public Object clone(Workspace workspace) throws CloneNotSupportedException { 107 AtomicPatternActor newObject = (AtomicPatternActor) super.clone(workspace); 108 newObject._keyType = null; 109 newObject._valueType = null; 110 newObject._formatTypeStr = _formatTypeStr; 111 newObject._formatProperty = null; 112 newObject._formatProperties = null; 113 newObject._FORMAT_TYPE_CATEGORY = _FORMAT_TYPE_CATEGORY; 114 return newObject; 115 } 116 117 /** Get the number of parallel instances to execute. */ 118 @Override 119 public int getDegreeOfParallelism() throws IllegalActionException { 120 return ((IntToken)degreeOfParallelism.getToken()).intValue(); 121 } 122 123 @Override 124 public String getDisplayRedirectDir() throws IllegalActionException { 125 return ""; 126 } 127 128 /** Get the name of the execution class. In this class, always 129 * returns the empty string. 130 */ 131 @Override 132 public String getExecutionClassName() { 133 return ""; 134 } 135 136 /** Get the execution code type. Returns null if not set. */ 137 @Override 138 public String getExecutionCodeType() throws IllegalActionException { 139 return null; 140 } 141 142 /** Get the execution code. Returns null if not set. */ 143 @Override 144 public String getExecutionCode() throws IllegalActionException { 145 return null; 146 } 147 148 /** Get the name of the class that provides the format type implementation 149 * for a specific engine. 150 */ 151 public String getFormatClassName(String engineName) throws IllegalActionException { 152 153 String className = null; 154 155 final ConfigurationProperty formatProperty = _getFormatProperty(engineName); 156 if (formatProperty != null) { 157 158 // we found the format with the same name as _formatTypeStr, 159 // and has an implementation class for the same engine 160 161 // now get the class name. 162 className = 163 formatProperty.getProperty("ImplementationClass").getProperty(engineName).getValue(); 164 165 // make sure class is specified for the engine 166 if (className == null) { 167 throw new IllegalActionException(this, "Format " 168 + _formatTypeStr 169 + " does not have an implementation class for " 170 + engineName); 171 } 172 } 173 174 // if we didn't find the format in the config file, the format type 175 // could be a class name. 176 if (className == null) { 177 className = _formatTypeStr; 178 } 179 180 return className; 181 } 182 183 /** Get a comma-separated list of jars to use with this actor. */ 184 @Override 185 public String getJars() throws IllegalActionException { 186 return jars.stringValue(); 187 } 188 189 /** Get whether print execution summary when running workflow 190 * inside of Hadoop/Stratosphere job. In this class, returns 191 * false since printing execution information is not supported 192 * in the DDP I/O actors. 193 */ 194 @Override 195 public boolean getPrintExeInfo() throws IllegalActionException { 196 return false; 197 /* 198 return ((BooleanToken) printExeInfo 199 .getToken()).booleanValue(); 200 */ 201 } 202 203 /** Get a set of name-value pairs of input/output format parameters for the execution engine. */ 204 @Override 205 public java.util.Map<String, String> getParameters() throws IllegalActionException { 206 207 final java.util.Map<String, String> retval = new HashMap<String, String>(); 208 209 // find all the parameters containing an attribute named 210 // _associatedWithFormatType 211 for (Parameter parameter : attributeList(Parameter.class)) { 212 if (parameter.getAttribute(FORMAT_TYPE_PARAMETER_NAME) != null) { 213 // add the name and value to the return map. 214 final Token token = parameter.getToken(); 215 String value; 216 if (token instanceof StringToken) { 217 value = ((StringToken)token).stringValue(); 218 } else { 219 value = token.toString(); 220 } 221 retval.put(parameter.getName(), value); 222 } 223 } 224 225 return retval; 226 } 227 228 /** Get a set of (kepler name, implementation name) pairs of input/output format parameters for the execution engine. */ 229 @Override 230 public java.util.Map<String, String> getParaImplNames(String engineName) throws IllegalActionException { 231 232 final java.util.Map<String, String> retval = new HashMap<String, String>(); 233 List<ConfigurationProperty> parameterPairs = null; 234 if (_formatProperty != null) 235 parameterPairs = _formatProperty.getProperties("Parameters.pair"); 236 if (parameterPairs != null) { 237 ConfigurationProperty nameInEngine; 238 for (ConfigurationProperty parameterPair: parameterPairs) { 239 nameInEngine = parameterPair.getProperty(engineName + "Name"); 240 retval.put(parameterPair.getProperty("name").getValue(), nameInEngine == null ? null : nameInEngine.getValue()); 241 } 242 } 243 return retval; 244 } 245 246 /** Update the port types based on the format type. */ 247 @Override 248 public void preinitialize() throws IllegalActionException { 249 250 super.preinitialize(); 251 252 Director director = getDirector(); 253 if (!(director instanceof DDPDirector)) { 254 throw new IllegalActionException(this, 255 "This actor only executes with the DDP director."); 256 } 257 258 // set the types based on the format type 259 _formatProperty = _getFormatProperty(((DDPDirector) director) 260 .getEngineName()); 261 //formatProperty.prettyPrint(); 262 _setTypesFromConfiguration(_formatProperty); 263 264 } 265 266 /////////////////////////////////////////////////////////////////// 267 //// public fields //// 268 269 /** The number of parallel instances to execute. */ 270 public Parameter degreeOfParallelism; 271 272 /** The name of the data format or a fully-qualified class name to 273 * parse the data. 274 */ 275 public StringParameter formatType; 276 277 /** A comma-separated list of jars to use with this actor. The 278 * full path to each jar must be specified. 279 */ 280 public StringParameter jars; 281 282 /** A boolean parameter for whether print execution summary when 283 * running as Hadoop/Stratosphere/etc jobs. 284 * NOTE: this is not support for DDP I/O actors. 285 */ 286 //public Parameter printExeInfo; 287 288 /** The key value types of the atomic patter actor. This parameter should only be set 289 * when a class name is specified for formatType parameter. 290 */ 291 public StringParameter keyValueTypes; 292 293 /////////////////////////////////////////////////////////////////// 294 //// protected methods //// 295 296 /** Add the formats in the configuration file as choices to formatType. */ 297 protected void _addFormats(String inputOutputName) { 298 final ConfigurationManager manager = ConfigurationManager.getInstance(); 299 final Module module = ConfigurationManager.getModule("ddp-common"); 300 final ConfigurationProperty propertyRoot = manager.getProperty(module); 301 _formatProperties = propertyRoot.getProperties(inputOutputName 302 + "Formats.Format"); 303 304 // use a set to store the names to remove duplicates 305 Set<String> names = new HashSet<String>(); 306 for (ConfigurationProperty formatProperty : _formatProperties) { 307 ConfigurationProperty nameProperty = formatProperty.getProperty("Name"); 308 if(nameProperty == null) { 309 MessageHandler.error("Configuration format does not have a name. Try deleting\n" + 310 "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" + 311 "and restarting Kepler."); 312 return; 313 } else { 314 names.add(nameProperty.getValue()); 315 } 316 } 317 318 // sort the names alphabetically 319 String[] namesArray = names.toArray(new String[names.size()]); 320 Arrays.sort(namesArray); 321 for (String name : namesArray) { 322 formatType.addChoice(name); 323 } 324 } 325 326 /** Set the format, and add any associated parameters. This method also sets 327 * the port types for the keys and values specified in the configuration. 328 * 329 * @param newFormatTypeStr the format name or a class name. 330 */ 331 protected void _setFormat(String newFormatTypeStr) throws IllegalActionException { 332 333 // see if the type has changed 334 if (_formatTypeStr == null || !newFormatTypeStr.equals(_formatTypeStr)) { 335 336 _formatTypeStr = newFormatTypeStr; 337 338 java.util.Map<String, String> associatedParameterPairs = null; 339 340 final ConfigurationProperty formatProperty = _getFormatProperty(null); 341 342 if (formatProperty != null) { 343 344 //formatProperty.prettyPrint(); 345 346 // see if it has associated parameters 347 final ConfigurationProperty parametersProperty = 348 formatProperty.getProperty("Parameters"); 349 if (parametersProperty != null) { 350 // get a map of names and values 351 associatedParameterPairs = ConfigurationUtilities 352 .getPairsMap(parametersProperty); 353 } 354 355 _setTypesFromConfiguration(formatProperty); 356 } 357 358 // see if we could not find the type in the configuration file 359 // the type may be a class name, so try to instantiate it. 360 if (formatProperty == null) { 361 362 // first check for classes that were in 0.1 WordCount 363 if (newFormatTypeStr.equals("eu.stratosphere.pact.example.wordcount.WordCount$LineInFormat")) { 364 System.out.println("class WordCount$LineInFormat no longer exists; using TextInputFormat"); 365 newFormatTypeStr = "eu.stratosphere.pact.common.io.TextInputFormat"; 366 } else if (newFormatTypeStr.equals("eu.stratosphere.pact.example.wordcount.WordCount$WordCountOutFormat")) { 367 System.out.println("class WordCount$WordCountOutFormat no longer exists; using RecordOutputFormat"); 368 newFormatTypeStr = "eu.stratosphere.pact.common.io.RecordOutputFormat"; 369 } else { 370 // try to load the class 371 try { 372 /*final Class<?> clazz =*/ Class.forName(newFormatTypeStr); 373 } catch (ClassNotFoundException e) { 374 //the exception might be that the class is from outside jar, do not throw error here. 375// throw new IllegalActionException( 376// this, 377// "Format type " 378// + newFormatTypeStr 379// + " was not found in the configuration file and class " 380// + newFormatTypeStr 381// + " was not found on the classpath."); 382 } 383 } 384 } 385 386 // remove parameters associated with the old format type that 387 // have different names than those associated with the new type 388 for (Parameter parameter : attributeList(Parameter.class)) { 389 if (parameter.getAttribute(FORMAT_TYPE_PARAMETER_NAME) != null 390 && (associatedParameterPairs == null || !associatedParameterPairs 391 .containsKey(parameter.getName()))) { 392 try { 393 // System.out.println("removing " + 394 // parameter.getName()); 395 parameter.setContainer(null); 396 } catch (NameDuplicationException e) { 397 throw new IllegalActionException(this, e, 398 "Error removing parameter " 399 + parameter.getName()); 400 } 401 } 402 } 403 404 // add any associated parameters 405 if (associatedParameterPairs != null) { 406 for (java.util.Map.Entry<String, String> entry : associatedParameterPairs.entrySet()) { 407 final String name = entry.getKey(); 408 409 // see if the parameter is already there 410 if (getAttribute(name) == null) { 411 Parameter associatedParameter; 412 try { 413 // System.out.println("adding " + name); 414 // create a parameter for the name and value 415 associatedParameter = new Parameter(this, name); 416 417 // add a parameter inside the new parameter so we 418 // can remove it when 419 // the format type changes 420 final Parameter hiddenParameter = new Parameter( 421 associatedParameter, 422 FORMAT_TYPE_PARAMETER_NAME); 423 hiddenParameter.setVisibility(Settable.NONE); 424 } catch (NameDuplicationException e) { 425 throw new IllegalActionException(this, e, 426 "Error adding parameter " + name); 427 } 428 associatedParameter.setExpression(entry.getValue()); 429 } 430 } 431 } 432 } 433 434 } 435 436 /** Set the key and value types from the types in the configuration property. */ 437 protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty) 438 throws IllegalActionException { 439 440 // see if it has the Types properties 441 final ConfigurationProperty typesProperty = formatProperty 442 .getProperty("Types"); 443 444 // make sure the Types property was found 445 if (typesProperty == null) { 446 throw new IllegalActionException( 447 "No Types specified for format type " + _formatTypeStr); 448 } 449 450 // make sure the Key property under Types was found 451 final ConfigurationProperty keyProperty = typesProperty 452 .getProperty("Key"); 453 if (keyProperty == null) { 454 throw new IllegalActionException( 455 "No Key type specified for format type " + _formatTypeStr); 456 } 457 // read the key type 458 String typeStr = keyProperty.getValue(); 459 460 final Type keyType = Types.getTypeFromString(typeStr); 461 if (keyType == null) { 462 throw new IllegalActionException(this, 463 "Could not find Ptolemy type for key type " + typeStr 464 + " for format " + _formatTypeStr); 465 } 466 _keyType = keyType; 467 //System.out.println(getName() + " change key type to " + _keyType); 468 469 // make sure the Value property under Types was found 470 final ConfigurationProperty valueProperty = typesProperty 471 .getProperty("Value"); 472 if (valueProperty == null) { 473 throw new IllegalActionException( 474 "No Value type specified for format type " + _formatTypeStr); 475 } 476 // read the value type 477 typeStr = valueProperty.getValue(); 478 479 final Type valueType = Types.getTypeFromString(typeStr); 480 481 if (valueType == null) { 482 throw new IllegalActionException(this, 483 "Could not find Ptolemy type for value type " + typeStr 484 + " for format " + _formatTypeStr); 485 } 486 _valueType = valueType; 487 //System.out.println(getName() + " change value type to " + _valueType); 488 489 // update the port type 490 _updateKeyValueTypes(); 491 } 492 493 /** Update the key and value types. In this class, do nothing. */ 494 protected void _updateKeyValueTypes() { 495 496 } 497 498 /////////////////////////////////////////////////////////////////// 499 //// protected fields //// 500 501 /** The key type. */ 502 protected Type _keyType; 503 504 /** The value type. */ 505 protected Type _valueType; 506 507 /** The string value of the format type. */ 508 protected String _formatTypeStr; 509 510 /** A list of properties of format types from the config file. */ 511 protected List<ConfigurationProperty> _formatProperties; 512 513 /** The configuration property name for the format type. */ 514 protected String _FORMAT_TYPE_CATEGORY; 515 516 /////////////////////////////////////////////////////////////////// 517 //// private methods //// 518 519 /** Get the configuration property with the same name as the format type. 520 * 521 * @param engineName If not null, returns the property whose implementation 522 * class is for the specified engine. Otherwise, returns the first property 523 * with the same name as the format type. 524 */ 525 private ConfigurationProperty _getFormatProperty(String engineName) 526 throws IllegalActionException { 527 528 // make sure it's not empty. 529 if (_formatTypeStr.trim().isEmpty()) { 530 throw new IllegalActionException(this, "Missing format name."); 531 } 532 533 // try to find the format name in the config properties 534 535 final List<ConfigurationProperty> formatList = ConfigurationManager 536 .getInstance() 537 .getProperty(ConfigurationManager.getModule("ddp-common")) 538 .getProperties(_FORMAT_TYPE_CATEGORY); 539 540 if (formatList == null || formatList.isEmpty()) { 541 throw new IllegalActionException(this, 542 "No formats found in configuration.xml for type " 543 + _FORMAT_TYPE_CATEGORY); 544 } 545 546 for (ConfigurationProperty format : formatList) { 547 ConfigurationProperty nameProperty = format.getProperty("Name"); 548 if(nameProperty == null) { 549 throw new IllegalActionException(this, 550 "Configuration format does not have a name. Try deleting\n" + 551 "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" + 552 "and restarting Kepler."); 553 } 554 555 if (nameProperty.getValue().equalsIgnoreCase(_formatTypeStr) && 556 ((engineName != null && 557 format.getProperty("ImplementationClass").getProperty(engineName) != null) || 558 (engineName == null))) { 559 return format; 560 } 561 } 562 563 return null; 564 } 565 566 /////////////////////////////////////////////////////////////////// 567 //// private fields //// 568 569 /** Name of attribute contained in each parameter associated with the format 570 * type. 571 */ 572 private final static String FORMAT_TYPE_PARAMETER_NAME = "_associatedWithFormatType"; 573 574 /** The configuration property for the format type. */ 575 private ConfigurationProperty _formatProperty; 576 577}