001/* Data input for DDP. 002 * 003 * Copyright (c) 2011-2012 The Regents of the University of California. 004 * All rights reserved. 005 * 006 * '$Author: crawl $' 007 * '$Date: 2015-12-03 18:04:07 +0000 (Thu, 03 Dec 2015) $' 008 * '$Revision: 34291 $' 009 * 010 * Permission is hereby granted, without written agreement and without 011 * license or royalty fees, to use, copy, modify, and distribute this 012 * software and its documentation for any purpose, provided that the above 013 * copyright notice and the following two paragraphs appear in all copies 014 * of this software. 015 * 016 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 017 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 018 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 019 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 020 * SUCH DAMAGE. 021 * 022 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 023 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 024 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 025 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 026 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 027 * ENHANCEMENTS, OR MODIFICATIONS. 028 * 029 */ 030package org.kepler.ddp.actor.pattern; 031 032import java.util.ArrayList; 033import java.util.Collections; 034import java.util.HashMap; 035import java.util.HashSet; 036import java.util.List; 037import java.util.Set; 038 039import org.kepler.configuration.ConfigurationProperty; 040 041import ptolemy.actor.TypedIOPort; 042import ptolemy.data.ArrayToken; 043import ptolemy.data.IntToken; 044import ptolemy.data.RecordToken; 045import ptolemy.data.StringToken; 046import ptolemy.data.Token; 047import ptolemy.data.expr.Parameter; 048import ptolemy.data.type.ArrayType; 049import ptolemy.data.type.BaseType; 050import ptolemy.data.type.MonotonicFunction; 051import ptolemy.data.type.RecordType; 052import ptolemy.data.type.Type; 053import ptolemy.graph.Inequality; 054import ptolemy.graph.InequalityTerm; 055import ptolemy.kernel.CompositeEntity; 056import ptolemy.kernel.util.Attribute; 057import ptolemy.kernel.util.IllegalActionException; 058import ptolemy.kernel.util.NameDuplicationException; 059import ptolemy.kernel.util.SingletonAttribute; 060 061/** This actor reads data from the storage system into downstream DDP 062 * pattern actors. Based on the data format in formatType, the data 063 * is partitioned or split and output as a set of key-value pairs. 064 * 065 * @author Daniel Crawl 066 * @version $Id: DDPDataSource.java 34291 2015-12-03 18:04:07Z crawl $ 067 */ 068public class DDPDataSource extends AtomicPathActor { 069 070 /** Construct a new FileDataSource in a container with a given name. */ 071 public DDPDataSource(CompositeEntity container, String name) 072 throws IllegalActionException, NameDuplicationException { 073 super(container, name); 074 075 out = new TypedIOPort(this, "out", false, true); 076 077 // set the default types for the out port 078 _keyType = BaseType.STRING; 079 _valueType = BaseType.STRING; 080 081 // add the input formats in the config file as choices 082 _addFormats("Input"); 083 084 _FORMAT_TYPE_CATEGORY = "InputFormats.Format"; 085 086 // set default format 087 _setFormat("LineInputFormat"); 088 089 data = new TypedIOPort(this, "data"); 090 data.setInput(true); 091 new SingletonAttribute(data, "_showName"); 092 093 chunkSize = new Parameter(this, "chunkSize"); 094 chunkSize.setTypeEquals(BaseType.INT); 095 chunkSize.setToken(IntToken.ONE); 096 } 097 098 /** React to a parameter change. */ 099 @Override 100 public void attributeChanged(Attribute attribute) throws IllegalActionException { 101 102 if(attribute == chunkSize) { 103 Token token = chunkSize.getToken(); 104 if(token != null) { 105 int val = ((IntToken)token).intValue(); 106 if(val < 1) { 107 throw new IllegalActionException(this, "Chunk size must be at least 1."); 108 } 109 _chunkSize = val; 110 } 111 } else { 112 super.attributeChanged(attribute); 113 } 114 } 115 116 117 /** Get the token for a DDPDataSource actor. 118 * @param name the full name of the DDPDataSource actor. 119 */ 120 public static ArrayToken getToken(String name) { 121 return _tokenMap.get(name); 122 } 123 124 /** Read the data input path and make sure it exists. */ 125 @Override 126 public void fire() throws IllegalActionException { 127 128 super.fire(); 129 130 if(data.numberOfSources() > 0) { 131 Token token = data.get(0); 132 ArrayToken arrayToken; 133 if(token instanceof ArrayToken) { 134 arrayToken = (ArrayToken) token; 135 } else { 136 arrayToken = new ArrayToken(new Token[] {token}); 137 } 138 139 if(_chunkSize > 1) { 140 141 List<ArrayToken> list = new ArrayList<ArrayToken>(); 142 List<Token> curChunk = new ArrayList<Token>(); 143 for(int i = 0 ; i < arrayToken.length(); i++) { 144 Token element = arrayToken.getElement(i); 145 curChunk.add(element); 146 if(curChunk.size() >= _chunkSize) { 147 ArrayToken curChunkArray = new ArrayToken(curChunk.toArray(new Token[curChunk.size()])); 148 list.add(curChunkArray); 149 curChunk = new ArrayList<Token>(); 150 } 151 } 152 153 // add any remaining elements 154 if(!curChunk.isEmpty()) { 155 ArrayToken curChunkArray = new ArrayToken(curChunk.toArray(new Token[curChunk.size()])); 156 list.add(curChunkArray); 157 } 158 159 RecordToken[] elements = new RecordToken[list.size()]; 160 for(int i = 0; i < list.size(); i++) { 161 elements[i] = new RecordToken(new String[] {"data", "id"}, 162 new Token[] { list.get(i), 163 new IntToken(i) }); 164 } 165 166 arrayToken = new ArrayToken(elements); 167 } 168 169 //System.out.println("array token = " + arrayToken); 170 171 _tokenMap.put(getFullName(), arrayToken); 172 } 173 174 // make sure path exists 175 176 // use asFile() so that relative paths are treated relative to 177 // directory containing workflow file. 178// final File file = path.asFile(); 179// if(!file.exists()) { 180// throw new IllegalActionException(this, "Path does not exist: " + file); 181// } 182 183 } 184 185 /** Make sure input is either data or file, but not both. */ 186 @Override 187 public void preinitialize() throws IllegalActionException { 188 189 super.preinitialize(); 190 191 boolean dataIsConnected = (data.numberOfSources() > 0); 192 boolean pathIsConnected = (path.getPort().numberOfSources() > 0); 193 194 Token pathToken = path.getToken(); 195 196 if(dataIsConnected && (pathIsConnected || 197 (pathToken != null && !((StringToken)pathToken).stringValue().isEmpty()))) { 198 throw new IllegalActionException(this, 199 "The data port and path port/parameter cannot be used at the same time.\n" + 200 "Either disconnect the data port, or disconnect the path port and clear the\n" + 201 "path parameter."); 202 } 203 204 if(dataIsConnected) { 205 formatType.setToken("TokenInputFormat"); 206 } else if(_formatTypeStr.equals("TokenInputFormat")) { 207 throw new IllegalActionException(this, "TokenInputFormat can only be used " + 208 "if the data port is connected."); 209 } 210 211 if(_chunkSize > 1 && pathIsConnected) { 212 throw new IllegalActionException(this, "The chunk size must be 1 " + 213 " when using the path port."); 214 } 215 } 216 217 /** Remove any token stored for this actor. */ 218 @Override 219 public void wrapup() throws IllegalActionException { 220 221 super.wrapup(); 222 223 _tokenMap.remove(getFullName()); 224 } 225 226 /////////////////////////////////////////////////////////////////// 227 //// public fields //// 228 229 /** Data output. */ 230 public TypedIOPort out; 231 232 /** An input token. */ 233 public TypedIOPort data; 234 235 /** The chunk size for token data. If greater than 1, then the input 236 * data array is split into arrays equal to or smaller than this size. 237 */ 238 public Parameter chunkSize; 239 240 /////////////////////////////////////////////////////////////////// 241 //// protected methods //// 242 243 /** Update the key and value types. */ 244 @Override 245 protected void _updateKeyValueTypes() { 246 super._updateKeyValueTypes(); 247 Type type = Types.createKeyValueArrayType(_keyType, _valueType); 248 out.setTypeEquals(type); 249 } 250 251 /** Set the key and value types from the types in the configuration property. */ 252 @Override 253 protected void _setTypesFromConfiguration(ConfigurationProperty formatProperty) 254 throws IllegalActionException { 255 256 //there is no formatProperty found for the key/value type, try to use it directly 257 if (formatProperty == null) { 258 String typesStr = keyValueTypes.stringValue(); 259 if(typesStr.isEmpty()) { 260 throw new IllegalActionException(this, "Parameter keyValueTypes has to be set if third party class is set for parameter formatType."); 261 } else { 262 out.setTypeEquals(Types.getKeyValueType(keyValueTypes, typesStr)); 263 } 264 } 265 else if(formatProperty.getProperty("Name").getValue().equals("TokenInputFormat")) { 266 _useDefaultTypeConstraints = false; 267 out.setTypeEquals(BaseType.UNKNOWN); 268 } else { 269 super._setTypesFromConfiguration(formatProperty); 270 } 271 } 272 273 /** If using the default type constraints, return the custom 274 * type constraints from the parent class. Otherwise, return 275 * a constraint on the out port using the DataPortFunction class. 276 */ 277 @Override 278 protected Set<Inequality> _customTypeConstraints() { 279 280 if(_useDefaultTypeConstraints) { 281 return super._customTypeConstraints(); 282 } 283 284 // set the constraints between record fields and output ports 285 Set<Inequality> constraints = new HashSet<Inequality>(); 286 287 Inequality inequality = new Inequality(new DataPortFunction(), 288 out.getTypeTerm()); 289 constraints.add(inequality); 290 291 return constraints; 292 } 293 294 /** If using the default type constraints, return the default type 295 * constraints from the parent class. Otherwise, return null. 296 */ 297 @Override 298 protected Set<Inequality> _defaultTypeConstraints() { 299 if(_useDefaultTypeConstraints) { 300 return super._defaultTypeConstraints(); 301 } 302 return null; 303 } 304 305 /////////////////////////////////////////////////////////////////// 306 //// inner classes //// 307 308 /** A MonotonicFunction for setting the output port types. */ 309 private class DataPortFunction extends MonotonicFunction { 310 311 /////////////////////////////////////////////////////////////// 312 //// public inner methods //// 313 314 /** Return the function result. 315 * @return A Type. 316 */ 317 @Override 318 public Object getValue() throws IllegalActionException { 319 320 final Type dataPortType = data.getType(); 321 Type retval = null; 322 323 if (dataPortType == BaseType.UNKNOWN) { 324 retval = BaseType.UNKNOWN; 325 } else { 326 Type valueType; 327 328 if (dataPortType instanceof ArrayType) { 329 valueType = ((ArrayType)dataPortType).getElementType(); 330 } else { 331 valueType = dataPortType; 332 } 333 334 // if the chunk size is greater than one, the construct 335 // the record type containing the array of chunks and id. 336 if(_chunkSize > 1) { 337 valueType = new RecordType( 338 new String[] {"data", "id"}, 339 new Type[] {new ArrayType(valueType), BaseType.INT}); 340 } 341 342 retval = Types.createKeyValueArrayType(BaseType.NIL, valueType); 343 } 344 345 return retval; 346 } 347 348 /** Return an additional string describing the current value 349 * of this function. 350 */ 351 @Override 352 public String getVerboseString() { 353 if (!(data.getType() instanceof ArrayType)) { 354 return "Data is not an array"; 355 } 356 return null; 357 } 358 359 /** Return the type variable in this inequality term. If the 360 * type of the input port is not declared, return an one 361 * element array containing the inequality term representing 362 * the type of the port; otherwise, return an empty array. 363 * @return An array of InequalityTerm. 364 */ 365 @Override 366 public InequalityTerm[] getVariables() { 367 InequalityTerm portTerm = data.getTypeTerm(); 368 369 if (portTerm.isSettable()) { 370 InequalityTerm[] variable = new InequalityTerm[1]; 371 variable[0] = portTerm; 372 return variable; 373 } 374 375 return (new InequalityTerm[0]); 376 } 377 } 378 379 /////////////////////////////////////////////////////////////////// 380 //// private fields //// 381 382 /** A mapping of DDPDataSource actor name to token. */ 383 private static java.util.Map<String,ArrayToken> _tokenMap = 384 Collections.synchronizedMap(new HashMap<String,ArrayToken>()); 385 386 /** If true, use the default type constraints set in _updateKeyValueTypes(). 387 * Otherwise, use custom type constraints defined in _customTypeConstraints(). 388 */ 389 private boolean _useDefaultTypeConstraints; 390 391 /** The chunk size. */ 392 private int _chunkSize = 1; 393}