001/* Data input for DDP.
002 * 
003 * Copyright (c) 2011-2012 The Regents of the University of California.
004 * All rights reserved.
005 *
006 * '$Author: crawl $'
007 * '$Date: 2015-12-03 18:04:07 +0000 (Thu, 03 Dec 2015) $' 
008 * '$Revision: 34291 $'
009 * 
010 * Permission is hereby granted, without written agreement and without
011 * license or royalty fees, to use, copy, modify, and distribute this
012 * software and its documentation for any purpose, provided that the above
013 * copyright notice and the following two paragraphs appear in all copies
014 * of this software.
015 *
016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
020 * SUCH DAMAGE.
021 *
022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
027 * ENHANCEMENTS, OR MODIFICATIONS.
028 *
029 */
030package org.kepler.ddp.actor.pattern;
031
032import java.util.ArrayList;
033import java.util.Collections;
034import java.util.HashMap;
035import java.util.HashSet;
036import java.util.List;
037import java.util.Set;
038
039import org.kepler.configuration.ConfigurationProperty;
040
041import ptolemy.actor.TypedIOPort;
042import ptolemy.data.ArrayToken;
043import ptolemy.data.IntToken;
044import ptolemy.data.RecordToken;
045import ptolemy.data.StringToken;
046import ptolemy.data.Token;
047import ptolemy.data.expr.Parameter;
048import ptolemy.data.type.ArrayType;
049import ptolemy.data.type.BaseType;
050import ptolemy.data.type.MonotonicFunction;
051import ptolemy.data.type.RecordType;
052import ptolemy.data.type.Type;
053import ptolemy.graph.Inequality;
054import ptolemy.graph.InequalityTerm;
055import ptolemy.kernel.CompositeEntity;
056import ptolemy.kernel.util.Attribute;
057import ptolemy.kernel.util.IllegalActionException;
058import ptolemy.kernel.util.NameDuplicationException;
059import ptolemy.kernel.util.SingletonAttribute;
060
061/** This actor reads data from the storage system into downstream DDP 
062 *  pattern actors. Based on the data format in formatType, the data
063 *  is partitioned or split and output as a set of key-value pairs.
064 * 
065 *  @author Daniel Crawl
066 *  @version $Id: DDPDataSource.java 34291 2015-12-03 18:04:07Z crawl $
067 */
068public class DDPDataSource extends AtomicPathActor {
069
070    /** Construct a new FileDataSource in a container with a given name. */
071    public DDPDataSource(CompositeEntity container, String name)
072            throws IllegalActionException, NameDuplicationException {
073        super(container, name);
074       
075        out = new TypedIOPort(this, "out", false, true);
076        
077        // set the default types for the out port
078        _keyType = BaseType.STRING;
079        _valueType = BaseType.STRING;
080                
081        // add the input formats in the config file as choices 
082        _addFormats("Input");
083
084        _FORMAT_TYPE_CATEGORY = "InputFormats.Format";
085
086        // set default format
087        _setFormat("LineInputFormat");
088        
089        data = new TypedIOPort(this, "data");
090        data.setInput(true);
091        new SingletonAttribute(data, "_showName");
092        
093        chunkSize = new Parameter(this, "chunkSize");
094        chunkSize.setTypeEquals(BaseType.INT);
095        chunkSize.setToken(IntToken.ONE);
096    }
097    
098    /** React to a parameter change. */
099    @Override
100    public void attributeChanged(Attribute attribute) throws IllegalActionException {
101        
102        if(attribute == chunkSize) {
103            Token token = chunkSize.getToken();
104            if(token != null) {
105                int val = ((IntToken)token).intValue();
106                if(val < 1) {
107                    throw new IllegalActionException(this, "Chunk size must be at least 1.");
108                }
109                _chunkSize = val;
110            }
111        } else {
112            super.attributeChanged(attribute);
113        }
114    }
115    
116    
117    /** Get the token for a DDPDataSource actor.
118     *  @param name the full name of the DDPDataSource actor.
119     */
120    public static ArrayToken getToken(String name) {
121        return _tokenMap.get(name);
122    }
123    
124    /** Read the data input path and make sure it exists. */
125    @Override
126    public void fire() throws IllegalActionException {
127        
128        super.fire();
129        
130        if(data.numberOfSources() > 0) {
131            Token token = data.get(0);
132            ArrayToken arrayToken;
133            if(token instanceof ArrayToken) {
134                arrayToken = (ArrayToken) token;
135            } else {
136                arrayToken = new ArrayToken(new Token[] {token});
137            }
138            
139            if(_chunkSize > 1) {
140                
141                List<ArrayToken> list = new ArrayList<ArrayToken>();
142                List<Token> curChunk = new ArrayList<Token>();
143                for(int i = 0 ; i < arrayToken.length(); i++) {
144                    Token element = arrayToken.getElement(i);
145                    curChunk.add(element);
146                    if(curChunk.size() >= _chunkSize) {
147                        ArrayToken curChunkArray = new ArrayToken(curChunk.toArray(new Token[curChunk.size()]));
148                        list.add(curChunkArray);
149                        curChunk = new ArrayList<Token>();
150                    }
151                }
152                
153                // add any remaining elements
154                if(!curChunk.isEmpty()) {
155                    ArrayToken curChunkArray = new ArrayToken(curChunk.toArray(new Token[curChunk.size()]));
156                    list.add(curChunkArray);
157                }
158                
159                RecordToken[] elements = new RecordToken[list.size()];
160                for(int i = 0; i < list.size(); i++) {
161                    elements[i] = new RecordToken(new String[] {"data", "id"},
162                            new Token[] { list.get(i),
163                            new IntToken(i) });
164                }
165                
166                arrayToken = new ArrayToken(elements);
167            }
168            
169            //System.out.println("array token = " + arrayToken);
170            
171            _tokenMap.put(getFullName(), arrayToken);
172        }
173        
174        // make sure path exists
175        
176        // use asFile() so that relative paths are treated relative to
177        // directory containing workflow file.      
178//        final File file = path.asFile();
179//        if(!file.exists()) {
180//            throw new IllegalActionException(this, "Path does not exist: " + file);
181//        }
182        
183    }
184    
185    /** Make sure input is either data or file, but not both. */
186    @Override
187    public void preinitialize() throws IllegalActionException {
188        
189        super.preinitialize();
190        
191        boolean dataIsConnected = (data.numberOfSources() > 0);
192        boolean pathIsConnected = (path.getPort().numberOfSources() > 0);
193        
194        Token pathToken = path.getToken();
195        
196        if(dataIsConnected && (pathIsConnected || 
197                        (pathToken != null && !((StringToken)pathToken).stringValue().isEmpty()))) {
198                throw new IllegalActionException(this, 
199                                "The data port and path port/parameter cannot be used at the same time.\n" +
200                                "Either disconnect the data port, or disconnect the path port and clear the\n" +
201                                "path parameter.");
202        }
203        
204        if(dataIsConnected) {
205                formatType.setToken("TokenInputFormat");
206        } else if(_formatTypeStr.equals("TokenInputFormat")) {
207                throw new IllegalActionException(this, "TokenInputFormat can only be used " +
208                                "if the data port is connected.");
209        }
210
211        if(_chunkSize > 1 && pathIsConnected) {
212            throw new IllegalActionException(this, "The chunk size must be 1 " +
213                    " when using the path port.");
214        }
215    }
216    
217    /** Remove any token stored for this actor. */
218    @Override
219    public void wrapup() throws IllegalActionException {
220        
221        super.wrapup();
222        
223        _tokenMap.remove(getFullName());
224    }
225       
226    ///////////////////////////////////////////////////////////////////
227    ////                         public fields                     ////
228
229    /** Data output. */
230    public TypedIOPort out;
231        
232    /** An input token. */
233    public TypedIOPort data;
234
235    /** The chunk size for token data. If greater than 1, then the input
236     *  data array is split into arrays equal to or smaller than this size.
237     */
238    public Parameter chunkSize;
239    
240    ///////////////////////////////////////////////////////////////////
241    ////                         protected methods                 ////
242
243    /** Update the key and value types. */
244    @Override
245    protected void _updateKeyValueTypes() {
246        super._updateKeyValueTypes();
247        Type type = Types.createKeyValueArrayType(_keyType, _valueType);
248        out.setTypeEquals(type);
249    }   
250    
251    /** Set the key and value types from the types in the configuration property. */
252    @Override
253    protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty)
254            throws IllegalActionException {
255        
256        //there is no formatProperty found for the key/value type, try to use it directly
257        if (formatProperty == null) {
258            String typesStr = keyValueTypes.stringValue();
259            if(typesStr.isEmpty()) {
260                throw new IllegalActionException(this, "Parameter keyValueTypes has to be set if third party class is set for parameter formatType.");
261            } else {
262                out.setTypeEquals(Types.getKeyValueType(keyValueTypes, typesStr));
263            }
264        }
265        else if(formatProperty.getProperty("Name").getValue().equals("TokenInputFormat")) {
266            _useDefaultTypeConstraints = false;
267            out.setTypeEquals(BaseType.UNKNOWN);
268        } else {
269                super._setTypesFromConfiguration(formatProperty);
270        }
271    }
272    
273    /** If using the default type constraints, return the custom
274     *  type constraints from the parent class. Otherwise, return
275     *  a constraint on the out port using the DataPortFunction class.
276     */
277    @Override
278    protected Set<Inequality> _customTypeConstraints() {
279
280        if(_useDefaultTypeConstraints) {
281            return super._customTypeConstraints();
282        }
283        
284        // set the constraints between record fields and output ports
285        Set<Inequality> constraints = new HashSet<Inequality>();
286
287        Inequality inequality = new Inequality(new DataPortFunction(),
288                    out.getTypeTerm());
289        constraints.add(inequality);
290
291        return constraints;
292    }
293
294    /** If using the default type constraints, return the default type
295     *  constraints from the parent class. Otherwise, return null.
296     */
297    @Override
298    protected Set<Inequality> _defaultTypeConstraints() {
299        if(_useDefaultTypeConstraints) {
300            return super._defaultTypeConstraints();
301        } 
302        return null;
303    }
304
305    ///////////////////////////////////////////////////////////////////
306    ////                         inner classes                     ////
307
308    /** A MonotonicFunction for setting the output port types. */
309    private class DataPortFunction extends MonotonicFunction {
310
311        ///////////////////////////////////////////////////////////////
312        ////                       public inner methods            ////
313
314        /** Return the function result.
315         *  @return A Type.
316         */
317        @Override
318        public Object getValue() throws IllegalActionException {
319            
320            final Type dataPortType = data.getType();
321            Type retval = null;
322            
323            if (dataPortType == BaseType.UNKNOWN) {
324                retval = BaseType.UNKNOWN;
325            } else {
326                Type valueType;
327                
328                if (dataPortType instanceof ArrayType) {
329                    valueType = ((ArrayType)dataPortType).getElementType();
330                } else {
331                    valueType = dataPortType;
332                }
333                
334                // if the chunk size is greater than one, the construct
335                // the record type containing the array of chunks and id.
336                if(_chunkSize > 1) {
337                    valueType = new RecordType(
338                            new String[] {"data", "id"},
339                            new Type[] {new ArrayType(valueType), BaseType.INT});
340                }
341                
342                retval = Types.createKeyValueArrayType(BaseType.NIL, valueType);                
343            }
344            
345            return retval;
346        }
347
348        /** Return an additional string describing the current value
349         *  of this function.
350         */
351        @Override
352        public String getVerboseString() {
353            if (!(data.getType() instanceof ArrayType)) {
354                return "Data is not an array";
355            }
356            return null;
357        }
358
359        /** Return the type variable in this inequality term. If the
360         *  type of the input port is not declared, return an one
361         *  element array containing the inequality term representing
362         *  the type of the port; otherwise, return an empty array.
363         *  @return An array of InequalityTerm.
364         */
365        @Override
366        public InequalityTerm[] getVariables() {
367            InequalityTerm portTerm = data.getTypeTerm();
368
369            if (portTerm.isSettable()) {
370                InequalityTerm[] variable = new InequalityTerm[1];
371                variable[0] = portTerm;
372                return variable;
373            }
374
375            return (new InequalityTerm[0]);
376        }
377    }
378        
379    ///////////////////////////////////////////////////////////////////
380    ////                         private fields                    ////
381
382    /** A mapping of DDPDataSource actor name to token. */
383    private static java.util.Map<String,ArrayToken> _tokenMap = 
384                Collections.synchronizedMap(new HashMap<String,ArrayToken>());
385    
386    /** If true, use the default type constraints set in _updateKeyValueTypes().
387     *  Otherwise, use custom type constraints defined in _customTypeConstraints().
388     */
389    private boolean _useDefaultTypeConstraints;
390
391    /** The chunk size. */
392    private int _chunkSize = 1;
393}