001/* A DDP pattern composite actor with a single input. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-09-08 23:47:26 +0000 (Tue, 08 Sep 2015) $' 008 * '$Revision: 33877 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern; 031 032import java.util.HashMap; 033import java.util.LinkedList; 034import java.util.List; 035 036import org.kepler.ddp.Utilities; 037import org.kepler.ddp.director.DDPDirector; 038 039import ptolemy.actor.Director; 040import ptolemy.actor.Manager; 041import ptolemy.actor.TypedCompositeActor; 042import ptolemy.actor.TypedIOPort; 043import ptolemy.actor.gui.style.TextStyle; 044import ptolemy.data.BooleanToken; 045import ptolemy.data.IntToken; 046import ptolemy.data.expr.Parameter; 047import ptolemy.data.expr.StringParameter; 048import ptolemy.data.type.BaseType; 049import ptolemy.kernel.CompositeEntity; 050import ptolemy.kernel.util.Attribute; 051import ptolemy.kernel.util.IllegalActionException; 052import ptolemy.kernel.util.InternalErrorException; 053import ptolemy.kernel.util.NameDuplicationException; 054import ptolemy.kernel.util.StringAttribute; 055import ptolemy.kernel.util.Workspace; 056 057/** A DDP pattern composite actor with a single input. 058 * 059 * @author Daniel Crawl 060 * @version $Id: SingleInputPatternActor.java 33877 2015-09-08 23:47:26Z crawl $ 061 */ 062public class SingleInputPatternActor extends TypedCompositeActor implements DDPPatternActor { 063 064 065 /** Construct a new SingleInputPatternActor for a workspace. */ 066 public SingleInputPatternActor(Workspace workspace) { 067 super(workspace); 068 } 069 070 /** Construct a new SingleInputPatternActor in a container with a given name. */ 071 public SingleInputPatternActor(CompositeEntity container, String name) 072 throws IllegalActionException, NameDuplicationException { 073 super(container, name); 074 075 in = new TypedIOPort(this, "in", true, false); 076 in.setTypeAtMost(Types.keyValueArrayType); 077 078 out = new TypedIOPort(this, "out", false, true); 079 // NOTE: do not set this, since the value type may change 080 // within the model (and possibly the key type) 081 //out.setTypeAtLeast(in); 082 083 degreeOfParallelism = new Parameter(this, "degreeOfParallelism"); 084 degreeOfParallelism.setExpression("0"); 085 086 executionClass = new StringParameter(this, "executionClass"); 087 088 executionCode = new StringParameter(this, "executionCode"); 089 executionCode.addChoice(""); 090 String[] languages = Utilities.getExecutionLanguageNames(); 091 for(String languageName : languages) { 092 executionCode.addChoice(languageName); 093 } 094 executionCode.setExpression(""); 095 096 StringAttribute code = new StringAttribute(executionCode, "code"); 097 new TextStyle(code, "_style"); 098 099 inKeyValueTypes = new StringParameter(this, "inKeyValueTypes"); 100 for(String type : _commonKeyValueTypes) { 101 inKeyValueTypes.addChoice(type); 102 } 103 104 outKeyValueTypes = new StringParameter(this, "outKeyValueTypes"); 105 for(String type : _commonKeyValueTypes) { 106 outKeyValueTypes.addChoice(type); 107 } 108 109 jars = new StringParameter(this, "jars"); 110 111 printExeSummary = new Parameter(this, 112 "printExeSummary", BooleanToken.FALSE); 113 printExeSummary.setTypeEquals(BaseType.BOOLEAN); 114 115 try { 116 displayRedirectDir = new StringParameter(this, "displayRedirectDir"); 117 } catch (Throwable t) { 118 throw new InternalErrorException(this, t, 119 "Cannot create displayRedirectDir parameter."); 120 } 121 122 runWorkflowLifecyclePerInput = new Parameter(this, "runWorkflowLifecyclePerInput", 123 BooleanToken.FALSE); 124 runWorkflowLifecyclePerInput.setTypeEquals(BaseType.BOOLEAN); 125 } 126 127 /** React to a change in an attribute. */ 128 @Override 129 public void attributeChanged(Attribute attribute) throws IllegalActionException { 130 131 if(attribute == inKeyValueTypes) { 132 in.typeConstraints().clear(); 133 String typesStr = inKeyValueTypes.stringValue(); 134 if(typesStr.isEmpty()) { 135 in.setTypeAtMost(Types.keyValueArrayType); 136 } else { 137 in.setTypeAtMost(Types.getKeyValueType(inKeyValueTypes, typesStr)); 138 } 139 } else if(attribute == outKeyValueTypes) { 140 out.typeConstraints().clear(); 141 String typesStr = outKeyValueTypes.stringValue(); 142 if(!typesStr.isEmpty()) { 143 out.setTypeEquals(Types.getKeyValueType(outKeyValueTypes, typesStr)); 144 } 145 } else if(attribute == executionClass) { 146 _executionClassName = executionClass.stringValue(); 147 } else if(attribute == executionCode) { 148 _executionCodeType = executionCode.stringValue(); 149 if(!_executionCodeType.isEmpty()) { 150 String[] names = Utilities.getExecutionLanguageNames(); 151 boolean found = false; 152 for(String name : names) { 153 if(name.equals(_executionCodeType)) { 154 found = true; 155 break; 156 } 157 } 158 if(!found) { 159 throw new IllegalActionException(this, 160 "Invalid execution language: " + _executionCodeType); 161 } 162 } 163 } else { 164 super.attributeChanged(attribute); 165 } 166 } 167 168 /** Clone the actor into the specified workspace. */ 169 /* 170 @Override 171 public Object clone(Workspace workspace) throws CloneNotSupportedException { 172 SingleInputPatternActor newObject = (SingleInputPatternActor) super.clone(workspace); 173 return newObject; 174 } 175 */ 176 177 /** List the opaque entities that are directly or indirectly contained by this entity. 178 * During model preinitializing and resolving types, if using a class or code for 179 * execution, there will be no such entities. 180 */ 181 @Override 182 public List<?> deepEntityList() { 183 184 Manager manager = getManager(); 185 if(manager != null && (!_executionClassName.isEmpty() || 186 (_executionCodeType != null && !_executionCodeType.isEmpty()))) { 187 Manager.State state = manager.getState(); 188 //System.out.println("manager state = " + state); 189 if(state == Manager.PREINITIALIZING || 190 state == Manager.RESOLVING_TYPES) { 191 return new LinkedList<Object>(); 192 } 193 } 194 195 return super.deepEntityList(); 196 } 197 198 /** List the opaque entities that are directly or indirectly contained by this entity. 199 * During model preinitializing and resolving types, if using a class or code for 200 * execution, there will be no such entities. 201 */ 202 @Override 203 public List<?> entityList() { 204 205 Manager manager = getManager(); 206 if(manager != null && (!_executionClassName.isEmpty() || 207 (_executionCodeType != null && !_executionCodeType.isEmpty()))) { 208 Manager.State state = manager.getState(); 209 //System.out.println("manager state = " + state); 210 if(state == Manager.PREINITIALIZING || 211 state == Manager.RESOLVING_TYPES) { 212 return new LinkedList<Object>(); 213 } 214 } 215 216 return super.entityList(); 217 } 218 219 /** Get the number of parallel instances to execute. */ 220 @Override 221 public int getDegreeOfParallelism() throws IllegalActionException { 222 return ((IntToken)degreeOfParallelism.getToken()).intValue(); 223 } 224 225 /** Get the dir to redirect display related actors. */ 226 @Override 227 public String getDisplayRedirectDir() throws IllegalActionException { 228 return displayRedirectDir.stringValue(); 229 } 230 231 /** Get the name of the execution class. If no execution class is set, 232 * i.e., for this class the sub-workflow is to be executed, returns 233 * the empty string. 234 */ 235 @Override 236 public String getExecutionClassName() { 237 return _executionClassName; 238 } 239 240 /** Get the execution code type. Returns null if not set. */ 241 @Override 242 public String getExecutionCodeType() throws IllegalActionException { 243 if(_executionCodeType == null || _executionCodeType.isEmpty()) { 244 return null; 245 } 246 return _executionCodeType; 247 } 248 249 /** Get the execution code. Returns null if not set. */ 250 @Override 251 public String getExecutionCode() throws IllegalActionException { 252 return ((StringAttribute) executionCode.getAttribute("code")).getExpression(); 253 } 254 255 /** Get a comma-separated list of jars to use with this actor. */ 256 @Override 257 public String getJars() throws IllegalActionException { 258 return jars.stringValue(); 259 } 260 261 /** Get whether print execution summary when running workflow inside of Hadoop/Stratosphere job. */ 262 @Override 263 public boolean getPrintExeInfo() throws IllegalActionException { 264 return ((BooleanToken) printExeSummary 265 .getToken()).booleanValue(); 266 } 267 268 /** Get a set of name-value pairs of input/output format parameters for the execution engine. */ 269 @Override 270 public java.util.Map<String,String> getParameters() throws IllegalActionException { 271 // return an empty map. 272 return new HashMap<String,String>(); 273 } 274 275 /** Get a set of (kepler name, implementation name) pairs of input/output format parameters for the execution engine. */ 276 @Override 277 public java.util.Map<String, String> getParaImplNames(String engineName) throws IllegalActionException { 278 // return an empty map. 279 return new HashMap<String,String>(); 280 } 281 282 /** Check if the full lifecycle of the sub-workflow should be executed for each input.*/ 283 public boolean getRunWorkflowLifecyclePerInput() throws IllegalActionException { 284 return ((BooleanToken) runWorkflowLifecyclePerInput 285 .getToken()).booleanValue(); 286 } 287 288 /** Override the parent class to check that parameter values are correctly set. */ 289 @Override 290 public void preinitialize() throws IllegalActionException { 291 292 super.preinitialize(); 293 294 // verify that execution class is set iff the key value types are set 295 296 // NOTE: only perform these checks if we are not the top level model, 297 // i.e., not the model running as a task by the DDP engine. 298 if(toplevel() != this) { 299 300 Director director = getExecutiveDirector(); 301 if (!(director instanceof DDPDirector)) { 302 throw new IllegalActionException(this, 303 "This actor only executes with the DDP director."); 304 } 305 306 final boolean executionClassSet = !_executionClassName.isEmpty(); 307 final boolean executionCodeSet = (_executionCodeType != null && !_executionCodeType.isEmpty()); 308 309 String inStr = inKeyValueTypes.stringValue(); 310 boolean inSet = false; 311 if(inStr != null && !inStr.isEmpty()) { 312 inSet = true; 313 } 314 315 String outStr = outKeyValueTypes.stringValue(); 316 boolean outSet = false; 317 if(outStr != null && !outStr.isEmpty()) { 318 outSet = true; 319 } 320 321 if((executionClassSet || executionCodeSet) && (!inSet || !outSet)) { 322 throw new IllegalActionException(this, "The execution class or code is set, " + 323 "but not both the inKeyValueTypes and outKeyValueTypes. If you " + 324 "specify the execution class or code, set both inKeyValueTypes " + 325 "and outKeyValueTypes."); 326 } 327 328 if(!executionClassSet && !executionCodeSet && (inSet || outSet)) { 329 throw new IllegalActionException(this, "The execution class and code is not set, " + 330 "but inKeyValueTypes or outKeyValueTypes are set. If you do not specify " + 331 "the execution class or code, clear both inKeyValueTypes, and outKeyValueTypes."); 332 } 333 334 } 335 } 336 337 /** Data input. */ 338 public TypedIOPort in; 339 340 /** Data output. */ 341 public TypedIOPort out; 342 343 /** The number of parallel instances to execute. */ 344 public Parameter degreeOfParallelism; 345 346 /** The name of the execution class. If not set, the sub-workflow is executed. 347 * To use a class not included with Kepler, add the class's jar to the jars 348 * parameter. 349 */ 350 public StringParameter executionClass; 351 352 /** The directory where the display related actors in DDP pattern sub-workflows will save their outputs. 353 * If it is empty, the display actors will be discarded before execution. 354 * More information about display redirect can be found at display-redirect module. 355 */ 356 public StringParameter displayRedirectDir; 357 358 /** The input key value types. This parameter should only be set 359 * when executionClass is specified. 360 */ 361 public StringParameter inKeyValueTypes; 362 363 /** The output key value types. This parameter should only be set 364 * when executionClass is specified. 365 */ 366 public StringParameter outKeyValueTypes; 367 368 /** A comma-separated list of jars to use with this actor. The 369 * full path to each jar must be specified. 370 */ 371 public StringParameter jars; 372 373 /** If true, print an execution summary to the log file each time the 374 * sub-workflow executes in the Hadoop or Stratosphere job. NOTE: 375 * this can severely impact performance and should only be used for 376 * debugging. 377 */ 378 public Parameter printExeSummary; 379 380 /** If true, the full lifecycle of the sub-workflow will be executed 381 * for each input. By setting this to true, multiple outputs can 382 * be collected for each input. If false, a single iteration of 383 * the workflow is performed for each input and only a single output 384 * may be generated. Performance is very slow if this is set to true. 385 */ 386 public Parameter runWorkflowLifecyclePerInput; 387 388 /** The type of the execution code. */ 389 public StringParameter executionCode; 390 391 /////////////////////////////////////////////////////////////////// 392 //// protected fields //// 393 394 /** The execution class name. */ 395 protected String _executionClassName = ""; 396 397 /** The execution code type. */ 398 protected String _executionCodeType; 399 400 /////////////////////////////////////////////////////////////////// 401 //// private fields //// 402 403}