001/* An actor that merges two monotonically increasing streams into one.
002
003 Copyright (c) 1998-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.actor.lib;
029
030import java.util.HashSet;
031import java.util.Set;
032
033import ptolemy.actor.TypedAtomicActor;
034import ptolemy.actor.TypedIOPort;
035import ptolemy.data.BooleanToken;
036import ptolemy.data.IntToken;
037import ptolemy.data.ScalarToken;
038import ptolemy.data.expr.Parameter;
039import ptolemy.data.type.BaseType;
040import ptolemy.graph.Inequality;
041import ptolemy.kernel.CompositeEntity;
042import ptolemy.kernel.util.IllegalActionException;
043import ptolemy.kernel.util.NameDuplicationException;
044import ptolemy.kernel.util.Settable;
045import ptolemy.kernel.util.StringAttribute;
046import ptolemy.kernel.util.Workspace;
047
048///////////////////////////////////////////////////////////////////
049//// OrderedMerge
050
051/**
052 This actor merges two monotonically nondecreasing streams of tokens into
053 one monotonically nondecreasing stream. On each firing, it reads data from
054 one of the inputs.  On the first firing, it simply records that token.
055 On the second firing, it reads data from the other input and outputs
056 the smaller of the recorded token and the one it just read.  If they
057 are equal, then it outputs the recorded token. It then
058 records the larger token.  On each subsequent firing, it reads a token
059 from the input port that did not provide the recorded token, and produces
060 at the output the smaller of the recorded token and the one just read.
061 Each time it produces an output token, it also produces
062 <i>true</i> on the <i>selectedA</i> output
063 if the output token came from <i>inputA</i>, and <i>false</i>
064 if it came from <i>inputB</i>.
065 <p>
066 If both input sequences are nondecreasing, then the output sequence
067 will be nondecreasing.
068 Note that if the inputs are not nondecreasing, then the output is
069 rather complex.  The key is that in each firing, it produces the smaller
070 of the recorded token and the token it is currently reading.
071
072 @author Edward A. Lee
073 @version $Id$
074 @since Ptolemy II 2.0.1
075 @Pt.ProposedRating Red (eal)
076 @Pt.AcceptedRating Red (eal)
077 */
078public class OrderedMerge extends TypedAtomicActor {
079    /** Construct an actor with the given container and name.
080     *  @param container The container.
081     *  @param name The name of this actor.
082     *  @exception IllegalActionException If the actor cannot be contained
083     *   by the proposed container.
084     *  @exception NameDuplicationException If the container already has an
085     *   actor with this name.
086     */
087    public OrderedMerge(CompositeEntity container, String name)
088            throws NameDuplicationException, IllegalActionException {
089        super(container, name);
090
091        eliminateDuplicates = new Parameter(this, "eliminateDuplicates");
092        eliminateDuplicates.setTypeEquals(BaseType.BOOLEAN);
093        eliminateDuplicates.setExpression("false");
094
095        inputA = new TypedIOPort(this, "inputA", true, false);
096        inputB = new TypedIOPort(this, "inputB", true, false);
097        inputB.setTypeSameAs(inputA);
098        inputA.setTypeAtMost(BaseType.SCALAR);
099
100        // For the benefit of the DDF director, this actor sets
101        // consumption rate values.
102        inputA_tokenConsumptionRate = new Parameter(inputA,
103                "tokenConsumptionRate");
104        inputA_tokenConsumptionRate.setVisibility(Settable.NOT_EDITABLE);
105        inputA_tokenConsumptionRate.setTypeEquals(BaseType.INT);
106
107        inputB_tokenConsumptionRate = new Parameter(inputB,
108                "tokenConsumptionRate");
109        inputB_tokenConsumptionRate.setVisibility(Settable.NOT_EDITABLE);
110        inputB_tokenConsumptionRate.setTypeEquals(BaseType.INT);
111
112        output = new TypedIOPort(this, "output", false, true);
113
114        selectedA = new TypedIOPort(this, "selectedA", false, true);
115        selectedA.setTypeEquals(BaseType.BOOLEAN);
116
117        // Add an attribute to get the port placed on the bottom.
118        StringAttribute channelCardinal = new StringAttribute(selectedA,
119                "_cardinal");
120        channelCardinal.setExpression("SOUTH");
121
122        _attachText("_iconDescription",
123                "<svg>\n" + "<polygon points=\"-10,20 10,10 10,-10, -10,-20\" "
124                        + "style=\"fill:blue\"/>\n" + "</svg>\n");
125    }
126
127    ///////////////////////////////////////////////////////////////////
128    ////                     ports and parameters                  ////
129
130    /** If true, eliminate duplicate tokens in the output stream.
131     *  This is a boolean that defaults to false.
132     */
133    public Parameter eliminateDuplicates;
134
135    /** The first input port, which accepts any scalar token. */
136    public TypedIOPort inputA;
137
138    /** The token consumption rate for <i>inputA</i>. */
139    public Parameter inputA_tokenConsumptionRate;
140
141    /** The second input port, which accepts any scalar token with
142     *  the same type as the first input port.
143     */
144    public TypedIOPort inputB;
145
146    /** The token consumption rate for <i>inputB</i>. */
147    public Parameter inputB_tokenConsumptionRate;
148
149    /** The output port, which has the same type as the input ports. */
150    public TypedIOPort output;
151
152    /** Output port indicating whether the output token came from
153     *  <i>inputA</i>.
154     */
155    public TypedIOPort selectedA;
156
157    ///////////////////////////////////////////////////////////////////
158    ////                         public methods                    ////
159
160    /** Clone the actor into the specified workspace. This calls the
161     *  base class and then sets the type constraints.
162     *  @param workspace The workspace for the new object.
163     *  @return A new actor.
164     *  @exception CloneNotSupportedException If a derived class has
165     *   an attribute that cannot be cloned.
166     */
167    @Override
168    public Object clone(Workspace workspace) throws CloneNotSupportedException {
169        OrderedMerge newObject = (OrderedMerge) super.clone(workspace);
170        newObject.inputA.setTypeAtMost(BaseType.SCALAR);
171        newObject.inputB.setTypeSameAs(newObject.inputA);
172        return newObject;
173    }
174
175    /** Read one token from the port that did not provide the recorded
176     *  token (or <i>inputA</i>, on the first firing), and output the
177     *  smaller of the recorded token or the newly read token.
178     *  If there is no token on the port to be read, then do nothing
179     *  and return. If an output token is produced, then also produce
180     *  <i>true</i> on the <i>selectedA</i> output
181     *  if the output token came from <i>inputA</i>, and <i>false</i>
182     *  if it came from <i>inputB</i>.
183     *  @exception IllegalActionException If there is no director.
184     */
185    @Override
186    public void fire() throws IllegalActionException {
187        super.fire();
188        if (_nextPort.hasToken(0)) {
189            ScalarToken readToken = (ScalarToken) _nextPort.get(0);
190
191            if (_debugging) {
192                _debug("Read input token from " + _nextPort.getName()
193                        + " with value " + readToken);
194            }
195            // The strategy here is to keep reading from the same
196            // port until its value is greater than or equal to the
197            // recorded token. When that occurs, output the recorded
198            // token, record the just-read input token, and start
199            // reading from the other input port.
200
201            if (_recordedToken == null) {
202                // First firing or after a duplicate.  Just record the token.
203                _tentativeRecordedToken = readToken;
204                _tentativeReadFromA = true;
205                // Swap ports.
206                if (_nextPort == inputA) {
207                    _tentativeNextPort = inputB;
208                } else {
209                    _tentativeNextPort = inputA;
210                }
211            } else {
212                // Logic is different if we have to eliminate duplicates.
213                if (((BooleanToken) eliminateDuplicates.getToken())
214                        .booleanValue()) {
215                    // We are set to eliminate duplicates.
216                    if (readToken.equals(_recordedToken)) {
217                        // Input is a duplicate of the recorded token.
218                        // Produce the recorded token as output,
219                        // discard the input token and continue reading from the
220                        // same input port with no recorded token.
221                        output.send(0, _recordedToken);
222                        _tentativeLastProduced = _recordedToken;
223                        if (_debugging) {
224                            _debug("Sent output token with value "
225                                    + _recordedToken
226                                    + "\nDiscarded duplicate input.");
227                        }
228                        _tentativeRecordedToken = null;
229                        if (_readFromA) {
230                            selectedA.send(0, BooleanToken.TRUE);
231                        } else {
232                            selectedA.send(0, BooleanToken.FALSE);
233                        }
234                        // Read from the same port again, so leave
235                        // _tentativeNextPort alone.
236                    } else if (readToken.equals(_lastProduced)) {
237                        // Token is the same as last produced.
238                        // Do not send an output and leave everything the same
239                        // Except there is no longer a recorded token.
240                        if (_debugging) {
241                            _debug("Discarded duplicate input " + readToken);
242                        }
243                    } else {
244                        // Not a duplicate.
245                        if (readToken.isLessThan(_recordedToken)
246                                .booleanValue()) {
247                            // Produce the smaller output.
248                            output.send(0, readToken);
249                            _tentativeLastProduced = readToken;
250
251                            if (_debugging) {
252                                _debug("Sent output token with value "
253                                        + readToken);
254                            }
255
256                            // Token was just read from _nextPort.
257                            if (_nextPort == inputA) {
258                                selectedA.send(0, BooleanToken.TRUE);
259                            } else {
260                                selectedA.send(0, BooleanToken.FALSE);
261                            }
262                            // Read from the same port again next time.
263                        } else {
264                            // Produce the smaller output.
265                            output.send(0, _recordedToken);
266                            _tentativeLastProduced = _recordedToken;
267                            if (_debugging) {
268                                _debug("Sent output token with value "
269                                        + _recordedToken);
270                            }
271                            if (_readFromA) {
272                                // Recorded token was read from A.
273                                selectedA.send(0, BooleanToken.TRUE);
274                            } else {
275                                selectedA.send(0, BooleanToken.FALSE);
276                            }
277
278                            _tentativeRecordedToken = readToken;
279                            _tentativeReadFromA = _nextPort == inputA;
280
281                            // Swap ports.
282                            if (_nextPort == inputA) {
283                                _tentativeNextPort = inputB;
284                            } else {
285                                _tentativeNextPort = inputA;
286                            }
287                        }
288                    }
289                } else {
290                    // Not eliminating duplicates.
291                    if (readToken.isLessThan(_recordedToken).booleanValue()) {
292                        // Produce the smaller output.
293                        output.send(0, readToken);
294
295                        if (_debugging) {
296                            _debug("Sent output token with value " + readToken);
297                        }
298
299                        // Token was just read from _nextPort.
300                        if (_nextPort == inputA) {
301                            selectedA.send(0, BooleanToken.TRUE);
302                        } else {
303                            selectedA.send(0, BooleanToken.FALSE);
304                        }
305                    } else {
306                        // Produce the smaller output.
307                        output.send(0, _recordedToken);
308                        if (_debugging) {
309                            _debug("Sent output token with value "
310                                    + _recordedToken);
311                        }
312
313                        if (_readFromA) {
314                            selectedA.send(0, BooleanToken.TRUE);
315                        } else {
316                            selectedA.send(0, BooleanToken.FALSE);
317                        }
318
319                        _tentativeRecordedToken = readToken;
320                        _tentativeReadFromA = _nextPort == inputA;
321
322                        // Swap ports.
323                        if (_nextPort == inputA) {
324                            _tentativeNextPort = inputB;
325                        } else {
326                            _tentativeNextPort = inputA;
327                        }
328                    }
329                }
330            }
331        }
332    }
333
334    /** Initialize this actor to indicate that no token is recorded.
335     *  @exception IllegalActionException If a derived class throws it.
336     */
337    @Override
338    public void initialize() throws IllegalActionException {
339        super.initialize();
340        _nextPort = inputA;
341        _recordedToken = null;
342        _lastProduced = null;
343        _tentativeLastProduced = null;
344        inputA_tokenConsumptionRate.setToken(_one);
345        inputB_tokenConsumptionRate.setToken(_zero);
346    }
347
348    /** Commit the recorded token.
349     *  @return True.
350     *  @exception IllegalActionException Not thrown in this base class.
351     */
352    @Override
353    public boolean postfire() throws IllegalActionException {
354        _recordedToken = _tentativeRecordedToken;
355        _readFromA = _tentativeReadFromA;
356        _nextPort = _tentativeNextPort;
357        _lastProduced = _tentativeLastProduced;
358
359        if (_nextPort == inputA) {
360            inputA_tokenConsumptionRate.setToken(_one);
361            inputB_tokenConsumptionRate.setToken(_zero);
362        } else {
363            inputA_tokenConsumptionRate.setToken(_zero);
364            inputB_tokenConsumptionRate.setToken(_one);
365        }
366
367        if (_debugging) {
368            _debug("Next port to read input from is " + _nextPort.getName());
369        }
370
371        return super.postfire();
372    }
373
374    ///////////////////////////////////////////////////////////////////
375    ////                         protected methods                 ////
376
377    /** Return the port that this actor will read from on the next
378     *  invocation of the fire() method. This will be null before the
379     *  first invocation of initialize().
380     *  @return The next input port.
381     */
382    protected TypedIOPort _getNextPort() {
383        // This method is Added by Gang Zhou so that DDFOrderedMerge
384        // can extend this class.
385        return _nextPort;
386    }
387
388    /**
389     * The output must be greater than or equal to each of both inputs. Since
390     * inputA is set to be the same as inputB, the output is simply set to be
391     * greater than or equal to inputA.
392     * @return A set of type constraints
393     */
394    @Override
395    protected Set<Inequality> _defaultTypeConstraints() {
396        Set<Inequality> result = new HashSet<Inequality>();
397        result.add(new Inequality(inputA.getTypeTerm(), output.getTypeTerm()));
398        return result;
399    }
400
401    ///////////////////////////////////////////////////////////////////
402    ////                         private variables                 ////
403
404    /** The last produced token. Used to eliminate duplicates. */
405    private ScalarToken _lastProduced;
406
407    /** The port from which to read next. */
408    private TypedIOPort _nextPort = null;
409
410    /** A final static IntToken with value 1. */
411    private final static IntToken _one = new IntToken(1);
412
413    /** Indicator of whether the _recordedToken was read from A. */
414    private boolean _readFromA;
415
416    /** The recorded token. */
417    private ScalarToken _recordedToken = null;
418
419    /** The tentative last produced token. Used to eliminate duplicates. */
420    private ScalarToken _tentativeLastProduced;
421
422    /** Tentative indicator of having read from A. */
423    private boolean _tentativeReadFromA;
424
425    /** The tentative recorded token. */
426    private ScalarToken _tentativeRecordedToken = null;
427
428    /** The tentative port from which to read next. */
429    private TypedIOPort _tentativeNextPort = null;
430
431    /** A final static IntToken with value 0. */
432    private final static IntToken _zero = new IntToken(0);
433}