001/* The base class for directors for the process oriented domains.
002
003 Copyright (c) 1998-2015 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 Semantics of initialize(Actor) have changed.
028 */
029package ptolemy.actor.process;
030
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedList;
034
035import ptolemy.actor.Actor;
036import ptolemy.actor.CompositeActor;
037import ptolemy.actor.Director;
038import ptolemy.actor.IOPort;
039import ptolemy.actor.Initializable;
040import ptolemy.actor.Manager;
041import ptolemy.actor.Receiver;
042import ptolemy.kernel.CompositeEntity;
043import ptolemy.kernel.util.IllegalActionException;
044import ptolemy.kernel.util.NameDuplicationException;
045import ptolemy.kernel.util.NamedObj;
046import ptolemy.kernel.util.Workspace;
047
048///////////////////////////////////////////////////////////////////
049//// ProcessDirector
050
051/**
052 The base class for directors for the process oriented domains. It provides
053 default implementations for methods that are common across such domains.
054 <P>
055 In the process oriented domains, the director controlling a model
056 needs to keep track of the state of the model. In particular it needs
057 to maintain an accurate count of the number of active processes under
058 its control and any processes that are blocked for whatever reason (trying
059 to read from an empty channel as in PN).
060 These counts, and perhaps other counts, are needed by the
061 director to control and respond when deadlock is detected (no processes
062 can make progress), or to respond to requests from higher in the hierarchy.
063 <P>
064 The methods that control how the director detects and responds to deadlocks
065 are _areActorsDeadlocked() and _resolveDeadlock(). These methods should be
066 overridden in derived classes to get domain-specific behaviour. The
067 implementations given here are trivial and suffice only to illustrate
068 the approach that should be followed.
069 <P>
070 This base class is not sufficient for executing hierarchical, heterogeneous
071 models. In order to accommodate hierarchical, heterogeneity the subclass
072 CompositeProcessDirector must be used.
073 <P>
074 <P>
075 @author Mudit Goel, Neil Smyth, John S. Davis II
076 @version $Id$
077 @since Ptolemy II 0.2
078 @Pt.ProposedRating Green (mudit)
079 @Pt.AcceptedRating Yellow (mudit)
080 @see Director
081 */
082public class ProcessDirector extends Director {
083    /** Construct a director in the default workspace with an empty string
084     *  as its name. The director is added to the list of objects in
085     *  the workspace. Increment the version number of the workspace.
086     *  @exception NameDuplicationException If construction of Time objects fails.
087     *  @exception IllegalActionException If construction of Time objects fails.
088     */
089    public ProcessDirector()
090            throws IllegalActionException, NameDuplicationException {
091        super();
092    }
093
094    /** Construct a director in the workspace with an empty name.
095     *  The director is added to the list of objects in the workspace.
096     *  Increment the version number of the workspace.
097     *  @param workspace The workspace of this object.
098     *  @exception NameDuplicationException If construction of Time objects fails.
099     *  @exception IllegalActionException If construction of Time objects fails.
100     */
101    public ProcessDirector(Workspace workspace)
102            throws IllegalActionException, NameDuplicationException {
103        super(workspace);
104    }
105
106    /** Construct a director in the given container with the given name.
107     *  If the container argument must not be null, or a
108     *  NullPointerException will be thrown.
109     *  If the name argument is null, then the name is set to the
110     *  empty string. Increment the version number of the workspace.
111     *
112     *  @param container The container
113     *  @param name Name of this director.
114     *  @exception IllegalActionException If the name contains a period,
115     *   or if the director is not compatible with the specified container.
116     *  @exception NameDuplicationException If the container not a
117     *   CompositeActor and the name collides with an entity in the container.
118     */
119    public ProcessDirector(CompositeEntity container, String name)
120            throws IllegalActionException, NameDuplicationException {
121        super(container, name);
122    }
123
124    ///////////////////////////////////////////////////////////////////
125    ////                         public methods                    ////
126
127    /** Notify this director that the specified thread is part of
128     *  the execution of this model. This is used
129     *  to keep track of whether the model is deadlocked, and also to
130     *  terminate threads if necessary. It is important that the thread
131     *  call _removeThread() upon exiting. Note further that this
132     *  should be called before the thread is started to avoid race
133     *  conditions where some threads have been started and others
134     *  have not been started and deadlock is falsely detected because
135     *  the not-yet-started threads are not counted.
136     *  @param thread The thread.
137     *  @see #removeThread(Thread)
138     */
139    public synchronized void addThread(Thread thread) {
140        assert !_activeThreads.contains(thread);
141        _activeThreads.add(thread);
142        assert _activeThreads.contains(thread);
143
144        if (_debugging) {
145            _debug("Adding a thread: " + thread.getName());
146        }
147
148        notifyAll();
149    }
150
151    /** Clone the director into the specified workspace. The new object is
152     *  <i>not</i> added to the directory of that workspace (It must be added
153     *  by the user if he wants it to be there).
154     *  The result is a new director with no container, no pending mutations,
155     *  and no topology listeners. The count of active processes is zero.
156     *
157     *  @param workspace The workspace for the cloned object.
158     *  @exception CloneNotSupportedException If one of the attributes
159     *   cannot be cloned.
160     *  @return The new ProcessDirector.
161     */
162    @Override
163    public Object clone(Workspace workspace) throws CloneNotSupportedException {
164        ProcessDirector newObject = (ProcessDirector) super.clone(workspace);
165
166        // Is it really necessary to do this?
167
168        // Findbugs:
169        //  [M M IS] Inconsistent synchronization [IS2_INCONSISTENT_SYNC]
170        // Actually this is not a problem since the object is
171        // being created and hence nobody else has access to it.
172
173        newObject._blockedThreads = new HashSet();
174        newObject._pausedThreads = new HashSet();
175        newObject._activeThreads = new HashSet();
176        newObject._notDone = true;
177        return newObject;
178    }
179
180    /** Request that the current iteration finishes and postfire() returns
181     *  false, indicating to the environment that no more iterations should
182     *  be invoked. To support domains where actor firings do not necessarily
183     *  terminate, such as PN, you may wish to call stopFire() as well to request
184     *  that those actors complete their firings.
185     */
186    @Override
187    public void finish() {
188        super.finish();
189        stop();
190    }
191
192    /** Wait until a deadlock is detected. Then deal with the deadlock
193     *  by calling the protected method _resolveDeadlock() and return.
194     *  This method is synchronized on the director.
195     *  @exception IllegalActionException If a derived class throws it.
196     */
197    @Override
198    public void fire() throws IllegalActionException {
199        // Don't call "Director.super.fire();" here, do the work instead.
200        Workspace workspace = workspace();
201
202        // In case we have an enclosing process director,
203        // we identify it so that we can notify it when we are blocked.
204        CompositeActor container = (CompositeActor) getContainer();
205        Director outsideDirector = container.getExecutiveDirector();
206
207        if (!(outsideDirector instanceof ProcessDirector)) {
208            outsideDirector = null;
209        }
210
211        int depth = 0;
212        try {
213            synchronized (this) {
214                if (_debugging) {
215                    _debug("Called fire().");
216                }
217
218                while (!_areThreadsDeadlocked() && !_areAllThreadsStopped()
219                        && !_stopRequested) {
220                    // Added to get thread to stop reliably on pushing stop button.
221                    // EAL 8/05
222                    if (_stopRequested) {
223                        return;
224                    }
225
226                    if (_debugging) {
227                        _debug("Waiting for actors to stop.");
228                    }
229
230                    try {
231                        if (outsideDirector != null) {
232                            ((ProcessDirector) outsideDirector).threadBlocked(
233                                    Thread.currentThread(), null);
234                        }
235                        // NOTE: We cannot use workspace.wait(Object) here without
236                        // introducing a race condition, because we have to release
237                        // the lock on the _director before calling workspace.wait(_director).
238                        if (depth == 0) {
239                            depth = workspace.releaseReadPermission();
240                        }
241                        wait();
242                    } catch (InterruptedException e) {
243                        if (_debugging) {
244                            _debug("Director thread interrupted.");
245                        }
246                        // stop all threads
247                        stop();
248                        return;
249                    } finally {
250                        if (outsideDirector != null) {
251                            ((ProcessDirector) outsideDirector).threadUnblocked(
252                                    Thread.currentThread(), null);
253                        }
254                    }
255                }
256
257                if (_debugging) {
258                    _debug("Actors have stopped.");
259                }
260
261                // Don't resolve deadlock if we are just pausing
262                // or if a stop has been requested.
263                // NOTE: Added !_stopRequested.  EAL 3/12/03.
264                if (_areThreadsDeadlocked() && !_stopRequested) {
265                    if (_debugging) {
266                        _debug("Deadlock detected.");
267                    }
268
269                    try {
270                        _notDone = _resolveDeadlock();
271                    } catch (IllegalActionException e) {
272                        // stop all threads.
273                        stop();
274                        throw e;
275                    }
276                }
277            }
278        } finally {
279            if (depth > 0) {
280                workspace.reacquireReadPermission(depth);
281            }
282        }
283    }
284
285    /** Initialize the given actor.  This class overrides the base
286     *  class to reset the flags for all of the receivers, and to
287     *  create a new ProcessThread for each actor being controlled.
288     *  This class does *NOT* directly call the initialize method of the
289     *  actor. That method is instead called by the actor's thread itself.
290     *  This allows actors in process domains to create tokens during
291     *  initialization, since sending data in a process-based domain
292     *  requires threads for each actor.
293     *  @exception IllegalActionException If the actor is not
294     *  acceptable to the domain.  Not thrown in this base class.
295     */
296    @Override
297    public synchronized void initialize(Actor actor)
298            throws IllegalActionException {
299        // FIXME: Note that ProcessDirector does *not* invoke
300        // super.initialize(actor), so changes made to
301        // Director.initialize(Actor) apply to
302        // ProcessDirector.initialize(Actor).
303
304        // FIXME: This method does not set _resourceScheduling like
305        // the parent method.
306
307        if (_debugging) {
308            _debug("Initializing actor: " + ((NamedObj) actor).getFullName());
309        }
310
311        // Reset the receivers.
312        Iterator ports = actor.inputPortList().iterator();
313
314        while (ports.hasNext()) {
315            IOPort port = (IOPort) ports.next();
316            Receiver[][] receivers = port.getReceivers();
317
318            for (Receiver[] receiver : receivers) {
319                for (int j = 0; j < receiver.length; j++) {
320                    receiver[j].reset();
321                }
322            }
323        }
324
325        // Create threads.
326        ProcessThread processThread = _newProcessThread(actor, this);
327        _activeThreads.add(processThread);
328        assert _activeThreads.contains(processThread);
329
330        _newActorThreadList.addFirst(processThread);
331    }
332
333    /** Return true if a stop has been requested on the director.
334     *  This is used by the ProcessThread to tell the difference
335     *  between a request to pause and a request to stop.
336     *  @return True if stop() has been called.
337     */
338    public boolean isStopFireRequested() {
339        return _stopFireRequested;
340    }
341
342    /** Return true if the specified thread has been registered
343     *  with addThread() and has not been removed with removeThread().
344     *  @return True if the specified thread is active.
345     *  @param thread The thread.
346     *  @see #addThread(Thread)
347     *  @see #removeThread(Thread)
348     */
349    public synchronized boolean isThreadActive(Thread thread) {
350        return _activeThreads.contains(thread);
351    }
352
353    /** Return false if a stop has been requested or if
354     *  the model has reached deadlock. Return true otherwise.
355     *  @return False if the director has detected a deadlock or
356     *   a stop has been requested.
357     *  @exception IllegalActionException If a derived class throws it.
358     */
359    @Override
360    public boolean postfire() throws IllegalActionException {
361        _notDone = _notDone && super.postfire();
362
363        if (_debugging) {
364            synchronized (this) {
365                _debug("Called postfire().");
366                _debug("_stopRequested = " + _stopRequested);
367                _debug("_stopFireRequested = " + _stopFireRequested);
368                _debug("Returning from postfire(): " + _notDone);
369            }
370        }
371
372        return _notDone;
373    }
374
375    /** Start threads for all actors that have not had threads started
376     *  already (this might include actors initialized since the last
377     *  invocation of prefire). This starts the threads, corresponding
378     *  to all the actors, that were created in a mutation.
379     *  @return True.
380     *  @exception IllegalActionException If a derived class throws it.
381     */
382    @Override
383    public boolean prefire() throws IllegalActionException {
384        // FIXME: Note that ProcessDirector does *not* invoke
385        // super.prefire(), so changes made to Director.prefire()
386        // should also be made to ProcessDirector.prefire().
387
388        // FIXME: this method does nothing about model time.
389
390        synchronized (this) {
391            // Clear the stopFire flag and trigger all of the actor threads.
392            _stopFireRequested = false;
393
394            notifyAll();
395        }
396
397        // Start threads for actors created since the last invocation
398        // of this prefire() method.
399        Iterator threads = _newActorThreadList.iterator();
400
401        while (threads.hasNext()) {
402            ProcessThread procThread = (ProcessThread) threads.next();
403            procThread.start();
404        }
405
406        _newActorThreadList.clear();
407
408        return true;
409    }
410
411    /** Preinitialize the model controlled by this director.  This
412     *  subclass overrides the base class to initialize the number of
413     *  running threads before proceeding with preinitialization of
414     *  the model.
415     *
416     *  @exception IllegalActionException If creating an actor thread
417     *  throws it.
418     */
419    @Override
420    public void preinitialize() throws IllegalActionException {
421        // This method calls super.preinitialize() at the end.
422
423        _notDone = true;
424        synchronized (this) {
425            _activeThreads.clear();
426            _blockedThreads.clear();
427            _pausedThreads.clear();
428        }
429        _newActorThreadList = new LinkedList();
430        super.preinitialize();
431    }
432
433    /** Notify this director that the specified thread has finished
434     *  executing. This is used to keep track of whether the model
435     *  is deadlocked, and also to terminate threads if necessary.
436     *  @param thread The thread.
437     *  @see #addThread(Thread)
438     */
439    public synchronized void removeThread(Thread thread) {
440        if (_debugging) {
441            _debug("Thread " + thread.getName() + " is exiting.");
442        }
443
444        //assert _activeThreads.contains(thread);
445
446        _activeThreads.remove(thread);
447
448        assert !_activeThreads.contains(thread);
449        _pausedThreads.remove(thread);
450        _blockedThreads.remove(thread);
451        notifyAll();
452    }
453
454    /** Request that the director cease execution altogether.
455     *  This causes a call to stop() on all actors contained by
456     *  the container of this director, and a call to stopThread()
457     *  on each of the process threads that contain actors
458     *  controlled by this director. This also sets a flag
459     *  so that the next call to postfire() returns false.
460     */
461    @Override
462    public void stop() {
463        // This method does not call super.stop() by design.
464
465        // Set this before calling stopThread(), in case the thread
466        // needs to distinguish between stopFire() and this method.
467        if (_debugging) {
468            _debug("Requesting stop of all threads.");
469        }
470
471        _stopRequested = true;
472        _stopFireRequested = true;
473
474        // We used to copy the active thread set because
475        // when stop() is called on each thread, the
476        // set itself is modified. We could get a
477        // ConcurrentModificationException.
478
479        // However, we were getting a ConcurrentModificationException anyway
480        // on terra, to replicate: (cd $PTII/ptolemy/domains/sysml/test/junit; make)
481        // Also, Coverity scan pointed out that we were accessing
482        // _activeThreads without getting a lock here.  So, we lock and
483        // to make our copy.  If this does not work, then we could
484        // put the loop inside the synchronized block.
485        LinkedList threadsCopy = null;
486        synchronized (this) {
487            threadsCopy = new LinkedList(_activeThreads);
488        }
489        Iterator threads = threadsCopy.iterator();
490        while (threads.hasNext()) {
491            Thread thread = (Thread) threads.next();
492
493            if (thread instanceof ProcessThread) {
494                // NOTE: We used to catch and ignore all exceptions
495                // here, but that doesn't look right to me. EAL 8/05.
496                ((ProcessThread) thread).getActor().stop();
497            }
498
499            // NOTE: Used to call thread.interrupt() here, with a comment
500            // about how it probably wasn't necessary.  But
501            // in applets, this gives a security violation.
502            // If threads fail to stop, the probably the call
503            // below to _requestFinishOnReceivers() isn't doing its
504            // job.
505        }
506
507        // Added to get stop button to work consistently the first time.
508        // EAL 8/05
509        _requestFinishOnReceivers();
510
511        // Create a notification thread so that this returns immediately
512        // (doesn't have to get a synchronized lock).
513        new NotifyThread(this).start();
514    }
515
516    /** Request that execution stop at the conclusion of the current
517     *  iteration. Call stopThread() on each of the process threads that
518     *  contain actors controlled by this director and call stopFire() on
519     *  the actors that are contained by these threads. This method is
520     *  non-blocking.
521     */
522    @Override
523    public void stopFire() {
524        // This method does not call super.stopFire() by design.
525
526        if (_debugging) {
527            _debug("stopFire() has been called.");
528        }
529
530        _stopFireRequested = true;
531
532        HashSet actors = new HashSet();
533        synchronized (this) {
534            Iterator threads = _activeThreads.iterator();
535
536            while (threads.hasNext()) {
537                Thread thread = (Thread) threads.next();
538
539                if (thread instanceof ProcessThread) {
540                    actors.add(((ProcessThread) thread).getActor());
541                }
542            }
543        }
544
545        Iterator actorsIterator = actors.iterator();
546        while (actorsIterator.hasNext()) {
547            ((Actor) actorsIterator.next()).stopFire();
548        }
549    }
550
551    /** Terminate all threads under control of this director immediately.
552     *  This abrupt termination will not allow normal cleanup actions
553     *  to be performed, and the model should be recreated after calling
554     *  this method. This method uses Thread.stop(), a deprecated method
555     *  in Java.
556     */
557    @Override
558    public void terminate() {
559        // First need to invoke terminate on all actors under the
560        // control of this director.
561        super.terminate();
562
563        // Now stop any threads created by this director.
564        LinkedList list = new LinkedList();
565        // FIXME: Coverity Scan points out that we are accessing
566        // _activeThreads without getting a lock here.
567        list.addAll(_activeThreads);
568        _activeThreads.clear();
569
570        Iterator threads = list.iterator();
571
572        while (threads.hasNext()) {
573            ((Thread) threads.next()).stop();
574        }
575    }
576
577    /** Notify the director that the specified thread is blocked
578     *  on an I/O operation.  If the thread has
579     *  not been registered with addThread(), then this call is
580     *  ignored.
581     *  @param thread The thread.
582     *  @param receiver The receiver handling the I/O operation,
583     *   or null if it is not a specific receiver.
584     *  @see #addThread(Thread)
585     */
586    public synchronized void threadBlocked(Thread thread,
587            ProcessReceiver receiver) {
588        if (_activeThreads.contains(thread)
589                && !_blockedThreads.contains(thread)) {
590            _blockedThreads.add(thread);
591            notifyAll();
592        }
593    }
594
595    /** Notify the director that the specified thread has paused
596     *  in response to a call to stopFire().  If the thread has
597     *  not been registered with addThread(), then this call is
598     *  ignored. If the thread has been identified as blocked,
599     *  it is removed from the set of blocked threads (so it
600     *  doesn't get counted twice).
601     *  @param thread The thread.
602     *  @see #addThread(Thread)
603     */
604    public synchronized void threadHasPaused(Thread thread) {
605        if (_activeThreads.contains(thread)
606                && !_pausedThreads.contains(thread)) {
607            _pausedThreads.add(thread);
608            _blockedThreads.remove(thread);
609            notifyAll();
610        }
611    }
612
613    /** Notify the director that the specified thread has resumed.
614     *  If the director has not previously been notified that it was
615     *  paused, then this call is ignored.
616     *  @param thread The thread.
617     *  @see #threadHasPaused(Thread)
618     */
619    public synchronized void threadHasResumed(Thread thread) {
620        if (_pausedThreads.remove(thread)) {
621            notifyAll();
622        }
623    }
624
625    /** Notify the director that the specified thread is unblocked
626     *  on an I/O operation.  If the thread has
627     *  not been registered with threadBlocked(), then this call is
628     *  ignored.
629     *  @param thread The thread.
630     *  @param receiver The receiver handling the I/O operation,
631     *   or null if it is not a specific receiver.
632     *  @see #threadBlocked(Thread, ProcessReceiver)     *
633     */
634    public synchronized void threadUnblocked(Thread thread,
635            ProcessReceiver receiver) {
636        if (_blockedThreads.remove(thread)) {
637            notifyAll();
638        }
639    }
640
641    /** Do nothing.  Input transfers in process domains are handled by
642     *  branches, which transfer inputs in a separate thread.
643     *  @param port The port.
644     *  @return False, to indicate that no tokens were transferred.
645     *  @exception IllegalActionException Not thrown in this base class.
646     */
647    @Override
648    public boolean transferInputs(IOPort port) throws IllegalActionException {
649        return false;
650    }
651
652    /** Do nothing.  Output transfers in process domains are handled by
653     *  branches, which transfer inputs in a separate thread.
654     *  @param port The port.
655     *  @return False, to indicate that no tokens were transferred.
656     *  @exception IllegalActionException Not thrown in this base class.
657     */
658    @Override
659    public boolean transferOutputs(IOPort port) throws IllegalActionException {
660        return false;
661    }
662
663    /** End the execution of the model under the control of this
664     *  director. A flag is set in all the receivers that causes
665     *  each process to terminate at the earliest communication point.
666     *  Prior to setting receiver flags, this method wakes up the
667     *  threads if they all are stopped.  If the container is not
668     *  an instance of CompositeActor, then this method does nothing.
669     *  <P>
670     *  This method is not synchronized on the workspace, so the caller
671     *  should be.
672     *
673     *  @exception IllegalActionException If an error occurs while
674     *   accessing the receivers of all actors under the control of
675     *   this director.
676     */
677    @Override
678    public void wrapup() throws IllegalActionException {
679        // FIXME: Note that ProcessDirector does *not*
680        // invoke super.wrapup(), so changes made to Director.wrapup()
681        // should also be made to ProcessDirector.wrapup().
682
683        if (_debugging) {
684            _debug("Called wrapup().");
685        }
686
687        // First invoke initializable methods.
688        if (_initializables != null) {
689            for (Initializable initializable : _initializables) {
690                initializable.wrapup();
691            }
692        }
693
694        CompositeActor container = (CompositeActor) getContainer();
695
696        // To ensure that we don't miss the notification from
697        // the processes that are ending, put this in a synchronized
698        // block.
699        int depth = 0;
700        try {
701            synchronized (this) {
702                _requestFinishOnReceivers();
703
704                // Now wake up threads that depend on the manager.
705                Manager manager = container.getManager();
706
707                // Do the notification in a new thread so as not
708                // to deadlock with this synchronized block.
709                new NotifyThread(manager).start();
710
711                // Wait until all threads stop.
712                while (_activeThreads.size() > 0) {
713                    if (depth == 0) {
714                        depth = workspace().releaseReadPermission();
715                    }
716                    try {
717                        wait();
718                    } catch (InterruptedException ex) {
719                        // ignore, wait until all process threads stop
720                    }
721                }
722            }
723        } finally {
724            if (depth > 0) {
725                workspace().reacquireReadPermission(depth);
726            }
727        }
728    }
729
730    ///////////////////////////////////////////////////////////////////
731    ////                         protected methods                 ////
732
733    /** Return true if the count of active processes equals the number
734     *  of paused and blocked threads.  Otherwise return false.
735     *  @return True if there are no active processes in the container.
736     */
737    protected synchronized boolean _areAllThreadsStopped() {
738        return _getActiveThreadsCount() == _getStoppedThreadsCount()
739                + _getBlockedThreadsCount();
740    }
741
742    /** Return true if the count of active processes in the container is 0.
743     *  Otherwise return false. Derived classes must override this method to
744     *  return true to any other forms of deadlocks that they might introduce.
745     *  @return True if there are no active processes in the container.
746     */
747    protected synchronized boolean _areThreadsDeadlocked() {
748        return _activeThreads.size() == 0;
749    }
750
751    /** Return the number of active threads under the control of this
752     *  director.
753     *  @return The number of active threads.
754     */
755    protected final synchronized int _getActiveThreadsCount() {
756        return _activeThreads.size();
757    }
758
759    /** Return the number of threads that are currently blocked.
760     *  @return Return the number of threads that are currently blocked.
761     */
762    protected final synchronized int _getBlockedThreadsCount() {
763        return _blockedThreads.size();
764    }
765
766    /** Return the number of threads that are currently stopped.
767     *  @return Return the number of threads that are currently stopped.
768     */
769    protected final synchronized int _getStoppedThreadsCount() {
770        return _pausedThreads.size();
771    }
772
773    /** Create a new ProcessThread for controlling the actor that
774     *  is passed as a parameter of this method. Subclasses are
775     *  encouraged to override this method as necessary for domain
776     *  specific functionality.
777     *  @param actor The actor that the created ProcessThread will
778     *   control.
779     *  @param director The director that manages the model that the
780     *   created thread is associated with.
781     *  @return Return a new ProcessThread that will control the
782     *   actor passed as a parameter for this method.
783     *  @exception IllegalActionException If creating an new ProcessThread
784     *  throws it.
785     */
786    protected ProcessThread _newProcessThread(Actor actor,
787            ProcessDirector director) throws IllegalActionException {
788        return new ProcessThread(actor, director);
789    }
790
791    /** Return false indicating that deadlock has not been resolved
792     *  and that execution will be discontinued. In derived classes,
793     *  override this method to obtain domain specific handling of
794     *  deadlocks. Return false if a real deadlock has occurred and
795     *  the simulation can be ended. Return true if the simulation
796     *  can proceed given additional data and need not be terminated.
797     *  @return False.
798     *  @exception IllegalActionException Not thrown in this base class.
799     */
800    protected boolean _resolveDeadlock() throws IllegalActionException {
801        return false;
802    }
803
804    /** Call requestFinish() on all receivers.
805     */
806    protected void _requestFinishOnReceivers() {
807        CompositeActor container = (CompositeActor) getContainer();
808        Iterator actors = container.deepEntityList().iterator();
809        Iterator actorPorts;
810
811        while (actors.hasNext()) {
812            Actor actor = (Actor) actors.next();
813            actorPorts = actor.inputPortList().iterator();
814
815            while (actorPorts.hasNext()) {
816                IOPort port = (IOPort) actorPorts.next();
817
818                // Setting finished flag in the receivers.
819                Receiver[][] receivers = port.getReceivers();
820
821                for (Receiver[] receiver : receivers) {
822                    for (int j = 0; j < receiver.length; j++) {
823                        if (receiver[j] instanceof ProcessReceiver) {
824                            ((ProcessReceiver) receiver[j]).requestFinish();
825                        }
826                    }
827                }
828            }
829        }
830
831        // FIXME: Should this also set a flag on inside receivers
832        // of the ports of the composite actor?
833    }
834
835    ///////////////////////////////////////////////////////////////////
836    ////                         protected variables               ////
837
838    /** A list of threads created but not started. */
839    protected LinkedList _newActorThreadList;
840
841    /** A flag for determining whether successive iterations will be
842     *  permitted.
843     */
844    protected boolean _notDone = true;
845
846    ///////////////////////////////////////////////////////////////////
847    ////                         private methods                   ////
848
849    ///////////////////////////////////////////////////////////////////
850    ////                         protected variables               ////
851
852    /** The threads created by this director. */
853    private HashSet _activeThreads = new HashSet();
854
855    /** The set of threads that are blocked on an IO operation. */
856    private HashSet _blockedThreads = new HashSet();
857
858    /** The set of threads that have been paused in response to stopFire(). */
859    private HashSet _pausedThreads = new HashSet();
860
861    /** Indicator that a stopFire has been requested by a call to
862     *  stopFire().
863     */
864    private boolean _stopFireRequested = false;
865}