001/* An actor that implements a queue of events.
002
003 Copyright (c) 1998-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.domains.de.lib;
029
030import ptolemy.actor.TypedIOPort;
031import ptolemy.actor.lib.Transformer;
032import ptolemy.actor.util.FIFOQueue;
033import ptolemy.data.BooleanToken;
034import ptolemy.data.IntToken;
035import ptolemy.data.Token;
036import ptolemy.data.expr.Parameter;
037import ptolemy.data.type.BaseType;
038import ptolemy.kernel.CompositeEntity;
039import ptolemy.kernel.util.Attribute;
040import ptolemy.kernel.util.IllegalActionException;
041import ptolemy.kernel.util.NameDuplicationException;
042import ptolemy.kernel.util.StringAttribute;
043import ptolemy.kernel.util.Workspace;
044
045///////////////////////////////////////////////////////////////////
046//// Queue
047
048/**
049 This actor implements a queue.  When a token is received on the
050 <i>input</i> port, it is stored in the queue.
051 When the <i>trigger</i> port receives a token, the oldest element in the
052 queue is produced on the output.  If there is no element in the queue when a
053 token is received on the <i>trigger</i> port, then no output is
054 produced. In this circumstance, if <i>persistentTrigger</i> is true
055 then the next time an input is received, it is sent immediately to
056 the output.
057 <p>
058 The inputs can be of any token type, and the output
059 is constrained to be of a type at least that of the input. If
060 the <i>capacity</i> parameter is negative or zero (the default),
061 then the capacity is infinite. Otherwise, the capacity is
062 given by that parameter, and inputs received when the queue
063 is full are discarded. Whenever the size of the queue changes,
064 the new size is produced on the <i>size</i> output port.
065 If an input arrives at the same time that an output is
066 produced, then the <i>size</i> port gets two events at
067 the same time.
068
069 @author Steve Neuendorffer and Edward A. Lee
070 @version $Id$
071 @since Ptolemy II 2.0
072 @Pt.ProposedRating Yellow (eal)
073 @Pt.AcceptedRating Yellow (eal)
074 */
075public class Queue extends Transformer {
076    /** Construct an actor with the given container and name.
077     *  @param container The container.
078     *  @param name The name of this actor.
079     *  @exception IllegalActionException If the actor cannot be contained
080     *   by the proposed container.
081     *  @exception NameDuplicationException If the container already has an
082     *   actor with this name.
083     */
084    public Queue(CompositeEntity container, String name)
085            throws NameDuplicationException, IllegalActionException {
086        super(container, name);
087        output.setTypeAtLeast(input);
088
089        trigger = new TypedIOPort(this, "trigger", true, false);
090        trigger.setMultiport(true);
091        // Leave trigger type undeclared.
092        // Put it at the bottom of the icon by default.
093        StringAttribute cardinality = new StringAttribute(trigger, "_cardinal");
094        cardinality.setExpression("SOUTH");
095
096        size = new TypedIOPort(this, "size", false, true);
097        size.setTypeEquals(BaseType.INT);
098        // Put it at the bottom of the icon by default.
099        cardinality = new StringAttribute(size, "_cardinal");
100        cardinality.setExpression("SOUTH");
101
102        _queue = new FIFOQueue();
103
104        capacity = new Parameter(this, "capacity");
105        capacity.setTypeEquals(BaseType.INT);
106        capacity.setExpression("0");
107
108        persistentTrigger = new Parameter(this, "persistentTrigger");
109        persistentTrigger.setTypeEquals(BaseType.BOOLEAN);
110        persistentTrigger.setExpression("false");
111    }
112
113    ///////////////////////////////////////////////////////////////////
114    ////                     ports and parameters                  ////
115
116    /** The capacity of the queue. If the value is positive, then
117     *  it specifies the capacity of the queue. If it is negative
118     *  or 0, then it specifies that the capacity is infinite.
119     *  This is an integer with default 0.
120     */
121    public Parameter capacity;
122
123    /** If set to true, then if a <i>trigger</i> arrives when the
124     *  queue is empty, it is remembered, and the next time an
125     *  <i>input</i> arrives, it is sent immediately to the output.
126     *  This is a boolean with default false.
127     */
128    public Parameter persistentTrigger;
129
130    /** The current size of the queue. This port produces an output
131     *  whenever the size changes. It has type int.
132     */
133    public TypedIOPort size;
134
135    /** The trigger port, which has undeclared type. If this port
136     *  receives a token, then the oldest token in the queue
137     *  will be emitted on the <i>output</i> port.
138     */
139    public TypedIOPort trigger;
140
141    ///////////////////////////////////////////////////////////////////
142    ////                         public methods                    ////
143
144    /** React to a change in an attribute.  If the attribute is
145     *  <i>capacity</i>, then change the capacity of the queue.
146     *  If the size of the queue currently exceeds the specified
147     *  capacity, then throw an exception.
148     *  @param attribute The attribute that changed.
149     *  @exception IllegalActionException If the current size
150     *   of the queue exceeds the specified capacity.
151     */
152    @Override
153    public void attributeChanged(Attribute attribute)
154            throws IllegalActionException {
155        if (attribute == capacity) {
156            int newCapacity = ((IntToken) capacity.getToken()).intValue();
157            if (newCapacity <= 0) {
158                if (_queue.getCapacity() != FIFOQueue.INFINITE_CAPACITY) {
159                    _queue.setCapacity(FIFOQueue.INFINITE_CAPACITY);
160                }
161            } else {
162                if (newCapacity < _queue.size()) {
163                    throw new IllegalActionException(this,
164                            "Queue size (" + _queue.size()
165                                    + ") exceed requested capacity "
166                                    + newCapacity + ").");
167                }
168                _queue.setCapacity(newCapacity);
169            }
170        } else {
171            super.attributeChanged(attribute);
172        }
173    }
174
175    /** Clone the actor into the specified workspace. This calls the
176     *  base class and then sets the ports.
177     *  @param workspace The workspace for the new object.
178     *  @return A new actor.
179     *  @exception CloneNotSupportedException If a derived class has
180     *   has an attribute that cannot be cloned.
181     */
182    @Override
183    public Object clone(Workspace workspace) throws CloneNotSupportedException {
184        Queue newObject = (Queue) super.clone(workspace);
185        newObject._queue = new FIFOQueue();
186        newObject.output.setTypeAtLeast(newObject.input);
187        return newObject;
188    }
189
190    /** Declare that the <i>output</i>
191     *  does not depend on the <i>input</i> in a firing.
192     *  @exception IllegalActionException If the causality interface
193     *  cannot be computed.
194     *  @see #getCausalityInterface()
195     */
196    @Override
197    public void declareDelayDependency() throws IllegalActionException {
198        _declareDelayDependency(input, output, 0.0);
199    }
200
201    /** Put a new input token on the queue and/or produce output
202     *  data from the queue.  Specifically, if there is a new token
203     *  on the <i>input</i> port, then put it on the input queue.
204     *  Then, if there is a token in the <i>trigger</i> port
205     *  and the queue is not empty, then
206     *  send the oldest token on the queue to the <i>output</i> port.
207     *  Send the resulting queue size to the output <i>size</i> output port.
208     *  @exception IllegalActionException If getting tokens from input and
209     *   trigger ports or sending token to output throws it.
210     */
211    @Override
212    public void fire() throws IllegalActionException {
213        super.fire();
214        int sizeOutput = _queue.size();
215        boolean gotTrigger = false;
216        for (int i = 0; i < trigger.getWidth(); i++) {
217            if (trigger.hasToken(i)) {
218                // Consume the trigger token.
219                trigger.get(i);
220                gotTrigger = true;
221            }
222        }
223        // Increment the size only either the queue has infinite capacity,
224        // the capacity is greater than the current size, or a trigger
225        // input was received (which will reduce the queue size by one,
226        // making room for a new token).
227        if (input.hasToken(0)) {
228            _token = input.get(0);
229            if (_queue.getCapacity() == FIFOQueue.INFINITE_CAPACITY
230                    || _queue.getCapacity() > _queue.size() || gotTrigger) {
231                sizeOutput++;
232            }
233        } else {
234            _token = null;
235        }
236        if (gotTrigger) {
237            if (sizeOutput > 0) {
238                // If there is no token on the queue,
239                // then send out the currently read token.
240                if (_queue.size() == 0) {
241                    output.send(0, _token);
242                    _token = null;
243                } else {
244                    output.send(0, (Token) _queue.get(0));
245                    _removeTokens = 1;
246                }
247                sizeOutput--;
248                _persistentTrigger = false;
249            } else {
250                if (((BooleanToken) persistentTrigger.getToken())
251                        .booleanValue()) {
252                    _persistentTrigger = true;
253                }
254            }
255        } else {
256            // If the queue was previously empty and
257            // persistent trigger is set, and there is an
258            // input, then produce the current input as output.
259            if (_persistentTrigger && _token != null) {
260                output.send(0, _token);
261                sizeOutput--;
262                _token = null;
263                _persistentTrigger = false;
264            }
265        }
266        size.send(0, new IntToken(sizeOutput));
267    }
268
269    /** Clear the cached input tokens.
270     *  @exception IllegalActionException If there is no director.
271     */
272    @Override
273    public void initialize() throws IllegalActionException {
274        _queue.clear();
275        _persistentTrigger = false;
276        _token = null;
277        _removeTokens = 0;
278        super.initialize();
279    }
280
281    /** Commit additions or removals from the queue.
282     *  @return True.
283     *  @exception IllegalActionException If the superclass throws it.
284     */
285    @Override
286    public boolean postfire() throws IllegalActionException {
287        if (_token != null) {
288            _queue.put(_token);
289        }
290        for (int i = 0; i < _removeTokens; i++) {
291            _queue.take();
292        }
293        _token = null;
294        _removeTokens = 0;
295
296        return super.postfire();
297    }
298
299    /** If there is no input on the <i>trigger</i> port, return
300     *  false, indicating that this actor does not want to fire.
301     *  This has the effect of leaving input values in the input
302     *  ports, if there are any.
303     *  @exception IllegalActionException If there is no director.
304     */
305    @Override
306    public boolean prefire() throws IllegalActionException {
307        // If the trigger input is not connected, never fire.
308        boolean hasInput = false;
309        boolean hasTrigger = false;
310
311        if (input.isOutsideConnected()) {
312            hasInput = input.hasToken(0);
313        }
314
315        if (trigger.isOutsideConnected()) {
316            hasTrigger = trigger.hasToken(0);
317        }
318
319        return hasInput || hasTrigger;
320    }
321
322    /** Clear the queue tokens.
323     *  @exception IllegalActionException If the superclass throws it.
324     */
325    @Override
326    public void wrapup() throws IllegalActionException {
327        // If we don't clear the queue, then you can't set the capacity
328        // to smaller than the final size on the last run.  So we
329        // need to either clear the queue or somehow allow that change
330        // in capacity.
331        _queue.clear();
332        super.wrapup();
333    }
334
335    ///////////////////////////////////////////////////////////////////
336    ////                         protected variables               ////
337
338    /** The FIFOQueue. */
339    protected FIFOQueue _queue;
340
341    /** The number of tokens that should be removed from the queue in
342     *  postfire().
343     */
344    protected int _removeTokens;
345
346    /** Token received in the fire() method for inclusion in
347     *  the queue in the postfire() method.
348     */
349    protected Token _token;
350
351    ///////////////////////////////////////////////////////////////////
352    ////                         private variables                 ////
353
354    /** An indicator of whether a trigger token was received in
355     *  the last fire() method invocation.
356     */
357    private boolean _persistentTrigger;
358}