001/* A DDP pattern composite actor with a single input.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-08 23:47:26 +0000 (Tue, 08 Sep 2015) $' 
008 * '$Revision: 33877 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern;
031
032import java.util.HashMap;
033import java.util.LinkedList;
034import java.util.List;
035
036import org.kepler.ddp.Utilities;
037import org.kepler.ddp.director.DDPDirector;
038
039import ptolemy.actor.Director;
040import ptolemy.actor.Manager;
041import ptolemy.actor.TypedCompositeActor;
042import ptolemy.actor.TypedIOPort;
043import ptolemy.actor.gui.style.TextStyle;
044import ptolemy.data.BooleanToken;
045import ptolemy.data.IntToken;
046import ptolemy.data.expr.Parameter;
047import ptolemy.data.expr.StringParameter;
048import ptolemy.data.type.BaseType;
049import ptolemy.kernel.CompositeEntity;
050import ptolemy.kernel.util.Attribute;
051import ptolemy.kernel.util.IllegalActionException;
052import ptolemy.kernel.util.InternalErrorException;
053import ptolemy.kernel.util.NameDuplicationException;
054import ptolemy.kernel.util.StringAttribute;
055import ptolemy.kernel.util.Workspace;
056
057/** A DDP pattern composite actor with a single input.
058 * 
059 *  @author Daniel Crawl
060 *  @version $Id: SingleInputPatternActor.java 33877 2015-09-08 23:47:26Z crawl $
061 */
062public class SingleInputPatternActor extends TypedCompositeActor implements DDPPatternActor {
063
064
065    /** Construct a new SingleInputPatternActor for a workspace. */
066    public SingleInputPatternActor(Workspace workspace) {
067        super(workspace);
068    }
069
070    /** Construct a new SingleInputPatternActor in a container with a given name. */
071    public SingleInputPatternActor(CompositeEntity container, String name)
072            throws IllegalActionException, NameDuplicationException {
073        super(container, name);
074        
075        in = new TypedIOPort(this, "in", true, false);
076        in.setTypeAtMost(Types.keyValueArrayType);
077        
078        out = new TypedIOPort(this, "out", false, true);
079        // NOTE: do not set this, since the value type may change
080        // within the model (and possibly the key type)
081        //out.setTypeAtLeast(in);
082        
083        degreeOfParallelism = new Parameter(this, "degreeOfParallelism");
084        degreeOfParallelism.setExpression("0");
085                
086        executionClass = new StringParameter(this, "executionClass");
087        
088        executionCode = new StringParameter(this, "executionCode");
089        executionCode.addChoice("");
090        String[] languages = Utilities.getExecutionLanguageNames();
091        for(String languageName : languages) {
092            executionCode.addChoice(languageName);
093        }
094        executionCode.setExpression("");
095        
096        StringAttribute code = new StringAttribute(executionCode, "code");
097        new TextStyle(code, "_style");
098
099        inKeyValueTypes = new StringParameter(this, "inKeyValueTypes");
100        for(String type : _commonKeyValueTypes) {
101            inKeyValueTypes.addChoice(type);
102        }
103        
104        outKeyValueTypes = new StringParameter(this, "outKeyValueTypes");
105        for(String type : _commonKeyValueTypes) {
106            outKeyValueTypes.addChoice(type);
107        }
108        
109        jars = new StringParameter(this, "jars");
110        
111        printExeSummary = new Parameter(this,
112                "printExeSummary", BooleanToken.FALSE);
113        printExeSummary.setTypeEquals(BaseType.BOOLEAN);
114        
115                try {
116                        displayRedirectDir = new StringParameter(this, "displayRedirectDir");
117                } catch (Throwable t) {
118                        throw new InternalErrorException(this, t,
119                                        "Cannot create displayRedirectDir parameter.");
120                }
121                
122                runWorkflowLifecyclePerInput = new Parameter(this, "runWorkflowLifecyclePerInput",
123                        BooleanToken.FALSE);
124                runWorkflowLifecyclePerInput.setTypeEquals(BaseType.BOOLEAN);           
125    }
126
127    /** React to a change in an attribute. */
128    @Override
129    public void attributeChanged(Attribute attribute) throws IllegalActionException {
130        
131        if(attribute == inKeyValueTypes) {
132            in.typeConstraints().clear();
133            String typesStr = inKeyValueTypes.stringValue();
134            if(typesStr.isEmpty()) {
135                in.setTypeAtMost(Types.keyValueArrayType);
136            } else {
137                in.setTypeAtMost(Types.getKeyValueType(inKeyValueTypes, typesStr));
138            }
139        } else if(attribute == outKeyValueTypes) {
140            out.typeConstraints().clear();
141            String typesStr = outKeyValueTypes.stringValue();
142            if(!typesStr.isEmpty()) {
143                out.setTypeEquals(Types.getKeyValueType(outKeyValueTypes, typesStr));
144            }
145        } else if(attribute == executionClass) {
146            _executionClassName = executionClass.stringValue();
147        } else if(attribute == executionCode) {
148            _executionCodeType = executionCode.stringValue();
149            if(!_executionCodeType.isEmpty()) {
150                String[] names = Utilities.getExecutionLanguageNames();
151                boolean found = false;
152                for(String name : names) {
153                    if(name.equals(_executionCodeType)) {
154                        found = true;
155                        break;
156                    }
157                }
158                if(!found) {
159                    throw new IllegalActionException(this,
160                            "Invalid execution language: " + _executionCodeType);
161                }
162            }
163        } else {
164            super.attributeChanged(attribute);
165        }
166    }
167    
168    /** Clone the actor into the specified workspace. */
169    /*
170    @Override
171    public Object clone(Workspace workspace) throws CloneNotSupportedException {
172        SingleInputPatternActor newObject = (SingleInputPatternActor) super.clone(workspace);
173        return newObject;
174    }
175    */
176    
177    /** List the opaque entities that are directly or indirectly contained by this entity. 
178     *  During model preinitializing and resolving types, if using a class or code for
179     *  execution, there will be no such entities.
180     */
181    @Override
182    public List<?> deepEntityList() {
183        
184        Manager manager = getManager();
185        if(manager != null && (!_executionClassName.isEmpty() || 
186                (_executionCodeType != null && !_executionCodeType.isEmpty()))) {
187            Manager.State state = manager.getState();
188            //System.out.println("manager state = " + state);
189            if(state == Manager.PREINITIALIZING ||
190                    state == Manager.RESOLVING_TYPES) {
191                return new LinkedList<Object>();
192            }
193        }
194        
195        return super.deepEntityList();
196    }
197   
198    /** List the opaque entities that are directly or indirectly contained by this entity. 
199     *  During model preinitializing and resolving types, if using a class or code for
200     *  execution, there will be no such entities.
201     */
202    @Override
203    public List<?> entityList() {
204
205        Manager manager = getManager();
206        if(manager != null && (!_executionClassName.isEmpty() || 
207                (_executionCodeType != null && !_executionCodeType.isEmpty()))) {
208            Manager.State state = manager.getState();
209            //System.out.println("manager state = " + state);
210            if(state == Manager.PREINITIALIZING ||
211                    state == Manager.RESOLVING_TYPES) {
212                return new LinkedList<Object>();
213            }
214        }
215
216        return super.entityList();
217    }
218
219    /** Get the number of parallel instances to execute. */
220    @Override
221    public int getDegreeOfParallelism() throws IllegalActionException {
222        return ((IntToken)degreeOfParallelism.getToken()).intValue();
223    }
224
225    /** Get the dir to redirect display related actors. */
226        @Override
227    public String getDisplayRedirectDir() throws IllegalActionException {
228        return displayRedirectDir.stringValue();
229    }
230    
231    /** Get the name of the execution class. If no execution class is set,
232     *  i.e., for this class the sub-workflow is to be executed, returns
233     *  the empty string.
234     */
235    @Override
236    public String getExecutionClassName() {
237        return _executionClassName;
238    }
239
240    /** Get the execution code type. Returns null if not set. */
241    @Override
242    public String getExecutionCodeType() throws IllegalActionException {
243        if(_executionCodeType == null || _executionCodeType.isEmpty()) {
244            return null;
245        }
246        return _executionCodeType;
247    }
248    
249    /** Get the execution code. Returns null if not set. */
250    @Override
251    public String getExecutionCode() throws IllegalActionException {
252        return ((StringAttribute) executionCode.getAttribute("code")).getExpression();
253    }
254
255    /** Get a comma-separated list of jars to use with this actor. */
256    @Override
257    public String getJars() throws IllegalActionException {
258        return jars.stringValue();
259    }
260
261    /** Get whether print execution summary when running workflow inside of Hadoop/Stratosphere job. */
262    @Override
263    public boolean getPrintExeInfo() throws IllegalActionException {
264        return ((BooleanToken) printExeSummary
265                .getToken()).booleanValue();
266    }
267    
268    /** Get a set of name-value pairs of input/output format parameters for the execution engine. */
269    @Override
270    public java.util.Map<String,String> getParameters() throws IllegalActionException {
271        // return an empty map.
272        return new HashMap<String,String>();
273    }
274    
275    /** Get a set of (kepler name, implementation name) pairs of input/output format parameters for the execution engine. */
276    @Override
277    public java.util.Map<String, String> getParaImplNames(String engineName) throws IllegalActionException {
278        // return an empty map.
279        return new HashMap<String,String>();
280    }
281    
282    /** Check if the full lifecycle of the sub-workflow should be executed for each input.*/
283    public boolean getRunWorkflowLifecyclePerInput() throws IllegalActionException {
284        return ((BooleanToken) runWorkflowLifecyclePerInput
285                .getToken()).booleanValue();
286    }
287    
288    /** Override the parent class to check that parameter values are correctly set. */
289    @Override
290    public void preinitialize() throws IllegalActionException {
291        
292        super.preinitialize();
293        
294        // verify that execution class is set iff the key value types are set
295        
296        // NOTE: only perform these checks if we are not the top level model,
297        // i.e., not the model running as a task by the DDP engine.
298        if(toplevel() != this) {
299        
300            Director director = getExecutiveDirector();
301            if (!(director instanceof DDPDirector)) {
302                throw new IllegalActionException(this,
303                    "This actor only executes with the DDP director.");
304            }
305
306            final boolean executionClassSet = !_executionClassName.isEmpty();
307            final boolean executionCodeSet = (_executionCodeType != null && !_executionCodeType.isEmpty());
308            
309            String inStr = inKeyValueTypes.stringValue();
310            boolean inSet = false;
311            if(inStr != null && !inStr.isEmpty()) {
312                inSet = true;
313            }
314            
315            String outStr = outKeyValueTypes.stringValue();
316            boolean outSet = false;
317            if(outStr != null && !outStr.isEmpty()) {
318                outSet = true;
319            }
320            
321            if((executionClassSet || executionCodeSet) && (!inSet || !outSet)) {
322                throw new IllegalActionException(this, "The execution class or code is set, " +
323                        "but not both the inKeyValueTypes and outKeyValueTypes. If you " +
324                        "specify the execution class or code, set both inKeyValueTypes " +
325                        "and outKeyValueTypes.");
326            }
327            
328            if(!executionClassSet && !executionCodeSet && (inSet || outSet)) {
329                throw new IllegalActionException(this, "The execution class and code is not set, " +
330                        "but inKeyValueTypes or outKeyValueTypes are set. If you do not specify " +
331                        "the execution class or code, clear both inKeyValueTypes, and outKeyValueTypes.");
332            }
333
334        }        
335    }
336        
337    /** Data input. */
338    public TypedIOPort in;
339    
340    /** Data output. */
341    public TypedIOPort out;
342    
343    /** The number of parallel instances to execute. */
344    public Parameter degreeOfParallelism;
345    
346    /** The name of the execution class. If not set, the sub-workflow is executed.
347     *  To use a class not included with Kepler, add the class's jar to the jars
348     *  parameter.
349     */
350    public StringParameter executionClass;
351    
352    /** The directory where the display related actors in DDP pattern sub-workflows will save their outputs.
353     *  If it is empty, the display actors will be discarded before execution.
354     *  More information about display redirect can be found at display-redirect module.
355     */
356    public StringParameter displayRedirectDir;
357        
358    /** The input key value types. This parameter should only be set
359     *  when executionClass is specified.
360     */
361    public StringParameter inKeyValueTypes;
362
363    /** The output key value types. This parameter should only be set
364     *  when executionClass is specified.
365     */
366    public StringParameter outKeyValueTypes;
367    
368    /** A comma-separated list of jars to use with this actor. The 
369     *  full path to each jar must be specified. 
370     */
371    public StringParameter jars;
372    
373    /** If true, print an execution summary to the log file each time the
374     *  sub-workflow executes in the Hadoop or Stratosphere job. NOTE:
375     *  this can severely impact performance and should only be used for
376     *  debugging.
377     */
378    public Parameter printExeSummary;
379        
380    /** If true, the full lifecycle of the sub-workflow will be executed
381     *  for each input. By setting this to true, multiple outputs can
382     *  be collected for each input. If false, a single iteration of
383     *  the workflow is performed for each input and only a single output
384     *  may be generated. Performance is very slow if this is set to true.
385     */
386    public Parameter runWorkflowLifecyclePerInput;
387    
388    /** The type of the execution code. */
389    public StringParameter executionCode;
390    
391    ///////////////////////////////////////////////////////////////////
392    ////                         protected fields                  ////
393
394    /** The execution class name. */
395    protected String _executionClassName = "";
396    
397    /** The execution code type. */
398    protected String _executionCodeType;
399     
400    ///////////////////////////////////////////////////////////////////
401    ////                         private fields                    ////
402    
403}