001/* An atomic actor for DDP patterns.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-09-08 23:43:03 +0000 (Tue, 08 Sep 2015) $' 
008 * '$Revision: 33875 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern;
031
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Set;
037
038import org.kepler.build.modules.Module;
039import org.kepler.configuration.ConfigurationManager;
040import org.kepler.configuration.ConfigurationProperty;
041import org.kepler.configuration.ConfigurationUtilities;
042import org.kepler.ddp.director.DDPDirector;
043
044import ptolemy.actor.Director;
045import ptolemy.actor.TypedAtomicActor;
046import ptolemy.data.IntToken;
047import ptolemy.data.StringToken;
048import ptolemy.data.Token;
049import ptolemy.data.expr.Parameter;
050import ptolemy.data.expr.StringParameter;
051import ptolemy.data.type.Type;
052import ptolemy.kernel.CompositeEntity;
053import ptolemy.kernel.util.Attribute;
054import ptolemy.kernel.util.IllegalActionException;
055import ptolemy.kernel.util.NameDuplicationException;
056import ptolemy.kernel.util.Settable;
057import ptolemy.kernel.util.Workspace;
058import ptolemy.util.MessageHandler;
059
060/**
061 * An atomic actor for DDP patterns.
062 * 
063 *  @author Daniel Crawl
064 *  @version $Id: AtomicPatternActor.java 33875 2015-09-08 23:43:03Z crawl $
065 */
066public class AtomicPatternActor extends TypedAtomicActor implements DDPPatternActor {
067
068    /** Construct a new AtomicPatternActor in a container with a given name. */
069    public AtomicPatternActor(CompositeEntity container, String name)
070            throws IllegalActionException, NameDuplicationException {
071        super(container, name);
072
073        degreeOfParallelism = new Parameter(this, "degreeOfParallelism");
074        degreeOfParallelism.setExpression("0");
075
076        formatType = new StringParameter(this, "formatType");
077        
078        keyValueTypes = new StringParameter(this, "keyValueTypes");
079        for(String type : _commonKeyValueTypes) {
080           keyValueTypes.addChoice(type);
081        }
082        
083        jars = new StringParameter(this, "jars");
084        
085        /* This is not supported for the DDP I/O actors.
086        printExeInfo = new Parameter(this,
087                "printExeSummary", BooleanToken.FALSE);
088        printExeInfo.setTypeEquals(BaseType.BOOLEAN);
089        */
090    }
091
092    /** See if the formatType parameter changed. */
093    @Override
094    public void attributeChanged(Attribute attribute) throws IllegalActionException {
095
096        if (attribute == formatType) {
097            String newFormatStr = formatType.stringValue();
098            _setFormat(newFormatStr);
099        } else {
100            super.attributeChanged(attribute);
101        }
102    }
103
104    /** Clone this actor into the specified workspace. */
105    @Override
106    public Object clone(Workspace workspace) throws CloneNotSupportedException {
107        AtomicPatternActor newObject = (AtomicPatternActor) super.clone(workspace);
108        newObject._keyType = null;
109        newObject._valueType = null;
110        newObject._formatTypeStr = _formatTypeStr;
111        newObject._formatProperty = null;
112        newObject._formatProperties = null;
113        newObject._FORMAT_TYPE_CATEGORY = _FORMAT_TYPE_CATEGORY;
114        return newObject;
115    }
116    
117    /** Get the number of parallel instances to execute. */
118    @Override
119    public int getDegreeOfParallelism() throws IllegalActionException {
120        return ((IntToken)degreeOfParallelism.getToken()).intValue();
121    }
122    
123        @Override
124        public String getDisplayRedirectDir() throws IllegalActionException {
125                return "";
126        }
127
128    /** Get the name of the execution class. In this class, always
129     *  returns the empty string.
130     */
131    @Override
132    public String getExecutionClassName() {
133        return "";
134    }
135
136    /** Get the execution code type. Returns null if not set. */
137    @Override
138    public String getExecutionCodeType() throws IllegalActionException {
139        return null;
140    }
141    
142    /** Get the execution code. Returns null if not set. */
143    @Override
144    public String getExecutionCode() throws IllegalActionException {
145        return null;
146    }
147
148    /** Get the name of the class that provides the format type implementation
149     *  for a specific engine.
150     */
151    public String getFormatClassName(String engineName) throws IllegalActionException {
152
153        String className = null;
154
155        final ConfigurationProperty formatProperty = _getFormatProperty(engineName);
156        if (formatProperty != null) {
157
158            // we found the format with the same name as _formatTypeStr,
159            // and has an implementation class for the same engine
160            
161            // now get the class name.
162            className = 
163                    formatProperty.getProperty("ImplementationClass").getProperty(engineName).getValue();
164            
165            // make sure class is specified for the engine
166            if (className == null) {
167                throw new IllegalActionException(this, "Format "
168                        + _formatTypeStr
169                        + " does not have an implementation class for "
170                        + engineName);
171            }
172        }
173
174        // if we didn't find the format in the config file, the format type
175        // could be a class name.
176        if (className == null) {
177            className = _formatTypeStr;
178        }
179
180        return className;
181    }
182
183    /** Get a comma-separated list of jars to use with this actor. */
184    @Override
185    public String getJars() throws IllegalActionException {
186        return jars.stringValue();
187    }
188    
189    /** Get whether print execution summary when running workflow
190     *  inside of Hadoop/Stratosphere job. In this class, returns
191     *  false since printing execution information is not supported
192     *  in the DDP I/O actors.
193     */
194    @Override
195    public boolean getPrintExeInfo() throws IllegalActionException {
196        return false;
197        /*
198        return ((BooleanToken) printExeInfo
199                .getToken()).booleanValue();
200        */
201    }
202
203    /** Get a set of name-value pairs of input/output format parameters for the execution engine. */
204    @Override
205    public java.util.Map<String, String> getParameters() throws IllegalActionException {
206
207        final java.util.Map<String, String> retval = new HashMap<String, String>();
208
209        // find all the parameters containing an attribute named
210        // _associatedWithFormatType
211        for (Parameter parameter : attributeList(Parameter.class)) {
212            if (parameter.getAttribute(FORMAT_TYPE_PARAMETER_NAME) != null) {
213                // add the name and value to the return map.
214                final Token token = parameter.getToken();
215                String value;
216                if (token instanceof StringToken) {
217                    value = ((StringToken)token).stringValue();
218                } else {
219                    value = token.toString();
220                }
221                retval.put(parameter.getName(), value);
222            }
223        }
224
225        return retval;
226    }
227    
228    /** Get a set of (kepler name, implementation name) pairs of input/output format parameters for the execution engine. */
229    @Override
230    public java.util.Map<String, String> getParaImplNames(String engineName) throws IllegalActionException {  
231        
232        final java.util.Map<String, String> retval = new HashMap<String, String>();
233        List<ConfigurationProperty> parameterPairs = null;
234        if (_formatProperty != null)
235                parameterPairs = _formatProperty.getProperties("Parameters.pair");
236        if (parameterPairs != null) {
237                ConfigurationProperty nameInEngine;
238                for (ConfigurationProperty parameterPair: parameterPairs) {
239                        nameInEngine = parameterPair.getProperty(engineName + "Name");
240                        retval.put(parameterPair.getProperty("name").getValue(), nameInEngine == null ? null : nameInEngine.getValue());
241                }
242        }
243        return retval;
244    }
245
246    /** Update the port types based on the format type. */
247    @Override
248    public void preinitialize() throws IllegalActionException {
249
250        super.preinitialize();
251
252        Director director = getDirector();
253        if (!(director instanceof DDPDirector)) {
254            throw new IllegalActionException(this,
255                "This actor only executes with the DDP director.");
256        }
257
258        // set the types based on the format type
259        _formatProperty = _getFormatProperty(((DDPDirector) director)
260                .getEngineName());
261        //formatProperty.prettyPrint();
262        _setTypesFromConfiguration(_formatProperty);
263
264    }
265    
266    ///////////////////////////////////////////////////////////////////
267    ////                         public fields                     ////
268
269    /** The number of parallel instances to execute. */
270    public Parameter degreeOfParallelism;
271
272    /** The name of the data format or a fully-qualified class name to
273     *  parse the data.
274     */
275    public StringParameter formatType;
276
277    /** A comma-separated list of jars to use with this actor. The 
278     *  full path to each jar must be specified.
279     */
280    public StringParameter jars;  
281    
282    /** A boolean parameter for whether print execution summary when
283     *  running as Hadoop/Stratosphere/etc jobs.
284     *  NOTE: this is not support for DDP I/O actors.
285     */
286    //public Parameter printExeInfo;
287    
288    /** The key value types of the atomic patter actor. This parameter should only be set
289     *  when a class name is specified for formatType parameter.
290     */
291    public StringParameter keyValueTypes;
292
293    ///////////////////////////////////////////////////////////////////
294    ////                         protected methods                 ////
295
296    /** Add the formats in the configuration file as choices to formatType. */
297    protected void _addFormats(String inputOutputName) {
298        final ConfigurationManager manager = ConfigurationManager.getInstance();
299        final Module module = ConfigurationManager.getModule("ddp-common");
300        final ConfigurationProperty propertyRoot = manager.getProperty(module);
301        _formatProperties = propertyRoot.getProperties(inputOutputName
302                + "Formats.Format");
303
304        // use a set to store the names to remove duplicates
305        Set<String> names = new HashSet<String>();
306        for (ConfigurationProperty formatProperty : _formatProperties) {
307            ConfigurationProperty nameProperty = formatProperty.getProperty("Name");
308            if(nameProperty == null) {
309                MessageHandler.error("Configuration format does not have a name. Try deleting\n" +
310                        "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" +
311                        "and restarting Kepler.");
312                return;
313            } else {
314                names.add(nameProperty.getValue());
315            }
316        }
317
318        // sort the names alphabetically
319        String[] namesArray = names.toArray(new String[names.size()]);
320        Arrays.sort(namesArray);
321        for (String name : namesArray) {
322            formatType.addChoice(name);
323        }
324    }
325    
326    /** Set the format, and add any associated parameters. This method also sets
327     *  the port types for the keys and values specified in the configuration.
328     * 
329     * @param newFormatTypeStr the format name or a class name.
330     */
331    protected void _setFormat(String newFormatTypeStr) throws IllegalActionException {
332
333        // see if the type has changed
334        if (_formatTypeStr == null || !newFormatTypeStr.equals(_formatTypeStr)) {
335
336            _formatTypeStr = newFormatTypeStr;
337
338            java.util.Map<String, String> associatedParameterPairs = null;
339
340            final ConfigurationProperty formatProperty = _getFormatProperty(null);
341
342            if (formatProperty != null) {
343
344                //formatProperty.prettyPrint();
345                
346                // see if it has associated parameters
347                final ConfigurationProperty parametersProperty = 
348                        formatProperty.getProperty("Parameters");
349                if (parametersProperty != null) {
350                    // get a map of names and values
351                    associatedParameterPairs = ConfigurationUtilities
352                            .getPairsMap(parametersProperty);
353                }
354
355                _setTypesFromConfiguration(formatProperty);
356            }
357
358            // see if we could not find the type in the configuration file
359            // the type may be a class name, so try to instantiate it.
360            if (formatProperty == null) {
361
362                // first check for classes that were in 0.1 WordCount
363                if (newFormatTypeStr.equals("eu.stratosphere.pact.example.wordcount.WordCount$LineInFormat")) {
364                    System.out.println("class WordCount$LineInFormat no longer exists; using TextInputFormat");
365                    newFormatTypeStr = "eu.stratosphere.pact.common.io.TextInputFormat";
366                } else if (newFormatTypeStr.equals("eu.stratosphere.pact.example.wordcount.WordCount$WordCountOutFormat")) {
367                    System.out.println("class WordCount$WordCountOutFormat no longer exists; using RecordOutputFormat");
368                    newFormatTypeStr = "eu.stratosphere.pact.common.io.RecordOutputFormat";
369                } else {
370                    // try to load the class
371                    try {
372                        /*final Class<?> clazz =*/ Class.forName(newFormatTypeStr);
373                    } catch (ClassNotFoundException e) {
374                        //the exception might be that the class is from outside jar, do not throw error here.
375//                        throw new IllegalActionException(
376//                                this,
377//                                "Format type "
378//                                        + newFormatTypeStr
379//                                        + " was not found in the configuration file and class "
380//                                        + newFormatTypeStr
381//                                        + " was not found on the classpath.");
382                    }
383                }
384            }
385
386            // remove parameters associated with the old format type that
387            // have different names than those associated with the new type
388            for (Parameter parameter : attributeList(Parameter.class)) {
389                if (parameter.getAttribute(FORMAT_TYPE_PARAMETER_NAME) != null
390                        && (associatedParameterPairs == null || !associatedParameterPairs
391                                .containsKey(parameter.getName()))) {
392                    try {
393                        // System.out.println("removing " +
394                        // parameter.getName());
395                        parameter.setContainer(null);
396                    } catch (NameDuplicationException e) {
397                        throw new IllegalActionException(this, e,
398                                "Error removing parameter "
399                                        + parameter.getName());
400                    }
401                }
402            }
403
404            // add any associated parameters
405            if (associatedParameterPairs != null) {
406                for (java.util.Map.Entry<String, String> entry : associatedParameterPairs.entrySet()) {
407                    final String name = entry.getKey();
408
409                    // see if the parameter is already there
410                    if (getAttribute(name) == null) {
411                        Parameter associatedParameter;
412                        try {
413                            // System.out.println("adding " + name);
414                            // create a parameter for the name and value
415                            associatedParameter = new Parameter(this, name);
416
417                            // add a parameter inside the new parameter so we
418                            // can remove it when
419                            // the format type changes
420                            final Parameter hiddenParameter = new Parameter(
421                                    associatedParameter,
422                                    FORMAT_TYPE_PARAMETER_NAME);
423                            hiddenParameter.setVisibility(Settable.NONE);
424                        } catch (NameDuplicationException e) {
425                            throw new IllegalActionException(this, e,
426                                    "Error adding parameter " + name);
427                        }
428                        associatedParameter.setExpression(entry.getValue());
429                    }
430                }
431            }
432        }
433
434    }
435
436    /** Set the key and value types from the types in the configuration property. */
437    protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty)
438            throws IllegalActionException {
439
440        // see if it has the Types properties
441        final ConfigurationProperty typesProperty = formatProperty
442                .getProperty("Types");
443
444        // make sure the Types property was found
445        if (typesProperty == null) {
446            throw new IllegalActionException(
447                    "No Types specified for format type " + _formatTypeStr);
448        }
449
450        // make sure the Key property under Types was found
451        final ConfigurationProperty keyProperty = typesProperty
452                .getProperty("Key");
453        if (keyProperty == null) {
454            throw new IllegalActionException(
455                    "No Key type specified for format type " + _formatTypeStr);
456        }
457        // read the key type
458        String typeStr = keyProperty.getValue();
459
460        final Type keyType = Types.getTypeFromString(typeStr);
461        if (keyType == null) {
462            throw new IllegalActionException(this,
463                    "Could not find Ptolemy type for key type " + typeStr
464                            + " for format " + _formatTypeStr);
465        }
466        _keyType = keyType;
467        //System.out.println(getName() + " change key type to " + _keyType);
468
469        // make sure the Value property under Types was found
470        final ConfigurationProperty valueProperty = typesProperty
471                .getProperty("Value");
472        if (valueProperty == null) {
473            throw new IllegalActionException(
474                    "No Value type specified for format type " + _formatTypeStr);
475        }
476        // read the value type
477        typeStr = valueProperty.getValue();
478        
479        final Type valueType = Types.getTypeFromString(typeStr);
480        
481        if (valueType == null) {
482            throw new IllegalActionException(this,
483                    "Could not find Ptolemy type for value type " + typeStr
484                            + " for format " + _formatTypeStr);
485        }
486        _valueType = valueType;
487        //System.out.println(getName() + " change value type to " + _valueType);
488
489        // update the port type
490        _updateKeyValueTypes();
491    }
492
493    /** Update the key and value types. In this class, do nothing. */
494    protected void _updateKeyValueTypes() {
495
496    }
497    
498    ///////////////////////////////////////////////////////////////////
499    ////                         protected fields                  ////
500
501    /** The key type. */
502    protected Type _keyType;
503
504    /** The value type. */
505    protected Type _valueType;
506
507    /** The string value of the format type. */
508    protected String _formatTypeStr;
509    
510    /** A list of properties of format types from the config file. */
511    protected List<ConfigurationProperty> _formatProperties;
512
513    /** The configuration property name for the format type. */
514    protected String _FORMAT_TYPE_CATEGORY;
515    
516    ///////////////////////////////////////////////////////////////////
517    ////                         private methods                   ////
518
519    /** Get the configuration property with the same name as the format type.
520     * 
521     * @param engineName If not null, returns the property whose implementation
522     * class is for the specified engine. Otherwise, returns the first property
523     * with the same name as the format type.
524     */
525    private ConfigurationProperty _getFormatProperty(String engineName)
526            throws IllegalActionException {
527
528        // make sure it's not empty.
529        if (_formatTypeStr.trim().isEmpty()) {
530            throw new IllegalActionException(this, "Missing format name.");
531        }
532
533        // try to find the format name in the config properties
534
535        final List<ConfigurationProperty> formatList = ConfigurationManager
536                .getInstance()
537                .getProperty(ConfigurationManager.getModule("ddp-common"))
538                .getProperties(_FORMAT_TYPE_CATEGORY);
539
540        if (formatList == null || formatList.isEmpty()) {
541            throw new IllegalActionException(this,
542                    "No formats found in configuration.xml for type "
543                            + _FORMAT_TYPE_CATEGORY);
544        }
545
546        for (ConfigurationProperty format : formatList) {
547            ConfigurationProperty nameProperty = format.getProperty("Name");
548            if(nameProperty == null) {
549                throw new IllegalActionException(this,
550                        "Configuration format does not have a name. Try deleting\n" +
551                        "$HOME/KeplerData/modules/ddp-common/configuration/configuration.xml\n" +
552                        "and restarting Kepler.");
553            } 
554            
555            if (nameProperty.getValue().equalsIgnoreCase(_formatTypeStr) &&
556                    ((engineName != null &&
557                    format.getProperty("ImplementationClass").getProperty(engineName) != null) ||
558                    (engineName == null))) {
559                return format;
560            }
561        }
562        
563        return null;
564    }
565
566    ///////////////////////////////////////////////////////////////////
567    ////                         private fields                    ////
568
569    /** Name of attribute contained in each parameter associated with the format
570     * type.
571     */
572    private final static String FORMAT_TYPE_PARAMETER_NAME = "_associatedWithFormatType";
573    
574    /** The configuration property for the format type. */
575    private ConfigurationProperty _formatProperty;
576
577}