001/* The base class for directors for the process oriented domains. 002 003 Copyright (c) 1998-2015 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 Semantics of initialize(Actor) have changed. 028 */ 029package ptolemy.actor.process; 030 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedList; 034 035import ptolemy.actor.Actor; 036import ptolemy.actor.CompositeActor; 037import ptolemy.actor.Director; 038import ptolemy.actor.IOPort; 039import ptolemy.actor.Initializable; 040import ptolemy.actor.Manager; 041import ptolemy.actor.Receiver; 042import ptolemy.kernel.CompositeEntity; 043import ptolemy.kernel.util.IllegalActionException; 044import ptolemy.kernel.util.NameDuplicationException; 045import ptolemy.kernel.util.NamedObj; 046import ptolemy.kernel.util.Workspace; 047 048/////////////////////////////////////////////////////////////////// 049//// ProcessDirector 050 051/** 052 The base class for directors for the process oriented domains. It provides 053 default implementations for methods that are common across such domains. 054 <P> 055 In the process oriented domains, the director controlling a model 056 needs to keep track of the state of the model. In particular it needs 057 to maintain an accurate count of the number of active processes under 058 its control and any processes that are blocked for whatever reason (trying 059 to read from an empty channel as in PN). 060 These counts, and perhaps other counts, are needed by the 061 director to control and respond when deadlock is detected (no processes 062 can make progress), or to respond to requests from higher in the hierarchy. 063 <P> 064 The methods that control how the director detects and responds to deadlocks 065 are _areActorsDeadlocked() and _resolveDeadlock(). These methods should be 066 overridden in derived classes to get domain-specific behaviour. The 067 implementations given here are trivial and suffice only to illustrate 068 the approach that should be followed. 069 <P> 070 This base class is not sufficient for executing hierarchical, heterogeneous 071 models. In order to accommodate hierarchical, heterogeneity the subclass 072 CompositeProcessDirector must be used. 073 <P> 074 <P> 075 @author Mudit Goel, Neil Smyth, John S. Davis II 076 @version $Id$ 077 @since Ptolemy II 0.2 078 @Pt.ProposedRating Green (mudit) 079 @Pt.AcceptedRating Yellow (mudit) 080 @see Director 081 */ 082public class ProcessDirector extends Director { 083 /** Construct a director in the default workspace with an empty string 084 * as its name. The director is added to the list of objects in 085 * the workspace. Increment the version number of the workspace. 086 * @exception NameDuplicationException If construction of Time objects fails. 087 * @exception IllegalActionException If construction of Time objects fails. 088 */ 089 public ProcessDirector() 090 throws IllegalActionException, NameDuplicationException { 091 super(); 092 } 093 094 /** Construct a director in the workspace with an empty name. 095 * The director is added to the list of objects in the workspace. 096 * Increment the version number of the workspace. 097 * @param workspace The workspace of this object. 098 * @exception NameDuplicationException If construction of Time objects fails. 099 * @exception IllegalActionException If construction of Time objects fails. 100 */ 101 public ProcessDirector(Workspace workspace) 102 throws IllegalActionException, NameDuplicationException { 103 super(workspace); 104 } 105 106 /** Construct a director in the given container with the given name. 107 * If the container argument must not be null, or a 108 * NullPointerException will be thrown. 109 * If the name argument is null, then the name is set to the 110 * empty string. Increment the version number of the workspace. 111 * 112 * @param container The container 113 * @param name Name of this director. 114 * @exception IllegalActionException If the name contains a period, 115 * or if the director is not compatible with the specified container. 116 * @exception NameDuplicationException If the container not a 117 * CompositeActor and the name collides with an entity in the container. 118 */ 119 public ProcessDirector(CompositeEntity container, String name) 120 throws IllegalActionException, NameDuplicationException { 121 super(container, name); 122 } 123 124 /////////////////////////////////////////////////////////////////// 125 //// public methods //// 126 127 /** Notify this director that the specified thread is part of 128 * the execution of this model. This is used 129 * to keep track of whether the model is deadlocked, and also to 130 * terminate threads if necessary. It is important that the thread 131 * call _removeThread() upon exiting. Note further that this 132 * should be called before the thread is started to avoid race 133 * conditions where some threads have been started and others 134 * have not been started and deadlock is falsely detected because 135 * the not-yet-started threads are not counted. 136 * @param thread The thread. 137 * @see #removeThread(Thread) 138 */ 139 public synchronized void addThread(Thread thread) { 140 assert !_activeThreads.contains(thread); 141 _activeThreads.add(thread); 142 assert _activeThreads.contains(thread); 143 144 if (_debugging) { 145 _debug("Adding a thread: " + thread.getName()); 146 } 147 148 notifyAll(); 149 } 150 151 /** Clone the director into the specified workspace. The new object is 152 * <i>not</i> added to the directory of that workspace (It must be added 153 * by the user if he wants it to be there). 154 * The result is a new director with no container, no pending mutations, 155 * and no topology listeners. The count of active processes is zero. 156 * 157 * @param workspace The workspace for the cloned object. 158 * @exception CloneNotSupportedException If one of the attributes 159 * cannot be cloned. 160 * @return The new ProcessDirector. 161 */ 162 @Override 163 public Object clone(Workspace workspace) throws CloneNotSupportedException { 164 ProcessDirector newObject = (ProcessDirector) super.clone(workspace); 165 166 // Is it really necessary to do this? 167 168 // Findbugs: 169 // [M M IS] Inconsistent synchronization [IS2_INCONSISTENT_SYNC] 170 // Actually this is not a problem since the object is 171 // being created and hence nobody else has access to it. 172 173 newObject._blockedThreads = new HashSet(); 174 newObject._pausedThreads = new HashSet(); 175 newObject._activeThreads = new HashSet(); 176 newObject._notDone = true; 177 return newObject; 178 } 179 180 /** Request that the current iteration finishes and postfire() returns 181 * false, indicating to the environment that no more iterations should 182 * be invoked. To support domains where actor firings do not necessarily 183 * terminate, such as PN, you may wish to call stopFire() as well to request 184 * that those actors complete their firings. 185 */ 186 @Override 187 public void finish() { 188 super.finish(); 189 stop(); 190 } 191 192 /** Wait until a deadlock is detected. Then deal with the deadlock 193 * by calling the protected method _resolveDeadlock() and return. 194 * This method is synchronized on the director. 195 * @exception IllegalActionException If a derived class throws it. 196 */ 197 @Override 198 public void fire() throws IllegalActionException { 199 // Don't call "Director.super.fire();" here, do the work instead. 200 Workspace workspace = workspace(); 201 202 // In case we have an enclosing process director, 203 // we identify it so that we can notify it when we are blocked. 204 CompositeActor container = (CompositeActor) getContainer(); 205 Director outsideDirector = container.getExecutiveDirector(); 206 207 if (!(outsideDirector instanceof ProcessDirector)) { 208 outsideDirector = null; 209 } 210 211 int depth = 0; 212 try { 213 synchronized (this) { 214 if (_debugging) { 215 _debug("Called fire()."); 216 } 217 218 while (!_areThreadsDeadlocked() && !_areAllThreadsStopped() 219 && !_stopRequested) { 220 // Added to get thread to stop reliably on pushing stop button. 221 // EAL 8/05 222 if (_stopRequested) { 223 return; 224 } 225 226 if (_debugging) { 227 _debug("Waiting for actors to stop."); 228 } 229 230 try { 231 if (outsideDirector != null) { 232 ((ProcessDirector) outsideDirector).threadBlocked( 233 Thread.currentThread(), null); 234 } 235 // NOTE: We cannot use workspace.wait(Object) here without 236 // introducing a race condition, because we have to release 237 // the lock on the _director before calling workspace.wait(_director). 238 if (depth == 0) { 239 depth = workspace.releaseReadPermission(); 240 } 241 wait(); 242 } catch (InterruptedException e) { 243 if (_debugging) { 244 _debug("Director thread interrupted."); 245 } 246 // stop all threads 247 stop(); 248 return; 249 } finally { 250 if (outsideDirector != null) { 251 ((ProcessDirector) outsideDirector).threadUnblocked( 252 Thread.currentThread(), null); 253 } 254 } 255 } 256 257 if (_debugging) { 258 _debug("Actors have stopped."); 259 } 260 261 // Don't resolve deadlock if we are just pausing 262 // or if a stop has been requested. 263 // NOTE: Added !_stopRequested. EAL 3/12/03. 264 if (_areThreadsDeadlocked() && !_stopRequested) { 265 if (_debugging) { 266 _debug("Deadlock detected."); 267 } 268 269 try { 270 _notDone = _resolveDeadlock(); 271 } catch (IllegalActionException e) { 272 // stop all threads. 273 stop(); 274 throw e; 275 } 276 } 277 } 278 } finally { 279 if (depth > 0) { 280 workspace.reacquireReadPermission(depth); 281 } 282 } 283 } 284 285 /** Initialize the given actor. This class overrides the base 286 * class to reset the flags for all of the receivers, and to 287 * create a new ProcessThread for each actor being controlled. 288 * This class does *NOT* directly call the initialize method of the 289 * actor. That method is instead called by the actor's thread itself. 290 * This allows actors in process domains to create tokens during 291 * initialization, since sending data in a process-based domain 292 * requires threads for each actor. 293 * @exception IllegalActionException If the actor is not 294 * acceptable to the domain. Not thrown in this base class. 295 */ 296 @Override 297 public synchronized void initialize(Actor actor) 298 throws IllegalActionException { 299 // FIXME: Note that ProcessDirector does *not* invoke 300 // super.initialize(actor), so changes made to 301 // Director.initialize(Actor) apply to 302 // ProcessDirector.initialize(Actor). 303 304 // FIXME: This method does not set _resourceScheduling like 305 // the parent method. 306 307 if (_debugging) { 308 _debug("Initializing actor: " + ((NamedObj) actor).getFullName()); 309 } 310 311 // Reset the receivers. 312 Iterator ports = actor.inputPortList().iterator(); 313 314 while (ports.hasNext()) { 315 IOPort port = (IOPort) ports.next(); 316 Receiver[][] receivers = port.getReceivers(); 317 318 for (Receiver[] receiver : receivers) { 319 for (int j = 0; j < receiver.length; j++) { 320 receiver[j].reset(); 321 } 322 } 323 } 324 325 // Create threads. 326 ProcessThread processThread = _newProcessThread(actor, this); 327 _activeThreads.add(processThread); 328 assert _activeThreads.contains(processThread); 329 330 _newActorThreadList.addFirst(processThread); 331 } 332 333 /** Return true if a stop has been requested on the director. 334 * This is used by the ProcessThread to tell the difference 335 * between a request to pause and a request to stop. 336 * @return True if stop() has been called. 337 */ 338 public boolean isStopFireRequested() { 339 return _stopFireRequested; 340 } 341 342 /** Return true if the specified thread has been registered 343 * with addThread() and has not been removed with removeThread(). 344 * @return True if the specified thread is active. 345 * @param thread The thread. 346 * @see #addThread(Thread) 347 * @see #removeThread(Thread) 348 */ 349 public synchronized boolean isThreadActive(Thread thread) { 350 return _activeThreads.contains(thread); 351 } 352 353 /** Return false if a stop has been requested or if 354 * the model has reached deadlock. Return true otherwise. 355 * @return False if the director has detected a deadlock or 356 * a stop has been requested. 357 * @exception IllegalActionException If a derived class throws it. 358 */ 359 @Override 360 public boolean postfire() throws IllegalActionException { 361 _notDone = _notDone && super.postfire(); 362 363 if (_debugging) { 364 synchronized (this) { 365 _debug("Called postfire()."); 366 _debug("_stopRequested = " + _stopRequested); 367 _debug("_stopFireRequested = " + _stopFireRequested); 368 _debug("Returning from postfire(): " + _notDone); 369 } 370 } 371 372 return _notDone; 373 } 374 375 /** Start threads for all actors that have not had threads started 376 * already (this might include actors initialized since the last 377 * invocation of prefire). This starts the threads, corresponding 378 * to all the actors, that were created in a mutation. 379 * @return True. 380 * @exception IllegalActionException If a derived class throws it. 381 */ 382 @Override 383 public boolean prefire() throws IllegalActionException { 384 // FIXME: Note that ProcessDirector does *not* invoke 385 // super.prefire(), so changes made to Director.prefire() 386 // should also be made to ProcessDirector.prefire(). 387 388 // FIXME: this method does nothing about model time. 389 390 synchronized (this) { 391 // Clear the stopFire flag and trigger all of the actor threads. 392 _stopFireRequested = false; 393 394 notifyAll(); 395 } 396 397 // Start threads for actors created since the last invocation 398 // of this prefire() method. 399 Iterator threads = _newActorThreadList.iterator(); 400 401 while (threads.hasNext()) { 402 ProcessThread procThread = (ProcessThread) threads.next(); 403 procThread.start(); 404 } 405 406 _newActorThreadList.clear(); 407 408 return true; 409 } 410 411 /** Preinitialize the model controlled by this director. This 412 * subclass overrides the base class to initialize the number of 413 * running threads before proceeding with preinitialization of 414 * the model. 415 * 416 * @exception IllegalActionException If creating an actor thread 417 * throws it. 418 */ 419 @Override 420 public void preinitialize() throws IllegalActionException { 421 // This method calls super.preinitialize() at the end. 422 423 _notDone = true; 424 synchronized (this) { 425 _activeThreads.clear(); 426 _blockedThreads.clear(); 427 _pausedThreads.clear(); 428 } 429 _newActorThreadList = new LinkedList(); 430 super.preinitialize(); 431 } 432 433 /** Notify this director that the specified thread has finished 434 * executing. This is used to keep track of whether the model 435 * is deadlocked, and also to terminate threads if necessary. 436 * @param thread The thread. 437 * @see #addThread(Thread) 438 */ 439 public synchronized void removeThread(Thread thread) { 440 if (_debugging) { 441 _debug("Thread " + thread.getName() + " is exiting."); 442 } 443 444 //assert _activeThreads.contains(thread); 445 446 _activeThreads.remove(thread); 447 448 assert !_activeThreads.contains(thread); 449 _pausedThreads.remove(thread); 450 _blockedThreads.remove(thread); 451 notifyAll(); 452 } 453 454 /** Request that the director cease execution altogether. 455 * This causes a call to stop() on all actors contained by 456 * the container of this director, and a call to stopThread() 457 * on each of the process threads that contain actors 458 * controlled by this director. This also sets a flag 459 * so that the next call to postfire() returns false. 460 */ 461 @Override 462 public void stop() { 463 // This method does not call super.stop() by design. 464 465 // Set this before calling stopThread(), in case the thread 466 // needs to distinguish between stopFire() and this method. 467 if (_debugging) { 468 _debug("Requesting stop of all threads."); 469 } 470 471 _stopRequested = true; 472 _stopFireRequested = true; 473 474 // We used to copy the active thread set because 475 // when stop() is called on each thread, the 476 // set itself is modified. We could get a 477 // ConcurrentModificationException. 478 479 // However, we were getting a ConcurrentModificationException anyway 480 // on terra, to replicate: (cd $PTII/ptolemy/domains/sysml/test/junit; make) 481 // Also, Coverity scan pointed out that we were accessing 482 // _activeThreads without getting a lock here. So, we lock and 483 // to make our copy. If this does not work, then we could 484 // put the loop inside the synchronized block. 485 LinkedList threadsCopy = null; 486 synchronized (this) { 487 threadsCopy = new LinkedList(_activeThreads); 488 } 489 Iterator threads = threadsCopy.iterator(); 490 while (threads.hasNext()) { 491 Thread thread = (Thread) threads.next(); 492 493 if (thread instanceof ProcessThread) { 494 // NOTE: We used to catch and ignore all exceptions 495 // here, but that doesn't look right to me. EAL 8/05. 496 ((ProcessThread) thread).getActor().stop(); 497 } 498 499 // NOTE: Used to call thread.interrupt() here, with a comment 500 // about how it probably wasn't necessary. But 501 // in applets, this gives a security violation. 502 // If threads fail to stop, the probably the call 503 // below to _requestFinishOnReceivers() isn't doing its 504 // job. 505 } 506 507 // Added to get stop button to work consistently the first time. 508 // EAL 8/05 509 _requestFinishOnReceivers(); 510 511 // Create a notification thread so that this returns immediately 512 // (doesn't have to get a synchronized lock). 513 new NotifyThread(this).start(); 514 } 515 516 /** Request that execution stop at the conclusion of the current 517 * iteration. Call stopThread() on each of the process threads that 518 * contain actors controlled by this director and call stopFire() on 519 * the actors that are contained by these threads. This method is 520 * non-blocking. 521 */ 522 @Override 523 public void stopFire() { 524 // This method does not call super.stopFire() by design. 525 526 if (_debugging) { 527 _debug("stopFire() has been called."); 528 } 529 530 _stopFireRequested = true; 531 532 HashSet actors = new HashSet(); 533 synchronized (this) { 534 Iterator threads = _activeThreads.iterator(); 535 536 while (threads.hasNext()) { 537 Thread thread = (Thread) threads.next(); 538 539 if (thread instanceof ProcessThread) { 540 actors.add(((ProcessThread) thread).getActor()); 541 } 542 } 543 } 544 545 Iterator actorsIterator = actors.iterator(); 546 while (actorsIterator.hasNext()) { 547 ((Actor) actorsIterator.next()).stopFire(); 548 } 549 } 550 551 /** Terminate all threads under control of this director immediately. 552 * This abrupt termination will not allow normal cleanup actions 553 * to be performed, and the model should be recreated after calling 554 * this method. This method uses Thread.stop(), a deprecated method 555 * in Java. 556 */ 557 @Override 558 public void terminate() { 559 // First need to invoke terminate on all actors under the 560 // control of this director. 561 super.terminate(); 562 563 // Now stop any threads created by this director. 564 LinkedList list = new LinkedList(); 565 // FIXME: Coverity Scan points out that we are accessing 566 // _activeThreads without getting a lock here. 567 list.addAll(_activeThreads); 568 _activeThreads.clear(); 569 570 Iterator threads = list.iterator(); 571 572 while (threads.hasNext()) { 573 ((Thread) threads.next()).stop(); 574 } 575 } 576 577 /** Notify the director that the specified thread is blocked 578 * on an I/O operation. If the thread has 579 * not been registered with addThread(), then this call is 580 * ignored. 581 * @param thread The thread. 582 * @param receiver The receiver handling the I/O operation, 583 * or null if it is not a specific receiver. 584 * @see #addThread(Thread) 585 */ 586 public synchronized void threadBlocked(Thread thread, 587 ProcessReceiver receiver) { 588 if (_activeThreads.contains(thread) 589 && !_blockedThreads.contains(thread)) { 590 _blockedThreads.add(thread); 591 notifyAll(); 592 } 593 } 594 595 /** Notify the director that the specified thread has paused 596 * in response to a call to stopFire(). If the thread has 597 * not been registered with addThread(), then this call is 598 * ignored. If the thread has been identified as blocked, 599 * it is removed from the set of blocked threads (so it 600 * doesn't get counted twice). 601 * @param thread The thread. 602 * @see #addThread(Thread) 603 */ 604 public synchronized void threadHasPaused(Thread thread) { 605 if (_activeThreads.contains(thread) 606 && !_pausedThreads.contains(thread)) { 607 _pausedThreads.add(thread); 608 _blockedThreads.remove(thread); 609 notifyAll(); 610 } 611 } 612 613 /** Notify the director that the specified thread has resumed. 614 * If the director has not previously been notified that it was 615 * paused, then this call is ignored. 616 * @param thread The thread. 617 * @see #threadHasPaused(Thread) 618 */ 619 public synchronized void threadHasResumed(Thread thread) { 620 if (_pausedThreads.remove(thread)) { 621 notifyAll(); 622 } 623 } 624 625 /** Notify the director that the specified thread is unblocked 626 * on an I/O operation. If the thread has 627 * not been registered with threadBlocked(), then this call is 628 * ignored. 629 * @param thread The thread. 630 * @param receiver The receiver handling the I/O operation, 631 * or null if it is not a specific receiver. 632 * @see #threadBlocked(Thread, ProcessReceiver) * 633 */ 634 public synchronized void threadUnblocked(Thread thread, 635 ProcessReceiver receiver) { 636 if (_blockedThreads.remove(thread)) { 637 notifyAll(); 638 } 639 } 640 641 /** Do nothing. Input transfers in process domains are handled by 642 * branches, which transfer inputs in a separate thread. 643 * @param port The port. 644 * @return False, to indicate that no tokens were transferred. 645 * @exception IllegalActionException Not thrown in this base class. 646 */ 647 @Override 648 public boolean transferInputs(IOPort port) throws IllegalActionException { 649 return false; 650 } 651 652 /** Do nothing. Output transfers in process domains are handled by 653 * branches, which transfer inputs in a separate thread. 654 * @param port The port. 655 * @return False, to indicate that no tokens were transferred. 656 * @exception IllegalActionException Not thrown in this base class. 657 */ 658 @Override 659 public boolean transferOutputs(IOPort port) throws IllegalActionException { 660 return false; 661 } 662 663 /** End the execution of the model under the control of this 664 * director. A flag is set in all the receivers that causes 665 * each process to terminate at the earliest communication point. 666 * Prior to setting receiver flags, this method wakes up the 667 * threads if they all are stopped. If the container is not 668 * an instance of CompositeActor, then this method does nothing. 669 * <P> 670 * This method is not synchronized on the workspace, so the caller 671 * should be. 672 * 673 * @exception IllegalActionException If an error occurs while 674 * accessing the receivers of all actors under the control of 675 * this director. 676 */ 677 @Override 678 public void wrapup() throws IllegalActionException { 679 // FIXME: Note that ProcessDirector does *not* 680 // invoke super.wrapup(), so changes made to Director.wrapup() 681 // should also be made to ProcessDirector.wrapup(). 682 683 if (_debugging) { 684 _debug("Called wrapup()."); 685 } 686 687 // First invoke initializable methods. 688 if (_initializables != null) { 689 for (Initializable initializable : _initializables) { 690 initializable.wrapup(); 691 } 692 } 693 694 CompositeActor container = (CompositeActor) getContainer(); 695 696 // To ensure that we don't miss the notification from 697 // the processes that are ending, put this in a synchronized 698 // block. 699 int depth = 0; 700 try { 701 synchronized (this) { 702 _requestFinishOnReceivers(); 703 704 // Now wake up threads that depend on the manager. 705 Manager manager = container.getManager(); 706 707 // Do the notification in a new thread so as not 708 // to deadlock with this synchronized block. 709 new NotifyThread(manager).start(); 710 711 // Wait until all threads stop. 712 while (_activeThreads.size() > 0) { 713 if (depth == 0) { 714 depth = workspace().releaseReadPermission(); 715 } 716 try { 717 wait(); 718 } catch (InterruptedException ex) { 719 // ignore, wait until all process threads stop 720 } 721 } 722 } 723 } finally { 724 if (depth > 0) { 725 workspace().reacquireReadPermission(depth); 726 } 727 } 728 } 729 730 /////////////////////////////////////////////////////////////////// 731 //// protected methods //// 732 733 /** Return true if the count of active processes equals the number 734 * of paused and blocked threads. Otherwise return false. 735 * @return True if there are no active processes in the container. 736 */ 737 protected synchronized boolean _areAllThreadsStopped() { 738 return _getActiveThreadsCount() == _getStoppedThreadsCount() 739 + _getBlockedThreadsCount(); 740 } 741 742 /** Return true if the count of active processes in the container is 0. 743 * Otherwise return false. Derived classes must override this method to 744 * return true to any other forms of deadlocks that they might introduce. 745 * @return True if there are no active processes in the container. 746 */ 747 protected synchronized boolean _areThreadsDeadlocked() { 748 return _activeThreads.size() == 0; 749 } 750 751 /** Return the number of active threads under the control of this 752 * director. 753 * @return The number of active threads. 754 */ 755 protected final synchronized int _getActiveThreadsCount() { 756 return _activeThreads.size(); 757 } 758 759 /** Return the number of threads that are currently blocked. 760 * @return Return the number of threads that are currently blocked. 761 */ 762 protected final synchronized int _getBlockedThreadsCount() { 763 return _blockedThreads.size(); 764 } 765 766 /** Return the number of threads that are currently stopped. 767 * @return Return the number of threads that are currently stopped. 768 */ 769 protected final synchronized int _getStoppedThreadsCount() { 770 return _pausedThreads.size(); 771 } 772 773 /** Create a new ProcessThread for controlling the actor that 774 * is passed as a parameter of this method. Subclasses are 775 * encouraged to override this method as necessary for domain 776 * specific functionality. 777 * @param actor The actor that the created ProcessThread will 778 * control. 779 * @param director The director that manages the model that the 780 * created thread is associated with. 781 * @return Return a new ProcessThread that will control the 782 * actor passed as a parameter for this method. 783 * @exception IllegalActionException If creating an new ProcessThread 784 * throws it. 785 */ 786 protected ProcessThread _newProcessThread(Actor actor, 787 ProcessDirector director) throws IllegalActionException { 788 return new ProcessThread(actor, director); 789 } 790 791 /** Return false indicating that deadlock has not been resolved 792 * and that execution will be discontinued. In derived classes, 793 * override this method to obtain domain specific handling of 794 * deadlocks. Return false if a real deadlock has occurred and 795 * the simulation can be ended. Return true if the simulation 796 * can proceed given additional data and need not be terminated. 797 * @return False. 798 * @exception IllegalActionException Not thrown in this base class. 799 */ 800 protected boolean _resolveDeadlock() throws IllegalActionException { 801 return false; 802 } 803 804 /** Call requestFinish() on all receivers. 805 */ 806 protected void _requestFinishOnReceivers() { 807 CompositeActor container = (CompositeActor) getContainer(); 808 Iterator actors = container.deepEntityList().iterator(); 809 Iterator actorPorts; 810 811 while (actors.hasNext()) { 812 Actor actor = (Actor) actors.next(); 813 actorPorts = actor.inputPortList().iterator(); 814 815 while (actorPorts.hasNext()) { 816 IOPort port = (IOPort) actorPorts.next(); 817 818 // Setting finished flag in the receivers. 819 Receiver[][] receivers = port.getReceivers(); 820 821 for (Receiver[] receiver : receivers) { 822 for (int j = 0; j < receiver.length; j++) { 823 if (receiver[j] instanceof ProcessReceiver) { 824 ((ProcessReceiver) receiver[j]).requestFinish(); 825 } 826 } 827 } 828 } 829 } 830 831 // FIXME: Should this also set a flag on inside receivers 832 // of the ports of the composite actor? 833 } 834 835 /////////////////////////////////////////////////////////////////// 836 //// protected variables //// 837 838 /** A list of threads created but not started. */ 839 protected LinkedList _newActorThreadList; 840 841 /** A flag for determining whether successive iterations will be 842 * permitted. 843 */ 844 protected boolean _notDone = true; 845 846 /////////////////////////////////////////////////////////////////// 847 //// private methods //// 848 849 /////////////////////////////////////////////////////////////////// 850 //// protected variables //// 851 852 /** The threads created by this director. */ 853 private HashSet _activeThreads = new HashSet(); 854 855 /** The set of threads that are blocked on an IO operation. */ 856 private HashSet _blockedThreads = new HashSet(); 857 858 /** The set of threads that have been paused in response to stopFire(). */ 859 private HashSet _pausedThreads = new HashSet(); 860 861 /** Indicator that a stopFire has been requested by a call to 862 * stopFire(). 863 */ 864 private boolean _stopFireRequested = false; 865}