001/* An actor that executes a contained actor in separate thread.
002
003 Copyright (c) 2007-2018 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.actor.lib.hoc;
029
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Queue;
036import java.util.Set;
037
038import ptolemy.actor.Actor;
039import ptolemy.actor.Director;
040import ptolemy.actor.Executable;
041import ptolemy.actor.IOPort;
042import ptolemy.actor.NoTokenException;
043import ptolemy.actor.QueueReceiver;
044import ptolemy.actor.Receiver;
045import ptolemy.actor.util.BreakCausalityInterface;
046import ptolemy.actor.util.CausalityInterface;
047import ptolemy.actor.util.Time;
048import ptolemy.data.BooleanToken;
049import ptolemy.data.DoubleToken;
050import ptolemy.data.Token;
051import ptolemy.data.expr.Parameter;
052import ptolemy.data.type.BaseType;
053import ptolemy.kernel.ComponentEntity;
054import ptolemy.kernel.CompositeEntity;
055import ptolemy.kernel.util.Attribute;
056import ptolemy.kernel.util.IllegalActionException;
057import ptolemy.kernel.util.InternalErrorException;
058import ptolemy.kernel.util.NameDuplicationException;
059import ptolemy.kernel.util.Settable;
060import ptolemy.kernel.util.Workspace;
061
062///////////////////////////////////////////////////////////////////
063//// ThreadedComposite
064
065/**
066 A container for another actor that executes that other actor
067 in a separate thread called the <i>inside thread</i>.
068 This actor starts that thread in its initialize()
069 method, which is invoked by its executive director (the director
070 in charge of firing this actor). The thread that invokes the
071 action methods of this actor
072 (initialize(), prefire(), fire(), postfire(), and wrapup())
073 is called the <i>director thread</i>.
074
075 <p> A paper describing the use of this actor is found at
076 <a href="http://www.eecs.berkeley.edu/Pubs/TechRpts/2008/EECS-2008-151.html#in_browser">http://www.eecs.berkeley.edu/Pubs/TechRpts/2008/EECS-2008-151.html</a>.</p>
077
078 <p> To use this actor in Vergil, drag a ThreadedComposite on to the
079 canvas and then drag the actor to be contained on to the
080 ThreadedComposite actor.</p>
081
082 <p> This actor automatically creates input and output ports to
083 match those of the inside actor. Input events provided at those
084 input ports are provided as input events to the contained actor.
085 Outputs provided by the contained actor become output events
086 of this actor. The time stamp of the input events is provided
087 by the container of this actor. The time stamp of the output events
088 depends on the <i>delay</i> parameter, as explained below.</p>
089
090 <p>
091 The inside thread blocks waiting for inputs or pure events.
092 Inputs are provided to that thread when the fire() method of
093 this actor is invoked by the director thread.
094 Pure events are provided after fireAt(),
095 fireAtCurrentTime(), or fireAtFirstValidTimeAfter() are called
096 by either the inside thread or the director thread.
097 When the time of those firing requests becomes current time,
098 the container will (presumably) fire this actor, and
099 this actor will provide a pure event to the inside thread,
100 causing it to fire the contained actor.
101 <p>
102 If the <i>synchronizeToRealTime</i> parameter is true, then
103 when the inside thread encounters an input or pure event
104 with time stamp <i>t</i>, it stalls until real time matches
105 or exceeds <i>t</i> (measured in seconds since the start of
106 execution of the inside thread). In contrast for example
107 to the <i>synchronizeToRealTime</i> parameter of the DEDirector,
108 this enables construction of models where only a portion of the
109 model synchronizes to real time.
110 <p>
111 When the wrapup() method of this actor is called, the inside thread is
112 provided with signal to terminate rather than to process additional
113 inputs. The inside thread will also exit if stop() is called on this
114 actor; however, in this case, which iterations are completed
115 is nondeterminate (there may be inputs left unprocessed).
116 If any inside actor return false from postfire(), then the
117 inside thread will also terminate and this actor will return false
118 from postfire.
119 <p>
120 The parameters of this actor include all the parameters of the
121 contained actor, and setting those parameters automatically
122 sets the parameters of the contained actor.
123 <p>
124 In addition to the parameters of the contained actor, this actor
125 has a <i>delay</i> parameter. This parameter is a double that
126 be any nonnegative value or the special value <i>UNDEFINED</i>.
127 If it is given a nonnegative value, then the value specifies
128 the model-time delay between input events and the output
129 events that result from reacting to those input events.
130 That is, if this actor is given an input event with time
131 stamp <i>t</i>, then if the contained actor produces any output
132 events in reaction to that event, those output events will be
133 produced by this actor with time stamp <i>t</i> + <i>delay</i>.
134 <p>
135 If <i>delay</i> has value <i>UNDEFINED</i>, then
136 outputs are produced at the current model time of the executive
137 director when the inside thread happens to produce those events,
138 or if <i>synchronizeToRealTime</i>, at the greater of current
139 model time and current real time (measured in seconds since
140 the start of execution).
141 This is accomplished by the inside thread calling
142 fireAtFirstValidTimeAfter() of the enclosing director, and
143 then producing the outputs when the requested firing occurs
144 in the director thread. Note that with this value of the
145 <i>delay</i>, it is possible for the inside thread to
146 continue to execute and respond to input events after
147 the wrapup phase of the director thread has been entered.
148 The wrapup phase will stall until the inside thread has
149 completed its processing of its inputs, but any outputs
150 it produces after the wrapup phase has started will be
151 discarded.
152 <p>
153 The most common use of this actor is in the DE domain,
154 although it can also be used in CT, SR, SDF, and other domains,
155 with some care. See the above referenced memo.
156 Regardless of the value of <i>delay</i>, this actor is treated
157 by DE as introducing a delay, much like the TimedDelay actor.
158 In fact, if <i>delay</i> is 0.0, there will be a one tick delay
159 in superdense time, just as with the TimedDelay actor.
160 If the inside model also has a time delay (e.g. if you
161 put a TimedDelay actor inside a ThreadedComposite), then
162 the total delay is the sum of the two delays.
163 <p>
164 <b>Discussion:</b>
165 <p>
166 There are several useful things you can do with this model.
167 We describe some use cases here:
168 <p>
169 <i>Background execution.</i> When <i>delay</i> is greater than
170 or equal to 0.0,
171 then when this actor is fired in response to input events
172 with time stamp <i>t</i>, the actual
173 processing of those events occurs later in a separate thread. The
174 director thread is not blocked, and can continue to process events
175 with time stamps less than or equal to <i>t</i> + <i>delay</i>.
176 The director thread is blocked from processing events with larger
177 time stamps than that because this is necessary to preserve DE
178 semantics. To implement this, this actor uses fireAt() to
179 request a firing at time  <i>t</i> + <i>delay</i>, and when that
180 firing occurs, it blocks the director thread until the reaction
181 is complete.
182 <p>
183 <i>Parallel firing.</i> Note that if <i>delay</i> is set to 0.0,
184 it may seem that there is no point in using this actor, since
185 model time will not be allowed to increase past <i>t</i> until
186 the contained actor has reacted to events with time stamp <i>t</i>.
187 However, there is actually exploitable concurrency if there
188 are other actors in the model that also have pending input
189 events with time stamp <i>t</i>. Those event can be processed
190 concurrently with this actor reacting to its input event.
191 A typical use case will broadcast an event to several instances
192 of ThreadedComposite, in which case each of those several
193 inside threads can execute concurrently in reaction to those
194 input events.
195 <p>
196 <i>Real-time source.</i> If the contained actor (and hence this
197 actor) has no inputs and <i>synchronizeToRealTime</i> is true, then
198 the contained actor must call fireAt() or one of its variants so that
199 the inside thread will be provided with pure events.
200 The behavior depends on which variant of the fireAt() methods is used
201 by the inside actor.  There are three cases:
202 FIXME: Described these. In particular, delay needs to specify the
203 minimum increment between these or fireAt() could result in an
204 exception.  Do we want a parameter to relax that?
205 <p>
206 On subtlety of this actor is that it cannot expose instances of ParameterPort
207 without introducing nondeterminacy in the execution. A ParameterPort
208 is an input port that sets the value of a parameter with the same name. Upon receiving
209 a token at such a port, if this actor were to set a parameter visible by the
210 inside thread, there is no assurance that the inside thread is not still
211 executing an earlier iteration. Thus, it could appear to be sending a message
212 backward in time, which would be truly bizarre. To prevent this error,
213 this actor does not mirror such ports, and hence they appear on the outside
214 only as parameters.
215
216 @author Edward A. Lee
217 @version $Id$
218 @since Ptolemy II 8.0
219 @Pt.ProposedRating Yellow (eal)
220 @Pt.AcceptedRating Red (eal)
221 */
222public class ThreadedComposite extends MirrorComposite {
223    /** Create an actor with a name and a container.
224     *  The container argument must not be null, or a
225     *  NullPointerException will be thrown.  This actor will use the
226     *  workspace of the container for synchronization and version counts.
227     *  If the name argument is null, then the name is set to the empty string.
228     *  Increment the version of the workspace.
229     *  @param container The container actor.
230     *  @param name The name of this actor.
231     *  @exception IllegalActionException If the container is incompatible
232     *   with this actor.
233     *  @exception NameDuplicationException If the name coincides with
234     *   an actor already in the container.
235     */
236    public ThreadedComposite(CompositeEntity container, String name)
237            throws IllegalActionException, NameDuplicationException {
238        // The false argument specifies that instances of ParameterPort
239        // should not be mirrored. This would make the behavior nondeterminate,
240        // so we expose these only as parameters.
241        super(container, name, false);
242        setClassName("ptolemy.actor.lib.hoc.ThreadedComposite");
243
244        // Create the ThreadedDirector in the proper workspace.
245        ThreadedDirector threadedDirector = new ThreadedDirector(workspace());
246        threadedDirector.setContainer(this);
247        threadedDirector.setName(uniqueName("ThreadedDirector"));
248
249        // Hidden parameter defining "UNDEFINED".
250        Parameter UNDEFINED = new Parameter(this, "UNDEFINED");
251        UNDEFINED.setVisibility(Settable.EXPERT);
252        UNDEFINED.setPersistent(false);
253        UNDEFINED.setExpression("-1.0");
254
255        delay = new Parameter(this, "delay");
256        delay.setTypeEquals(BaseType.DOUBLE);
257        delay.setExpression("0.0");
258
259        synchronizeToRealTime = new Parameter(this, "synchronizeToRealTime");
260        synchronizeToRealTime.setTypeEquals(BaseType.BOOLEAN);
261        synchronizeToRealTime.setExpression("false");
262    }
263
264    ///////////////////////////////////////////////////////////////////
265    ////                         parameters                        ////
266
267    /** The model-time delay between the input events and the
268     *  output events. This is a double that defaults to 0.0,
269     *  indicating that outputs should have the same time stamps
270     *  as the inputs that trigger them. If it has a value greater
271     *  than zero, then the outputs will have larger time stamps
272     *  by that amount. If it has the value <i>UNDEFINED</i>
273     *  (or any negative number), then the output time stamp
274     *  will be nondeterminate, and will depend on the current
275     *  model time of the outside director when the output is
276     *  produced or on current real time.
277     */
278    public Parameter delay;
279
280    /** If set to true, the inside thread stalls until real time matches
281     *  the time stamps of input events or pure events for each firing.
282     *  In addition, if <i>delay</i> is set to undefined and this is set
283     *  to true, then output events are assigned a time stamp that is the
284     *  greater of current model time and real time.
285     *  Time is measured since the start of the execution of the inside
286     *  thread.  This is a boolean that defaults to false. Changing
287     *  the value of this parameter has no effect until the next
288     *  execution of the model.
289     */
290    public Parameter synchronizeToRealTime;
291
292    ///////////////////////////////////////////////////////////////////
293    ////                         public methods                    ////
294
295    /** React to a change in an attribute.  This method is called by
296     *  a contained attribute when its value changes.  In this base class,
297     *  the method does nothing.  In derived classes, this method may
298     *  throw an exception, indicating that the new attribute value
299     *  is invalid.  It is up to the caller to restore the attribute
300     *  to a valid value if an exception is thrown.
301     *  @param attribute The attribute that changed.
302     *  @exception IllegalActionException If the change is not acceptable
303     *   to this container (not thrown in this base class).
304     */
305    @Override
306    public void attributeChanged(Attribute attribute)
307            throws IllegalActionException {
308        if (attribute == delay) {
309            _delayValue = ((DoubleToken) delay.getToken()).doubleValue();
310        } else {
311            super.attributeChanged(attribute);
312        }
313    }
314
315    /** Clone the actor into the specified workspace.
316     *  @param workspace The workspace for the new object.
317     *  @return A new actor.
318     *  @exception CloneNotSupportedException If a derived class has
319     *   has an attribute that cannot be cloned.
320     */
321    @Override
322    public Object clone(Workspace workspace) throws CloneNotSupportedException {
323        ThreadedComposite newObject = (ThreadedComposite) super.clone(
324                workspace);
325        try {
326            // Remove the old inner ThreadedDirector that is in the wrong workspace.
327            List iterateDirectors = newObject
328                    .attributeList(ThreadedDirector.class);
329            ThreadedDirector oldThreadedDirector = (ThreadedDirector) iterateDirectors
330                    .get(0);
331            String threadedDirectorName = oldThreadedDirector.getName();
332            oldThreadedDirector.setContainer(null);
333
334            // Create a new ThreadedDirector that is in the right workspace.
335            ThreadedDirector iterateDirector = newObject.new ThreadedDirector(
336                    workspace);
337            iterateDirector.setContainer(newObject);
338            iterateDirector.setName(threadedDirectorName);
339        } catch (Throwable throwable) {
340            throw new CloneNotSupportedException(
341                    "Could not clone: " + throwable);
342        }
343
344        newObject._causalityInterface = null;
345        newObject._realStartTime = 0L;
346        return newObject;
347    }
348
349    /** Override the base class to return a causality interface that
350     *  indicates that the output does not depend (immediately) on
351     *  the input. This method assumes that the director deals with BooleanDependencies
352     *  and returns an instance of BreakCausalityInterface.
353     *  @return A representation of the dependencies between input ports
354     *  and output ports.
355     */
356    @Override
357    public CausalityInterface getCausalityInterface() {
358        // FIXME: This will not work property with Ptides because it will effectively
359        // declare that the delay from input to output is infinite, which it is not.
360        // What we want is for the delay from input to output to be a superdense time
361        // delay of (0.0, 1).  This could be implemented by a class similar to
362        // BreakCausalityInterface that does the right thing when the director
363        // provides a Dependency that is a SuperdenseTimeIdentity.
364        if (_causalityInterface == null) {
365            _causalityInterface = new BreakCausalityInterface(this,
366                    getExecutiveDirector().defaultDependency());
367        }
368        return _causalityInterface;
369    }
370
371    /** Iterate the contained actors of the
372     *  container of this director.
373     *  @return False if any contained actor returns false in postfire.
374     *  @exception IllegalActionException If any called method of
375     *   of the contained actor throws it, or if the contained
376     *   actor is not opaque.
377     */
378    public boolean iterateContainedActors() throws IllegalActionException {
379        // Don't call "super.fire();" here, this actor contains its
380        // own director.
381        boolean result = true;
382        List<Actor> actors = entityList();
383        for (Actor actor : actors) {
384            if (_stopRequested) {
385                break;
386            }
387            if (!((ComponentEntity) actor).isOpaque()) {
388                throw new IllegalActionException(this,
389                        "Inside actor is not opaque "
390                                + "(perhaps it needs a director).");
391            }
392            if (_debugging) {
393                _debug("---- Iterating actor in inside thread: "
394                        + actor.getFullName());
395            }
396            if (actor.iterate(1) == Executable.STOP_ITERATING) {
397                result = false;
398                _debug("---- Postfire returned false: " + actor.getFullName());
399            }
400        }
401        return result;
402    }
403
404    ///////////////////////////////////////////////////////////////////
405    ////                         private variables                 ////
406
407    /** The cached value of the <i>delay</i> parameter. */
408    private double _delayValue = 0.0;
409
410    /** The real time at which the model begins executing, in milliseconds. */
411    private long _realStartTime = 0;
412
413    ///////////////////////////////////////////////////////////////////
414    ////                         inner classes                     ////
415
416    ///////////////////////////////////////////////////////////////////
417    //// TokenFrame
418
419    /** Bundle data associated with ports and a time stamp.
420     *  There are three types of frames:
421     *  EVENT is a (possibly empty) bundle of data and a time
422     *  stamp that is either provided to the inside thread from
423     *  the inputs of a ThreadedComposite or provided by the
424     *  inside thread to form the outputs of a ThreadedComposite.
425     *  POSTFIRE is a frame indicating that the inside actor
426     *  can be postfired. No tokens are provided (they are assumed
427     *  to have been consumed in the firing). STOP is a frame
428     *  provided to the inside thread to indicate that it should
429     *  stop executing.
430     */
431    protected static class TokenFrame {
432        // FindBugs suggests making this class static so as to decrease
433        // the size of instances and avoid dangling references.
434
435        /** Construct a  TokenFrame.
436         *  @param theTime The time of this token frame.
437         *  @param theTokens a list of QueueTokens.
438         *  @param theType The FrameType.
439         */
440        public TokenFrame(Time theTime, List<QueuedToken> theTokens,
441                FrameType theType) {
442            tokens = theTokens;
443            time = theTime;
444            type = theType;
445        }
446
447        /** The time. */
448        public final Time time;
449        /** A list of tokens. */
450        public final List<QueuedToken> tokens;
451        /** The type of the frame. */
452        public final FrameType type;
453
454        // Final fields (FindBugs suggestion)
455        /**  A (possibly empty) bundle of data and a time
456         *  stamp that is either provided to the inside thread from
457         *  the inputs of a ThreadedComposite or provided by the
458         *  inside thread to form the outputs of a ThreadedComposite.
459         */
460        public final static FrameType EVENT = new FrameType();
461
462        /**  POSTFIRE is a frame indicating that the inside actor
463         *  can be postfired. No tokens are provided (they are assumed
464         *  to have been consumed in the firing).
465         */
466        public final static FrameType POSTFIRE = new FrameType();
467
468        /** STOP is a frame provided to the inside thread to indicate
469         *  that it should stop executing.
470         */
471        public final static FrameType STOP = new FrameType();
472
473        private static class FrameType {
474            private FrameType() {
475            };
476        }
477    }
478
479    ///////////////////////////////////////////////////////////////////
480    //// QueuedToken
481
482    /** Bundle of a token and the input port and channel
483     *  at which it arrived.
484     */
485    private static class QueuedToken {
486
487        // FindBugs suggests making this class static so as to decrease
488        // the size of instances and avoid dangling references.
489
490        public QueuedToken(IOPort thePort, int theChannel, Token theToken) {
491            token = theToken;
492            channel = theChannel;
493            port = thePort;
494        }
495
496        public final int channel;
497
498        public final Token token;
499
500        public final IOPort port;
501
502        @Override
503        public String toString() {
504            return "token " + token + " for port " + port.getFullName() + "("
505                    + channel + ")";
506        }
507    }
508
509    ///////////////////////////////////////////////////////////////////
510    //// ThreadedDirector
511
512    /** A specialized director that fires a contained actor
513     *  in a separate thread. The prefire() method returns true
514     *  if the inside thread is alive. The fire() method posts
515     *  input events, if any, for the current firing on a queue for
516     *  the inside thread to consume. If the firing is in response
517     *  to a prior refiring request by this director, then the fire()
518     *  method will also wait for the inside thread to complete
519     *  its firing, and will then produce outputs from that firing.
520     *  The postfire() method posts
521     *  a request to postfire the contained actor and also requests
522     *  a refiring of this director at the current time plus the delay
523     *  value (unless the delay value is UNDEFINED). The wrapup() method
524     *  requests termination of the inside thread. If postfire()
525     *  of the contained actor returns false, then postfire() of this director
526     *  will return false, requesting a halt to execution of the model.
527     */
528    private class ThreadedDirector extends Director {
529
530        /** Construct a new instance of the director for ThreadedComposite.
531         *  The director is created in the specified workspace with
532         *  no container and an empty string as a name. You can then change
533         *  the name with setName(). If the workspace argument is null, then
534         *  use the default workspace.  You should set the local director or
535         *  executive director before attempting to send data to the actor
536         *  or to execute it. Add the actor to the workspace directory.
537         *  Increment the version number of the workspace.
538         *  @param workspace The workspace that will list the actor.
539         *  @exception IllegalActionException If the container is incompatible
540         *   with this actor.
541         *  @exception NameDuplicationException If the name coincides with
542         *   an actor already in the container.
543         */
544        public ThreadedDirector(Workspace workspace)
545                throws IllegalActionException, NameDuplicationException {
546            super(workspace);
547            setPersistent(false);
548        }
549
550        /** Clone the director into the specified workspace.
551         *  @param workspace The workspace for the new object.
552         *  @return A new director.
553         *  @exception CloneNotSupportedException If a derived class has
554         *   has an attribute that cannot be cloned.
555         */
556        @Override
557        public Object clone(Workspace workspace)
558                throws CloneNotSupportedException {
559            ThreadedDirector newObject = (ThreadedDirector) super.clone(
560                    workspace);
561            newObject._exception = null;
562            newObject._inputTokens = null;
563            newObject._thread = null;
564            newObject._outputTimes = new LinkedList<Time>();
565            newObject._fireAtTimes = Collections
566                    .synchronizedSet(new HashSet<Time>());
567            newObject._inputFrames = new LinkedList<TokenFrame>();
568            newObject._outputFrames = new LinkedList<TokenFrame>();
569            return newObject;
570        }
571
572        /** Produce outputs (if appropriate).
573         *  @exception IllegalActionException If production of an output
574         *   fails (e.g. type error), or if this thread is interrupted
575         *   while we are waiting for output to produce.
576         */
577        @Override
578        public synchronized void fire() throws IllegalActionException {
579            // NOTE: This method is synchronized to ensure that when
580            // delay is UNDEFINED and the inside thread calls
581            // fireAtFirstTimeAfter(), that the firing does not
582            // occur before _outputFrames and _outputTimes have
583            // been updated.
584
585            if (_exception != null) {
586                throw new IllegalActionException(ThreadedComposite.this,
587                        _exception, "Error in inside thread of actor.");
588            }
589
590            Time environmentTime = ThreadedComposite.this.getExecutiveDirector()
591                    .getModelTime();
592
593            if (ThreadedComposite.this._debugging) {
594                ThreadedComposite.this
595                        ._debug("Firing at time " + environmentTime);
596            }
597
598            // If there is an output to be produced at this
599            // time, produce it.
600            Time nextOutputTime = _outputTimes.peek();
601            if (environmentTime.equals(nextOutputTime)) {
602                // There is an output to be produced.
603                // First, remove that time from the pending outputs queue.
604
605                // FIXME: FindBugs "RV: Base use of return value from method,
606                // Method ignores return value." java.util.Queue.poll() returns
607                // the value, which is ignored.
608                _outputTimes.poll();
609                // First, wait (if necessary) for output
610                // to be produced.
611                // We already know that the environment time matches
612                // the expected output time, so we can ignore the time
613                // stamp of the output frame.
614                try {
615                    // NOTE: Cannot use LinkedBlockingQueue for _outputFrames
616                    // because we have to release the lock on this director
617                    // while we are waiting or we get a deadlock.
618                    while (_outputFrames.isEmpty() && !_stopRequested) {
619                        if (ThreadedComposite.this._debugging) {
620                            ThreadedComposite.this._debug(
621                                    "Waiting for outputs from inside thread.");
622                        }
623                        // The timeout allows this to respond to stop()
624                        // even if we have a deadlock for some reason.
625                        // However, if the deadlock involves the Swing thread, then
626                        // no stopping will be possible.
627                        wait(1000L);
628                    }
629                    if (_outputFrames.isEmpty()) {
630                        // A stop has been requested and there is no data to produce.
631                        return;
632                    }
633                    TokenFrame frame = _outputFrames.poll();
634
635                    // There is now an output frame to be produced.
636                    if (ThreadedComposite.this._debugging) {
637                        ThreadedComposite.this._debug("Done waiting.");
638                    }
639
640                    // Produce the outputs on the frame, if there are any
641                    // outputs. Note that frame.tokens can only be null
642                    // if the inside thread was interrupted while executing
643                    // or if an exception occurred in the inside thread.
644                    if (frame.tokens == null) {
645                        if (_exception != null) {
646                            throw new IllegalActionException(this, _exception,
647                                    "Inside thread had an exception.");
648                        }
649                        throw new IllegalActionException(this,
650                                "Inside thread was interrupted.");
651                    }
652                    for (QueuedToken token : frame.tokens) {
653                        if (token.channel < token.port.getWidth()) {
654                            // There is now an output frame to be produced.
655                            if (ThreadedComposite.this._debugging) {
656                                ThreadedComposite.this._debug(
657                                        "Sending output token ",
658                                        token + " to port "
659                                                + token.port.getName());
660                            }
661
662                            token.port.send(token.channel, token.token);
663                        }
664                    }
665                } catch (InterruptedException ex) {
666                    throw new IllegalActionException(ThreadedComposite.this, ex,
667                            "Director thread interrupted.");
668                }
669            }
670        }
671
672        /** Delegate by calling fireAt() on the director of the container's
673         *  container (the executive director), and make a local record that
674         *  a refiring request has been made for the specified time. Note that the
675         *  executive director may modify the requested time. If it does, the
676         *  modified value is returned. It is up to the calling actor to
677         *  throw an exception if the modified time is not acceptable.
678         *  @param actor The actor requesting firing.
679         *  @param time The time at which to fire.
680         *  @param microstep The microstep.
681         *  @return The time at which the actor passed as an argument
682         *   will be fired.
683         *  @exception IllegalActionException If the executive director throws it.
684         */
685        @Override
686        public Time fireAt(Actor actor, Time time, int microstep)
687                throws IllegalActionException {
688            Time result = time;
689            Director director = ThreadedComposite.this.getExecutiveDirector();
690            if (director != null) {
691                if (ThreadedComposite.this._debugging) {
692                    ThreadedComposite.this
693                            ._debug("---- Request refiring at time " + time
694                                    + " for actor: " + actor.getFullName());
695                }
696                try {
697                    result = director.fireAt(ThreadedComposite.this, time,
698                            microstep);
699                } catch (IllegalActionException ex) {
700                    throw new IllegalActionException(this, ex, "Actor "
701                            + actor.getFullName()
702                            + " requests refiring at time " + time
703                            + ", which fails.\n"
704                            + "Perhaps the delay parameter is too large?\n"
705                            + "Try setting it to 0.");
706                }
707            }
708            if (actor != ThreadedComposite.this) {
709                // The fireAt() request is coming from the inside, so
710                // when the firing occurs, we want to post an input
711                // frame (even if there are no input events) for
712                // the inside thread.
713                _fireAtTimes.add(result);
714            }
715            return result;
716        }
717
718        /** Start the inside thread.
719         *  @exception IllegalActionException If the initialize() method of
720         *   one of the inside actors throws it.
721         */
722        @Override
723        public synchronized void initialize() throws IllegalActionException {
724            // The following must be done before the initialize() methods
725            // of the actors is called because those methods may call fireAt().
726            // Note that previous runs may have left residual data on these lists.
727            _fireAtTimes.clear();
728            _outputFrames.clear();
729            _outputTimes.clear();
730            _inputFrames.clear();
731
732            _exception = null;
733
734            // The superclass will initialize all the actors.
735            super.initialize();
736
737            // Set a flag indicating that the first firing should
738            // initialize the _realStartTime variable. This is done
739            // in the first firing to be as late as possible, so
740            // that startup transients are minimized.
741            // FIXME: This will impede synchronization with other
742            // actors, since there won't be a common time base.
743            _realStartTime = -1L;
744
745            _inputFrames.clear();
746            _outputFrames.clear();
747
748            _synchronizeToRealTime = ((BooleanToken) synchronizeToRealTime
749                    .getToken()).booleanValue();
750
751            // Create and start the inside thread.
752            _thread = new CompositeThread();
753            _thread.setPriority(Thread.MAX_PRIORITY);
754            _thread.start();
755        }
756
757        /** Return a new instance of QueueReceiver.
758         *  @return A new instance of QueueReceiver.
759         *  @see QueueReceiver
760         */
761        @Override
762        public Receiver newReceiver() {
763            return new QueueReceiver();
764        }
765
766        /** Return true if the inside thread is alive.
767         *  @return True if the inside thread is still alive.
768         */
769        @Override
770        public boolean prefire() throws IllegalActionException {
771            // Do not call super.prefire()!
772            // Superclass sets current time of this
773            // director to that of the container.
774            // The notion of current time presented to the
775            // inside actors (which may be currently executing
776            // in another thread) must match that of the frame
777            // that the inside thread is processing.
778
779            // Have to create a new list because the previous list may
780            // not have been consumed yet.
781            _inputTokens = new LinkedList<QueuedToken>();
782
783            boolean result = _thread.isAlive() || !_outputFrames.isEmpty();
784
785            if (ThreadedComposite.this._debugging) {
786                ThreadedComposite.this._debug("Prefire returns " + result);
787            }
788            return result;
789        }
790
791        /** Consume inputs (if any) and post a frame on the queue
792         *  for the inside thread to consume. A frame will be posted
793         *  even if there are no inputs if a refiring request has
794         *  been made for the current time.
795         *  @return True if the inside thread is still alive.
796         */
797        @Override
798        public boolean postfire() throws IllegalActionException {
799
800            Time environmentTime = ThreadedComposite.this.getExecutiveDirector()
801                    .getModelTime();
802
803            if (ThreadedComposite.this._debugging) {
804                ThreadedComposite.this
805                        ._debug("Postfiring at time " + environmentTime);
806            }
807
808            // If there are inputs to be consumed, or if a refiring
809            // request has been made for this time, then create an
810            // input frame for the inside thread.
811            // We can safely remove the refire request since we
812            // are now responding to it. Note that semantically,
813            // multiple refire requests for the same time are only
814            // required to trigger a single refiring, so this is true
815            // even if there were multiple refire requests.
816            boolean refireRequested = _fireAtTimes.remove(environmentTime);
817
818            // Put a frame on the _inputFrames for the inside thread
819            // if either a refire was requested or if there are inputs.
820            if (refireRequested || !_inputTokens.isEmpty()) {
821                if (ThreadedComposite.this._debugging) {
822                    ThreadedComposite.this._debug(
823                            "Queueing input tokens for the inside thread: "
824                                    + _inputTokens.toString()
825                                    + " to be processed at time "
826                                    + environmentTime);
827                }
828                synchronized (this) {
829                    _inputFrames.add(new TokenFrame(environmentTime,
830                            _inputTokens, TokenFrame.EVENT));
831                    notifyAll();
832                    if (_delayValue >= 0.0) {
833                        // Delay value is not UNDEFINED. Schedule a firing
834                        // at current time plus the delay.
835                        Time responseTime = environmentTime.add(_delayValue);
836                        // Need to be sure to call the executive director's fireAt().
837                        // Make sure to throw an exception if the executive
838                        // director does not exactly respect this request.
839                        Time response = ThreadedComposite.this
840                                .getExecutiveDirector()
841                                .fireAt(ThreadedComposite.this, responseTime);
842
843                        if (!response.equals(responseTime)) {
844                            throw new IllegalActionException(this,
845                                    "Director is unable to fire the actor at the requested time: "
846                                            + responseTime
847                                            + ". It responds it will fire it at: "
848                                            + response);
849                        }
850
851                        // Queue an indicator to produce outputs in response to that firing.
852                        _outputTimes.add(responseTime);
853                    }
854                }
855                // Give the inside thread a chance to react.
856                Thread.yield();
857            }
858            boolean isAlive = _thread.isAlive();
859            return isAlive || !_outputTimes.isEmpty();
860        }
861
862        /** Override the base class to post a "stop frame" on the queue
863         *  for the inside thread to stop.
864         */
865        @Override
866        public void stop() {
867            super.stop();
868            Time environmentTime = ThreadedComposite.this.getExecutiveDirector()
869                    .getModelTime();
870            if (ThreadedComposite.this._debugging) {
871                ThreadedComposite.this._debug(
872                        "Queueing a stop-frame token for the inside thread with time: "
873                                + environmentTime);
874            }
875            synchronized (this) {
876                _inputFrames.add(
877                        new TokenFrame(environmentTime, null, TokenFrame.STOP));
878                notifyAll();
879            }
880        }
881
882        /** Record data from the specified input port
883         *  for transfer to the queue used to communicate these data to the
884         *  inside thread. This is called in the fire() method of
885         *  the enclosing composite actor after the prefire() method
886         *  of this director has been called and before its fire() method
887         *  is called.
888         *  @param port The port to transfer tokens from.
889         *  @return True if at least one data token is transferred.
890         *  @exception IllegalActionException If reading the inputs fails.
891         */
892        @Override
893        public boolean transferInputs(IOPort port)
894                throws IllegalActionException {
895            boolean result = false;
896            for (int i = 0; i < port.getWidth(); i++) {
897                try {
898                    if (port.isKnown(i)) {
899                        if (port.hasToken(i)) {
900                            Token token = port.get(i);
901                            _inputTokens.add(new QueuedToken(port, i, token));
902                            if (ThreadedComposite.this._debugging) {
903                                ThreadedComposite.this
904                                        ._debug("Transferring input from "
905                                                + port.getName());
906                            }
907                            result = true;
908                        }
909                    }
910                } catch (NoTokenException ex) {
911                    // this shouldn't happen.
912                    throw new InternalErrorException(this, ex, null);
913                }
914            }
915            return result;
916        }
917
918        /** Override the base class to do nothing since the fire() method of this
919         *  director directly handles producing the outputs. In particular, we
920         *  don't want to read from the inside of the output ports because the
921         *  inside thread may be concurrently writing to them for the next
922         *  iteration.
923         *  @param port The port to transfer tokens from.
924         *  @return False, indicating that no data token is produced now.
925         *  @exception IllegalActionException If writing the outputs fails.
926         */
927        @Override
928        public boolean transferOutputs(IOPort port)
929                throws IllegalActionException {
930            return false;
931        }
932
933        /** Override the base class to wait until the inside thread
934         *  terminates and then call super.wrapup().
935         *  @exception IllegalActionException If the wrapup() method of
936         *   one of the associated actors throws it.
937         */
938        @Override
939        public void wrapup() throws IllegalActionException {
940            // First, post a "stop frame" in case one has not been posted.
941            // In the case of a finite run, one will likely have not been posted.
942            Time environmentTime = ThreadedComposite.this.getExecutiveDirector()
943                    .getModelTime();
944            if (ThreadedComposite.this._debugging) {
945                ThreadedComposite.this._debug("Called wrapup. ",
946                        "Queueing a stop-frame token for the inside thread with time: "
947                                + environmentTime);
948            }
949            synchronized (this) {
950                // A "stop frame" has a null token list.
951                _inputFrames.add(
952                        new TokenFrame(environmentTime, null, TokenFrame.STOP));
953                notifyAll();
954            }
955
956            if (_exception != null) {
957                throw new IllegalActionException(ThreadedComposite.this,
958                        _exception, "Error in inside thread of actor.");
959            }
960            if (_thread != null && _thread.isAlive()) {
961                try {
962                    if (ThreadedComposite.this._debugging) {
963                        ThreadedComposite.this
964                                ._debug("Waiting for inside thread to stop.");
965                    }
966                    _thread.join();
967                    if (ThreadedComposite.this._debugging) {
968                        ThreadedComposite.this
969                                ._debug("Inside thread has stopped.");
970                    }
971                    if (_exception != null) {
972                        throw new IllegalActionException(ThreadedComposite.this,
973                                _exception, "Error in inside thread of actor.");
974                    }
975                } catch (InterruptedException e) {
976                    // Ignore.
977                }
978            }
979            super.wrapup();
980        }
981
982        //////////////////////////////////////////////////////////////
983        ////                   private variables                  ////
984
985        /** If an exception occurs in the inside thread, the exception
986         *  will be assigned to this member, which will cause the
987         *  the next invocation of the fire() or wrapup() method
988         *  to throw the exception.
989         */
990        private Throwable _exception;
991
992        /** Record of the times which refire requests have been made
993         *  and not yet processed by any of the fireAt() methods.
994         *  This set is accessed from both the director
995         *  thread and the inside thread so it has to
996         *  be thread safe.
997         */
998        private Set<Time> _fireAtTimes = Collections
999                .synchronizedSet(new HashSet<Time>());
1000
1001        // NOTE: Cannot use LinkedBlockingQueue for _inputFrames
1002        // because we have to release the lock on this director
1003        // while we are waiting or we get a deadlock.
1004
1005        /** Queue of unprocessed input events. This is a blocking
1006         *  queue, which blocks the calling thread if the queue is empty.
1007         *  This is accessed by both the director thread and the inside
1008         *  thread, so it has to be thread safe (LinkedBlockingQueue is a
1009         *  thread-safe container).
1010         */
1011        private LinkedList<TokenFrame> _inputFrames = new LinkedList<TokenFrame>();
1012
1013        /** List of input events in the current iteration.
1014         *  This is accessed only in the director thread so it need
1015         *  not be thread safe.
1016         */
1017        private List<QueuedToken> _inputTokens;
1018
1019        /** Queue of unprocessed output events.
1020         *  This queue is accessed from multiple threads, so it must be
1021         *  thread safe.
1022         */
1023        private LinkedList<TokenFrame> _outputFrames = new LinkedList<TokenFrame>();
1024
1025        /** Record of the time stamps at which
1026         *  to produce outputs. These are enqueued and dequeued
1027         *  in time stamp order. If the delay value is UNDEFINED,
1028         *  then this is accessed from the inside thread as well
1029         *  as the director thread, so it needs to be thread safe.
1030         *  To ensure this, we always access it within a block
1031         *  synchronized on this director.
1032         */
1033        private Queue<Time> _outputTimes = new LinkedList<Time>();
1034
1035        /** The value of the synchronizeToRealTime parameter when
1036         *  initialize() was invoked.
1037         */
1038        private boolean _synchronizeToRealTime;
1039
1040        /** The thread that executes the contained actors. */
1041        private Thread _thread;
1042
1043        //////////////////////////////////////////////////////////////
1044        ////                   inner inner classes                ////
1045
1046        ///////////////////////////////////////////////////////////////////
1047        //// CompositeThread
1048
1049        /** The inside thread, which executes the contained actor.
1050         */
1051        private class CompositeThread extends Thread {
1052            public CompositeThread() {
1053                super("CompositeThread_"
1054                        + ThreadedComposite.this.getFullName());
1055            }
1056
1057            @Override
1058            public void run() {
1059                while (!_stopRequested) {
1060                    try {
1061                        if (ThreadedComposite.this._debugging) {
1062                            ThreadedComposite.this._debug(
1063                                    "---- Waiting for inputs in the inside thread.");
1064                        }
1065                        TokenFrame frame = null;
1066                        synchronized (ThreadedDirector.this) {
1067                            // The following blocks this thread if the queue is empty.
1068                            while (_inputFrames.isEmpty() && !_stopRequested) {
1069                                // The timeout allows this to respond to stop()
1070                                // even if we have a deadlock for some reason.
1071                                ThreadedDirector.this.wait(1000L);
1072                            }
1073                            if (_stopRequested) {
1074                                break;
1075                            }
1076                            frame = _inputFrames.poll();
1077                        }
1078
1079                        // Check for a "stop frame" and exit the thread.
1080                        if (frame.type == TokenFrame.STOP) {
1081                            if (ThreadedComposite.this._debugging) {
1082                                ThreadedComposite.this._debug(
1083                                        "---- Read a stop frame in inside thread.");
1084                            }
1085                            break;
1086                        }
1087                        if (ThreadedComposite.this._debugging) {
1088                            ThreadedComposite.this._debug(
1089                                    "---- Reading input tokens in inside thread with time "
1090                                            + frame.time + " and value "
1091                                            + frame.tokens);
1092                        }
1093                        // Current time of the director should match the frame time.
1094                        // This is the view of time that should be presented to any inside actors.
1095                        setModelTime(frame.time);
1096
1097                        if (_synchronizeToRealTime) {
1098                            long currentRealTime = System.currentTimeMillis();
1099                            // If this is the first firing, record the start time.
1100                            if (_realStartTime < 0L) {
1101                                _realStartTime = currentRealTime;
1102                            }
1103                            long realTimeMillis = currentRealTime
1104                                    - _realStartTime;
1105                            long modelTimeMillis = Math.round(
1106                                    getModelTime().getDoubleValue() * 1000.0);
1107                            if (realTimeMillis < modelTimeMillis) {
1108                                try {
1109                                    Thread.sleep(
1110                                            modelTimeMillis - realTimeMillis);
1111                                } catch (InterruptedException e) {
1112                                    // Ignore and continue.
1113                                }
1114                            }
1115                        }
1116
1117                        // Note that there may not be any tokens here, since there
1118                        // may not be any inputs (the firing is in response to
1119                        // a pure event). We still want to fire the
1120                        // enclosed model at the specified time because the firing
1121                        // is due to the model itself having previously called
1122                        // fireAt().
1123                        for (QueuedToken token : frame.tokens) {
1124                            if (token.channel < token.port.getWidthInside()) {
1125                                token.port.sendInside(token.channel,
1126                                        token.token);
1127                            }
1128                        }
1129                        // Iterate the contained actors.
1130                        if (!iterateContainedActors()) {
1131                            // Collect the outputs so that outputs from this
1132                            // final iteration are produced, then terminate this
1133                            // thread.
1134                            _stopRequested = true;
1135                        }
1136
1137                        // If outputs are produced by the iteration, then
1138                        // we need to record those in an output frame.
1139                        List<QueuedToken> outputTokens = new LinkedList<QueuedToken>();
1140                        Iterator ports = outputPortList().iterator();
1141                        while (ports.hasNext()) {
1142                            IOPort port = (IOPort) ports.next();
1143                            for (int i = 0; i < port.getWidth(); i++) {
1144                                if (port.isKnownInside(i)
1145                                        && port.hasTokenInside(i)) {
1146                                    Token token = port.getInside(i);
1147                                    QueuedToken tokenBundle = new QueuedToken(
1148                                            port, i, token);
1149                                    outputTokens.add(tokenBundle);
1150                                    if (ThreadedComposite.this._debugging) {
1151                                        ThreadedComposite.this._debug(
1152                                                "---- Inside actor produced token "
1153                                                        + token + " for port "
1154                                                        + port.getName());
1155                                    }
1156                                }
1157                            }
1158                        }
1159                        Time responseTime = getModelTime().add(_delayValue);
1160
1161                        synchronized (ThreadedDirector.this) {
1162                            // If delay is UNDEFINED, then we have to now request a
1163                            // refiring at the first opportunity. This is because
1164                            // the postfire method won't do it.
1165                            if (_delayValue < 0.0) {
1166                                // If synchronizeToRealTime is true, then we want to use the
1167                                // greater of real-time or the current environment time.
1168                                // Otherwise, we just use the current environment time.
1169                                if (_synchronizeToRealTime) {
1170                                    long realTimeMillis = System
1171                                            .currentTimeMillis()
1172                                            - _realStartTime;
1173                                    Time realTime = new Time(
1174                                            ThreadedDirector.this,
1175                                            realTimeMillis * 0.001);
1176                                    responseTime = ThreadedDirector.this.fireAt(
1177                                            ThreadedComposite.this, realTime);
1178                                } else {
1179                                    responseTime = ThreadedDirector.this.fireAt(
1180                                            ThreadedComposite.this,
1181                                            getModelTime());
1182                                }
1183                                _outputTimes.add(responseTime);
1184                            }
1185                            TokenFrame outputFrame = new TokenFrame(
1186                                    responseTime, outputTokens,
1187                                    TokenFrame.EVENT);
1188                            _outputFrames.add(outputFrame);
1189                            if (ThreadedComposite.this._debugging) {
1190                                ThreadedComposite.this._debug(
1191                                        "---- Inside thread posted output frame.");
1192                            }
1193                            ThreadedDirector.this.notifyAll();
1194                            // Give the director thread a chance to react.
1195                            Thread.yield();
1196                        }
1197                    } catch (InterruptedException e) {
1198                        // Post a stop frame.
1199                        TokenFrame stopFrame = new TokenFrame(getModelTime(),
1200                                null, TokenFrame.STOP);
1201                        synchronized (ThreadedDirector.this) {
1202                            _outputFrames.add(stopFrame);
1203                            ThreadedDirector.this.notifyAll();
1204                        }
1205                        // Exit the thread.
1206                        break;
1207                    } catch (IllegalActionException ex) {
1208                        synchronized (ThreadedDirector.this) {
1209                            // To stop the outside firing, set this variable.
1210                            // On the next invocation of fire() or wrapup(), the
1211                            // exception will be thrown.
1212                            _exception = ex;
1213                            // Post a stop frame.
1214                            TokenFrame stopFrame = new TokenFrame(
1215                                    getModelTime(), null, TokenFrame.STOP);
1216                            _outputFrames.add(stopFrame);
1217                            ThreadedDirector.this.notifyAll();
1218                        }
1219                        break;
1220                    }
1221                }
1222            }
1223        }
1224    }
1225}