001/* A receiver with a FIFO queue and performing blocking reads 002 and blocking writes. 003 004 Copyright (c) 1997-2015 The Regents of the University of California. 005 All rights reserved. 006 Permission is hereby granted, without written agreement and without 007 license or royalty fees, to use, copy, modify, and distribute this 008 software and its documentation for any purpose, provided that the above 009 copyright notice and the following two paragraphs appear in all copies 010 of this software. 011 012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 016 SUCH DAMAGE. 017 018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 ENHANCEMENTS, OR MODIFICATIONS. 024 025 PT_COPYRIGHT_VERSION_2 026 COPYRIGHTENDKEY 027 028 */ 029package ptolemy.domains.pn.kernel; 030 031import ptolemy.actor.Actor; 032import ptolemy.actor.Director; 033import ptolemy.actor.IOPort; 034import ptolemy.actor.Manager; 035import ptolemy.actor.NoRoomException; 036import ptolemy.actor.QueueReceiver; 037import ptolemy.actor.process.BoundaryDetector; 038import ptolemy.actor.process.ProcessReceiver; 039import ptolemy.actor.process.TerminateProcessException; 040import ptolemy.data.Token; 041import ptolemy.kernel.util.IllegalActionException; 042import ptolemy.kernel.util.InvalidStateException; 043import ptolemy.kernel.util.Nameable; 044import ptolemy.kernel.util.Workspace; 045 046/////////////////////////////////////////////////////////////////// 047//// PNQueueReceiver 048 049/** 050 051 A receiver with a FIFO queue that blocks the calling process on a read if the 052 FIFO queue is empty and on a write if the queue is full. Blocking read provides 053 the basic functionality of a FIFO channel in the process networks model of 054 computation. Blocking write supports the implementation suggested by Parks for 055 bounded memory execution of process networks. 056 <p> 057 Tokens are appended to the queue with the put() method, which blocks on a write 058 if the queue is full. Tokens are removed from the queue with the get() method, 059 which blocks on a read if the queue is empty. 060 In case a process blocks on a read or a write, the receiver informs the 061 director about the same. 062 The receiver also unblocks processes blocked on a read or a write. In case 063 a process is blocked on a read (read-blocked), it is unblocked on availability 064 of a token. If a process is blocked on a write (write-blocked), it 065 is unblocked on the availability of room in the queue and informs the director 066 of the same. 067 <p> 068 This class is also responsible for pausing or terminating a process that tries 069 to read from or write to the receiver. In case of termination, the receiver 070 throws a TerminateProcessException when a process tries to read from or write 071 to the receiver. This terminates the process. 072 In case of pausing, the receiver suspends the process when it tries to read 073 from or write to the receiver and resumes it only after a request to resume the 074 process has been received.g 075 <p> 076 077 @author Mudit Goel, John S. Davis II, Edward A. Lee, Xiaowen Xin 078 @version $Id$ 079 @since Ptolemy II 0.2 080 @Pt.ProposedRating Yellow (eal) 081 @Pt.AcceptedRating Red (hyzheng) 082 @see QueueReceiver 083 @see ptolemy.actor.QueueReceiver 084 */ 085public class PNQueueReceiver extends QueueReceiver implements ProcessReceiver { 086 /** Construct an empty receiver with no container. 087 */ 088 public PNQueueReceiver() { 089 super(); 090 _boundaryDetector = new BoundaryDetector(this); 091 } 092 093 /** Construct an empty receiver with the specified container. 094 * @param container The container of this receiver. 095 * @exception IllegalActionException If the container does 096 * not accept this receiver. 097 */ 098 public PNQueueReceiver(IOPort container) throws IllegalActionException { 099 super(container); 100 _boundaryDetector = new BoundaryDetector(this); 101 } 102 103 /////////////////////////////////////////////////////////////////// 104 //// public methods //// 105 106 /** Set the container. This overrides the base class to record 107 * the director. 108 * @param port The container. 109 * @exception IllegalActionException If the container is not of 110 * an appropriate subclass of IOPort, or if the container's director 111 * is not an instance of PNDirector. 112 */ 113 @Override 114 public void setContainer(IOPort port) throws IllegalActionException { 115 super.setContainer(port); 116 if (port == null) { 117 _director = null; 118 } else { 119 Actor actor = (Actor) port.getContainer(); 120 Director director; 121 122 // For a composite actor, 123 // the receiver type of an input port is decided by 124 // the executive director. 125 // While the receiver type of an output is decided by the director. 126 // NOTE: getExecutiveDirector() and getDirector() yield the same 127 // result for actors that do not contain directors. 128 if (port.isInput()) { 129 director = actor.getExecutiveDirector(); 130 } else { 131 director = actor.getDirector(); 132 } 133 134 if (!(director instanceof PNDirector)) { 135 throw new IllegalActionException(port, 136 "Cannot use an instance of PNQueueReceiver " 137 + "since the director is not a PNDirector."); 138 } 139 140 _director = (PNDirector) director; 141 } 142 } 143 144 /** Get a token from this receiver. If the receiver is empty then 145 * block until a token becomes available. If this receiver is 146 * terminated during the execution of this method, then throw a 147 * TerminateProcessException. 148 * @return The token contained by this receiver. 149 */ 150 @Override 151 public Token get() { 152 Token result = null; 153 Workspace workspace = getContainer().workspace(); 154 while (!_terminate) { 155 int depth = 0; 156 try { 157 // NOTE: This used to synchronize on this, but since it calls 158 // director methods that are synchronized on the director, 159 // this can cause deadlock. 160 synchronized (_director) { 161 // Need to check this again after acquiring the lock. 162 // Otherwise, could end up calling wait() below _after_ 163 // notification has occurred. 164 if (_terminate) { 165 break; 166 } 167 168 // Try to read. 169 if (super.hasToken()) { 170 result = super.get(); 171 172 // Need to mark any thread that is write blocked on 173 // this receiver unblocked now, before any notification, 174 // or we will detect deadlock and increase the buffer sizes. 175 // Note that there is no need to clear the _readPending 176 // reference because that will have been cleared by the write. 177 if (_writePending != null) { 178 _director.threadUnblocked(_writePending, this, 179 PNDirector.WRITE_BLOCKED); 180 _writePending = null; 181 } 182 183 break; 184 } 185 _readPending = Thread.currentThread(); 186 _director.threadBlocked(Thread.currentThread(), this, 187 PNDirector.READ_BLOCKED); 188 189 // NOTE: We cannot use workspace.wait(Object) here without 190 // introducing a race condition, because we have to release 191 // the lock on the _director before calling workspace.wait(_director). 192 depth = workspace.releaseReadPermission(); 193 _director.wait(); 194 } // release lock on _director before reacquiring read permissions. 195 } catch (InterruptedException e) { 196 _terminate = true; 197 } finally { 198 if (depth > 0) { 199 workspace.reacquireReadPermission(depth); 200 } 201 } 202 } 203 if (_terminate) { 204 throw new TerminateProcessException(""); 205 } 206 return result; 207 } 208 209 /** Return the director in charge of this receiver, or null 210 * if there is none. 211 * @return The director in charge of this receiver. 212 */ 213 public PNDirector getDirector() { 214 return _director; 215 } 216 217 /** Return true, since a channel in the Kahn process networks 218 * model of computation is of infinite capacity and always has room. 219 * @return True. 220 */ 221 @Override 222 public boolean hasRoom() { 223 return true; 224 } 225 226 /** Return true, since a channel in the Kahn process networks 227 * model of computation is of infinite capacity and always has room. 228 * @param tokens The number of tokens, which is ignored in this method. 229 * @return True. 230 */ 231 @Override 232 public boolean hasRoom(int tokens) { 233 return true; 234 } 235 236 /** Return true, since a call to the get() method of the receiver will 237 * always return a token if the call to get() ever returns. 238 * @return True. 239 */ 240 @Override 241 public boolean hasToken() { 242 return true; 243 } 244 245 /** Return true, since a call to the get() method of the receiver will 246 * always return a token if the call to get() ever returns. 247 * @param tokens The number of tokens, which is ignored in this method. 248 * @return True. 249 */ 250 @Override 251 public boolean hasToken(int tokens) { 252 return true; 253 } 254 255 /** Return true if this receiver is connected to the inside of a 256 * boundary port. A boundary port is an opaque port that is 257 * contained by a composite actor. If this receiver is connected 258 * to the inside of a boundary port, then return true; otherwise 259 * return false. 260 * @return True if this receiver is connected to the inside of 261 * a boundary port; return false otherwise. 262 * @exception IllegalActionException If thrown by the boundary 263 * detector. 264 * @see ptolemy.actor.process.BoundaryDetector 265 */ 266 @Override 267 public boolean isConnectedToBoundary() throws IllegalActionException { 268 return _boundaryDetector.isConnectedToBoundary(); 269 } 270 271 /** Return true if this receiver is connected to the inside of a 272 * boundary port. A boundary port is an opaque port that is 273 * contained by a composite actor. If this receiver is connected 274 * to the inside of a boundary port, then return true; otherwise 275 * return false. 276 * @return True if this receiver is connected to the inside of 277 * a boundary port; return false otherwise. 278 * @exception IllegalActionException If thrown by the boundary 279 * detector. 280 * @exception InvalidStateException If thrown by the boundary 281 * detector. 282 * @see ptolemy.actor.process.BoundaryDetector 283 */ 284 @Override 285 public boolean isConnectedToBoundaryInside() 286 throws InvalidStateException, IllegalActionException { 287 return _boundaryDetector.isConnectedToBoundaryInside(); 288 } 289 290 /** Return true if this receiver is connected to the outside of a 291 * boundary port. A boundary port is an opaque port that is 292 * contained by a composite actor. If this receiver is connected 293 * to the outside of a boundary port, then return true; otherwise 294 * return false. 295 * @return True if this receiver is connected to the outside of 296 * a boundary port; return false otherwise. 297 * @exception IllegalActionException If thrown by the boundary 298 * detector. 299 * @see ptolemy.actor.process.BoundaryDetector 300 */ 301 @Override 302 public boolean isConnectedToBoundaryOutside() 303 throws IllegalActionException { 304 return _boundaryDetector.isConnectedToBoundaryOutside(); 305 } 306 307 /** Return true if this receiver is connected to the boundary. 308 * That is, it is in an input port that is connected on 309 * the outside to the inside of an input port, or it is on 310 * the inside of an output port that is connected on the 311 * outside to an input port higher in the hierarchy. 312 * @see #isConnectedToBoundary() 313 * @return True if this is connected to the boundary. 314 * @exception IllegalActionException If thrown by the boundary 315 * detector. 316 */ 317 @Override 318 public boolean isConsumerReceiver() throws IllegalActionException { 319 if (isConnectedToBoundary()) { 320 return true; 321 } 322 323 return false; 324 } 325 326 /** Return true if this receiver is contained on the inside of a 327 * boundary port. A boundary port is an opaque port that is 328 * contained by a composite actor. If this receiver is contained 329 * on the inside of a boundary port then return true; otherwise 330 * return false. 331 * @return True if this receiver is contained on the inside of 332 * a boundary port; return false otherwise. 333 * @see ptolemy.actor.process.BoundaryDetector 334 */ 335 @Override 336 public boolean isInsideBoundary() { 337 return _boundaryDetector.isInsideBoundary(); 338 } 339 340 /** Return true if this receiver is contained on the outside of a 341 * boundary port. A boundary port is an opaque port that is 342 * contained by a composite actor. If this receiver is contained 343 * on the outside of a boundary port then return true; otherwise 344 * return false. 345 * @return True if this receiver is contained on the outside of 346 * a boundary port; return false otherwise. 347 * @see BoundaryDetector 348 */ 349 @Override 350 public boolean isOutsideBoundary() { 351 return _boundaryDetector.isOutsideBoundary(); 352 } 353 354 /** Return true if this receiver is at a boundary. 355 * @return True if this receiver is at a boundary. 356 */ 357 @Override 358 public boolean isProducerReceiver() { 359 if (isOutsideBoundary() || isInsideBoundary()) { 360 return true; 361 } 362 363 return false; 364 } 365 366 /** Return a true or false to indicate whether there is a read block 367 * on this receiver or not, respectively. 368 * @return a boolean indicating whether a read is blocked on this 369 * receiver or not. 370 */ 371 @Override 372 public boolean isReadBlocked() { 373 // NOTE: This method used to be synchronized on this 374 // receiver, but since it is called by synchronized methods in 375 // the director, that can cause deadlock. 376 synchronized (_director) { 377 return _readPending != null; 378 } 379 } 380 381 /** Return a true or false to indicate whether there is a write block 382 * on this receiver or not. 383 * @return A boolean indicating whether a write is blocked on this 384 * receiver or not. 385 */ 386 @Override 387 public boolean isWriteBlocked() { 388 // NOTE: This method used to be synchronized on this 389 // receiver, but since it is called by synchronized methods in 390 // the director, that can cause deadlock. 391 synchronized (_director) { 392 return _writePending != null; 393 } 394 } 395 396 /** Put a token on the queue contained in this receiver. 397 * If the queue is full, then suspend the calling thread (blocking 398 * write) and inform the director of the same. Resume the process on 399 * detecting room in the queue. 400 * If a termination is requested, then initiate the termination of the 401 * calling process by throwing a TerminateProcessException. 402 * On detecting a room in the queue, put a token in the queue. 403 * Check whether any process is blocked 404 * on a read from this receiver. If a process is indeed blocked, then 405 * unblock the process, and inform the director of the same. 406 * @param token The token to be put in the receiver, or null to not put anything. 407 * @exception NoRoomException If during initialization, capacity cannot be increased 408 * enough to accommodate initial tokens. 409 */ 410 @Override 411 public void put(Token token) throws NoRoomException { 412 IOPort port = getContainer(); 413 if (port == null || token == null) { 414 return; // Nothing to do. 415 } 416 Workspace workspace = port.workspace(); 417 while (!_terminate) { 418 int depth = 0; 419 try { 420 // NOTE: Avoid acquiring read access on the workspace 421 // while holding the lock on the director because if 422 // some other process is trying to acquire write access, 423 // the request for read access will be deferred. 424 Nameable container = getContainer().getContainer(); 425 Manager manager = ((Actor) container).getManager(); 426 // NOTE: This used to synchronize on this, but since it calls 427 // director methods that are synchronized on the director, 428 // this can cause deadlock. 429 synchronized (_director) { 430 // Need to check this again after acquiring the lock. 431 // Otherwise, could end up calling wait() below _after_ 432 // notification has occurred. 433 if (_terminate) { 434 break; 435 } 436 437 // If we are in the initialization phase, then we may have 438 // to increase the queue capacity before proceeding. This 439 // may be needed to support PublisherPorts that produce 440 // initial tokens (or, I suppose, any actor that produces 441 // initial tokens during initialize()?). 442 if (!super.hasRoom()) { 443 if (container instanceof Actor) { 444 if (manager.getState() 445 .equals(Manager.INITIALIZING)) { 446 try { 447 _queue.setCapacity( 448 _queue.getCapacity() + 1); 449 } catch (IllegalActionException e) { 450 throw new NoRoomException(getContainer(), 451 "Failed to increase queue capacity enough to accommodate initial tokens"); 452 } 453 } 454 } 455 } 456 // Try to write. 457 if (super.hasRoom()) { 458 super.put(token); 459 460 // If any thread is blocked on a get(), then it will become 461 // unblocked. Notify the director now so that there isn't a 462 // spurious deadlock detection. 463 if (_readPending != null) { 464 _director.threadUnblocked(_readPending, this, 465 PNDirector.READ_BLOCKED); 466 _readPending = null; 467 } 468 469 // Normally, the _writePending reference will have 470 // been cleared by the read that unblocked this write. 471 // However, it might be that the director increased the 472 // buffer size, which would also have the affect of unblocking 473 // this write. Hence, we clear it here if it is set. 474 if (_writePending != null) { 475 _director.threadUnblocked(_writePending, this, 476 PNDirector.WRITE_BLOCKED); 477 _writePending = null; 478 } 479 480 break; 481 } 482 483 // Wait to try again. 484 _writePending = Thread.currentThread(); 485 _director.threadBlocked(_writePending, this, 486 PNDirector.WRITE_BLOCKED); 487 488 // NOTE: We cannot use workspace.wait(Object) here without 489 // introducing a race condition, because we have to release 490 // the lock on the _director before calling workspace.wait(_director). 491 depth = workspace.releaseReadPermission(); 492 _director.wait(); 493 } // release lock on _director before reacquiring read permissions. 494 } catch (InterruptedException e) { 495 _terminate = true; 496 } finally { 497 if (depth > 0) { 498 workspace.reacquireReadPermission(depth); 499 } 500 } 501 } 502 503 if (_terminate) { 504 throw new TerminateProcessException("Process terminated."); 505 } 506 } 507 508 /** Reset the state variables in the receiver. 509 */ 510 @Override 511 public void reset() { 512 if (_readPending != null) { 513 _director.threadUnblocked(_readPending, this, 514 PNDirector.READ_BLOCKED); 515 } 516 517 if (_writePending != null) { 518 _director.threadUnblocked(_writePending, this, 519 PNDirector.WRITE_BLOCKED); 520 } 521 522 _readPending = null; 523 _writePending = null; 524 _terminate = false; 525 _boundaryDetector.reset(); 526 } 527 528 /** Set a flag in the receiver to indicate the onset of termination. 529 * This will result in termination of any process that is either blocked 530 * on the receiver or is trying to read from or write to it. 531 */ 532 @Override 533 public void requestFinish() { 534 // NOTE: This method used to be synchronized on this 535 // receiver, but since it calls synchronized methods in 536 // the director, that can cause deadlock. 537 synchronized (_director) { 538 _terminate = true; 539 _director.notifyAll(); 540 } 541 } 542 543 /////////////////////////////////////////////////////////////////// 544 //// protected variables //// 545 546 /** The director in charge of this receiver. */ 547 protected PNDirector _director; 548 549 /** Reference to a thread that is read blocked on this receiver. */ 550 protected Thread _readPending = null; 551 552 /** Reference to a thread that is write blocked on this receiver. */ 553 protected Thread _writePending = null; 554 555 /////////////////////////////////////////////////////////////////// 556 //// private variables //// 557 558 /** Flag indicating whether finish has been requested. */ 559 protected boolean _terminate = false; 560 561 /** A BoundaryDetector determines the topological relationship of 562 * a Receiver with respect to boundary ports. 563 */ 564 protected BoundaryDetector _boundaryDetector; 565}