001/* Director for Kahn-MacQueen process network semantics. 002 003 Copyright (c) 1998-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.domains.pn.kernel; 029 030import java.lang.ref.WeakReference; 031import java.util.HashMap; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.ListIterator; 035 036import ptolemy.actor.CompositeActor; 037import ptolemy.actor.IORelation; 038import ptolemy.actor.Receiver; 039import ptolemy.actor.process.CompositeProcessDirector; 040import ptolemy.actor.process.ProcessReceiver; 041import ptolemy.data.IntToken; 042import ptolemy.data.expr.Parameter; 043import ptolemy.data.type.BaseType; 044import ptolemy.domains.pn.kernel.event.PNProcessListener; 045import ptolemy.kernel.CompositeEntity; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.InternalErrorException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.Workspace; 050 051/////////////////////////////////////////////////////////////////// 052//// PNDirector 053 054/** 055 <p>A PNDirector governs the execution of a CompositeActor with extended 056 Kahn-MacQueen process networks (PN) semantics. This model of computation has 057 been extended to support mutations of graphs in a non-deterministic way. 058 </p><p> 059 The thread that calls the various execution methods (initialize, prefire, fire 060 and postfire) on the director is referred to as the <i>directing thread</i>. 061 This directing thread might be the main thread responsible for the execution 062 of the entire simulation or might be the thread created by the executive 063 director of the containing composite actor. 064 </p><p> 065 In the PN domain, the director creates a thread (an instance of 066 ProcessThread), representing a Kahn process, for each actor in the model. 067 The threads are created in initialize() and started in the prefire() method 068 of the ProcessDirector. A process is considered <i>active</i> from its 069 creation until its termination. An active process can block when trying to 070 read from a channel (read-blocked), when trying to write to a channel 071 (write-blocked) or when waiting for a queued topology change request to be 072 processed (mutation-blocked). 073 </p><p> 074 A <i>deadlock</i> is when all the active processes are blocked. 075 The director is responsible for handling deadlocks during execution. 076 This director handles two different sorts of deadlocks, <i>real deadlock</i> 077 and <i>artificial deadlock</i>. 078 </p><p> 079 A real deadlock is when all the processes are blocked on a read meaning that 080 no process can proceed until it receives new data. The execution can be 081 terminated, if desired, in such a situation. If the container of this director 082 does not have any input ports (as is in the case of a top-level composite 083 actor), then the executive director or manager terminates the execution. 084 If the container has input ports, then it is up to the 085 executive director of the container to decide on the termination of the 086 execution. To terminate the execution after detection of a real deadlock, the 087 manager or the executive director calls wrapup() on the director. 088 </p><p> 089 An artificial deadlock is when all processes are blocked and at least one 090 process is blocked on a write. In this case the director increases the 091 capacity of the receiver with the smallest capacity amongst all the 092 receivers on which a process is blocked on a write. 093 This breaks the deadlock and the execution can resume. 094 If the increase results in a capacity that exceeds the value of 095 <i>maximumQueueCapacity</i>, then instead of breaking the deadlock, 096 an exception is thrown. This can be used to detect erroneous models 097 that require unbounded queues.</p> 098 099 <p>There are at least three ways for a PN model to terminate itself: 100 <ol> 101 <li>Have the model starve itself. Typically, a boolean switch is used. 102 See the PN OrderedMerge demo at 103 <a href="ptolemy/domains/pn/demo/OrderedMerge/OrderedMerge.xml"><code>ptolemy/domains/pn/demo/OrderedMerge/OrderedMerge.xml</code></a> 104 105 <li>Have the model call the Stop actor. See the PN RemoveNilTokens demo at 106 <a href="ptolemy/domains/pn/demo/RemoveNilTokens/RemoveNilTokens.xml"><code>ptolemy/domains/pn/demo/RemoveNilTokens/RemoveNilTokens.xmll</code></a> 107 108 <li>Set the <i>firingCountLimit</i> 109 ({@link ptolemy.actor.lib.LimitedFiringSource#_firingCountLimit}) actor 110 parameter to the number of iterations desired. Actors such as Ramp 111 extend LimitedFiringSource and have the <i>firingCountLimit</i> parameter. 112 </ol> 113 114 115 @author Mudit Goel, Edward A. Lee, Xiaowen Xin 116 @version $Id$ 117 @since Ptolemy II 0.2 118 @Pt.ProposedRating Green (mudit) 119 @Pt.AcceptedRating Green (davisj) 120 */ 121public class PNDirector extends CompositeProcessDirector { 122 /** Construct a director in the default workspace with an empty string 123 * as its name. The director is added to the list of objects in 124 * the workspace. Increment the version number of the workspace. 125 * Create a director parameter "initialQueueCapacity" with the default 126 * value 1. This sets the initial capacities of the queues in all 127 * the receivers created in the PN domain. 128 * @exception IllegalActionException If the name has a period in it, or 129 * the director is not compatible with the specified container. 130 * @exception NameDuplicationException If the container already contains 131 * an entity with the specified name. 132 */ 133 public PNDirector() 134 throws IllegalActionException, NameDuplicationException { 135 super(); 136 _init(); 137 } 138 139 /** Construct a director in the workspace with an empty name. 140 * The director is added to the list of objects in the workspace. 141 * Increment the version number of the workspace. 142 * Create a director parameter "initialQueueCapacity" with the default 143 * value 1. This sets the initial capacities of the queues in all 144 * the receivers created in the PN domain. 145 * @param workspace The workspace of this object. 146 * @exception IllegalActionException If the name has a period in it, or 147 * the director is not compatible with the specified container. 148 * @exception NameDuplicationException If the container already contains 149 * an entity with the specified name. 150 */ 151 public PNDirector(Workspace workspace) 152 throws IllegalActionException, NameDuplicationException { 153 super(workspace); 154 _init(); 155 } 156 157 /** Construct a director in the given container with the given name. 158 * If the container argument must not be null, or a 159 * NullPointerException will be thrown. 160 * If the name argument is null, then the name is set to the 161 * empty string. Increment the version number of the workspace. 162 * 163 * Create a director parameter "initialQueueCapacity" with the default 164 * value 1. This sets the initial capacities of the queues in all 165 * the receivers created in the PN domain. 166 * @param container Container of the director. 167 * @param name Name of this director. 168 * @exception IllegalActionException If the director is not compatible 169 * with the specified container. Thrown in derived classes. 170 * @exception NameDuplicationException If the container not a 171 * CompositeActor and the name collides with an entity in the container. 172 */ 173 public PNDirector(CompositeEntity container, String name) 174 throws IllegalActionException, NameDuplicationException { 175 super(container, name); 176 _init(); 177 } 178 179 /////////////////////////////////////////////////////////////////// 180 //// parameters //// 181 182 /** The initial size of the queues for each communication channel. 183 * This is an integer that defaults to 1. 184 */ 185 public Parameter initialQueueCapacity; 186 187 /** The maximum size of the queues for each communication channel. 188 * This is an integer that defaults to 65536. To specify unbounded 189 * queues, set this to 0. 190 */ 191 public Parameter maximumQueueCapacity; 192 193 /////////////////////////////////////////////////////////////////// 194 //// public methods //// 195 196 /** Add a process state change listener to this director. The listener 197 * will be notified of each change to the state of a process. 198 * @param listener The PNProcessListener to add. 199 * @see #removeProcessListener(PNProcessListener) 200 */ 201 public void addProcessListener(PNProcessListener listener) { 202 _processListeners.add(listener); 203 } 204 205 /** Clone the director into the specified workspace. The new object is 206 * <i>not</i> added to the directory of that workspace (It must be added 207 * by the user if he wants it to be there). 208 * The result is a new director with no container, no pending mutations, 209 * and no topology listeners. The count of active processes is zero. 210 * 211 * @param workspace The workspace for the cloned object. 212 * @exception CloneNotSupportedException If one of the attributes 213 * cannot be cloned. 214 * @return The new PNDirector. 215 */ 216 @Override 217 public Object clone(Workspace workspace) throws CloneNotSupportedException { 218 PNDirector newObject = (PNDirector) super.clone(workspace); 219 //System.out.println("PNDirector.clone: " + _processListeners); 220 newObject._processListeners = new LinkedList(); 221 //System.out.println("PNDirector.clone: " + _processListeners + " clone: " + newObject._processListeners); 222 newObject._readBlockedQueues = new HashMap(); 223 newObject._receivers = new LinkedList(); 224 newObject._writeBlockedQueues = new HashMap(); 225 return newObject; 226 } 227 228 /** Invoke the initialize() method of ProcessDirector. Also set all the 229 * state variables to the their initial values. The list of process 230 * listeners is not reset as the developer might want to reuse the 231 * list of listeners. 232 * @exception IllegalActionException If the initialize() method of one 233 * of the deeply contained actors throws it. 234 */ 235 @Override 236 public void initialize() throws IllegalActionException { 237 // Initialize these counts BEFORE creating threads. 238 _readBlockedQueues.clear(); 239 _writeBlockedQueues.clear(); 240 241 super.initialize(); 242 } 243 244 /** Return a new receiver compatible with this director. The receiver 245 * is an instance of PNQueueReceiver. Set the initial capacity 246 * of the FIFO queue in the receiver to the value specified by the 247 * director parameter "initialQueueCapacity". The default value 248 * of the parameter is 1. 249 * @return A new PNQueueReceiver. 250 */ 251 @Override 252 public Receiver newReceiver() { 253 PNQueueReceiver receiver = new PNQueueReceiver(); 254 _receivers.add(new WeakReference(receiver)); 255 256 // Set the capacity to the default. Note that it will also 257 // be set in preinitialize(). 258 try { 259 int capacity = ((IntToken) initialQueueCapacity.getToken()) 260 .intValue(); 261 receiver.setCapacity(capacity); 262 } catch (IllegalActionException e) { 263 throw new InternalErrorException(e); 264 } 265 266 return receiver; 267 } 268 269 /** Return true if the containing composite actor contains active 270 * processes and the composite actor has input ports and if stop() 271 * has not been called. Return false otherwise. This method is 272 * normally called only after detecting a real deadlock, or if 273 * stopFire() is called. True is returned to indicate that the 274 * composite actor can start its execution again if it 275 * receives data on any of its input ports. 276 * @return true to indicate that the composite actor can continue 277 * executing on receiving additional input on its input ports. 278 * @exception IllegalActionException Not thrown in this base class. May be 279 * thrown by derived classes. 280 */ 281 @Override 282 public boolean postfire() throws IllegalActionException { 283 _notDone = super.postfire(); 284 285 // If the container has input ports and there are active processes 286 // in the container, then the execution might restart on receiving 287 // additional data. 288 if (!((CompositeActor) getContainer()).inputPortList().isEmpty() 289 && _getActiveThreadsCount() != 0) { 290 // Avoid returning false on detected deadlock. 291 return !_stopRequested; 292 } else { 293 return _notDone; 294 } 295 } 296 297 /** Override the base class to reset the capacities of all the receivers. 298 * @exception IllegalActionException If the superclass throws it. 299 */ 300 @Override 301 public void preinitialize() throws IllegalActionException { 302 super.preinitialize(); 303 304 // Check that no relation has multiple sources of data connected to it. 305 // FIXME: This only detects the error at this level of the hierarchy. 306 // Probably need to recursively descend into composite actors. 307 CompositeEntity container = (CompositeEntity) getContainer(); 308 Iterator relations = container.relationList().iterator(); 309 310 while (relations.hasNext()) { 311 IORelation relation = (IORelation) relations.next(); 312 313 if (relation.linkedSourcePortList().size() > 1) { 314 throw new IllegalActionException(relation, 315 "Relation has multiple sources of data," 316 + " which is not allowed in PN." 317 + " If you want nondeterministic merge," 318 + " use the NondeterministicMerge actor."); 319 } 320 } 321 322 // Reset the capacities of all the receivers. 323 Parameter parameter = (Parameter) getAttribute("initialQueueCapacity"); 324 int capacity = ((IntToken) parameter.getToken()).intValue(); 325 ListIterator receivers = _receivers.listIterator(); 326 327 while (receivers.hasNext()) { 328 WeakReference reference = (WeakReference) receivers.next(); 329 330 if (reference.get() == null) { 331 // Reference has been garbage collected. 332 receivers.remove(); 333 } else { 334 PNQueueReceiver receiver = (PNQueueReceiver) reference.get(); 335 if (receiver.getDirector() == this) { 336 receiver.clear(); 337 receiver.setCapacity(capacity); 338 } else { 339 // If the director is not this, then 340 // the receiver is no longer in use and 341 // can be removed. 342 receivers.remove(); 343 } 344 } 345 } 346 } 347 348 /** Remove a process listener from this director. 349 * If the listener is not attached to this director, do nothing. 350 * 351 * @param listener The PNProcessListener to be removed. 352 * @see #addProcessListener(PNProcessListener) 353 */ 354 public void removeProcessListener(PNProcessListener listener) { 355 _processListeners.remove(listener); 356 } 357 358 /** Return an array of suggested ModalModel directors to use with 359 * PNDirector. The default director is MultirateFSMDirector, the 360 * alternative director is FSMDirector. 361 * @return An array of suggested directors to be used with ModalModel. 362 * @see ptolemy.actor.Director#suggestedModalModelDirectors() 363 */ 364 @Override 365 public String[] suggestedModalModelDirectors() { 366 return new String[] { 367 "ptolemy.domains.modal.kernel.MultirateFSMDirector", 368 "ptolemy.domains.modal.kernel.FSMDirector", 369 "ptolemy.domains.modal.kernel.NonStrictFSMDirector" }; 370 } 371 372 /** Return true to indicate that a ModalModel under control 373 * of this director supports multirate firing. 374 * @return True indicating a ModalModel under control of this director 375 * supports multirate firing. 376 */ 377 @Override 378 public boolean supportMultirateFiring() { 379 return true; 380 } 381 382 /** Notify the director that the specified thread is blocked 383 * on an I/O operation. 384 * @param thread The thread. 385 * @param receiver The receiver handling the I/O operation, 386 * or null if it is not a specific receiver. 387 * @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED 388 * to indicate whether the thread is blocked on read or write. 389 * @see CompositeProcessDirector#threadBlocked(Thread, ProcessReceiver) 390 */ 391 public synchronized void threadBlocked(Thread thread, 392 ProcessReceiver receiver, boolean readOrWrite) { 393 if (readOrWrite == READ_BLOCKED) { 394 _readBlockedQueues.put(receiver, thread); 395 } else { 396 _writeBlockedQueues.put(receiver, thread); 397 } 398 399 super.threadBlocked(thread, receiver); 400 } 401 402 /** Notify the director that the specified thread is unblocked 403 * on an I/O operation. If the thread has 404 * not been registered with threadBlocked(), then this call is 405 * ignored. 406 * @param thread The thread. 407 * @param receiver The receiver handling the I/O operation, 408 * or null if it is not a specific receiver. 409 * @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED 410 * to indicate whether the thread is blocked on read or write. 411 * @see CompositeProcessDirector#threadUnblocked(Thread, ProcessReceiver) 412 */ 413 public synchronized void threadUnblocked(Thread thread, 414 ProcessReceiver receiver, boolean readOrWrite) { 415 if (readOrWrite == READ_BLOCKED) { 416 _readBlockedQueues.remove(receiver); 417 } else { 418 _writeBlockedQueues.remove(receiver); 419 } 420 421 super.threadUnblocked(thread, receiver); 422 } 423 424 /////////////////////////////////////////////////////////////////// 425 //// public variables //// 426 427 /** Indicator that a thread is read blocked. */ 428 public static final boolean READ_BLOCKED = true; 429 430 /** Indicator that a thread is write blocked. */ 431 public static final boolean WRITE_BLOCKED = false; 432 433 /////////////////////////////////////////////////////////////////// 434 //// protected methods //// 435 436 /** Double the capacity of one of the queues with the smallest 437 * capacity belonging to a receiver on which a process is blocked 438 * while attempting to write. <p>Traverse through the list of receivers 439 * on which a process is blocked on a write and choose the one containing 440 * the queue with the smallest capacity. Double the capacity 441 * if the capacity is non-negative. In case the capacity is 442 * negative, set the capacity to 1. 443 * Unblock the process blocked on a write to the receiver containing this 444 * queue. 445 * Notify the thread corresponding to the blocked process to resume 446 * its execution and return.</p> 447 * @exception IllegalActionException If the resulting capacity would 448 * exceed the value of <i>maximumQueueCapacity</i>. 449 */ 450 protected synchronized void _incrementLowestWriteCapacityPort() 451 throws IllegalActionException { 452 // NOTE: This is synchronized as a precaution, although in theory 453 // it gets called only within a synchronized block of the fire() 454 // method of the parent ProcessDirector. It must be synchronized 455 // because of the notifyAll() call at the end. 456 PNQueueReceiver smallestCapacityQueue = null; 457 int smallestCapacity = -1; 458 Iterator receivers = _writeBlockedQueues.keySet().iterator(); 459 460 if (!receivers.hasNext()) { 461 return; 462 } 463 464 while (receivers.hasNext()) { 465 PNQueueReceiver queue = (PNQueueReceiver) receivers.next(); 466 467 if (smallestCapacity == -1) { 468 smallestCapacityQueue = queue; 469 smallestCapacity = queue.getCapacity(); 470 } else if (smallestCapacity > queue.getCapacity()) { 471 smallestCapacityQueue = queue; 472 smallestCapacity = queue.getCapacity(); 473 } 474 } 475 476 int capacity = smallestCapacityQueue.getCapacity(); 477 478 if (capacity <= 0) { 479 smallestCapacityQueue.setCapacity(1); 480 capacity = 1; 481 } else { 482 int maximumCapacity = ((IntToken) maximumQueueCapacity.getToken()) 483 .intValue(); 484 485 if (maximumCapacity > 0 && capacity * 2 > maximumCapacity) { 486 int channel = smallestCapacityQueue.getContainer() 487 .getChannelForReceiver(smallestCapacityQueue); 488 String msg = "Queue size " + capacity * 2 489 + " exceeds the maximum capacity in port " 490 + smallestCapacityQueue.getContainer().getFullName() 491 + (channel > 0 ? " (channel " + channel + ")" : "") 492 + ". Perhaps you have an unbounded queue?"; 493 494 if (_debugging) { 495 _debug(msg); 496 } 497 498 throw new IllegalActionException( 499 smallestCapacityQueue.getContainer(), msg); 500 } 501 502 smallestCapacityQueue.setCapacity(capacity * 2); 503 } 504 505 if (_debugging) { 506 _debug("increasing the capacity of receiver " 507 + smallestCapacityQueue.getContainer() + " to " 508 + smallestCapacityQueue.getCapacity()); 509 } 510 511 // Need to mark any thread that is blocked on 512 // this receiver unblocked now, before the notification, 513 // or we will detect deadlock all over again and 514 // again increase the buffer sizes. 515 threadUnblocked((Thread) _writeBlockedQueues.get(smallestCapacityQueue), 516 smallestCapacityQueue, WRITE_BLOCKED); 517 518 return; 519 } 520 521 /** Resolve an artificial deadlock and return true. If the 522 * deadlock is not an artificial deadlock (it is a real deadlock), 523 * then return false. 524 * If it is an artificial deadlock, select the 525 * receiver with the smallest queue capacity on which any process is 526 * blocked on a write and increment the capacity of the contained queue. 527 * If the capacity is non-negative, then increment the capacity by 1. 528 * Otherwise set the capacity to 1. Unblock the process blocked on 529 * this receiver. Notify the thread corresponding to the blocked 530 * process and return true. 531 * <p> 532 * If derived classes introduce new forms of deadlocks, they should 533 * override this method to introduce mechanisms of handling those 534 * deadlocks. This method is called from the fire() method of the director 535 * alone.</p> 536 * @return True after handling an artificial deadlock. Otherwise return 537 * false. 538 * @exception IllegalActionException If the maximum queue capacity 539 * is exceeded. 540 * This might be thrown by derived classes. 541 */ 542 @Override 543 protected boolean _resolveInternalDeadlock() throws IllegalActionException { 544 if (_writeBlockedQueues.isEmpty() && !_readBlockedQueues.isEmpty()) { 545 // There is a real deadlock. 546 if (_debugging) { 547 _debug("Deadlock detected: no processes blocked on write, but some are blocked on read."); 548 } 549 550 return false; 551 } else if (_getActiveThreadsCount() == 0) { 552 // There is a real deadlock as no processes are active. 553 if (_debugging) { 554 _debug("No more active processes."); 555 } 556 557 return false; 558 } else { 559 // This is an artificial deadlock. Hence find the input port with 560 // lowest capacity queue that is blocked on a write and increment 561 // its capacity; 562 if (_debugging) { 563 _debug("Artificial Deadlock - increasing queue capacity."); 564 } 565 566 _incrementLowestWriteCapacityPort(); 567 return true; 568 } 569 } 570 571 /////////////////////////////////////////////////////////////////// 572 //// protected variables //// 573 574 /** The set of processes blocked on a read from a receiver. */ 575 protected HashMap _readBlockedQueues = new HashMap(); 576 577 /** The set of receivers blocked on a write to a receiver. */ 578 protected HashMap _writeBlockedQueues = new HashMap(); 579 580 /** The list of all receivers that this director has created. */ 581 protected LinkedList _receivers = new LinkedList(); 582 583 /////////////////////////////////////////////////////////////////// 584 //// private methods //// 585 private void _init() 586 throws IllegalActionException, NameDuplicationException { 587 initialQueueCapacity = new Parameter(this, "initialQueueCapacity", 588 new IntToken(1)); 589 initialQueueCapacity.setTypeEquals(BaseType.INT); 590 591 maximumQueueCapacity = new Parameter(this, "maximumQueueCapacity", 592 new IntToken(65536)); 593 maximumQueueCapacity.setTypeEquals(BaseType.INT); 594 } 595 596 /////////////////////////////////////////////////////////////////// 597 //// private variables //// 598 599 /** List of process listeners. */ 600 private LinkedList _processListeners = new LinkedList(); 601 602}