001/* 002 * Copyright (c) 2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: welker $' 006 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 007 * '$Revision: 24234 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030/* CPES Actor for processing a stream of strings, 031 waiting for a specific termination element Nth occurence 032 and emit the termination element only that time. 033 Do not use in SDF! 034 */ 035/** 036 * '$RCSfile$' 037 * 038 * '$Author: welker $' 039 * '$Date: 2010-05-06 05:21:26 +0000 (Thu, 06 May 2010) $' 040 * '$Revision: 24234 $' 041 * 042 * For Details: http://www.kepler-project.org 043 * 044 * Copyright (c) 2004 The Regents of the University of California. 045 * All rights reserved. 046 * 047 * Permission is hereby granted, without written agreement and without 048 * license or royalty fees, to use, copy, modify, and distribute this 049 * software and its documentation for any purpose, provided that the 050 * above copyright notice and the following two paragraphs appear in 051 * all copies of this software. 052 * 053 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 054 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 055 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN 056 * IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY 057 * OF SUCH DAMAGE. 058 * 059 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 060 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 061 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 062 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY 063 * OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, 064 * UPDATES, ENHANCEMENTS, OR MODIFICATIONS. 065 */ 066 067package org.sdm.spa; 068 069import java.util.Iterator; 070import java.util.LinkedList; 071import java.util.List; 072import java.util.Set; 073 074import org.apache.commons.logging.Log; 075import org.apache.commons.logging.LogFactory; 076 077import ptolemy.actor.TypedAtomicActor; 078import ptolemy.actor.TypedIOPort; 079import ptolemy.data.BooleanToken; 080import ptolemy.data.IntToken; 081import ptolemy.data.RecordToken; 082import ptolemy.data.Token; 083import ptolemy.data.expr.Parameter; 084import ptolemy.data.type.BaseType; 085import ptolemy.data.type.MonotonicFunction; 086import ptolemy.data.type.RecordType; 087import ptolemy.data.type.Type; 088import ptolemy.graph.Inequality; 089import ptolemy.graph.InequalityTerm; 090import ptolemy.kernel.CompositeEntity; 091import ptolemy.kernel.util.Attribute; 092import ptolemy.kernel.util.IllegalActionException; 093import ptolemy.kernel.util.NameDuplicationException; 094 095////////////////////////////////////////////////////////////////////////// 096//// SyncOnTerminator 097 098/** 099 * <p> 100 * Pass on a stream of tokens except for a specific element (termination) and 101 * emit the termination when it is found in the stream 102 * <i>NumberOfOccurences</i>th times.<br/> 103 * The input should be a stream of tokens of a given type that matches the type 104 * of the terminator parameter of this actor. 105 * </p> 106 * 107 * <p> 108 * Input tokens are passed on as long as the termination element is found as 109 * many times as specified. The termination element is not passed on until the 110 * <i>NumberOfOccurences</i>th occurence. After the termination element is 111 * emitted, incoming tokens will raise an Exception. 112 * </p> 113 * 114 * <p> 115 * This actor can be used in the following stream processing scenario. A stream, 116 * which has a termination token, is split and elements are processed in 117 * parallel and then merged non-deterministically again. In such a case, if the 118 * termination token is routed on only one branch, it can overcome other tokens 119 * at the merge. So the termination token should be routed on ALL branches and 120 * then after the merge this actor can help to wait for the last termination 121 * token and thus ensuring that the termination token is the very last token in 122 * the stream. 123 * </p> 124 * 125 * <p> 126 * Note that for record types, you do not need to specify all fields within the 127 * terminator. E.g. if you have a stream with {name/string, value=int}, you can 128 * define the terminator, for instance, as {name="T"}. 129 * </p> 130 * 131 * <p> 132 * If the flag <i>discardOthers</i> is set, the input tokens are NOT emitted on 133 * output. That is, there is only one output, the termination token on its 134 * <i>NumberOfOccurences</i>th occurence. 135 * </p> 136 * 137 * <p> 138 * The actor outputs the input tokens. 139 * </p> 140 * 141 * <p> 142 * This actor is not always producing a token for an input (namely for the 143 * termination tokens), thus, it cannot be used in SDF. Since you do not have 144 * parallel stream processing under SDF, this can hardly be a problem. 145 * </p> 146 * 147 * @author Norbert Podhorszki 148 * @version $Id: SyncOnTerminator.java 24234 2010-05-06 05:21:26Z welker $ 149 * @since Ptolemy II 6.0.2 150 */ 151public class SyncOnTerminator extends TypedAtomicActor { 152 /** 153 * Construct an actor with the given container and name. 154 * 155 * @param container 156 * The container. 157 * @param name 158 * The name of this actor. 159 * @exception IllegalActionException 160 * If the actor cannot be contained by the proposed 161 * container. 162 * @exception NameDuplicationException 163 * If the container already has an actor with this name. 164 */ 165 public SyncOnTerminator(CompositeEntity container, String name) 166 throws NameDuplicationException, IllegalActionException { 167 super(container, name); 168 169 /* 170 * Input ports and port parameters 171 */ 172 173 // input 174 input = new TypedIOPort(this, "input", true, false); 175 new Parameter(input, "_showName", BooleanToken.FALSE); 176 177 // The terminator element to wait for 178 terminator = new Parameter(this, "terminator"); 179 180 // Number of occurences of the terminator element to wait for 181 numberOfOccurences = new Parameter(this, "numberOfOccurences", 182 new IntToken(1)); 183 numberOfOccurences.setTypeEquals(BaseType.INT); 184 185 // Flag to indicate to discarding all non-terminator inputs 186 discardOthers = new Parameter(this, "discardOthers", BooleanToken.FALSE); 187 discardOthers.setTypeEquals(BaseType.BOOLEAN); 188 189 /* 190 * Output ports 191 */ 192 193 // file name 194 output = new TypedIOPort(this, "output", false, true); 195 new Parameter(output, "_showName", BooleanToken.FALSE); 196 197 } 198 199 /*********************************************************** 200 * ports and parameters 201 */ 202 203 /** 204 * Input token. The type should match the terminator's type. 205 */ 206 public TypedIOPort input; 207 208 /** 209 * The terminator element to wait for. 210 * 211 */ 212 public Parameter terminator; 213 214 /** 215 * The number of occurences of the terminator in the stream. The terminator 216 * will be emitted only at the last occurence. 217 */ 218 public Parameter numberOfOccurences; 219 220 /** 221 * A flag to indicate whether non-terminator tokens should be passed on or 222 * discarded. If set, only one token will be ever emitted, name the 223 * terminator at the time of its last occurence. 224 */ 225 public Parameter discardOthers; 226 227 /** 228 * The output token, which is always the input token. 229 */ 230 public TypedIOPort output; 231 232 /*********************************************************** 233 * public methods 234 */ 235 236 /** 237 * initialize() runs once before first exec 238 * 239 * @exception IllegalActionException 240 * If the parent class throws it. 241 */ 242 public void initialize() throws IllegalActionException { 243 super.initialize(); 244 _occured = 0; 245 } 246 247 /** 248 * Override the base class to determine which function is being specified. 249 * 250 * @param attribute 251 * The attribute that changed. 252 * @exception IllegalActionException 253 * If the function is not recognized. 254 */ 255 public void attributeChanged(Attribute attribute) 256 throws IllegalActionException { 257 if (attribute == discardOthers) { 258 _discard = ((BooleanToken) discardOthers.getToken()).booleanValue(); 259 if (isDebugging) 260 log.debug("Changed attribute discardOthers to: " + _discard); 261 } else if (attribute == numberOfOccurences) { 262 _occurence = ((IntToken) numberOfOccurences.getToken()).intValue(); 263 if (isDebugging) 264 log.debug("Changed attribute numberOfOccurences to: " 265 + _occurence); 266 } else if (attribute == terminator) { 267 _terminator = (Token) terminator.getToken(); 268 } else { 269 super.attributeChanged(attribute); 270 } 271 } 272 273 /** 274 * fire 275 * 276 * @exception IllegalActionException 277 */ 278 public void fire() throws IllegalActionException { 279 super.fire(); 280 281 Token _input = input.get(0); 282 283 // error check: what if already terminated? 284 if (_occured >= _occurence) { 285 throw new IllegalActionException( 286 this.getName() 287 + " : A token has arrived on input after the last occurence of the terminator. Max occurence = " 288 + _occurence + "; occured = " + _occured); 289 290 } 291 292 if (_isTerminator(_input)) { 293 _occured++; 294 if (_occured >= _occurence) 295 output.send(0, _input); // emit the terminator finally 296 297 } else if (!_discard) { 298 output.send(0, _input); // pass-on the non-terminator 299 } 300 301 } 302 303 /** 304 * Return the type constraints of this actor. The type constraints are (a) 305 * the input port type is at most the type of the terminator parameter's 306 * type; (b) in case of record types, the type of the output port equals to 307 * the type of the input port; (c) for all other types, the output port type 308 * is at least the terminator parameter's type; 309 * 310 * This allows a terminator specification of {name="T"} and still passing 311 * {name=string, date=long,...} records through the actor and the type of 312 * the output port will be the longer record. 313 * 314 * @return a list of Inequality. 315 */ 316 317 public List typeConstraintList() { 318 List constraints = new LinkedList(); 319 320 _type = terminator.getType(); 321 322 // I. non-record types 323 if (!(_type instanceof RecordType)) { 324 input.setTypeAtMost(_type); 325 output.setTypeEquals(_type); 326 if (isDebugging) 327 log.debug("typeConstraintList(). Type = " + _type 328 + " input type = " + input.getType() 329 + " output type = " + output.getType()); 330 return constraints; 331 } 332 333 // II. record types 334 335 // ensure that input has the labels of the parameter record 336 // i.e. can be a subtype, with more labels than the parameter 337 input.setTypeAtMost(_type); 338 339 // Declare that output has all the fields of the input port 340 Inequality inequality = new Inequality(new FunctionTerm(), output 341 .getTypeTerm()); 342 constraints.add(inequality); 343 344 if (isDebugging) 345 log.debug("typeConstraintList(). Type = " + _type 346 + " input type = " + input.getType() + " output type = " 347 + output.getType()); 348 return constraints; 349 } 350 351 /** 352 * Check if the incoming token is the terminator. For basic types, the 353 * tokens should be equal by the Token.equals() method. For record types, 354 * the terminator's all labels should exist in input, and their values 355 * should be equal (i.e. input record fully contains the terminator). 356 */ 357 private boolean _isTerminator(Token token) { 358 // all types except records 359 if (!(_type instanceof RecordType)) 360 return _terminator.equals(token); 361 362 // check records 363 Set terminatorLabelSet = ((RecordToken) _terminator).labelSet(); 364 Set tokenLabelSet = ((RecordToken) token).labelSet(); 365 Iterator iterator = terminatorLabelSet.iterator(); 366 367 while (iterator.hasNext()) { 368 String label = (String) iterator.next(); 369 370 // check that the label is in input 371 if (!tokenLabelSet.contains(label)) 372 return false; 373 374 // check the values 375 Token token1 = ((RecordToken) _terminator).get(label); 376 Token token2 = ((RecordToken) token).get(label); 377 378 if (!token1.equals(token2)) 379 return false; 380 } 381 return true; // _terminator record is fully contained in input 382 } 383 384 private int _occurence; // the max # of occurence of terminator 385 private int _occured = 0; // the # of occurence of terminator so far 386 private boolean _discard; // discard all non-terminator? 387 private Token _terminator; // the terminator 388 private Type _type; // the type of the terminator token 389 390 private static final Log log = LogFactory.getLog(SyncOnTerminator.class 391 .getName()); 392 private static final boolean isDebugging = log.isDebugEnabled(); 393 394 // ///////////////////////////////////////////////////////////////// 395 // // inner classes //// 396 // This class implements a monotonic function of the input port 397 // types. The value of the function is the record type of the 398 // input record. To ensure that this function is monotonic, the 399 // value of the function is bottom if the type of the port with 400 // name "input" is bottom. Otherwise (it must be a record), the 401 // value of the function is that record type. 402 403 private class FunctionTerm extends MonotonicFunction { 404 // ///////////////////////////////////////////////////////////// 405 // // public inner methods //// 406 407 /** 408 * Return the function result. 409 * 410 * @return A Type. 411 */ 412 public Object getValue() { 413 Type inputType = input.getType(); 414 415 if (isDebugging) 416 log.debug("FunctionTerm.getValue called. input type=" 417 + inputType); 418 419 if (!(inputType instanceof RecordType)) { 420 return BaseType.UNKNOWN; 421 } 422 423 return (RecordType) inputType; 424 425 } 426 427 /** 428 * Return all the InequalityTerms for all input ports in an array. 429 * 430 * @return An array of InequalityTerm. 431 */ 432 public InequalityTerm[] getVariables() { 433 InequalityTerm[] variables = new InequalityTerm[1]; 434 variables[0] = input.getTypeTerm(); 435 return variables; 436 } 437 } 438 439}