001/* An actor that implements a queue of events. 002 003 Copyright (c) 1998-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.domains.de.lib; 029 030import ptolemy.actor.TypedIOPort; 031import ptolemy.actor.lib.Transformer; 032import ptolemy.actor.util.FIFOQueue; 033import ptolemy.data.BooleanToken; 034import ptolemy.data.IntToken; 035import ptolemy.data.Token; 036import ptolemy.data.expr.Parameter; 037import ptolemy.data.type.BaseType; 038import ptolemy.kernel.CompositeEntity; 039import ptolemy.kernel.util.Attribute; 040import ptolemy.kernel.util.IllegalActionException; 041import ptolemy.kernel.util.NameDuplicationException; 042import ptolemy.kernel.util.StringAttribute; 043import ptolemy.kernel.util.Workspace; 044 045/////////////////////////////////////////////////////////////////// 046//// Queue 047 048/** 049 This actor implements a queue. When a token is received on the 050 <i>input</i> port, it is stored in the queue. 051 When the <i>trigger</i> port receives a token, the oldest element in the 052 queue is produced on the output. If there is no element in the queue when a 053 token is received on the <i>trigger</i> port, then no output is 054 produced. In this circumstance, if <i>persistentTrigger</i> is true 055 then the next time an input is received, it is sent immediately to 056 the output. 057 <p> 058 The inputs can be of any token type, and the output 059 is constrained to be of a type at least that of the input. If 060 the <i>capacity</i> parameter is negative or zero (the default), 061 then the capacity is infinite. Otherwise, the capacity is 062 given by that parameter, and inputs received when the queue 063 is full are discarded. Whenever the size of the queue changes, 064 the new size is produced on the <i>size</i> output port. 065 If an input arrives at the same time that an output is 066 produced, then the <i>size</i> port gets two events at 067 the same time. 068 069 @author Steve Neuendorffer and Edward A. Lee 070 @version $Id$ 071 @since Ptolemy II 2.0 072 @Pt.ProposedRating Yellow (eal) 073 @Pt.AcceptedRating Yellow (eal) 074 */ 075public class Queue extends Transformer { 076 /** Construct an actor with the given container and name. 077 * @param container The container. 078 * @param name The name of this actor. 079 * @exception IllegalActionException If the actor cannot be contained 080 * by the proposed container. 081 * @exception NameDuplicationException If the container already has an 082 * actor with this name. 083 */ 084 public Queue(CompositeEntity container, String name) 085 throws NameDuplicationException, IllegalActionException { 086 super(container, name); 087 output.setTypeAtLeast(input); 088 089 trigger = new TypedIOPort(this, "trigger", true, false); 090 trigger.setMultiport(true); 091 // Leave trigger type undeclared. 092 // Put it at the bottom of the icon by default. 093 StringAttribute cardinality = new StringAttribute(trigger, "_cardinal"); 094 cardinality.setExpression("SOUTH"); 095 096 size = new TypedIOPort(this, "size", false, true); 097 size.setTypeEquals(BaseType.INT); 098 // Put it at the bottom of the icon by default. 099 cardinality = new StringAttribute(size, "_cardinal"); 100 cardinality.setExpression("SOUTH"); 101 102 _queue = new FIFOQueue(); 103 104 capacity = new Parameter(this, "capacity"); 105 capacity.setTypeEquals(BaseType.INT); 106 capacity.setExpression("0"); 107 108 persistentTrigger = new Parameter(this, "persistentTrigger"); 109 persistentTrigger.setTypeEquals(BaseType.BOOLEAN); 110 persistentTrigger.setExpression("false"); 111 } 112 113 /////////////////////////////////////////////////////////////////// 114 //// ports and parameters //// 115 116 /** The capacity of the queue. If the value is positive, then 117 * it specifies the capacity of the queue. If it is negative 118 * or 0, then it specifies that the capacity is infinite. 119 * This is an integer with default 0. 120 */ 121 public Parameter capacity; 122 123 /** If set to true, then if a <i>trigger</i> arrives when the 124 * queue is empty, it is remembered, and the next time an 125 * <i>input</i> arrives, it is sent immediately to the output. 126 * This is a boolean with default false. 127 */ 128 public Parameter persistentTrigger; 129 130 /** The current size of the queue. This port produces an output 131 * whenever the size changes. It has type int. 132 */ 133 public TypedIOPort size; 134 135 /** The trigger port, which has undeclared type. If this port 136 * receives a token, then the oldest token in the queue 137 * will be emitted on the <i>output</i> port. 138 */ 139 public TypedIOPort trigger; 140 141 /////////////////////////////////////////////////////////////////// 142 //// public methods //// 143 144 /** React to a change in an attribute. If the attribute is 145 * <i>capacity</i>, then change the capacity of the queue. 146 * If the size of the queue currently exceeds the specified 147 * capacity, then throw an exception. 148 * @param attribute The attribute that changed. 149 * @exception IllegalActionException If the current size 150 * of the queue exceeds the specified capacity. 151 */ 152 @Override 153 public void attributeChanged(Attribute attribute) 154 throws IllegalActionException { 155 if (attribute == capacity) { 156 int newCapacity = ((IntToken) capacity.getToken()).intValue(); 157 if (newCapacity <= 0) { 158 if (_queue.getCapacity() != FIFOQueue.INFINITE_CAPACITY) { 159 _queue.setCapacity(FIFOQueue.INFINITE_CAPACITY); 160 } 161 } else { 162 if (newCapacity < _queue.size()) { 163 throw new IllegalActionException(this, 164 "Queue size (" + _queue.size() 165 + ") exceed requested capacity " 166 + newCapacity + ")."); 167 } 168 _queue.setCapacity(newCapacity); 169 } 170 } else { 171 super.attributeChanged(attribute); 172 } 173 } 174 175 /** Clone the actor into the specified workspace. This calls the 176 * base class and then sets the ports. 177 * @param workspace The workspace for the new object. 178 * @return A new actor. 179 * @exception CloneNotSupportedException If a derived class has 180 * has an attribute that cannot be cloned. 181 */ 182 @Override 183 public Object clone(Workspace workspace) throws CloneNotSupportedException { 184 Queue newObject = (Queue) super.clone(workspace); 185 newObject._queue = new FIFOQueue(); 186 newObject.output.setTypeAtLeast(newObject.input); 187 return newObject; 188 } 189 190 /** Declare that the <i>output</i> 191 * does not depend on the <i>input</i> in a firing. 192 * @exception IllegalActionException If the causality interface 193 * cannot be computed. 194 * @see #getCausalityInterface() 195 */ 196 @Override 197 public void declareDelayDependency() throws IllegalActionException { 198 _declareDelayDependency(input, output, 0.0); 199 } 200 201 /** Put a new input token on the queue and/or produce output 202 * data from the queue. Specifically, if there is a new token 203 * on the <i>input</i> port, then put it on the input queue. 204 * Then, if there is a token in the <i>trigger</i> port 205 * and the queue is not empty, then 206 * send the oldest token on the queue to the <i>output</i> port. 207 * Send the resulting queue size to the output <i>size</i> output port. 208 * @exception IllegalActionException If getting tokens from input and 209 * trigger ports or sending token to output throws it. 210 */ 211 @Override 212 public void fire() throws IllegalActionException { 213 super.fire(); 214 int sizeOutput = _queue.size(); 215 boolean gotTrigger = false; 216 for (int i = 0; i < trigger.getWidth(); i++) { 217 if (trigger.hasToken(i)) { 218 // Consume the trigger token. 219 trigger.get(i); 220 gotTrigger = true; 221 } 222 } 223 // Increment the size only either the queue has infinite capacity, 224 // the capacity is greater than the current size, or a trigger 225 // input was received (which will reduce the queue size by one, 226 // making room for a new token). 227 if (input.hasToken(0)) { 228 _token = input.get(0); 229 if (_queue.getCapacity() == FIFOQueue.INFINITE_CAPACITY 230 || _queue.getCapacity() > _queue.size() || gotTrigger) { 231 sizeOutput++; 232 } 233 } else { 234 _token = null; 235 } 236 if (gotTrigger) { 237 if (sizeOutput > 0) { 238 // If there is no token on the queue, 239 // then send out the currently read token. 240 if (_queue.size() == 0) { 241 output.send(0, _token); 242 _token = null; 243 } else { 244 output.send(0, (Token) _queue.get(0)); 245 _removeTokens = 1; 246 } 247 sizeOutput--; 248 _persistentTrigger = false; 249 } else { 250 if (((BooleanToken) persistentTrigger.getToken()) 251 .booleanValue()) { 252 _persistentTrigger = true; 253 } 254 } 255 } else { 256 // If the queue was previously empty and 257 // persistent trigger is set, and there is an 258 // input, then produce the current input as output. 259 if (_persistentTrigger && _token != null) { 260 output.send(0, _token); 261 sizeOutput--; 262 _token = null; 263 _persistentTrigger = false; 264 } 265 } 266 size.send(0, new IntToken(sizeOutput)); 267 } 268 269 /** Clear the cached input tokens. 270 * @exception IllegalActionException If there is no director. 271 */ 272 @Override 273 public void initialize() throws IllegalActionException { 274 _queue.clear(); 275 _persistentTrigger = false; 276 _token = null; 277 _removeTokens = 0; 278 super.initialize(); 279 } 280 281 /** Commit additions or removals from the queue. 282 * @return True. 283 * @exception IllegalActionException If the superclass throws it. 284 */ 285 @Override 286 public boolean postfire() throws IllegalActionException { 287 if (_token != null) { 288 _queue.put(_token); 289 } 290 for (int i = 0; i < _removeTokens; i++) { 291 _queue.take(); 292 } 293 _token = null; 294 _removeTokens = 0; 295 296 return super.postfire(); 297 } 298 299 /** If there is no input on the <i>trigger</i> port, return 300 * false, indicating that this actor does not want to fire. 301 * This has the effect of leaving input values in the input 302 * ports, if there are any. 303 * @exception IllegalActionException If there is no director. 304 */ 305 @Override 306 public boolean prefire() throws IllegalActionException { 307 // If the trigger input is not connected, never fire. 308 boolean hasInput = false; 309 boolean hasTrigger = false; 310 311 if (input.isOutsideConnected()) { 312 hasInput = input.hasToken(0); 313 } 314 315 if (trigger.isOutsideConnected()) { 316 hasTrigger = trigger.hasToken(0); 317 } 318 319 return hasInput || hasTrigger; 320 } 321 322 /** Clear the queue tokens. 323 * @exception IllegalActionException If the superclass throws it. 324 */ 325 @Override 326 public void wrapup() throws IllegalActionException { 327 // If we don't clear the queue, then you can't set the capacity 328 // to smaller than the final size on the last run. So we 329 // need to either clear the queue or somehow allow that change 330 // in capacity. 331 _queue.clear(); 332 super.wrapup(); 333 } 334 335 /////////////////////////////////////////////////////////////////// 336 //// protected variables //// 337 338 /** The FIFOQueue. */ 339 protected FIFOQueue _queue; 340 341 /** The number of tokens that should be removed from the queue in 342 * postfire(). 343 */ 344 protected int _removeTokens; 345 346 /** Token received in the fire() method for inclusion in 347 * the queue in the postfire() method. 348 */ 349 protected Token _token; 350 351 /////////////////////////////////////////////////////////////////// 352 //// private variables //// 353 354 /** An indicator of whether a trigger token was received in 355 * the last fire() method invocation. 356 */ 357 private boolean _persistentTrigger; 358}