001/* A process receiver that stores tokens via a mailbox and can be used by 002 composite actors. 003 004 Copyright (c) 1997-2014 The Regents of the University of California. 005 All rights reserved. 006 Permission is hereby granted, without written agreement and without 007 license or royalty fees, to use, copy, modify, and distribute this 008 software and its documentation for any purpose, provided that the above 009 copyright notice and the following two paragraphs appear in all copies 010 of this software. 011 012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 016 SUCH DAMAGE. 017 018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 ENHANCEMENTS, OR MODIFICATIONS. 024 025 PT_COPYRIGHT_VERSION_2 026 COPYRIGHTENDKEY 027 028 */ 029package ptolemy.actor.process; 030 031import ptolemy.actor.Actor; 032import ptolemy.actor.Director; 033import ptolemy.actor.IOPort; 034import ptolemy.actor.Mailbox; 035import ptolemy.data.Token; 036import ptolemy.kernel.util.IllegalActionException; 037import ptolemy.kernel.util.InvalidStateException; 038import ptolemy.kernel.util.Workspace; 039 040/////////////////////////////////////////////////////////////////// 041//// MailboxBoundaryReceiver 042 043/** 044 A process receiver that stores tokens via a mailbox and can be used by 045 composite actors. This receiver extends the functionality of the mailbox 046 receiver found in the actor package in two key ways. First it facilitates 047 blocking reads and writes. If a read (a call to get()) is attempted when 048 this mailbox is empty then the call will block until a token is placed in 049 the mailbox. Similarly if a write (a call to put()) is attempted when 050 this mailbox is full (has a token) then the call will block until the 051 token is removed from the mailbox. 052 <P> 053 The second key feature of this mailbox receiver is that it can be used 054 by opaque composite actors operating in process-oriented models of 055 computation. Indeed the name "MailboxBoundaryReceiver" is used to 056 indicate that this receiver can be contained on the boundary of an 057 opaque composite actor. The get() and put() methods of mailbox boundary 058 receiver can be invoked by a Branch object. In such cases any blocks that 059 occur are registered with the calling branch. The branch will then serve 060 as a proxy by communicating to the director through the branch controller. 061 <P> 062 Note that it is not necessary for a mailbox boundary receiver to be used 063 in the ports of an opaque composite actor. It is perfectly fine for a 064 mailbox boundary receiver to be used in the ports of an atomic actor. In 065 such cases the get() and put() methods are called without the use of a 066 branch object. If blocking reads or writes occur they are registered with 067 the controlling director without the need for a branch or branch 068 controller. 069 070 071 @author John S. Davis II 072 @version $Id$ 073 @since Ptolemy II 1.0 074 @Pt.ProposedRating Green (mudit) 075 @Pt.AcceptedRating Yellow (davisj) 076 @see ptolemy.actor.process.Branch 077 @see ptolemy.actor.process.BranchController 078 079 */ 080public class MailboxBoundaryReceiver extends Mailbox 081 implements ProcessReceiver { 082 /** Construct an empty MailboxBoundaryReceiver with no container. 083 */ 084 public MailboxBoundaryReceiver() { 085 super(); 086 _boundaryDetector = new BoundaryDetector(this); 087 } 088 089 /** Construct an empty MailboxBoundaryReceiver with the specified 090 * container. 091 * @param container The container. 092 * @exception IllegalActionException If the container cannot contain 093 * this receiver. 094 */ 095 public MailboxBoundaryReceiver(IOPort container) 096 throws IllegalActionException { 097 super(container); 098 _boundaryDetector = new BoundaryDetector(this); 099 } 100 101 /////////////////////////////////////////////////////////////////// 102 //// public methods //// 103 104 /** Get a token from this receiver. If the receiver is empty then 105 * block until a token becomes available. Use the local director 106 * to manage blocking reads that occur. If this receiver is 107 * terminated during the execution of this method, then throw a 108 * TerminateProcessException. 109 * 110 * @return The token contained by this receiver. 111 */ 112 @Override 113 public Token get() { 114 Workspace workspace = getContainer().workspace(); 115 Token result = null; 116 117 int depth = 0; 118 try { 119 // NOTE: This used to synchronize on this, but since it calls 120 // director methods that are synchronized on the director, 121 // this can cause deadlock. 122 synchronized (_director) { 123 while (!_terminate) { 124 // Try to read. 125 if (super.hasToken()) { 126 result = super.get(); 127 128 // Need to mark any thread that is write blocked 129 // on this receiver unblocked now, before any 130 // notification, or we will detect deadlock and 131 // increase the buffer sizes. Note that there is 132 // no need to clear the _readPending reference 133 // because that will have been cleared by the 134 // write. 135 if (_writePending != null) { 136 _director.threadUnblocked(_writePending, this); 137 _writePending = null; 138 } 139 140 break; 141 } 142 143 // Wait to try again. 144 try { 145 _readPending = Thread.currentThread(); 146 _director.threadBlocked(Thread.currentThread(), this); 147 // NOTE: We cannot use workspace.wait(Object) here without 148 // introducing a race condition, because we have to release 149 // the lock on the _director before calling workspace.wait(_director). 150 if (depth == 0) { 151 depth = workspace.releaseReadPermission(); 152 } 153 _director.wait(); 154 } catch (InterruptedException e) { 155 _terminate = true; 156 } 157 } 158 159 if (_terminate) { 160 throw new TerminateProcessException(""); 161 } 162 } 163 } finally { 164 if (depth > 0) { 165 workspace.reacquireReadPermission(depth); 166 } 167 } 168 return result; 169 } 170 171 /** Return the director in charge of this receiver, or null 172 * if there is none. 173 * @return The director in charge of this receiver. 174 */ 175 public ProcessDirector getDirector() { 176 return _director; 177 } 178 179 /** Return true if this receiver is connected to a boundary port. 180 * A boundary port is an opaque port that is contained by a 181 * composite actor. If this receiver is connected to a boundary 182 * port, then return true; otherwise return false. 183 * This method is not synchronized so the caller should be. 184 * 185 * @return True if this receiver is connected to boundary port; 186 * return false otherwise. 187 * @exception IllegalActionException 188 */ 189 @Override 190 public boolean isConnectedToBoundary() throws IllegalActionException { 191 return _boundaryDetector.isConnectedToBoundary(); 192 } 193 194 /** Return true if this receiver is connected to the inside of an 195 * input boundary port; return false otherwise. A boundary port is 196 * an opaque port that is contained by a composite actor. This 197 * method is not synchronized so the caller should be. 198 * 199 * @return True if this receiver is connected to the inside of a 200 * boundary port; return false otherwise. 201 * @exception IllegalActionException 202 * @exception InvalidStateException 203 */ 204 @Override 205 public boolean isConnectedToBoundaryInside() 206 throws InvalidStateException, IllegalActionException { 207 return _boundaryDetector.isConnectedToBoundaryInside(); 208 } 209 210 /** Return true if this receiver is connected to the outside of an 211 * output boundary port; return false otherwise. A boundary port is 212 * an opaque port that is contained by a composite actor. If this 213 * receiver is contained on the inside of a boundary port, then return 214 * false. This method is not synchronized so the caller should be. 215 * 216 * @return True if this receiver is connected to the outside of a 217 * boundary port; return false otherwise. 218 * @exception IllegalActionException 219 */ 220 @Override 221 public boolean isConnectedToBoundaryOutside() 222 throws IllegalActionException { 223 return _boundaryDetector.isConnectedToBoundaryOutside(); 224 } 225 226 /** Return true if this is a consumer receiver; return false otherwise. 227 * A consumer receiver is defined as a receiver that is connected to 228 * a boundary port. 229 * 230 * @return True if this is a consumer receiver; return false otherwise. 231 * @exception IllegalActionException 232 */ 233 @Override 234 public boolean isConsumerReceiver() throws IllegalActionException { 235 if (isConnectedToBoundary()) { 236 return true; 237 } 238 239 return false; 240 } 241 242 /** Return true if this receiver is contained on the inside of a 243 * boundary port. A boundary port is an opaque port that is 244 * contained by a composite actor. If this receiver is contained 245 * on the inside of a boundary port then return true; otherwise 246 * return false. This method is not synchronized so the caller 247 * should be. 248 * 249 * @return True if this receiver is contained on the inside of 250 * a boundary port; return false otherwise. 251 */ 252 @Override 253 public boolean isInsideBoundary() { 254 return _boundaryDetector.isInsideBoundary(); 255 } 256 257 /** Return true if this receiver is contained on the outside of a 258 * boundary port. A boundary port is an opaque port that is 259 * contained by a composite actor. If this receiver is contained 260 * on the outside of a boundary port then return true; otherwise 261 * return false. This method is not synchronized so the caller 262 * should be. 263 * 264 * @return True if this receiver is contained on the outside of a 265 * boundary port; return false otherwise. 266 */ 267 @Override 268 public boolean isOutsideBoundary() { 269 return _boundaryDetector.isOutsideBoundary(); 270 } 271 272 /** Return true if this is a producer receiver; return false otherwise. 273 * A producer receiver is defined as a receiver that is connected to 274 * a boundary port. 275 * 276 * @return True if this is a producer receiver; return false otherwise. 277 */ 278 @Override 279 public boolean isProducerReceiver() { 280 if (isOutsideBoundary() || isInsideBoundary()) { 281 return true; 282 } 283 284 return false; 285 } 286 287 /** Return a true or false to indicate whether there is a read block 288 * on this receiver or not, respectively. 289 * @return a boolean indicating whether a read is blocked on this 290 * receiver or not. 291 */ 292 @Override 293 public boolean isReadBlocked() { 294 // NOTE: This method used to be synchronized on this 295 // receiver, but since it is called by synchronized methods in 296 // the director, that can cause deadlock. 297 synchronized (_director) { 298 return _readPending != null; 299 } 300 } 301 302 /** Return a true or false to indicate whether there is a write block 303 * on this receiver or not. 304 * @return A boolean indicating whether a write is blocked on this 305 * receiver or not. 306 */ 307 @Override 308 public boolean isWriteBlocked() { 309 // NOTE: This method used to be synchronized on this 310 // receiver, but since it is called by synchronized methods in 311 // the director, that can cause deadlock. 312 synchronized (_director) { 313 return _writePending != null; 314 } 315 } 316 317 /** Put a token into this receiver. If the receiver is full 318 * (contains a token) then block until room becomes available. 319 * Use the local director to manage blocking writes that occur. 320 * If this receiver is terminated during the execution of this 321 * method, then throw a TerminateProcessException. 322 * If the specified token is null, this method does nothing. 323 * 324 * @param token The token being placed in this receiver, or null 325 * to do nothing. 326 */ 327 @Override 328 public void put(Token token) { 329 if (token == null) { 330 return; 331 } 332 Workspace workspace = getContainer().workspace(); 333 int depth = 0; 334 try { 335 // NOTE: This used to synchronize on this, but since it calls 336 // director methods that are synchronized on the director, 337 // this can cause deadlock. 338 synchronized (_director) { 339 while (!_terminate) { 340 // Try to write. 341 if (super.hasRoom()) { 342 super.put(token); 343 344 // If any thread is blocked on a get(), then it will become 345 // unblocked. Notify the director now so that there isn't a 346 // spurious deadlock detection. 347 if (_readPending != null) { 348 _director.threadUnblocked(_readPending, this); 349 _readPending = null; 350 } 351 352 // Normally, the _writePending reference will have 353 // been cleared by the read that unblocked this 354 // write. However, it might be that the director 355 // increased the buffer size, which would also 356 // have the affect of unblocking this 357 // write. Hence, we clear it here if it is set. 358 if (_writePending != null) { 359 _director.threadUnblocked(_writePending, this); 360 _writePending = null; 361 } 362 363 break; 364 } 365 366 // Wait to try again. 367 try { 368 _writePending = Thread.currentThread(); 369 _director.threadBlocked(_writePending, this); 370 371 // NOTE: We cannot use workspace.wait(Object) here without 372 // introducing a race condition, because we have to release 373 // the lock on the _director before calling workspace.wait(_director). 374 if (depth == 0) { 375 depth = workspace.releaseReadPermission(); 376 } 377 _director.wait(); 378 } catch (InterruptedException e) { 379 _terminate = true; 380 } 381 } 382 383 if (_terminate) { 384 throw new TerminateProcessException("Process terminated."); 385 } 386 } 387 } finally { 388 if (depth > 0) { 389 workspace.reacquireReadPermission(depth); 390 } 391 } 392 } 393 394 /** Set a local flag requesting that execution of the actor 395 * containing this receiver discontinue. 396 */ 397 @Override 398 public synchronized void requestFinish() { 399 _terminate = true; 400 notifyAll(); 401 } 402 403 /** Reset the local flags of this receiver. Use this method when 404 * restarting execution. 405 */ 406 @Override 407 public void reset() { 408 if (_readPending != null) { 409 _director.threadUnblocked(_readPending, this); 410 } 411 412 if (_writePending != null) { 413 _director.threadUnblocked(_writePending, this); 414 } 415 416 _terminate = false; 417 _boundaryDetector.reset(); 418 } 419 420 /** Set the container. This overrides the base class to record 421 * the director. 422 * @param port The container. 423 * @exception IllegalActionException If the container is not of 424 * an appropriate subclass of IOPort, or if the container's director 425 * is not an instance of ProcessDirector. 426 */ 427 @Override 428 public void setContainer(IOPort port) throws IllegalActionException { 429 super.setContainer(port); 430 431 if (port == null) { 432 _director = null; 433 } else { 434 Actor actor = (Actor) port.getContainer(); 435 Director director; 436 437 // For a composite actor, 438 // the receiver type of an input port is decided by 439 // the executive director. 440 // While the receiver type of an output is decided by the director. 441 // NOTE: getExecutiveDirector() and getDirector() yield the same 442 // result for actors that do not contain directors. 443 if (port.isInput()) { 444 director = actor.getExecutiveDirector(); 445 } else { 446 director = actor.getDirector(); 447 } 448 449 if (!(director instanceof ProcessDirector)) { 450 throw new IllegalActionException(port, 451 "Cannot use an instance of PNQueueReceiver " 452 + "since the director is not a PNDirector."); 453 } 454 455 _director = (ProcessDirector) director; 456 } 457 } 458 459 /////////////////////////////////////////////////////////////////// 460 //// private methods //// 461 462 /** The boundary detector. */ 463 private BoundaryDetector _boundaryDetector; 464 465 /** The director in charge of this receiver. */ 466 private ProcessDirector _director; 467 468 /** Reference to a thread that is read blocked on this receiver. */ 469 private Thread _readPending = null; 470 471 /** Flag indicating that termination has been requested. */ 472 private boolean _terminate = false; 473 474 /** Reference to a thread that is write blocked on this receiver. */ 475 private Thread _writePending = null; 476}