001/* An actor that iterates a contained actor over input arrays. 002 003 Copyright (c) 2007-2015 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.actor.lib.hoc; 029 030import java.util.Collections; 031import java.util.Iterator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Queue; 035import java.util.concurrent.DelayQueue; 036import java.util.concurrent.Delayed; 037import java.util.concurrent.TimeUnit; 038 039import ptolemy.actor.Actor; 040import ptolemy.actor.Director; 041import ptolemy.actor.Executable; 042import ptolemy.actor.IOPort; 043import ptolemy.actor.NoTokenException; 044import ptolemy.actor.QueueReceiver; 045import ptolemy.actor.Receiver; 046import ptolemy.actor.util.Time; 047import ptolemy.data.DoubleToken; 048import ptolemy.data.Token; 049import ptolemy.data.expr.Parameter; 050import ptolemy.data.type.BaseType; 051import ptolemy.kernel.ComponentEntity; 052import ptolemy.kernel.CompositeEntity; 053import ptolemy.kernel.util.Attribute; 054import ptolemy.kernel.util.IllegalActionException; 055import ptolemy.kernel.util.InternalErrorException; 056import ptolemy.kernel.util.NameDuplicationException; 057import ptolemy.kernel.util.Settable; 058import ptolemy.kernel.util.Workspace; 059import ptolemy.util.MessageHandler; 060 061/////////////////////////////////////////////////////////////////// 062//// RealTimeComposite 063 064/** 065 This is a container for another actor that fires that other actor 066 at real times corresponding to the input time stamps. Its 067 ports are those of the contained actor. Given one or more events 068 with time stamp <i>t</i> at the input ports, it queues the events 069 to provide to a firing of the contained actor that is deferred to 070 occur when real time (since start of execution, in seconds) exceeds 071 or matches <i>t</i>. If real time already exceeds <i>t</i>, then the firing 072 may occur immediately. 073 <p> 074 In addition to the parameters of the contained actor, this actor 075 has a <i>delay</i> parameter. The value of this parameter is 076 the minimum delay (in model time) between an input event and 077 an output event that results from that input event. 078 If the enclosed actor produces no output, or if the time 079 of the outputs can be arbitrarily whatever current time 080 is in the model when they are produced, then <i>delay</i> 081 should be set to <i>UNDEFINED</i>. This is the default value. 082 With this value, the enclosed actor is 083 executed in a separate thread. 084 If the firing produces output events, then those are given time 085 stamps equal to the greater of the current model time of the 086 enclosing model and the current real time at which the outputs 087 are produced (in seconds since the start of execution). In 088 this case, the enclosed actor 089 does not regulate in any way the passage of time of the 090 enclosing model, so the time stamps of the enclosing model 091 could get arbitrarily far ahead of real time. 092 <p> 093 If the value of <i>delay</i> is 0.0 (zero), then the inside 094 model is run in the same thread as the enclosing model. 095 When this RealTimeComposite fires, the fire() method stalls 096 until real time matches the current time of the model, and 097 then invokes the enclosed model. If the enclosed model produces 098 any outputs, then those outputs have time stamps equal to the 099 time stamps of the input. Hence, from the perspective of DE 100 semantics, this actor has zero delay, even though it can 101 introduce real-time delay (which is indistinguishable from 102 just taking a long time to evaluate the fire() method). 103 Note that with <i>delay</i> = 0.0, this actor affects the 104 model in way similar to the <i>synchronizeToRealTime</i> 105 parameter of the director, except that only the events 106 provided to this actor are synchronized to real time, rather 107 than all events. 108 <p> 109 If the value of <i>delay</i> is positive, then the inside 110 model is run in a separate thread, just as if the value 111 were UNDEFINED, but in this case, this actor does 112 regulate the passage of time of the enclosing model. 113 In particular, given an event with time stamp <i>t</i> 114 it prevents model time from advancing past <i>t</i> 115 + <i>delay</i> until the firing triggered by the event 116 has completed (which will be at some real time greater 117 than <i>t</i>). Any outputs produced by that firing are 118 assigned time stamps equal to the greater of <i>t</i> 119 + <i>delay</i> and the current real time at which the 120 output is produced. 121 <p> 122 For various reasons, this actor is tricky to use. The most natural 123 domain to use it in is DE, providing it with input events with time 124 stamps that specify when to perform some action, such as an actuator 125 or display action. However, if the DE system is an open-loop system, 126 then model time of the DE system can get very far ahead of the 127 RealTimeComposite. It is helpful to use a feedback loop including 128 this RealTimeComposite to keep the DE model from getting ahead, 129 and to use the <i>delay</i> parameter judiciously as explained 130 above. 131 <p> 132 This actor may also be used in SDF and SR if the <i>period</i> parameter 133 of the director is set to something greater than zero. 134 This actor consumes its inputs and schedules execution in 135 its postfire() method, and hence in SR will behave as a strict 136 actor (all inputs must be known for anything to happen). 137 <p> 138 FIXME: For actors that are triggered by internal calls to fireAt(), 139 it seems that the delay needs to be no larger than the smallest 140 increment between calls to fireAt(). Is this correct? Why? 141 <p> 142 FIXME: If there is a PortParameter, the parameter gets updated when the 143 fire() method of this composite is invoked, which creates a nondeterminate 144 interaction with the deferred execution. See CompositeActor.fire(). 145 146 @author Edward A. Lee 147 @version $Id$ 148 @since Ptolemy II 6.1 149 @deprecated Use {@link ptolemy.actor.lib.hoc.ThreadedComposite} instead 150 @Pt.ProposedRating Yellow (eal) 151 @Pt.AcceptedRating Red (neuendor) 152 @deprecated Use ThreadedComposite instead. 153 */ 154@Deprecated 155public class RealTimeComposite extends MirrorComposite { 156 /** Create an actor with a name and a container. 157 * The container argument must not be null, or a 158 * NullPointerException will be thrown. This actor will use the 159 * workspace of the container for synchronization and version counts. 160 * If the name argument is null, then the name is set to the empty string. 161 * Increment the version of the workspace. 162 * @param container The container actor. 163 * @param name The name of this actor. 164 * @exception IllegalActionException If the container is incompatible 165 * with this actor. 166 * @exception NameDuplicationException If the name coincides with 167 * an actor already in the container. 168 */ 169 public RealTimeComposite(CompositeEntity container, String name) 170 throws IllegalActionException, NameDuplicationException { 171 super(container, name); 172 setClassName("ptolemy.actor.lib.hoc.RealTimeComposite"); 173 new RealTimeDirector(this, "RealTimeDirector"); 174 175 // Hidden parameter defining "UNDEFINED". 176 Parameter UNDEFINED = new Parameter(this, "UNDEFINED"); 177 UNDEFINED.setVisibility(Settable.EXPERT); 178 UNDEFINED.setPersistent(false); 179 UNDEFINED.setExpression("-1.0"); 180 181 delay = new Parameter(this, "delay"); 182 delay.setTypeEquals(BaseType.DOUBLE); 183 delay.setExpression("UNDEFINED"); 184 } 185 186 /////////////////////////////////////////////////////////////////// 187 //// parameters //// 188 189 /** The maximum model-time delay between the input events and the 190 * output events. This is a double that defaults to <i>UNDEFINED</i>. 191 */ 192 public Parameter delay; 193 194 /////////////////////////////////////////////////////////////////// 195 //// public methods //// 196 197 /** React to a change in an attribute. This method is called by 198 * a contained attribute when its value changes. In this base class, 199 * the method does nothing. In derived classes, this method may 200 * throw an exception, indicating that the new attribute value 201 * is invalid. It is up to the caller to restore the attribute 202 * to a valid value if an exception is thrown. 203 * @param attribute The attribute that changed. 204 * @exception IllegalActionException If the change is not acceptable 205 * to this container (not thrown in this base class). 206 */ 207 @Override 208 public void attributeChanged(Attribute attribute) 209 throws IllegalActionException { 210 if (attribute == delay) { 211 _delayValue = ((DoubleToken) delay.getToken()).doubleValue(); 212 } else { 213 super.attributeChanged(attribute); 214 } 215 } 216 217 /** Clone the object into the specified workspace. This overrides 218 * the base class to instantiate a new RealTimeDirector. 219 * @param workspace The workspace for the new object. 220 * @return A new NamedObj. 221 * @exception CloneNotSupportedException If any of the attributes 222 * cannot be cloned. 223 * @see #exportMoML(java.io.Writer, int, String) 224 */ 225 @Override 226 public Object clone(Workspace workspace) throws CloneNotSupportedException { 227 RealTimeComposite result = (RealTimeComposite) super.clone(workspace); 228 try { 229 // Remove the old inner RealTimeDirector(s) that is(are) in the wrong workspace. 230 String realTimeDirectorName = null; 231 Iterator realTimeDirectors = result 232 .attributeList(RealTimeDirector.class).iterator(); 233 while (realTimeDirectors.hasNext()) { 234 RealTimeDirector oldRealTimeDirector = (RealTimeDirector) realTimeDirectors 235 .next(); 236 if (realTimeDirectorName == null) { 237 realTimeDirectorName = oldRealTimeDirector.getName(); 238 } 239 oldRealTimeDirector.setContainer(null); 240 } 241 242 // Create a new RealTimeDirector that is in the right workspace. 243 RealTimeDirector realTimeDirector = result.new RealTimeDirector( 244 workspace); 245 realTimeDirector.setContainer(result); 246 realTimeDirector.setName(realTimeDirectorName); 247 } catch (Throwable throwable) { 248 throw new CloneNotSupportedException( 249 "Could not clone: " + throwable); 250 } 251 return result; 252 } 253 254 /** Invoke iterations on the contained actor of the 255 * container of this director repeatedly until either it runs out 256 * of input data or prefire() returns false. If postfire() of any 257 * actor returns false, then return false. Otherwise, return true. 258 * @return True to allow the thread to continue executing. 259 * @exception IllegalActionException If any called method of 260 * of the contained actor throws it, or if the contained 261 * actor is not opaque. 262 */ 263 public boolean fireContainedActors() throws IllegalActionException { 264 // Don't call "super.fire();" here, this actor contains its 265 // own director. 266 Iterator actors = entityList().iterator(); 267 boolean postfireReturns = true; 268 269 while (actors.hasNext() && !_stopRequested) { 270 Actor actor = (Actor) actors.next(); 271 272 if (!((ComponentEntity) actor).isOpaque()) { 273 throw new IllegalActionException(this, 274 "Inside actor is not opaque " 275 + "(perhaps it needs a director)."); 276 } 277 278 int result = Executable.COMPLETED; 279 280 while (result != Executable.NOT_READY) { 281 if (_debugging) { 282 _debug("Iterating actor: " + actor.getFullName()); 283 } 284 if (_debugging) { 285 _debug("---- Iterating actor in associated thread: " 286 + actor.getFullName()); 287 } 288 result = actor.iterate(1); 289 290 // Should return if there are no more input data, 291 // irrespective of return value of prefire() of 292 // the actor, which is not reliable. 293 boolean outOfData = true; 294 Iterator inPorts = actor.inputPortList().iterator(); 295 296 while (inPorts.hasNext()) { 297 IOPort port = (IOPort) inPorts.next(); 298 299 for (int i = 0; i < port.getWidth(); i++) { 300 if (port.hasToken(i)) { 301 outOfData = false; 302 break; 303 } 304 } 305 } 306 307 if (outOfData) { 308 break; 309 } 310 311 if (result == Executable.STOP_ITERATING) { 312 if (_debugging) { 313 _debug("---- Actor requests halt: " 314 + actor.getFullName()); 315 } 316 postfireReturns = false; 317 break; 318 } 319 } 320 } 321 return postfireReturns; 322 } 323 324 /////////////////////////////////////////////////////////////////// 325 //// private variables //// 326 327 /** The cached value of the <i>delay</i> parameter. */ 328 private double _delayValue = 0.0; 329 330 /** Queue of times at which inside actors have requested firings. 331 * This queue is accessed from multiple threads, so it must be 332 * thread safe. 333 */ 334 private List<Time> _fireAtTimes = Collections 335 .synchronizedList(new LinkedList<Time>()); 336 337 /** Queue of unprocessed input events. 338 */ 339 private DelayQueue<InputFrame> _inputFrames = new DelayQueue<InputFrame>(); 340 341 /** Queue of unprocessed output events. 342 * This queue is accessed from multiple threads, so it must be 343 * thread safe. 344 */ 345 private List<OutputFrame> _outputFrames = Collections 346 .synchronizedList(new LinkedList<OutputFrame>()); 347 348 /** The real time at which the model begins executing, in milliseconds. */ 349 private long _realStartTime = 0; 350 351 /** Queue of times at which responses to firings are expected. 352 * This is accessed only from the Director action methods, which run 353 * in a single thread, so it need not by thread safe. 354 */ 355 private Queue<Time> _responseTimes = new LinkedList<Time>(); 356 357 /////////////////////////////////////////////////////////////////// 358 //// inner classes //// 359 360 /////////////////////////////////////////////////////////////////// 361 //// InputFrame 362 363 /** Bundle of a token and the input port at which it arrived. 364 * Use null for <i>theTokens</i> specifies this frame as a "stop frame" to 365 * flag that no more inputs will be delivered. 366 */ 367 private class InputFrame implements Delayed { 368 369 /** Construct an input frame. 370 * @param theTime The model time of the input events. 371 * @param theTokens The tokens in the input events. 372 */ 373 public InputFrame(Time theTime, List<QueuedToken> theTokens) { 374 tokens = theTokens; 375 time = theTime; 376 } 377 378 public final Time time; 379 380 public final List<QueuedToken> tokens; 381 382 @Override 383 public long getDelay(TimeUnit unit) { 384 // Calculate time to wait. 385 long elapsedTime = System.currentTimeMillis() - _realStartTime; 386 // NOTE: We assume that the elapsed time can be 387 // safely cast to a double. This means that 388 // the DE domain has an upper limit on running 389 // time of Double.MAX_VALUE milliseconds. 390 double elapsedTimeInSeconds = elapsedTime / 1000.0; 391 long timeToWait = (long) (time.subtract(elapsedTimeInSeconds) 392 .getDoubleValue() * 1000.0); 393 return unit.convert(timeToWait, TimeUnit.MILLISECONDS); 394 } 395 396 @Override 397 public int compareTo(Delayed frame) { 398 // NOTE: We assume that only comparisons against instances 399 // of Frame will be done. Is this safe? 400 return time.compareTo(((InputFrame) frame).time); 401 } 402 403 /** Return true if this InputFrame object has the same 404 * time as the given InputFrame object. 405 * @param inputFrame The InputFrame object that this 406 * InputFrame object is compared to. 407 * @return True if the two InputFrame objects have the same time. 408 */ 409 @Override 410 public boolean equals(Object inputFrame) { 411 // See http://www.technofundo.com/tech/java/equalhash.html 412 413 /* FindBugs says that InputFrame "defined 414 * compareTo(Object) and uses Object.equals()" 415 * http://findbugs.sourceforge.net/bugDescriptions.html#EQ_COMPARETO_USE_OBJECT_EQUALS 416 * says: "This class defines a compareTo(...) method but 417 * inherits its equals() method from 418 * java.lang.Object. Generally, the value of compareTo should 419 * return zero if and only if equals returns true. If this is 420 * violated, weird and unpredictable failures will occur in 421 * classes such as PriorityQueue. In Java 5 the 422 * PriorityQueue.remove method uses the compareTo method, 423 * while in Java 6 it uses the equals method. 424 * 425 * From the JavaDoc for the compareTo method in the 426 * Comparable interface: 427 * 428 * It is strongly recommended, but not strictly required that 429 * (x.compareTo(y)==0) == (x.equals(y)). Generally speaking, 430 * any class that implements the Comparable interface and 431 * violates this condition should clearly indicate this 432 * fact. The recommended language is "Note: this class has a 433 * natural ordering that is inconsistent with equals." " 434 */ 435 if (inputFrame == this) { 436 return true; 437 } 438 if (inputFrame == null || inputFrame.getClass() != getClass()) { 439 return false; 440 } else { 441 InputFrame frame = (InputFrame) inputFrame; 442 if (compareTo(frame) == 0 443 && frame.tokens.size() == tokens.size()) { 444 return frame.tokens.equals(tokens); 445 } 446 } 447 return false; 448 } 449 450 /** Return the hash code for the InputFrame object. 451 * @return The hash code for this InputFrame object; 452 */ 453 @Override 454 public int hashCode() { 455 // See http://www.technofundo.com/tech/java/equalhash.html 456 int hashCode = 7; 457 if (time != null) { 458 hashCode = 31 * hashCode + time.hashCode(); 459 } 460 if (tokens != null) { 461 hashCode = 31 * hashCode + tokens.hashCode(); 462 } 463 return hashCode; 464 } 465 } 466 467 /////////////////////////////////////////////////////////////////// 468 //// QueuedToken 469 470 /** Bundle of a token and the input port and channel 471 * at which it arrived. 472 */ 473 private static class QueuedToken { 474 475 // FindBugs suggests making this class static so as to decrease 476 // the size of instances and avoid dangling references. 477 478 public QueuedToken(IOPort thePort, int theChannel, Token theToken) { 479 token = theToken; 480 channel = theChannel; 481 port = thePort; 482 } 483 484 public final int channel; 485 486 public final Token token; 487 488 public final IOPort port; 489 490 @Override 491 public String toString() { 492 return "token " + token + " for port " + port.getFullName() + "(" 493 + channel + ")"; 494 } 495 } 496 497 /////////////////////////////////////////////////////////////////// 498 //// OutputFrame 499 500 /** Bundle of a token and the output port at which it arrived. 501 */ 502 private static class OutputFrame { 503 504 // FindBugs suggests making this class static so as to decrease 505 // the size of instances and avoid dangling references. 506 507 /** Construct an output frame. 508 * @param theTime The model time of the output events. 509 * @param theTokens The tokens in the output events. 510 */ 511 public OutputFrame(Time theTime, List<QueuedToken> theTokens) { 512 tokens = theTokens; 513 time = theTime; 514 } 515 516 public final Time time; 517 518 public final List<QueuedToken> tokens; 519 } 520 521 /////////////////////////////////////////////////////////////////// 522 //// RealTimeDirector 523 524 /** This is a specialized director that defers firing of the 525 * contained actors until real-time matches the time stamp of 526 * provided inputs. It does this in a separate thread that 527 * blocks until the times match, then transfers the input tokens 528 * that arrived with that time stamp and fires the contained actors 529 * in the order in which they appear in the actor list repeatedly 530 * until either there is no more input data for the actor or 531 * the prefire() method of the actor returns false. If postfire() 532 * of any actor returns false, then postfire() of this director 533 * will return false, requesting a halt to execution of the model. 534 */ 535 private class RealTimeDirector extends Director { 536 /** Create a new instance of the director for RealTimeComposite. 537 * @param container The container for the director. 538 * @param name The name of the director. 539 * @exception IllegalActionException Not thrown in this base class. 540 * @exception NameDuplicationException Not thrown in this base class. 541 */ 542 public RealTimeDirector(CompositeEntity container, String name) 543 throws IllegalActionException, NameDuplicationException { 544 super(container, name); 545 setPersistent(false); 546 } 547 548 /** Construct a RealTimeDirector in the specified workspace with 549 * no container and an empty string as a name. You can then change 550 * the name with setName(). If the workspace argument is null, then 551 * use the default workspace. You should set the local director or 552 * executive director before attempting to send data to the actor 553 * or to execute it. Add the actor to the workspace directory. 554 * Increment the version number of the workspace. 555 * @param workspace The workspace that will list the actor. 556 * @exception IllegalActionException If the container is incompatible 557 * with this actor. 558 * @exception NameDuplicationException If the name coincides with 559 * an actor already in the container. 560 */ 561 public RealTimeDirector(Workspace workspace) 562 throws IllegalActionException, NameDuplicationException { 563 super(workspace); 564 setPersistent(false); 565 } 566 567 /** If current model time of the environment matches the time at which outputs 568 * that have been queued should be produced, then produce them. 569 * Yield to other threads. 570 * @exception IllegalActionException If production of an output 571 * fails (e.g. type error). 572 */ 573 @Override 574 public void fire() throws IllegalActionException { 575 if (_realStartTime < 0L) { 576 _realStartTime = System.currentTimeMillis(); 577 } 578 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 579 .getModelTime(); 580 if (_delayValue == 0.0) { 581 // Delay is zero, so wait until current time matches 582 // model time, and then treat this as an ordinary composite actor. 583 long realTimeMillis = System.currentTimeMillis() 584 - _realStartTime; 585 long modelTimeMillis = Math 586 .round(environmentTime.getDoubleValue() * 1000.0); 587 if (realTimeMillis < modelTimeMillis) { 588 try { 589 Thread.sleep(modelTimeMillis - realTimeMillis); 590 } catch (InterruptedException e) { 591 // Ignore and continue. 592 } 593 } 594 // FIXME: This isn't quite right, since this will postfire() 595 // contained actors. 596 super.fire(); 597 } else { 598 // Delay is either UNDEFINED or positive, 599 // so we are running in separate thread. 600 601 // If the delay value is positive, then we may need 602 // to stall to prevent model time from getting too 603 // far ahead of real time. 604 if (_delayValue > 0.0) { 605 // Delay value is positive. If current time matches 606 // the time at the head of the _responseTime queue, 607 // then stall until real time matches that time. 608 // Note that there is no harm in consuming the 609 // head of the queue since the side effect here 610 // is the passage of real time. 611 Time responseTime = _responseTimes.peek(); 612 if (responseTime != null 613 && responseTime.equals(environmentTime)) { 614 615 // FIXME: Findbugs says that the next line: 616 // "ignores return value of java.util.Queue.poll()" 617 _responseTimes.poll(); 618 // Time matches. Compare to real time. 619 long realTimeMillis = System.currentTimeMillis() 620 - _realStartTime; 621 long modelTimeMillis = Math.round( 622 environmentTime.getDoubleValue() * 1000.0); 623 if (realTimeMillis < modelTimeMillis) { 624 try { 625 Thread.sleep(modelTimeMillis - realTimeMillis); 626 } catch (InterruptedException e) { 627 // Ignore and continue. 628 } 629 } 630 } 631 } 632 633 // Next check for outputs to produce. 634 if (_outputFrames.size() > 0) { 635 OutputFrame frame = _outputFrames.get(0); 636 if (frame.time.equals(environmentTime)) { 637 // Current time matches the time of the first frame on 638 // the output queue. 639 // Produce the outputs on the frame. 640 for (QueuedToken token : frame.tokens) { 641 if (token.channel < token.port.getWidth()) { 642 token.port.send(token.channel, token.token); 643 } 644 } 645 } 646 } 647 Thread.yield(); 648 } 649 } 650 651 /** Delegate by calling fireAt() on the director of the container's 652 * container. 653 * @param actor The actor requesting firing. 654 * @param time The time at which to fire. 655 * @param microstep The microstep. 656 * @return The time at which the actor passed as an argument 657 * will be fired. 658 */ 659 @Override 660 public Time fireAt(Actor actor, Time time, int microstep) 661 throws IllegalActionException { 662 Time result = time; 663 Director director = RealTimeComposite.this.getExecutiveDirector(); 664 if (director != null) { 665 if (RealTimeComposite.this._debugging) { 666 RealTimeComposite.this 667 ._debug("---- Actor requests firing at time " + time 668 + ": " + actor.getFullName()); 669 } 670 result = director.fireAt(RealTimeComposite.this, time, 671 microstep); 672 } 673 if (actor != RealTimeComposite.this) { 674 // The fireAt() request is coming from the inside, so 675 // when the firing occurs, we want to post an input 676 // frame (even if there are no input events) for 677 // the associated thread. 678 _fireAtTimes.add(time); 679 } 680 return result; 681 } 682 683 /** Fire the specified actor at the first opportunity 684 * and then pass the request up to the executive director. 685 * When passing it up, request a firing at the greater of 686 * the current time of that director or the elapsed real 687 * time since the start of the model. 688 * This is useful for actors that spontaneously produce output, 689 * e.g. from sensor data or from completion of some previously 690 * started task. The firing of the actor will produce the output, 691 * sending it to the inside of the output ports of this composite, 692 * and then the firing of the composite will transfer those tokens 693 * to the outside model. 694 * @param actor The actor requesting firing (ignored). 695 */ 696 @Override 697 public Time fireAtCurrentTime(Actor actor) 698 throws IllegalActionException { 699 // Coverity Scan reports that this method does not call 700 // super.fireAtCurrentTime(), which is ok because we call 701 // it on the executive director. 702 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 703 .getModelTime(); 704 _inputFrames.put(new InputFrame(environmentTime, 705 new LinkedList<QueuedToken>())); 706 Director director = RealTimeComposite.this.getExecutiveDirector(); 707 if (director != null) { 708 // We assume that the contained actors mean "real time" by 709 // "current time". Hopefully, this will be in the future w.r.t. model time. 710 // Use fireAt() hoping that the director will not increment time 711 // too soon. 712 // FIXME: This is not right! 713 Time time = new Time(this, 714 (System.currentTimeMillis() - _realStartTime) / 1000.0); 715 if (RealTimeComposite.this._debugging) { 716 RealTimeComposite.this._debug( 717 "----- fireAtCurrentTime() request by actor " 718 + actor.getFullName() + ". Model time is " 719 + environmentTime + ", and real time is " 720 + time); 721 } 722 director.fireAt(RealTimeComposite.this, time); 723 return time; 724 } 725 return environmentTime; 726 } 727 728 /** Return the current time of the enclosing actor if the delay 729 * is zero. Otherwise, get the local notion of current time. 730 * @return The current time. 731 */ 732 @Override 733 public Time getModelTime() { 734 if (_delayValue == 0.0) { 735 return ((Actor) getContainer()).getExecutiveDirector() 736 .getModelTime(); 737 } else { 738 return super.getModelTime(); 739 } 740 } 741 742 /** Start the associated thread. 743 * @exception IllegalActionException If the initialize() method of 744 * one of the associated actors throws it. 745 */ 746 @Override 747 public void initialize() throws IllegalActionException { 748 // The following must be done before the initialize() methods 749 // of the actors is called because those methods may call fireAt(). 750 _fireAtTimes.clear(); 751 752 // The superclass will initialize all the actors. 753 super.initialize(); 754 // Set a flag indicating that the first firing should 755 // initialize the _realStartTime variable. This is done 756 // in the first firing to be as late as possible, so 757 // that startup transients are minimized. 758 // FIXME: This will impede synchronization with other 759 // actors, since there won't be a common time base. 760 _realStartTime = -1L; 761 if (_delayValue != 0) { 762 // We will be executing in a new thread. 763 // Create and start that thread. 764 _inputFrames.clear(); 765 _outputFrames.clear(); 766 _responseTimes.clear(); 767 _thread = new RealTimeThread(); 768 _thread.setPriority(Thread.MAX_PRIORITY); 769 _thread.start(); 770 } 771 } 772 773 /** Return a new instance of QueueReceiver. 774 * @return A new instance of QueueReceiver. 775 * @see QueueReceiver 776 */ 777 @Override 778 public Receiver newReceiver() { 779 return new QueueReceiver(); 780 } 781 782 /** Clear the list of input events for this iteration and return true 783 * if the associated thread is alive, if <i>delay</i> is not 0.0. 784 * Otherwise, return true. 785 * @return True if the associated thread is still alive, or true 786 * if delay == 0.0. 787 */ 788 @Override 789 public boolean prefire() throws IllegalActionException { 790 // Do not call super.prefire()! 791 // Superclass aligns current time to that of the container. 792 // The notion of current time presented to these actors 793 // should match that of the frame. 794 // super.prefire(); 795 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 796 .getModelTime(); 797 if (RealTimeComposite.this._debugging) { 798 RealTimeComposite.this 799 ._debug("----- Current environment time is: " 800 + environmentTime); 801 } 802 803 if (_delayValue != 0) { 804 // Have to create a new list because the previous list may 805 // not have been consumed yet. 806 _inputTokens = new LinkedList<QueuedToken>(); 807 return _thread.isAlive(); 808 } else { 809 return true; 810 } 811 } 812 813 /** Send all the collected tokens to the queue for consumption 814 * by the associated thread, if there is an associated thread. 815 * Otherwise, just invoke the superclass postfire(). 816 * @return True if the associated thread is still alive. 817 */ 818 @Override 819 public boolean postfire() throws IllegalActionException { 820 boolean result = super.postfire(); 821 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 822 .getModelTime(); 823 if (_delayValue != 0) { 824 // Delay is either UNDEFINED or positive. 825 // Post the inputs for consumption in the 826 // associated thread. 827 if (_inputTokens.size() > 0) { 828 if (RealTimeComposite.this._debugging) { 829 RealTimeComposite.this._debug( 830 "Queueing input tokens for the associated thread: " 831 + _inputTokens.toString() 832 + " to be processed at time " 833 + environmentTime); 834 } 835 _inputFrames 836 .put(new InputFrame(environmentTime, _inputTokens)); 837 if (_delayValue > 0.0) { 838 // Delay value is positive. Schedule a firing 839 // at current time plus the delay. 840 Time responseTime = environmentTime.add(_delayValue); 841 fireAt(RealTimeComposite.this, responseTime); 842 843 // Queue an indicator to stall when that firing occurs. 844 _responseTimes.add(responseTime); 845 } 846 } 847 // Even if _inputTokens is null, we still want to post an 848 // event if the firing is due to a call to fireAt() from the inside. 849 // Check to see whether that is the case. 850 if (_fireAtTimes.size() > 0) { 851 Time fireAtTime = _fireAtTimes.get(0); 852 if (fireAtTime.equals(environmentTime)) { 853 // Remove the time from the queue. 854 _fireAtTimes.remove(0); 855 // Queue an iteration even if there are no inputs. 856 if (_inputTokens.size() == 0) { 857 if (RealTimeComposite.this._debugging) { 858 RealTimeComposite.this._debug( 859 "Queueing pure event for the associated thread, " 860 + " to be processed at time " 861 + environmentTime); 862 } 863 _inputFrames.put(new InputFrame(environmentTime, 864 _inputTokens)); 865 if (_delayValue > 0.0) { 866 // Delay value is positive. Schedule a firing 867 // at current time plus the delay. 868 Time responseTime = environmentTime 869 .add(_delayValue); 870 fireAt(RealTimeComposite.this, responseTime); 871 872 // Queue an indicator to stall when that firing occurs. 873 _responseTimes.add(responseTime); 874 } 875 } 876 } 877 } 878 879 // If current time matches the time at the head of 880 // of the queue for outputs, then consume the data on the 881 // head of the queue. Those data were sent to the output 882 // in the fire() method. 883 if (_outputFrames.size() > 0) { 884 OutputFrame frame = _outputFrames.get(0); 885 if (frame.time.equals(environmentTime)) { 886 // Consume the outputs on the frame, which will have 887 // been sent in the fire() method. 888 _outputFrames.remove(0); 889 } 890 } 891 result = _thread.isAlive(); 892 } 893 return result; 894 } 895 896 /** Override the base class to post a "stop frame" on the queue 897 * if there is an associated thread. 898 */ 899 @Override 900 public void stop() { 901 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 902 .getModelTime(); 903 if (_delayValue != 0) { 904 if (RealTimeComposite.this._debugging) { 905 RealTimeComposite.this._debug( 906 "Queueing a stop-frame token for the associated thread with time: " 907 + environmentTime); 908 } 909 // A "stop frame" has a null token list. 910 _inputFrames.put(new InputFrame(environmentTime, null)); 911 } else { 912 super.stop(); 913 } 914 } 915 916 /** Record data from the specified input port 917 * for transfer to the queue used to communicate these data to the 918 * associated thread. 919 * @param port The port to transfer tokens from. 920 * @return True if at least one data token is transferred. 921 * @exception IllegalActionException If reading the inputs fails. 922 */ 923 @Override 924 public boolean transferInputs(IOPort port) 925 throws IllegalActionException { 926 if (_delayValue == 0) { 927 return super.transferInputs(port); 928 } 929 boolean result = false; 930 931 for (int i = 0; i < port.getWidth(); i++) { 932 try { 933 if (port.isKnown(i)) { 934 if (port.hasToken(i)) { 935 Token token = port.get(i); 936 _inputTokens.add(new QueuedToken(port, i, token)); 937 if (RealTimeComposite.this._debugging) { 938 RealTimeComposite.this._debug(getName(), 939 "transferring input from " 940 + port.getName()); 941 } 942 result = true; 943 } 944 } 945 } catch (NoTokenException ex) { 946 // this shouldn't happen. 947 throw new InternalErrorException(this, ex, null); 948 } 949 } 950 return result; 951 } 952 953 /** If real time is less than or equal to the current model time 954 * of the environment, then produce the outputs immediately at the 955 * current model time. Otherwise, collect them and queue them to 956 * be produced by the fire method when model time matches the 957 * current real time, and call fireAt() to request a firing 958 * at that time. 959 * @param port The port to transfer tokens from. 960 * @return True if at least one data token is produced now. 961 * @exception IllegalActionException If reading the inputs fails. 962 */ 963 @Override 964 public boolean transferOutputs(IOPort port) 965 throws IllegalActionException { 966 if (_delayValue == 0) { 967 return super.transferOutputs(port); 968 } 969 // Compare against the environment time. 970 Time environmentTime = RealTimeComposite.this.getExecutiveDirector() 971 .getModelTime(); 972 double realTimeInSeconds = (System.currentTimeMillis() 973 - _realStartTime) / 1000.0; 974 if (environmentTime.getDoubleValue() >= realTimeInSeconds) { 975 return super.transferOutputs(port); 976 } else { 977 // The current real time is greater than the current 978 // model time of the environment. Schedule the production 979 // of outputs at the real time. 980 environmentTime = new Time(this, realTimeInSeconds); 981 LinkedList<QueuedToken> outputTokens = new LinkedList<QueuedToken>(); 982 for (int i = 0; i < port.getWidth(); i++) { 983 try { 984 if (port.isKnownInside(i)) { 985 if (port.hasTokenInside(i)) { 986 Token token = port.getInside(i); 987 outputTokens 988 .add(new QueuedToken(port, i, token)); 989 if (RealTimeComposite.this._debugging) { 990 RealTimeComposite.this._debug(getName(), 991 "transferring output from " 992 + port.getName() 993 + " with value " + token); 994 } 995 } 996 } 997 } catch (NoTokenException ex) { 998 // this shouldn't happen. 999 throw new InternalErrorException(this, ex, null); 1000 } 1001 } 1002 if (outputTokens.size() > 0) { 1003 OutputFrame frame = new OutputFrame(environmentTime, 1004 outputTokens); 1005 _outputFrames.add(frame); 1006 // Request a firing to actually transfer the outputs to 1007 // the outside. 1008 fireAt(RealTimeComposite.this, environmentTime); 1009 } 1010 return false; 1011 } 1012 } 1013 1014 /** Override the base class to wait until the associated thread 1015 * terminates and then call super.wrapup(). 1016 * @exception IllegalActionException If the wrapup() method of 1017 * one of the associated actors throws it. 1018 */ 1019 @Override 1020 public void wrapup() throws IllegalActionException { 1021 if (_delayValue != 0) { 1022 // First, post a "stop frame" in case one has not been posted. 1023 // In the case of a finite run, one will likely have not been posted. 1024 Time environmentTime = RealTimeComposite.this 1025 .getExecutiveDirector().getModelTime(); 1026 if (RealTimeComposite.this._debugging) { 1027 RealTimeComposite.this._debug( 1028 "Queueing a stop-frame token for the associated thread with time: " 1029 + environmentTime); 1030 } 1031 // A "stop frame" has a null token list. 1032 _inputFrames.put(new InputFrame(environmentTime, null)); 1033 try { 1034 if (RealTimeComposite.this._debugging) { 1035 RealTimeComposite.this._debug( 1036 "Waiting for associated thread to stop."); 1037 } 1038 _thread.join(); 1039 if (RealTimeComposite.this._debugging) { 1040 RealTimeComposite.this 1041 ._debug("Associated thread has stopped."); 1042 } 1043 } catch (InterruptedException e) { 1044 // Ignore. 1045 } 1046 } 1047 super.wrapup(); 1048 } 1049 1050 ////////////////////////////////////////////////////////////// 1051 //// private variables //// 1052 1053 /** List of input events in the current iteration. */ 1054 private List<QueuedToken> _inputTokens; 1055 1056 /** The thread that executes the contained actors. */ 1057 private Thread _thread; 1058 1059 ////////////////////////////////////////////////////////////// 1060 //// inner inner classes //// 1061 1062 /////////////////////////////////////////////////////////////////// 1063 //// RealTimeThread 1064 1065 /** This the thread that executed the actors. 1066 */ 1067 private class RealTimeThread extends Thread { 1068 public RealTimeThread() { 1069 super("RealTimeThread"); 1070 } 1071 1072 @Override 1073 public void run() { 1074 while (!_stopRequested) { 1075 try { 1076 if (RealTimeComposite.this._debugging) { 1077 RealTimeComposite.this._debug( 1078 "---- Waiting for inputs in the associated thread."); 1079 } 1080 InputFrame frame = _inputFrames.take(); 1081 if (frame.tokens == null) { 1082 // Recognize a "stop frame" and exit the thread. 1083 if (RealTimeComposite.this._debugging) { 1084 RealTimeComposite.this._debug( 1085 "---- Read a stop frame in associated thread."); 1086 } 1087 break; 1088 } 1089 if (RealTimeComposite.this._debugging) { 1090 RealTimeComposite.this._debug( 1091 "---- Reading input tokens in associated thread with time " 1092 + frame.time + " and value " 1093 + frame.tokens); 1094 } 1095 // Current time of the director should match the frame time. 1096 // This is the view of time that should be presented to any inside actors. 1097 localClock.setLocalTime(frame.time); 1098 1099 // Note that there may not be any tokens here, since there 1100 // may not be any inputs. We still want to iterate the 1101 // enclosed model at the specified time because the firing 1102 // is due to the model itself having previously called 1103 // fireAt(). 1104 for (QueuedToken token : frame.tokens) { 1105 if (token.channel < token.port.getWidthInside()) { 1106 token.port.sendInside(token.channel, 1107 token.token); 1108 } 1109 } 1110 boolean postfireReturnsTrue = fireContainedActors(); 1111 // If outputs are produced by the firing, then 1112 // we need to trigger a transferOutputs() call. 1113 // Note that this does not have to be done if the delay 1114 // is 0.0, since it will be done by the superclass. 1115 if (_delayValue != 0.0) { 1116 Iterator ports = outputPortList().iterator(); 1117 while (ports.hasNext()) { 1118 IOPort port = (IOPort) ports.next(); 1119 boolean hasOutputs = false; 1120 for (int i = 0; i < port.getWidth(); i++) { 1121 if (port.isKnownInside(i) 1122 && port.hasTokenInside(i)) { 1123 hasOutputs = true; 1124 } 1125 } 1126 if (hasOutputs) { 1127 transferOutputs(port); 1128 } 1129 } 1130 } 1131 if (!postfireReturnsTrue) { 1132 // postfire() of the contained actors returns false. 1133 break; 1134 } 1135 } catch (InterruptedException e) { 1136 // Exit the thread. 1137 break; 1138 } catch (IllegalActionException ex) { 1139 MessageHandler.error("Error in real-time thread.", ex); 1140 } 1141 } 1142 } 1143 } 1144 } 1145}