001/* An actor that merges two monotonically increasing streams into one. 002 003 Copyright (c) 1998-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.actor.lib; 029 030import java.util.HashSet; 031import java.util.Set; 032 033import ptolemy.actor.TypedAtomicActor; 034import ptolemy.actor.TypedIOPort; 035import ptolemy.data.BooleanToken; 036import ptolemy.data.IntToken; 037import ptolemy.data.ScalarToken; 038import ptolemy.data.expr.Parameter; 039import ptolemy.data.type.BaseType; 040import ptolemy.graph.Inequality; 041import ptolemy.kernel.CompositeEntity; 042import ptolemy.kernel.util.IllegalActionException; 043import ptolemy.kernel.util.NameDuplicationException; 044import ptolemy.kernel.util.Settable; 045import ptolemy.kernel.util.StringAttribute; 046import ptolemy.kernel.util.Workspace; 047 048/////////////////////////////////////////////////////////////////// 049//// OrderedMerge 050 051/** 052 This actor merges two monotonically nondecreasing streams of tokens into 053 one monotonically nondecreasing stream. On each firing, it reads data from 054 one of the inputs. On the first firing, it simply records that token. 055 On the second firing, it reads data from the other input and outputs 056 the smaller of the recorded token and the one it just read. If they 057 are equal, then it outputs the recorded token. It then 058 records the larger token. On each subsequent firing, it reads a token 059 from the input port that did not provide the recorded token, and produces 060 at the output the smaller of the recorded token and the one just read. 061 Each time it produces an output token, it also produces 062 <i>true</i> on the <i>selectedA</i> output 063 if the output token came from <i>inputA</i>, and <i>false</i> 064 if it came from <i>inputB</i>. 065 <p> 066 If both input sequences are nondecreasing, then the output sequence 067 will be nondecreasing. 068 Note that if the inputs are not nondecreasing, then the output is 069 rather complex. The key is that in each firing, it produces the smaller 070 of the recorded token and the token it is currently reading. 071 072 @author Edward A. Lee 073 @version $Id$ 074 @since Ptolemy II 2.0.1 075 @Pt.ProposedRating Red (eal) 076 @Pt.AcceptedRating Red (eal) 077 */ 078public class OrderedMerge extends TypedAtomicActor { 079 /** Construct an actor with the given container and name. 080 * @param container The container. 081 * @param name The name of this actor. 082 * @exception IllegalActionException If the actor cannot be contained 083 * by the proposed container. 084 * @exception NameDuplicationException If the container already has an 085 * actor with this name. 086 */ 087 public OrderedMerge(CompositeEntity container, String name) 088 throws NameDuplicationException, IllegalActionException { 089 super(container, name); 090 091 eliminateDuplicates = new Parameter(this, "eliminateDuplicates"); 092 eliminateDuplicates.setTypeEquals(BaseType.BOOLEAN); 093 eliminateDuplicates.setExpression("false"); 094 095 inputA = new TypedIOPort(this, "inputA", true, false); 096 inputB = new TypedIOPort(this, "inputB", true, false); 097 inputB.setTypeSameAs(inputA); 098 inputA.setTypeAtMost(BaseType.SCALAR); 099 100 // For the benefit of the DDF director, this actor sets 101 // consumption rate values. 102 inputA_tokenConsumptionRate = new Parameter(inputA, 103 "tokenConsumptionRate"); 104 inputA_tokenConsumptionRate.setVisibility(Settable.NOT_EDITABLE); 105 inputA_tokenConsumptionRate.setTypeEquals(BaseType.INT); 106 107 inputB_tokenConsumptionRate = new Parameter(inputB, 108 "tokenConsumptionRate"); 109 inputB_tokenConsumptionRate.setVisibility(Settable.NOT_EDITABLE); 110 inputB_tokenConsumptionRate.setTypeEquals(BaseType.INT); 111 112 output = new TypedIOPort(this, "output", false, true); 113 114 selectedA = new TypedIOPort(this, "selectedA", false, true); 115 selectedA.setTypeEquals(BaseType.BOOLEAN); 116 117 // Add an attribute to get the port placed on the bottom. 118 StringAttribute channelCardinal = new StringAttribute(selectedA, 119 "_cardinal"); 120 channelCardinal.setExpression("SOUTH"); 121 122 _attachText("_iconDescription", 123 "<svg>\n" + "<polygon points=\"-10,20 10,10 10,-10, -10,-20\" " 124 + "style=\"fill:blue\"/>\n" + "</svg>\n"); 125 } 126 127 /////////////////////////////////////////////////////////////////// 128 //// ports and parameters //// 129 130 /** If true, eliminate duplicate tokens in the output stream. 131 * This is a boolean that defaults to false. 132 */ 133 public Parameter eliminateDuplicates; 134 135 /** The first input port, which accepts any scalar token. */ 136 public TypedIOPort inputA; 137 138 /** The token consumption rate for <i>inputA</i>. */ 139 public Parameter inputA_tokenConsumptionRate; 140 141 /** The second input port, which accepts any scalar token with 142 * the same type as the first input port. 143 */ 144 public TypedIOPort inputB; 145 146 /** The token consumption rate for <i>inputB</i>. */ 147 public Parameter inputB_tokenConsumptionRate; 148 149 /** The output port, which has the same type as the input ports. */ 150 public TypedIOPort output; 151 152 /** Output port indicating whether the output token came from 153 * <i>inputA</i>. 154 */ 155 public TypedIOPort selectedA; 156 157 /////////////////////////////////////////////////////////////////// 158 //// public methods //// 159 160 /** Clone the actor into the specified workspace. This calls the 161 * base class and then sets the type constraints. 162 * @param workspace The workspace for the new object. 163 * @return A new actor. 164 * @exception CloneNotSupportedException If a derived class has 165 * an attribute that cannot be cloned. 166 */ 167 @Override 168 public Object clone(Workspace workspace) throws CloneNotSupportedException { 169 OrderedMerge newObject = (OrderedMerge) super.clone(workspace); 170 newObject.inputA.setTypeAtMost(BaseType.SCALAR); 171 newObject.inputB.setTypeSameAs(newObject.inputA); 172 return newObject; 173 } 174 175 /** Read one token from the port that did not provide the recorded 176 * token (or <i>inputA</i>, on the first firing), and output the 177 * smaller of the recorded token or the newly read token. 178 * If there is no token on the port to be read, then do nothing 179 * and return. If an output token is produced, then also produce 180 * <i>true</i> on the <i>selectedA</i> output 181 * if the output token came from <i>inputA</i>, and <i>false</i> 182 * if it came from <i>inputB</i>. 183 * @exception IllegalActionException If there is no director. 184 */ 185 @Override 186 public void fire() throws IllegalActionException { 187 super.fire(); 188 if (_nextPort.hasToken(0)) { 189 ScalarToken readToken = (ScalarToken) _nextPort.get(0); 190 191 if (_debugging) { 192 _debug("Read input token from " + _nextPort.getName() 193 + " with value " + readToken); 194 } 195 // The strategy here is to keep reading from the same 196 // port until its value is greater than or equal to the 197 // recorded token. When that occurs, output the recorded 198 // token, record the just-read input token, and start 199 // reading from the other input port. 200 201 if (_recordedToken == null) { 202 // First firing or after a duplicate. Just record the token. 203 _tentativeRecordedToken = readToken; 204 _tentativeReadFromA = true; 205 // Swap ports. 206 if (_nextPort == inputA) { 207 _tentativeNextPort = inputB; 208 } else { 209 _tentativeNextPort = inputA; 210 } 211 } else { 212 // Logic is different if we have to eliminate duplicates. 213 if (((BooleanToken) eliminateDuplicates.getToken()) 214 .booleanValue()) { 215 // We are set to eliminate duplicates. 216 if (readToken.equals(_recordedToken)) { 217 // Input is a duplicate of the recorded token. 218 // Produce the recorded token as output, 219 // discard the input token and continue reading from the 220 // same input port with no recorded token. 221 output.send(0, _recordedToken); 222 _tentativeLastProduced = _recordedToken; 223 if (_debugging) { 224 _debug("Sent output token with value " 225 + _recordedToken 226 + "\nDiscarded duplicate input."); 227 } 228 _tentativeRecordedToken = null; 229 if (_readFromA) { 230 selectedA.send(0, BooleanToken.TRUE); 231 } else { 232 selectedA.send(0, BooleanToken.FALSE); 233 } 234 // Read from the same port again, so leave 235 // _tentativeNextPort alone. 236 } else if (readToken.equals(_lastProduced)) { 237 // Token is the same as last produced. 238 // Do not send an output and leave everything the same 239 // Except there is no longer a recorded token. 240 if (_debugging) { 241 _debug("Discarded duplicate input " + readToken); 242 } 243 } else { 244 // Not a duplicate. 245 if (readToken.isLessThan(_recordedToken) 246 .booleanValue()) { 247 // Produce the smaller output. 248 output.send(0, readToken); 249 _tentativeLastProduced = readToken; 250 251 if (_debugging) { 252 _debug("Sent output token with value " 253 + readToken); 254 } 255 256 // Token was just read from _nextPort. 257 if (_nextPort == inputA) { 258 selectedA.send(0, BooleanToken.TRUE); 259 } else { 260 selectedA.send(0, BooleanToken.FALSE); 261 } 262 // Read from the same port again next time. 263 } else { 264 // Produce the smaller output. 265 output.send(0, _recordedToken); 266 _tentativeLastProduced = _recordedToken; 267 if (_debugging) { 268 _debug("Sent output token with value " 269 + _recordedToken); 270 } 271 if (_readFromA) { 272 // Recorded token was read from A. 273 selectedA.send(0, BooleanToken.TRUE); 274 } else { 275 selectedA.send(0, BooleanToken.FALSE); 276 } 277 278 _tentativeRecordedToken = readToken; 279 _tentativeReadFromA = _nextPort == inputA; 280 281 // Swap ports. 282 if (_nextPort == inputA) { 283 _tentativeNextPort = inputB; 284 } else { 285 _tentativeNextPort = inputA; 286 } 287 } 288 } 289 } else { 290 // Not eliminating duplicates. 291 if (readToken.isLessThan(_recordedToken).booleanValue()) { 292 // Produce the smaller output. 293 output.send(0, readToken); 294 295 if (_debugging) { 296 _debug("Sent output token with value " + readToken); 297 } 298 299 // Token was just read from _nextPort. 300 if (_nextPort == inputA) { 301 selectedA.send(0, BooleanToken.TRUE); 302 } else { 303 selectedA.send(0, BooleanToken.FALSE); 304 } 305 } else { 306 // Produce the smaller output. 307 output.send(0, _recordedToken); 308 if (_debugging) { 309 _debug("Sent output token with value " 310 + _recordedToken); 311 } 312 313 if (_readFromA) { 314 selectedA.send(0, BooleanToken.TRUE); 315 } else { 316 selectedA.send(0, BooleanToken.FALSE); 317 } 318 319 _tentativeRecordedToken = readToken; 320 _tentativeReadFromA = _nextPort == inputA; 321 322 // Swap ports. 323 if (_nextPort == inputA) { 324 _tentativeNextPort = inputB; 325 } else { 326 _tentativeNextPort = inputA; 327 } 328 } 329 } 330 } 331 } 332 } 333 334 /** Initialize this actor to indicate that no token is recorded. 335 * @exception IllegalActionException If a derived class throws it. 336 */ 337 @Override 338 public void initialize() throws IllegalActionException { 339 super.initialize(); 340 _nextPort = inputA; 341 _recordedToken = null; 342 _lastProduced = null; 343 _tentativeLastProduced = null; 344 inputA_tokenConsumptionRate.setToken(_one); 345 inputB_tokenConsumptionRate.setToken(_zero); 346 } 347 348 /** Commit the recorded token. 349 * @return True. 350 * @exception IllegalActionException Not thrown in this base class. 351 */ 352 @Override 353 public boolean postfire() throws IllegalActionException { 354 _recordedToken = _tentativeRecordedToken; 355 _readFromA = _tentativeReadFromA; 356 _nextPort = _tentativeNextPort; 357 _lastProduced = _tentativeLastProduced; 358 359 if (_nextPort == inputA) { 360 inputA_tokenConsumptionRate.setToken(_one); 361 inputB_tokenConsumptionRate.setToken(_zero); 362 } else { 363 inputA_tokenConsumptionRate.setToken(_zero); 364 inputB_tokenConsumptionRate.setToken(_one); 365 } 366 367 if (_debugging) { 368 _debug("Next port to read input from is " + _nextPort.getName()); 369 } 370 371 return super.postfire(); 372 } 373 374 /////////////////////////////////////////////////////////////////// 375 //// protected methods //// 376 377 /** Return the port that this actor will read from on the next 378 * invocation of the fire() method. This will be null before the 379 * first invocation of initialize(). 380 * @return The next input port. 381 */ 382 protected TypedIOPort _getNextPort() { 383 // This method is Added by Gang Zhou so that DDFOrderedMerge 384 // can extend this class. 385 return _nextPort; 386 } 387 388 /** 389 * The output must be greater than or equal to each of both inputs. Since 390 * inputA is set to be the same as inputB, the output is simply set to be 391 * greater than or equal to inputA. 392 * @return A set of type constraints 393 */ 394 @Override 395 protected Set<Inequality> _defaultTypeConstraints() { 396 Set<Inequality> result = new HashSet<Inequality>(); 397 result.add(new Inequality(inputA.getTypeTerm(), output.getTypeTerm())); 398 return result; 399 } 400 401 /////////////////////////////////////////////////////////////////// 402 //// private variables //// 403 404 /** The last produced token. Used to eliminate duplicates. */ 405 private ScalarToken _lastProduced; 406 407 /** The port from which to read next. */ 408 private TypedIOPort _nextPort = null; 409 410 /** A final static IntToken with value 1. */ 411 private final static IntToken _one = new IntToken(1); 412 413 /** Indicator of whether the _recordedToken was read from A. */ 414 private boolean _readFromA; 415 416 /** The recorded token. */ 417 private ScalarToken _recordedToken = null; 418 419 /** The tentative last produced token. Used to eliminate duplicates. */ 420 private ScalarToken _tentativeLastProduced; 421 422 /** Tentative indicator of having read from A. */ 423 private boolean _tentativeReadFromA; 424 425 /** The tentative recorded token. */ 426 private ScalarToken _tentativeRecordedToken = null; 427 428 /** The tentative port from which to read next. */ 429 private TypedIOPort _tentativeNextPort = null; 430 431 /** A final static IntToken with value 0. */ 432 private final static IntToken _zero = new IntToken(0); 433}