001/* An actor that iterates a contained actor over input arrays.
002
003 Copyright (c) 2007-2015 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.actor.lib.hoc;
029
030import java.util.Collections;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Queue;
035import java.util.concurrent.DelayQueue;
036import java.util.concurrent.Delayed;
037import java.util.concurrent.TimeUnit;
038
039import ptolemy.actor.Actor;
040import ptolemy.actor.Director;
041import ptolemy.actor.Executable;
042import ptolemy.actor.IOPort;
043import ptolemy.actor.NoTokenException;
044import ptolemy.actor.QueueReceiver;
045import ptolemy.actor.Receiver;
046import ptolemy.actor.util.Time;
047import ptolemy.data.DoubleToken;
048import ptolemy.data.Token;
049import ptolemy.data.expr.Parameter;
050import ptolemy.data.type.BaseType;
051import ptolemy.kernel.ComponentEntity;
052import ptolemy.kernel.CompositeEntity;
053import ptolemy.kernel.util.Attribute;
054import ptolemy.kernel.util.IllegalActionException;
055import ptolemy.kernel.util.InternalErrorException;
056import ptolemy.kernel.util.NameDuplicationException;
057import ptolemy.kernel.util.Settable;
058import ptolemy.kernel.util.Workspace;
059import ptolemy.util.MessageHandler;
060
061///////////////////////////////////////////////////////////////////
062//// RealTimeComposite
063
064/**
065 This is a container for another actor that fires that other actor
066 at real times corresponding to the input time stamps. Its
067 ports are those of the contained actor. Given one or more events
068 with time stamp <i>t</i> at the input ports, it queues the events
069 to provide to a firing of the contained actor that is deferred to
070 occur when real time (since start of execution, in seconds) exceeds
071 or matches <i>t</i>.  If real time already exceeds <i>t</i>, then the firing
072 may occur immediately.
073 <p>
074 In addition to the parameters of the contained actor, this actor
075 has a <i>delay</i> parameter. The value of this parameter is
076 the minimum delay (in model time) between an input event and
077 an output event that results from that input event.
078 If the enclosed actor produces no output, or if the time
079 of the outputs can be arbitrarily whatever current time
080 is in the model when they are produced, then <i>delay</i>
081 should be set to <i>UNDEFINED</i>. This is the default value.
082 With this value, the enclosed actor is
083 executed in a separate thread.
084 If the firing produces output events, then those are given time
085 stamps equal to the greater of the current model time of the
086 enclosing model and the current real time at which the outputs
087 are produced (in seconds since the start of execution). In
088 this case, the enclosed actor
089 does not regulate in any way the passage of time of the
090 enclosing model, so the time stamps of the enclosing model
091 could get arbitrarily far ahead of real time.
092 <p>
093 If the value of <i>delay</i> is 0.0 (zero), then the inside
094 model is run in the same thread as the enclosing model.
095 When this RealTimeComposite fires, the fire() method stalls
096 until real time matches the current time of the model, and
097 then invokes the enclosed model. If the enclosed model produces
098 any outputs, then those outputs have time stamps equal to the
099 time stamps of the input. Hence, from the perspective of DE
100 semantics, this actor has zero delay, even though it can
101 introduce real-time delay (which is indistinguishable from
102 just taking a long time to evaluate the fire() method).
103 Note that with <i>delay</i> = 0.0, this actor affects the
104 model in way similar to the <i>synchronizeToRealTime</i>
105 parameter of the director, except that only the events
106 provided to this actor are synchronized to real time, rather
107 than all events.
108 <p>
109 If the value of <i>delay</i> is positive, then the inside
110 model is run in a separate thread, just as if the value
111 were UNDEFINED, but in this case, this actor does
112 regulate the passage of time of the enclosing model.
113 In particular, given an event with time stamp <i>t</i>
114 it prevents model time from advancing past <i>t</i>
115 + <i>delay</i> until the firing triggered by the event
116 has completed (which will be at some real time greater
117 than <i>t</i>). Any outputs produced by that firing are
118 assigned time stamps equal to the greater of <i>t</i>
119 + <i>delay</i> and the current real time at which the
120 output is produced.
121 <p>
122 For various reasons, this actor is tricky to use. The most natural
123 domain to use it in is DE, providing it with input events with time
124 stamps that specify when to perform some action, such as an actuator
125 or display action. However, if the DE system is an open-loop system,
126 then model time of the DE system can get very far ahead of the
127 RealTimeComposite. It is helpful to use a feedback loop including
128 this RealTimeComposite to keep the DE model from getting ahead,
129 and to use the <i>delay</i> parameter judiciously as explained
130 above.
131 <p>
132 This actor may also be used in SDF and SR if the <i>period</i> parameter
133 of the director is set to something greater than zero.
134 This actor consumes its inputs and schedules execution in
135 its postfire() method, and hence in SR will behave as a strict
136 actor (all inputs must be known for anything to happen).
137 <p>
138 FIXME: For actors that are triggered by internal calls to fireAt(),
139 it seems that the delay needs to be no larger than the smallest
140 increment between calls to fireAt(). Is this correct?  Why?
141 <p>
142 FIXME: If there is a PortParameter, the parameter gets updated when the
143 fire() method of this composite is invoked, which creates a nondeterminate
144 interaction with the deferred execution. See CompositeActor.fire().
145
146 @author Edward A. Lee
147 @version $Id$
148 @since Ptolemy II 6.1
149 @deprecated Use {@link ptolemy.actor.lib.hoc.ThreadedComposite} instead
150 @Pt.ProposedRating Yellow (eal)
151 @Pt.AcceptedRating Red (neuendor)
152 @deprecated Use ThreadedComposite instead.
153 */
154@Deprecated
155public class RealTimeComposite extends MirrorComposite {
156    /** Create an actor with a name and a container.
157     *  The container argument must not be null, or a
158     *  NullPointerException will be thrown.  This actor will use the
159     *  workspace of the container for synchronization and version counts.
160     *  If the name argument is null, then the name is set to the empty string.
161     *  Increment the version of the workspace.
162     *  @param container The container actor.
163     *  @param name The name of this actor.
164     *  @exception IllegalActionException If the container is incompatible
165     *   with this actor.
166     *  @exception NameDuplicationException If the name coincides with
167     *   an actor already in the container.
168     */
169    public RealTimeComposite(CompositeEntity container, String name)
170            throws IllegalActionException, NameDuplicationException {
171        super(container, name);
172        setClassName("ptolemy.actor.lib.hoc.RealTimeComposite");
173        new RealTimeDirector(this, "RealTimeDirector");
174
175        // Hidden parameter defining "UNDEFINED".
176        Parameter UNDEFINED = new Parameter(this, "UNDEFINED");
177        UNDEFINED.setVisibility(Settable.EXPERT);
178        UNDEFINED.setPersistent(false);
179        UNDEFINED.setExpression("-1.0");
180
181        delay = new Parameter(this, "delay");
182        delay.setTypeEquals(BaseType.DOUBLE);
183        delay.setExpression("UNDEFINED");
184    }
185
186    ///////////////////////////////////////////////////////////////////
187    ////                         parameters                        ////
188
189    /** The maximum model-time delay between the input events and the
190     *  output events. This is a double that defaults to <i>UNDEFINED</i>.
191     */
192    public Parameter delay;
193
194    ///////////////////////////////////////////////////////////////////
195    ////                         public methods                    ////
196
197    /** React to a change in an attribute.  This method is called by
198     *  a contained attribute when its value changes.  In this base class,
199     *  the method does nothing.  In derived classes, this method may
200     *  throw an exception, indicating that the new attribute value
201     *  is invalid.  It is up to the caller to restore the attribute
202     *  to a valid value if an exception is thrown.
203     *  @param attribute The attribute that changed.
204     *  @exception IllegalActionException If the change is not acceptable
205     *   to this container (not thrown in this base class).
206     */
207    @Override
208    public void attributeChanged(Attribute attribute)
209            throws IllegalActionException {
210        if (attribute == delay) {
211            _delayValue = ((DoubleToken) delay.getToken()).doubleValue();
212        } else {
213            super.attributeChanged(attribute);
214        }
215    }
216
217    /** Clone the object into the specified workspace. This overrides
218     *  the base class to instantiate a new RealTimeDirector.
219     *  @param workspace The workspace for the new object.
220     *  @return A new NamedObj.
221     *  @exception CloneNotSupportedException If any of the attributes
222     *   cannot be cloned.
223     *  @see #exportMoML(java.io.Writer, int, String)
224     */
225    @Override
226    public Object clone(Workspace workspace) throws CloneNotSupportedException {
227        RealTimeComposite result = (RealTimeComposite) super.clone(workspace);
228        try {
229            // Remove the old inner RealTimeDirector(s) that is(are) in the wrong workspace.
230            String realTimeDirectorName = null;
231            Iterator realTimeDirectors = result
232                    .attributeList(RealTimeDirector.class).iterator();
233            while (realTimeDirectors.hasNext()) {
234                RealTimeDirector oldRealTimeDirector = (RealTimeDirector) realTimeDirectors
235                        .next();
236                if (realTimeDirectorName == null) {
237                    realTimeDirectorName = oldRealTimeDirector.getName();
238                }
239                oldRealTimeDirector.setContainer(null);
240            }
241
242            // Create a new RealTimeDirector that is in the right workspace.
243            RealTimeDirector realTimeDirector = result.new RealTimeDirector(
244                    workspace);
245            realTimeDirector.setContainer(result);
246            realTimeDirector.setName(realTimeDirectorName);
247        } catch (Throwable throwable) {
248            throw new CloneNotSupportedException(
249                    "Could not clone: " + throwable);
250        }
251        return result;
252    }
253
254    /** Invoke iterations on the contained actor of the
255     *  container of this director repeatedly until either it runs out
256     *  of input data or prefire() returns false. If postfire() of any
257     *  actor returns false, then return false. Otherwise, return true.
258     *  @return True to allow the thread to continue executing.
259     *  @exception IllegalActionException If any called method of
260     *   of the contained actor throws it, or if the contained
261     *   actor is not opaque.
262     */
263    public boolean fireContainedActors() throws IllegalActionException {
264        // Don't call "super.fire();" here, this actor contains its
265        // own director.
266        Iterator actors = entityList().iterator();
267        boolean postfireReturns = true;
268
269        while (actors.hasNext() && !_stopRequested) {
270            Actor actor = (Actor) actors.next();
271
272            if (!((ComponentEntity) actor).isOpaque()) {
273                throw new IllegalActionException(this,
274                        "Inside actor is not opaque "
275                                + "(perhaps it needs a director).");
276            }
277
278            int result = Executable.COMPLETED;
279
280            while (result != Executable.NOT_READY) {
281                if (_debugging) {
282                    _debug("Iterating actor: " + actor.getFullName());
283                }
284                if (_debugging) {
285                    _debug("---- Iterating actor in associated thread: "
286                            + actor.getFullName());
287                }
288                result = actor.iterate(1);
289
290                // Should return if there are no more input data,
291                // irrespective of return value of prefire() of
292                // the actor, which is not reliable.
293                boolean outOfData = true;
294                Iterator inPorts = actor.inputPortList().iterator();
295
296                while (inPorts.hasNext()) {
297                    IOPort port = (IOPort) inPorts.next();
298
299                    for (int i = 0; i < port.getWidth(); i++) {
300                        if (port.hasToken(i)) {
301                            outOfData = false;
302                            break;
303                        }
304                    }
305                }
306
307                if (outOfData) {
308                    break;
309                }
310
311                if (result == Executable.STOP_ITERATING) {
312                    if (_debugging) {
313                        _debug("---- Actor requests halt: "
314                                + actor.getFullName());
315                    }
316                    postfireReturns = false;
317                    break;
318                }
319            }
320        }
321        return postfireReturns;
322    }
323
324    ///////////////////////////////////////////////////////////////////
325    ////                         private variables                 ////
326
327    /** The cached value of the <i>delay</i> parameter. */
328    private double _delayValue = 0.0;
329
330    /** Queue of times at which inside actors have requested firings.
331     *  This queue is accessed from multiple threads, so it must be
332     *  thread safe.
333     */
334    private List<Time> _fireAtTimes = Collections
335            .synchronizedList(new LinkedList<Time>());
336
337    /** Queue of unprocessed input events.
338     */
339    private DelayQueue<InputFrame> _inputFrames = new DelayQueue<InputFrame>();
340
341    /** Queue of unprocessed output events.
342     *  This queue is accessed from multiple threads, so it must be
343     *  thread safe.
344     */
345    private List<OutputFrame> _outputFrames = Collections
346            .synchronizedList(new LinkedList<OutputFrame>());
347
348    /** The real time at which the model begins executing, in milliseconds. */
349    private long _realStartTime = 0;
350
351    /** Queue of times at which responses to firings are expected.
352     *  This is accessed only from the Director action methods, which run
353     *  in a single thread, so it need not by thread safe.
354     */
355    private Queue<Time> _responseTimes = new LinkedList<Time>();
356
357    ///////////////////////////////////////////////////////////////////
358    ////                         inner classes                     ////
359
360    ///////////////////////////////////////////////////////////////////
361    //// InputFrame
362
363    /** Bundle of a token and the input port at which it arrived.
364     *  Use null for <i>theTokens</i> specifies this frame as a "stop frame" to
365     *  flag that no more inputs will be delivered.
366     */
367    private class InputFrame implements Delayed {
368
369        /** Construct an input frame.
370         *  @param theTime The model time of the input events.
371         *  @param theTokens The tokens in the input events.
372         */
373        public InputFrame(Time theTime, List<QueuedToken> theTokens) {
374            tokens = theTokens;
375            time = theTime;
376        }
377
378        public final Time time;
379
380        public final List<QueuedToken> tokens;
381
382        @Override
383        public long getDelay(TimeUnit unit) {
384            // Calculate time to wait.
385            long elapsedTime = System.currentTimeMillis() - _realStartTime;
386            // NOTE: We assume that the elapsed time can be
387            // safely cast to a double.  This means that
388            // the DE domain has an upper limit on running
389            // time of Double.MAX_VALUE milliseconds.
390            double elapsedTimeInSeconds = elapsedTime / 1000.0;
391            long timeToWait = (long) (time.subtract(elapsedTimeInSeconds)
392                    .getDoubleValue() * 1000.0);
393            return unit.convert(timeToWait, TimeUnit.MILLISECONDS);
394        }
395
396        @Override
397        public int compareTo(Delayed frame) {
398            // NOTE: We assume that only comparisons against instances
399            // of Frame will be done.  Is this safe?
400            return time.compareTo(((InputFrame) frame).time);
401        }
402
403        /** Return true if this InputFrame object has the same
404         *  time as the given InputFrame object.
405         *  @param inputFrame The InputFrame object that this
406         *  InputFrame object is compared to.
407         *  @return True if the two InputFrame objects have the same time.
408         */
409        @Override
410        public boolean equals(Object inputFrame) {
411            // See http://www.technofundo.com/tech/java/equalhash.html
412
413            /* FindBugs says that InputFrame "defined
414             * compareTo(Object) and uses Object.equals()"
415             * http://findbugs.sourceforge.net/bugDescriptions.html#EQ_COMPARETO_USE_OBJECT_EQUALS
416             * says: "This class defines a compareTo(...) method but
417             * inherits its equals() method from
418             * java.lang.Object. Generally, the value of compareTo should
419             * return zero if and only if equals returns true. If this is
420             * violated, weird and unpredictable failures will occur in
421             * classes such as PriorityQueue. In Java 5 the
422             * PriorityQueue.remove method uses the compareTo method,
423             * while in Java 6 it uses the equals method.
424             *
425             *  From the JavaDoc for the compareTo method in the
426             *  Comparable interface:
427             *
428             * It is strongly recommended, but not strictly required that
429             * (x.compareTo(y)==0) == (x.equals(y)). Generally speaking,
430             * any class that implements the Comparable interface and
431             * violates this condition should clearly indicate this
432             * fact. The recommended language is "Note: this class has a
433             * natural ordering that is inconsistent with equals." "
434             */
435            if (inputFrame == this) {
436                return true;
437            }
438            if (inputFrame == null || inputFrame.getClass() != getClass()) {
439                return false;
440            } else {
441                InputFrame frame = (InputFrame) inputFrame;
442                if (compareTo(frame) == 0
443                        && frame.tokens.size() == tokens.size()) {
444                    return frame.tokens.equals(tokens);
445                }
446            }
447            return false;
448        }
449
450        /** Return the hash code for the InputFrame object.
451         *  @return The hash code for this InputFrame object;
452         */
453        @Override
454        public int hashCode() {
455            // See http://www.technofundo.com/tech/java/equalhash.html
456            int hashCode = 7;
457            if (time != null) {
458                hashCode = 31 * hashCode + time.hashCode();
459            }
460            if (tokens != null) {
461                hashCode = 31 * hashCode + tokens.hashCode();
462            }
463            return hashCode;
464        }
465    }
466
467    ///////////////////////////////////////////////////////////////////
468    //// QueuedToken
469
470    /** Bundle of a token and the input port and channel
471     *  at which it arrived.
472     */
473    private static class QueuedToken {
474
475        // FindBugs suggests making this class static so as to decrease
476        // the size of instances and avoid dangling references.
477
478        public QueuedToken(IOPort thePort, int theChannel, Token theToken) {
479            token = theToken;
480            channel = theChannel;
481            port = thePort;
482        }
483
484        public final int channel;
485
486        public final Token token;
487
488        public final IOPort port;
489
490        @Override
491        public String toString() {
492            return "token " + token + " for port " + port.getFullName() + "("
493                    + channel + ")";
494        }
495    }
496
497    ///////////////////////////////////////////////////////////////////
498    //// OutputFrame
499
500    /** Bundle of a token and the output port at which it arrived.
501     */
502    private static class OutputFrame {
503
504        // FindBugs suggests making this class static so as to decrease
505        // the size of instances and avoid dangling references.
506
507        /** Construct an output frame.
508         *  @param theTime The model time of the output events.
509         *  @param theTokens The tokens in the output events.
510         */
511        public OutputFrame(Time theTime, List<QueuedToken> theTokens) {
512            tokens = theTokens;
513            time = theTime;
514        }
515
516        public final Time time;
517
518        public final List<QueuedToken> tokens;
519    }
520
521    ///////////////////////////////////////////////////////////////////
522    //// RealTimeDirector
523
524    /** This is a specialized director that defers firing of the
525     *  contained actors until real-time matches the time stamp of
526     *  provided inputs. It does this in a separate thread that
527     *  blocks until the times match, then transfers the input tokens
528     *  that arrived with that time stamp and fires the contained actors
529     *  in the order in which they appear in the actor list repeatedly
530     *  until either there is no more input data for the actor or
531     *  the prefire() method of the actor returns false. If postfire()
532     *  of any actor returns false, then postfire() of this director
533     *  will return false, requesting a halt to execution of the model.
534     */
535    private class RealTimeDirector extends Director {
536        /** Create a new instance of the director for RealTimeComposite.
537         *  @param container The container for the director.
538         *  @param name The name of the director.
539         *  @exception IllegalActionException Not thrown in this base class.
540         *  @exception NameDuplicationException Not thrown in this base class.
541         */
542        public RealTimeDirector(CompositeEntity container, String name)
543                throws IllegalActionException, NameDuplicationException {
544            super(container, name);
545            setPersistent(false);
546        }
547
548        /** Construct a RealTimeDirector in the specified workspace with
549         *  no container and an empty string as a name. You can then change
550         *  the name with setName(). If the workspace argument is null, then
551         *  use the default workspace.  You should set the local director or
552         *  executive director before attempting to send data to the actor
553         *  or to execute it. Add the actor to the workspace directory.
554         *  Increment the version number of the workspace.
555         *  @param workspace The workspace that will list the actor.
556         *  @exception IllegalActionException If the container is incompatible
557         *   with this actor.
558         *  @exception NameDuplicationException If the name coincides with
559         *   an actor already in the container.
560         */
561        public RealTimeDirector(Workspace workspace)
562                throws IllegalActionException, NameDuplicationException {
563            super(workspace);
564            setPersistent(false);
565        }
566
567        /** If current model time of the environment matches the time at which outputs
568         *  that have been queued should be produced, then produce them.
569         *  Yield to other threads.
570         *  @exception IllegalActionException If production of an output
571         *   fails (e.g. type error).
572         */
573        @Override
574        public void fire() throws IllegalActionException {
575            if (_realStartTime < 0L) {
576                _realStartTime = System.currentTimeMillis();
577            }
578            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
579                    .getModelTime();
580            if (_delayValue == 0.0) {
581                // Delay is zero, so wait until current time matches
582                // model time, and then treat this as an ordinary composite actor.
583                long realTimeMillis = System.currentTimeMillis()
584                        - _realStartTime;
585                long modelTimeMillis = Math
586                        .round(environmentTime.getDoubleValue() * 1000.0);
587                if (realTimeMillis < modelTimeMillis) {
588                    try {
589                        Thread.sleep(modelTimeMillis - realTimeMillis);
590                    } catch (InterruptedException e) {
591                        // Ignore and continue.
592                    }
593                }
594                // FIXME: This isn't quite right, since this will postfire()
595                // contained actors.
596                super.fire();
597            } else {
598                // Delay is either UNDEFINED or positive,
599                // so we are running in separate thread.
600
601                // If the delay value is positive, then we may need
602                // to stall to prevent model time from getting too
603                // far ahead of real time.
604                if (_delayValue > 0.0) {
605                    // Delay value is positive. If current time matches
606                    // the time at the head of the _responseTime queue,
607                    // then stall until real time matches that time.
608                    // Note that there is no harm in consuming the
609                    // head of the queue since the side effect here
610                    // is the passage of real time.
611                    Time responseTime = _responseTimes.peek();
612                    if (responseTime != null
613                            && responseTime.equals(environmentTime)) {
614
615                        // FIXME: Findbugs says that the next line:
616                        // "ignores return value of java.util.Queue.poll()"
617                        _responseTimes.poll();
618                        // Time matches.  Compare to real time.
619                        long realTimeMillis = System.currentTimeMillis()
620                                - _realStartTime;
621                        long modelTimeMillis = Math.round(
622                                environmentTime.getDoubleValue() * 1000.0);
623                        if (realTimeMillis < modelTimeMillis) {
624                            try {
625                                Thread.sleep(modelTimeMillis - realTimeMillis);
626                            } catch (InterruptedException e) {
627                                // Ignore and continue.
628                            }
629                        }
630                    }
631                }
632
633                // Next check for outputs to produce.
634                if (_outputFrames.size() > 0) {
635                    OutputFrame frame = _outputFrames.get(0);
636                    if (frame.time.equals(environmentTime)) {
637                        // Current time matches the time of the first frame on
638                        // the output queue.
639                        // Produce the outputs on the frame.
640                        for (QueuedToken token : frame.tokens) {
641                            if (token.channel < token.port.getWidth()) {
642                                token.port.send(token.channel, token.token);
643                            }
644                        }
645                    }
646                }
647                Thread.yield();
648            }
649        }
650
651        /** Delegate by calling fireAt() on the director of the container's
652         *  container.
653         *  @param actor The actor requesting firing.
654         *  @param time The time at which to fire.
655         *  @param microstep The microstep.
656         *  @return The time at which the actor passed as an argument
657         *   will be fired.
658         */
659        @Override
660        public Time fireAt(Actor actor, Time time, int microstep)
661                throws IllegalActionException {
662            Time result = time;
663            Director director = RealTimeComposite.this.getExecutiveDirector();
664            if (director != null) {
665                if (RealTimeComposite.this._debugging) {
666                    RealTimeComposite.this
667                            ._debug("---- Actor requests firing at time " + time
668                                    + ": " + actor.getFullName());
669                }
670                result = director.fireAt(RealTimeComposite.this, time,
671                        microstep);
672            }
673            if (actor != RealTimeComposite.this) {
674                // The fireAt() request is coming from the inside, so
675                // when the firing occurs, we want to post an input
676                // frame (even if there are no input events) for
677                // the associated thread.
678                _fireAtTimes.add(time);
679            }
680            return result;
681        }
682
683        /** Fire the specified actor at the first opportunity
684         *  and then pass the request up to the executive director.
685         *  When passing it up, request a firing at the greater of
686         *  the current time of that director or the elapsed real
687         *  time since the start of the model.
688         *  This is useful for actors that spontaneously produce output,
689         *  e.g. from sensor data or from completion of some previously
690         *  started task. The firing of the actor will produce the output,
691         *  sending it to the inside of the output ports of this composite,
692         *  and then the firing of the composite will transfer those tokens
693         *  to the outside model.
694         *  @param actor The actor requesting firing (ignored).
695         */
696        @Override
697        public Time fireAtCurrentTime(Actor actor)
698                throws IllegalActionException {
699            // Coverity Scan reports that this method does not call
700            // super.fireAtCurrentTime(), which is ok because we call
701            // it on the executive director.
702            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
703                    .getModelTime();
704            _inputFrames.put(new InputFrame(environmentTime,
705                    new LinkedList<QueuedToken>()));
706            Director director = RealTimeComposite.this.getExecutiveDirector();
707            if (director != null) {
708                // We assume that the contained actors mean "real time" by
709                // "current time". Hopefully, this will be in the future w.r.t. model time.
710                // Use fireAt() hoping that the director will not increment time
711                // too soon.
712                // FIXME: This is not right!
713                Time time = new Time(this,
714                        (System.currentTimeMillis() - _realStartTime) / 1000.0);
715                if (RealTimeComposite.this._debugging) {
716                    RealTimeComposite.this._debug(
717                            "----- fireAtCurrentTime() request by actor "
718                                    + actor.getFullName() + ". Model time is "
719                                    + environmentTime + ", and real time is "
720                                    + time);
721                }
722                director.fireAt(RealTimeComposite.this, time);
723                return time;
724            }
725            return environmentTime;
726        }
727
728        /** Return the current time of the enclosing actor if the delay
729         *  is zero. Otherwise, get the local notion of current time.
730         *  @return The current time.
731         */
732        @Override
733        public Time getModelTime() {
734            if (_delayValue == 0.0) {
735                return ((Actor) getContainer()).getExecutiveDirector()
736                        .getModelTime();
737            } else {
738                return super.getModelTime();
739            }
740        }
741
742        /** Start the associated thread.
743         *  @exception IllegalActionException If the initialize() method of
744         *   one of the associated actors throws it.
745         */
746        @Override
747        public void initialize() throws IllegalActionException {
748            // The following must be done before the initialize() methods
749            // of the actors is called because those methods may call fireAt().
750            _fireAtTimes.clear();
751
752            // The superclass will initialize all the actors.
753            super.initialize();
754            // Set a flag indicating that the first firing should
755            // initialize the _realStartTime variable. This is done
756            // in the first firing to be as late as possible, so
757            // that startup transients are minimized.
758            // FIXME: This will impede synchronization with other
759            // actors, since there won't be a common time base.
760            _realStartTime = -1L;
761            if (_delayValue != 0) {
762                // We will be executing in a new thread.
763                // Create and start that thread.
764                _inputFrames.clear();
765                _outputFrames.clear();
766                _responseTimes.clear();
767                _thread = new RealTimeThread();
768                _thread.setPriority(Thread.MAX_PRIORITY);
769                _thread.start();
770            }
771        }
772
773        /** Return a new instance of QueueReceiver.
774         *  @return A new instance of QueueReceiver.
775         *  @see QueueReceiver
776         */
777        @Override
778        public Receiver newReceiver() {
779            return new QueueReceiver();
780        }
781
782        /** Clear the list of input events for this iteration and return true
783         *  if the associated thread is alive, if <i>delay</i> is not 0.0.
784         *  Otherwise, return true.
785         *  @return True if the associated thread is still alive, or true
786         *   if delay == 0.0.
787         */
788        @Override
789        public boolean prefire() throws IllegalActionException {
790            // Do not call super.prefire()!
791            // Superclass aligns current time to that of the container.
792            // The notion of current time presented to these actors
793            // should match that of the frame.
794            // super.prefire();
795            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
796                    .getModelTime();
797            if (RealTimeComposite.this._debugging) {
798                RealTimeComposite.this
799                        ._debug("----- Current environment time is: "
800                                + environmentTime);
801            }
802
803            if (_delayValue != 0) {
804                // Have to create a new list because the previous list may
805                // not have been consumed yet.
806                _inputTokens = new LinkedList<QueuedToken>();
807                return _thread.isAlive();
808            } else {
809                return true;
810            }
811        }
812
813        /** Send all the collected tokens to the queue for consumption
814         *  by the associated thread, if there is an associated thread.
815         *  Otherwise, just invoke the superclass postfire().
816         *  @return True if the associated thread is still alive.
817         */
818        @Override
819        public boolean postfire() throws IllegalActionException {
820            boolean result = super.postfire();
821            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
822                    .getModelTime();
823            if (_delayValue != 0) {
824                // Delay is either UNDEFINED or positive.
825                // Post the inputs for consumption in the
826                // associated thread.
827                if (_inputTokens.size() > 0) {
828                    if (RealTimeComposite.this._debugging) {
829                        RealTimeComposite.this._debug(
830                                "Queueing input tokens for the associated thread: "
831                                        + _inputTokens.toString()
832                                        + " to be processed at time "
833                                        + environmentTime);
834                    }
835                    _inputFrames
836                            .put(new InputFrame(environmentTime, _inputTokens));
837                    if (_delayValue > 0.0) {
838                        // Delay value is positive. Schedule a firing
839                        // at current time plus the delay.
840                        Time responseTime = environmentTime.add(_delayValue);
841                        fireAt(RealTimeComposite.this, responseTime);
842
843                        // Queue an indicator to stall when that firing occurs.
844                        _responseTimes.add(responseTime);
845                    }
846                }
847                // Even if _inputTokens is null, we still want to post an
848                // event if the firing is due to a call to fireAt() from the inside.
849                // Check to see whether that is the case.
850                if (_fireAtTimes.size() > 0) {
851                    Time fireAtTime = _fireAtTimes.get(0);
852                    if (fireAtTime.equals(environmentTime)) {
853                        // Remove the time from the queue.
854                        _fireAtTimes.remove(0);
855                        // Queue an iteration even if there are no inputs.
856                        if (_inputTokens.size() == 0) {
857                            if (RealTimeComposite.this._debugging) {
858                                RealTimeComposite.this._debug(
859                                        "Queueing pure event for the associated thread, "
860                                                + " to be processed at time "
861                                                + environmentTime);
862                            }
863                            _inputFrames.put(new InputFrame(environmentTime,
864                                    _inputTokens));
865                            if (_delayValue > 0.0) {
866                                // Delay value is positive. Schedule a firing
867                                // at current time plus the delay.
868                                Time responseTime = environmentTime
869                                        .add(_delayValue);
870                                fireAt(RealTimeComposite.this, responseTime);
871
872                                // Queue an indicator to stall when that firing occurs.
873                                _responseTimes.add(responseTime);
874                            }
875                        }
876                    }
877                }
878
879                // If current time matches the time at the head of
880                // of the queue for outputs, then consume the data on the
881                // head of the queue. Those data were sent to the output
882                // in the fire() method.
883                if (_outputFrames.size() > 0) {
884                    OutputFrame frame = _outputFrames.get(0);
885                    if (frame.time.equals(environmentTime)) {
886                        // Consume the outputs on the frame, which will have
887                        // been sent in the fire() method.
888                        _outputFrames.remove(0);
889                    }
890                }
891                result = _thread.isAlive();
892            }
893            return result;
894        }
895
896        /** Override the base class to post a "stop frame" on the queue
897         *  if there is an associated thread.
898         */
899        @Override
900        public void stop() {
901            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
902                    .getModelTime();
903            if (_delayValue != 0) {
904                if (RealTimeComposite.this._debugging) {
905                    RealTimeComposite.this._debug(
906                            "Queueing a stop-frame token for the associated thread with time: "
907                                    + environmentTime);
908                }
909                // A "stop frame" has a null token list.
910                _inputFrames.put(new InputFrame(environmentTime, null));
911            } else {
912                super.stop();
913            }
914        }
915
916        /** Record data from the specified input port
917         *  for transfer to the queue used to communicate these data to the
918         *  associated thread.
919         *  @param port The port to transfer tokens from.
920         *  @return True if at least one data token is transferred.
921         *  @exception IllegalActionException If reading the inputs fails.
922         */
923        @Override
924        public boolean transferInputs(IOPort port)
925                throws IllegalActionException {
926            if (_delayValue == 0) {
927                return super.transferInputs(port);
928            }
929            boolean result = false;
930
931            for (int i = 0; i < port.getWidth(); i++) {
932                try {
933                    if (port.isKnown(i)) {
934                        if (port.hasToken(i)) {
935                            Token token = port.get(i);
936                            _inputTokens.add(new QueuedToken(port, i, token));
937                            if (RealTimeComposite.this._debugging) {
938                                RealTimeComposite.this._debug(getName(),
939                                        "transferring input from "
940                                                + port.getName());
941                            }
942                            result = true;
943                        }
944                    }
945                } catch (NoTokenException ex) {
946                    // this shouldn't happen.
947                    throw new InternalErrorException(this, ex, null);
948                }
949            }
950            return result;
951        }
952
953        /** If real time is less than or equal to the current model time
954         *  of the environment, then produce the outputs immediately at the
955         *  current model time. Otherwise, collect them and queue them to
956         *  be produced by the fire method when model time matches the
957         *  current real time, and call fireAt() to request a firing
958         *  at that time.
959         *  @param port The port to transfer tokens from.
960         *  @return True if at least one data token is produced now.
961         *  @exception IllegalActionException If reading the inputs fails.
962         */
963        @Override
964        public boolean transferOutputs(IOPort port)
965                throws IllegalActionException {
966            if (_delayValue == 0) {
967                return super.transferOutputs(port);
968            }
969            // Compare against the environment time.
970            Time environmentTime = RealTimeComposite.this.getExecutiveDirector()
971                    .getModelTime();
972            double realTimeInSeconds = (System.currentTimeMillis()
973                    - _realStartTime) / 1000.0;
974            if (environmentTime.getDoubleValue() >= realTimeInSeconds) {
975                return super.transferOutputs(port);
976            } else {
977                // The current real time is greater than the current
978                // model time of the environment. Schedule the production
979                // of outputs at the real time.
980                environmentTime = new Time(this, realTimeInSeconds);
981                LinkedList<QueuedToken> outputTokens = new LinkedList<QueuedToken>();
982                for (int i = 0; i < port.getWidth(); i++) {
983                    try {
984                        if (port.isKnownInside(i)) {
985                            if (port.hasTokenInside(i)) {
986                                Token token = port.getInside(i);
987                                outputTokens
988                                        .add(new QueuedToken(port, i, token));
989                                if (RealTimeComposite.this._debugging) {
990                                    RealTimeComposite.this._debug(getName(),
991                                            "transferring output from "
992                                                    + port.getName()
993                                                    + " with value " + token);
994                                }
995                            }
996                        }
997                    } catch (NoTokenException ex) {
998                        // this shouldn't happen.
999                        throw new InternalErrorException(this, ex, null);
1000                    }
1001                }
1002                if (outputTokens.size() > 0) {
1003                    OutputFrame frame = new OutputFrame(environmentTime,
1004                            outputTokens);
1005                    _outputFrames.add(frame);
1006                    // Request a firing to actually transfer the outputs to
1007                    // the outside.
1008                    fireAt(RealTimeComposite.this, environmentTime);
1009                }
1010                return false;
1011            }
1012        }
1013
1014        /** Override the base class to wait until the associated thread
1015         *  terminates and then call super.wrapup().
1016         *  @exception IllegalActionException If the wrapup() method of
1017         *   one of the associated actors throws it.
1018         */
1019        @Override
1020        public void wrapup() throws IllegalActionException {
1021            if (_delayValue != 0) {
1022                // First, post a "stop frame" in case one has not been posted.
1023                // In the case of a finite run, one will likely have not been posted.
1024                Time environmentTime = RealTimeComposite.this
1025                        .getExecutiveDirector().getModelTime();
1026                if (RealTimeComposite.this._debugging) {
1027                    RealTimeComposite.this._debug(
1028                            "Queueing a stop-frame token for the associated thread with time: "
1029                                    + environmentTime);
1030                }
1031                // A "stop frame" has a null token list.
1032                _inputFrames.put(new InputFrame(environmentTime, null));
1033                try {
1034                    if (RealTimeComposite.this._debugging) {
1035                        RealTimeComposite.this._debug(
1036                                "Waiting for associated thread to stop.");
1037                    }
1038                    _thread.join();
1039                    if (RealTimeComposite.this._debugging) {
1040                        RealTimeComposite.this
1041                                ._debug("Associated thread has stopped.");
1042                    }
1043                } catch (InterruptedException e) {
1044                    // Ignore.
1045                }
1046            }
1047            super.wrapup();
1048        }
1049
1050        //////////////////////////////////////////////////////////////
1051        ////                   private variables                  ////
1052
1053        /** List of input events in the current iteration. */
1054        private List<QueuedToken> _inputTokens;
1055
1056        /** The thread that executes the contained actors. */
1057        private Thread _thread;
1058
1059        //////////////////////////////////////////////////////////////
1060        ////                   inner inner classes                ////
1061
1062        ///////////////////////////////////////////////////////////////////
1063        //// RealTimeThread
1064
1065        /** This the thread that executed the actors.
1066         */
1067        private class RealTimeThread extends Thread {
1068            public RealTimeThread() {
1069                super("RealTimeThread");
1070            }
1071
1072            @Override
1073            public void run() {
1074                while (!_stopRequested) {
1075                    try {
1076                        if (RealTimeComposite.this._debugging) {
1077                            RealTimeComposite.this._debug(
1078                                    "---- Waiting for inputs in the associated thread.");
1079                        }
1080                        InputFrame frame = _inputFrames.take();
1081                        if (frame.tokens == null) {
1082                            // Recognize a "stop frame" and exit the thread.
1083                            if (RealTimeComposite.this._debugging) {
1084                                RealTimeComposite.this._debug(
1085                                        "---- Read a stop frame in associated thread.");
1086                            }
1087                            break;
1088                        }
1089                        if (RealTimeComposite.this._debugging) {
1090                            RealTimeComposite.this._debug(
1091                                    "---- Reading input tokens in associated thread with time "
1092                                            + frame.time + " and value "
1093                                            + frame.tokens);
1094                        }
1095                        // Current time of the director should match the frame time.
1096                        // This is the view of time that should be presented to any inside actors.
1097                        localClock.setLocalTime(frame.time);
1098
1099                        // Note that there may not be any tokens here, since there
1100                        // may not be any inputs. We still want to iterate the
1101                        // enclosed model at the specified time because the firing
1102                        // is due to the model itself having previously called
1103                        // fireAt().
1104                        for (QueuedToken token : frame.tokens) {
1105                            if (token.channel < token.port.getWidthInside()) {
1106                                token.port.sendInside(token.channel,
1107                                        token.token);
1108                            }
1109                        }
1110                        boolean postfireReturnsTrue = fireContainedActors();
1111                        // If outputs are produced by the firing, then
1112                        // we need to trigger a transferOutputs() call.
1113                        // Note that this does not have to be done if the delay
1114                        // is 0.0, since it will be done by the superclass.
1115                        if (_delayValue != 0.0) {
1116                            Iterator ports = outputPortList().iterator();
1117                            while (ports.hasNext()) {
1118                                IOPort port = (IOPort) ports.next();
1119                                boolean hasOutputs = false;
1120                                for (int i = 0; i < port.getWidth(); i++) {
1121                                    if (port.isKnownInside(i)
1122                                            && port.hasTokenInside(i)) {
1123                                        hasOutputs = true;
1124                                    }
1125                                }
1126                                if (hasOutputs) {
1127                                    transferOutputs(port);
1128                                }
1129                            }
1130                        }
1131                        if (!postfireReturnsTrue) {
1132                            // postfire() of the contained actors returns false.
1133                            break;
1134                        }
1135                    } catch (InterruptedException e) {
1136                        // Exit the thread.
1137                        break;
1138                    } catch (IllegalActionException ex) {
1139                        MessageHandler.error("Error in real-time thread.", ex);
1140                    }
1141                }
1142            }
1143        }
1144    }
1145}