001/* An actor that executes a contained actor in separate thread. 002 003 Copyright (c) 2007-2018 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.actor.lib.hoc; 029 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedList; 034import java.util.List; 035import java.util.Queue; 036import java.util.Set; 037 038import ptolemy.actor.Actor; 039import ptolemy.actor.Director; 040import ptolemy.actor.Executable; 041import ptolemy.actor.IOPort; 042import ptolemy.actor.NoTokenException; 043import ptolemy.actor.QueueReceiver; 044import ptolemy.actor.Receiver; 045import ptolemy.actor.util.BreakCausalityInterface; 046import ptolemy.actor.util.CausalityInterface; 047import ptolemy.actor.util.Time; 048import ptolemy.data.BooleanToken; 049import ptolemy.data.DoubleToken; 050import ptolemy.data.Token; 051import ptolemy.data.expr.Parameter; 052import ptolemy.data.type.BaseType; 053import ptolemy.kernel.ComponentEntity; 054import ptolemy.kernel.CompositeEntity; 055import ptolemy.kernel.util.Attribute; 056import ptolemy.kernel.util.IllegalActionException; 057import ptolemy.kernel.util.InternalErrorException; 058import ptolemy.kernel.util.NameDuplicationException; 059import ptolemy.kernel.util.Settable; 060import ptolemy.kernel.util.Workspace; 061 062/////////////////////////////////////////////////////////////////// 063//// ThreadedComposite 064 065/** 066 A container for another actor that executes that other actor 067 in a separate thread called the <i>inside thread</i>. 068 This actor starts that thread in its initialize() 069 method, which is invoked by its executive director (the director 070 in charge of firing this actor). The thread that invokes the 071 action methods of this actor 072 (initialize(), prefire(), fire(), postfire(), and wrapup()) 073 is called the <i>director thread</i>. 074 075 <p> A paper describing the use of this actor is found at 076 <a href="http://www.eecs.berkeley.edu/Pubs/TechRpts/2008/EECS-2008-151.html#in_browser">http://www.eecs.berkeley.edu/Pubs/TechRpts/2008/EECS-2008-151.html</a>.</p> 077 078 <p> To use this actor in Vergil, drag a ThreadedComposite on to the 079 canvas and then drag the actor to be contained on to the 080 ThreadedComposite actor.</p> 081 082 <p> This actor automatically creates input and output ports to 083 match those of the inside actor. Input events provided at those 084 input ports are provided as input events to the contained actor. 085 Outputs provided by the contained actor become output events 086 of this actor. The time stamp of the input events is provided 087 by the container of this actor. The time stamp of the output events 088 depends on the <i>delay</i> parameter, as explained below.</p> 089 090 <p> 091 The inside thread blocks waiting for inputs or pure events. 092 Inputs are provided to that thread when the fire() method of 093 this actor is invoked by the director thread. 094 Pure events are provided after fireAt(), 095 fireAtCurrentTime(), or fireAtFirstValidTimeAfter() are called 096 by either the inside thread or the director thread. 097 When the time of those firing requests becomes current time, 098 the container will (presumably) fire this actor, and 099 this actor will provide a pure event to the inside thread, 100 causing it to fire the contained actor. 101 <p> 102 If the <i>synchronizeToRealTime</i> parameter is true, then 103 when the inside thread encounters an input or pure event 104 with time stamp <i>t</i>, it stalls until real time matches 105 or exceeds <i>t</i> (measured in seconds since the start of 106 execution of the inside thread). In contrast for example 107 to the <i>synchronizeToRealTime</i> parameter of the DEDirector, 108 this enables construction of models where only a portion of the 109 model synchronizes to real time. 110 <p> 111 When the wrapup() method of this actor is called, the inside thread is 112 provided with signal to terminate rather than to process additional 113 inputs. The inside thread will also exit if stop() is called on this 114 actor; however, in this case, which iterations are completed 115 is nondeterminate (there may be inputs left unprocessed). 116 If any inside actor return false from postfire(), then the 117 inside thread will also terminate and this actor will return false 118 from postfire. 119 <p> 120 The parameters of this actor include all the parameters of the 121 contained actor, and setting those parameters automatically 122 sets the parameters of the contained actor. 123 <p> 124 In addition to the parameters of the contained actor, this actor 125 has a <i>delay</i> parameter. This parameter is a double that 126 be any nonnegative value or the special value <i>UNDEFINED</i>. 127 If it is given a nonnegative value, then the value specifies 128 the model-time delay between input events and the output 129 events that result from reacting to those input events. 130 That is, if this actor is given an input event with time 131 stamp <i>t</i>, then if the contained actor produces any output 132 events in reaction to that event, those output events will be 133 produced by this actor with time stamp <i>t</i> + <i>delay</i>. 134 <p> 135 If <i>delay</i> has value <i>UNDEFINED</i>, then 136 outputs are produced at the current model time of the executive 137 director when the inside thread happens to produce those events, 138 or if <i>synchronizeToRealTime</i>, at the greater of current 139 model time and current real time (measured in seconds since 140 the start of execution). 141 This is accomplished by the inside thread calling 142 fireAtFirstValidTimeAfter() of the enclosing director, and 143 then producing the outputs when the requested firing occurs 144 in the director thread. Note that with this value of the 145 <i>delay</i>, it is possible for the inside thread to 146 continue to execute and respond to input events after 147 the wrapup phase of the director thread has been entered. 148 The wrapup phase will stall until the inside thread has 149 completed its processing of its inputs, but any outputs 150 it produces after the wrapup phase has started will be 151 discarded. 152 <p> 153 The most common use of this actor is in the DE domain, 154 although it can also be used in CT, SR, SDF, and other domains, 155 with some care. See the above referenced memo. 156 Regardless of the value of <i>delay</i>, this actor is treated 157 by DE as introducing a delay, much like the TimedDelay actor. 158 In fact, if <i>delay</i> is 0.0, there will be a one tick delay 159 in superdense time, just as with the TimedDelay actor. 160 If the inside model also has a time delay (e.g. if you 161 put a TimedDelay actor inside a ThreadedComposite), then 162 the total delay is the sum of the two delays. 163 <p> 164 <b>Discussion:</b> 165 <p> 166 There are several useful things you can do with this model. 167 We describe some use cases here: 168 <p> 169 <i>Background execution.</i> When <i>delay</i> is greater than 170 or equal to 0.0, 171 then when this actor is fired in response to input events 172 with time stamp <i>t</i>, the actual 173 processing of those events occurs later in a separate thread. The 174 director thread is not blocked, and can continue to process events 175 with time stamps less than or equal to <i>t</i> + <i>delay</i>. 176 The director thread is blocked from processing events with larger 177 time stamps than that because this is necessary to preserve DE 178 semantics. To implement this, this actor uses fireAt() to 179 request a firing at time <i>t</i> + <i>delay</i>, and when that 180 firing occurs, it blocks the director thread until the reaction 181 is complete. 182 <p> 183 <i>Parallel firing.</i> Note that if <i>delay</i> is set to 0.0, 184 it may seem that there is no point in using this actor, since 185 model time will not be allowed to increase past <i>t</i> until 186 the contained actor has reacted to events with time stamp <i>t</i>. 187 However, there is actually exploitable concurrency if there 188 are other actors in the model that also have pending input 189 events with time stamp <i>t</i>. Those event can be processed 190 concurrently with this actor reacting to its input event. 191 A typical use case will broadcast an event to several instances 192 of ThreadedComposite, in which case each of those several 193 inside threads can execute concurrently in reaction to those 194 input events. 195 <p> 196 <i>Real-time source.</i> If the contained actor (and hence this 197 actor) has no inputs and <i>synchronizeToRealTime</i> is true, then 198 the contained actor must call fireAt() or one of its variants so that 199 the inside thread will be provided with pure events. 200 The behavior depends on which variant of the fireAt() methods is used 201 by the inside actor. There are three cases: 202 FIXME: Described these. In particular, delay needs to specify the 203 minimum increment between these or fireAt() could result in an 204 exception. Do we want a parameter to relax that? 205 <p> 206 On subtlety of this actor is that it cannot expose instances of ParameterPort 207 without introducing nondeterminacy in the execution. A ParameterPort 208 is an input port that sets the value of a parameter with the same name. Upon receiving 209 a token at such a port, if this actor were to set a parameter visible by the 210 inside thread, there is no assurance that the inside thread is not still 211 executing an earlier iteration. Thus, it could appear to be sending a message 212 backward in time, which would be truly bizarre. To prevent this error, 213 this actor does not mirror such ports, and hence they appear on the outside 214 only as parameters. 215 216 @author Edward A. Lee 217 @version $Id$ 218 @since Ptolemy II 8.0 219 @Pt.ProposedRating Yellow (eal) 220 @Pt.AcceptedRating Red (eal) 221 */ 222public class ThreadedComposite extends MirrorComposite { 223 /** Create an actor with a name and a container. 224 * The container argument must not be null, or a 225 * NullPointerException will be thrown. This actor will use the 226 * workspace of the container for synchronization and version counts. 227 * If the name argument is null, then the name is set to the empty string. 228 * Increment the version of the workspace. 229 * @param container The container actor. 230 * @param name The name of this actor. 231 * @exception IllegalActionException If the container is incompatible 232 * with this actor. 233 * @exception NameDuplicationException If the name coincides with 234 * an actor already in the container. 235 */ 236 public ThreadedComposite(CompositeEntity container, String name) 237 throws IllegalActionException, NameDuplicationException { 238 // The false argument specifies that instances of ParameterPort 239 // should not be mirrored. This would make the behavior nondeterminate, 240 // so we expose these only as parameters. 241 super(container, name, false); 242 setClassName("ptolemy.actor.lib.hoc.ThreadedComposite"); 243 244 // Create the ThreadedDirector in the proper workspace. 245 ThreadedDirector threadedDirector = new ThreadedDirector(workspace()); 246 threadedDirector.setContainer(this); 247 threadedDirector.setName(uniqueName("ThreadedDirector")); 248 249 // Hidden parameter defining "UNDEFINED". 250 Parameter UNDEFINED = new Parameter(this, "UNDEFINED"); 251 UNDEFINED.setVisibility(Settable.EXPERT); 252 UNDEFINED.setPersistent(false); 253 UNDEFINED.setExpression("-1.0"); 254 255 delay = new Parameter(this, "delay"); 256 delay.setTypeEquals(BaseType.DOUBLE); 257 delay.setExpression("0.0"); 258 259 synchronizeToRealTime = new Parameter(this, "synchronizeToRealTime"); 260 synchronizeToRealTime.setTypeEquals(BaseType.BOOLEAN); 261 synchronizeToRealTime.setExpression("false"); 262 } 263 264 /////////////////////////////////////////////////////////////////// 265 //// parameters //// 266 267 /** The model-time delay between the input events and the 268 * output events. This is a double that defaults to 0.0, 269 * indicating that outputs should have the same time stamps 270 * as the inputs that trigger them. If it has a value greater 271 * than zero, then the outputs will have larger time stamps 272 * by that amount. If it has the value <i>UNDEFINED</i> 273 * (or any negative number), then the output time stamp 274 * will be nondeterminate, and will depend on the current 275 * model time of the outside director when the output is 276 * produced or on current real time. 277 */ 278 public Parameter delay; 279 280 /** If set to true, the inside thread stalls until real time matches 281 * the time stamps of input events or pure events for each firing. 282 * In addition, if <i>delay</i> is set to undefined and this is set 283 * to true, then output events are assigned a time stamp that is the 284 * greater of current model time and real time. 285 * Time is measured since the start of the execution of the inside 286 * thread. This is a boolean that defaults to false. Changing 287 * the value of this parameter has no effect until the next 288 * execution of the model. 289 */ 290 public Parameter synchronizeToRealTime; 291 292 /////////////////////////////////////////////////////////////////// 293 //// public methods //// 294 295 /** React to a change in an attribute. This method is called by 296 * a contained attribute when its value changes. In this base class, 297 * the method does nothing. In derived classes, this method may 298 * throw an exception, indicating that the new attribute value 299 * is invalid. It is up to the caller to restore the attribute 300 * to a valid value if an exception is thrown. 301 * @param attribute The attribute that changed. 302 * @exception IllegalActionException If the change is not acceptable 303 * to this container (not thrown in this base class). 304 */ 305 @Override 306 public void attributeChanged(Attribute attribute) 307 throws IllegalActionException { 308 if (attribute == delay) { 309 _delayValue = ((DoubleToken) delay.getToken()).doubleValue(); 310 } else { 311 super.attributeChanged(attribute); 312 } 313 } 314 315 /** Clone the actor into the specified workspace. 316 * @param workspace The workspace for the new object. 317 * @return A new actor. 318 * @exception CloneNotSupportedException If a derived class has 319 * has an attribute that cannot be cloned. 320 */ 321 @Override 322 public Object clone(Workspace workspace) throws CloneNotSupportedException { 323 ThreadedComposite newObject = (ThreadedComposite) super.clone( 324 workspace); 325 try { 326 // Remove the old inner ThreadedDirector that is in the wrong workspace. 327 List iterateDirectors = newObject 328 .attributeList(ThreadedDirector.class); 329 ThreadedDirector oldThreadedDirector = (ThreadedDirector) iterateDirectors 330 .get(0); 331 String threadedDirectorName = oldThreadedDirector.getName(); 332 oldThreadedDirector.setContainer(null); 333 334 // Create a new ThreadedDirector that is in the right workspace. 335 ThreadedDirector iterateDirector = newObject.new ThreadedDirector( 336 workspace); 337 iterateDirector.setContainer(newObject); 338 iterateDirector.setName(threadedDirectorName); 339 } catch (Throwable throwable) { 340 throw new CloneNotSupportedException( 341 "Could not clone: " + throwable); 342 } 343 344 newObject._causalityInterface = null; 345 newObject._realStartTime = 0L; 346 return newObject; 347 } 348 349 /** Override the base class to return a causality interface that 350 * indicates that the output does not depend (immediately) on 351 * the input. This method assumes that the director deals with BooleanDependencies 352 * and returns an instance of BreakCausalityInterface. 353 * @return A representation of the dependencies between input ports 354 * and output ports. 355 */ 356 @Override 357 public CausalityInterface getCausalityInterface() { 358 // FIXME: This will not work property with Ptides because it will effectively 359 // declare that the delay from input to output is infinite, which it is not. 360 // What we want is for the delay from input to output to be a superdense time 361 // delay of (0.0, 1). This could be implemented by a class similar to 362 // BreakCausalityInterface that does the right thing when the director 363 // provides a Dependency that is a SuperdenseTimeIdentity. 364 if (_causalityInterface == null) { 365 _causalityInterface = new BreakCausalityInterface(this, 366 getExecutiveDirector().defaultDependency()); 367 } 368 return _causalityInterface; 369 } 370 371 /** Iterate the contained actors of the 372 * container of this director. 373 * @return False if any contained actor returns false in postfire. 374 * @exception IllegalActionException If any called method of 375 * of the contained actor throws it, or if the contained 376 * actor is not opaque. 377 */ 378 public boolean iterateContainedActors() throws IllegalActionException { 379 // Don't call "super.fire();" here, this actor contains its 380 // own director. 381 boolean result = true; 382 List<Actor> actors = entityList(); 383 for (Actor actor : actors) { 384 if (_stopRequested) { 385 break; 386 } 387 if (!((ComponentEntity) actor).isOpaque()) { 388 throw new IllegalActionException(this, 389 "Inside actor is not opaque " 390 + "(perhaps it needs a director)."); 391 } 392 if (_debugging) { 393 _debug("---- Iterating actor in inside thread: " 394 + actor.getFullName()); 395 } 396 if (actor.iterate(1) == Executable.STOP_ITERATING) { 397 result = false; 398 _debug("---- Postfire returned false: " + actor.getFullName()); 399 } 400 } 401 return result; 402 } 403 404 /////////////////////////////////////////////////////////////////// 405 //// private variables //// 406 407 /** The cached value of the <i>delay</i> parameter. */ 408 private double _delayValue = 0.0; 409 410 /** The real time at which the model begins executing, in milliseconds. */ 411 private long _realStartTime = 0; 412 413 /////////////////////////////////////////////////////////////////// 414 //// inner classes //// 415 416 /////////////////////////////////////////////////////////////////// 417 //// TokenFrame 418 419 /** Bundle data associated with ports and a time stamp. 420 * There are three types of frames: 421 * EVENT is a (possibly empty) bundle of data and a time 422 * stamp that is either provided to the inside thread from 423 * the inputs of a ThreadedComposite or provided by the 424 * inside thread to form the outputs of a ThreadedComposite. 425 * POSTFIRE is a frame indicating that the inside actor 426 * can be postfired. No tokens are provided (they are assumed 427 * to have been consumed in the firing). STOP is a frame 428 * provided to the inside thread to indicate that it should 429 * stop executing. 430 */ 431 protected static class TokenFrame { 432 // FindBugs suggests making this class static so as to decrease 433 // the size of instances and avoid dangling references. 434 435 /** Construct a TokenFrame. 436 * @param theTime The time of this token frame. 437 * @param theTokens a list of QueueTokens. 438 * @param theType The FrameType. 439 */ 440 public TokenFrame(Time theTime, List<QueuedToken> theTokens, 441 FrameType theType) { 442 tokens = theTokens; 443 time = theTime; 444 type = theType; 445 } 446 447 /** The time. */ 448 public final Time time; 449 /** A list of tokens. */ 450 public final List<QueuedToken> tokens; 451 /** The type of the frame. */ 452 public final FrameType type; 453 454 // Final fields (FindBugs suggestion) 455 /** A (possibly empty) bundle of data and a time 456 * stamp that is either provided to the inside thread from 457 * the inputs of a ThreadedComposite or provided by the 458 * inside thread to form the outputs of a ThreadedComposite. 459 */ 460 public final static FrameType EVENT = new FrameType(); 461 462 /** POSTFIRE is a frame indicating that the inside actor 463 * can be postfired. No tokens are provided (they are assumed 464 * to have been consumed in the firing). 465 */ 466 public final static FrameType POSTFIRE = new FrameType(); 467 468 /** STOP is a frame provided to the inside thread to indicate 469 * that it should stop executing. 470 */ 471 public final static FrameType STOP = new FrameType(); 472 473 private static class FrameType { 474 private FrameType() { 475 }; 476 } 477 } 478 479 /////////////////////////////////////////////////////////////////// 480 //// QueuedToken 481 482 /** Bundle of a token and the input port and channel 483 * at which it arrived. 484 */ 485 private static class QueuedToken { 486 487 // FindBugs suggests making this class static so as to decrease 488 // the size of instances and avoid dangling references. 489 490 public QueuedToken(IOPort thePort, int theChannel, Token theToken) { 491 token = theToken; 492 channel = theChannel; 493 port = thePort; 494 } 495 496 public final int channel; 497 498 public final Token token; 499 500 public final IOPort port; 501 502 @Override 503 public String toString() { 504 return "token " + token + " for port " + port.getFullName() + "(" 505 + channel + ")"; 506 } 507 } 508 509 /////////////////////////////////////////////////////////////////// 510 //// ThreadedDirector 511 512 /** A specialized director that fires a contained actor 513 * in a separate thread. The prefire() method returns true 514 * if the inside thread is alive. The fire() method posts 515 * input events, if any, for the current firing on a queue for 516 * the inside thread to consume. If the firing is in response 517 * to a prior refiring request by this director, then the fire() 518 * method will also wait for the inside thread to complete 519 * its firing, and will then produce outputs from that firing. 520 * The postfire() method posts 521 * a request to postfire the contained actor and also requests 522 * a refiring of this director at the current time plus the delay 523 * value (unless the delay value is UNDEFINED). The wrapup() method 524 * requests termination of the inside thread. If postfire() 525 * of the contained actor returns false, then postfire() of this director 526 * will return false, requesting a halt to execution of the model. 527 */ 528 private class ThreadedDirector extends Director { 529 530 /** Construct a new instance of the director for ThreadedComposite. 531 * The director is created in the specified workspace with 532 * no container and an empty string as a name. You can then change 533 * the name with setName(). If the workspace argument is null, then 534 * use the default workspace. You should set the local director or 535 * executive director before attempting to send data to the actor 536 * or to execute it. Add the actor to the workspace directory. 537 * Increment the version number of the workspace. 538 * @param workspace The workspace that will list the actor. 539 * @exception IllegalActionException If the container is incompatible 540 * with this actor. 541 * @exception NameDuplicationException If the name coincides with 542 * an actor already in the container. 543 */ 544 public ThreadedDirector(Workspace workspace) 545 throws IllegalActionException, NameDuplicationException { 546 super(workspace); 547 setPersistent(false); 548 } 549 550 /** Clone the director into the specified workspace. 551 * @param workspace The workspace for the new object. 552 * @return A new director. 553 * @exception CloneNotSupportedException If a derived class has 554 * has an attribute that cannot be cloned. 555 */ 556 @Override 557 public Object clone(Workspace workspace) 558 throws CloneNotSupportedException { 559 ThreadedDirector newObject = (ThreadedDirector) super.clone( 560 workspace); 561 newObject._exception = null; 562 newObject._inputTokens = null; 563 newObject._thread = null; 564 newObject._outputTimes = new LinkedList<Time>(); 565 newObject._fireAtTimes = Collections 566 .synchronizedSet(new HashSet<Time>()); 567 newObject._inputFrames = new LinkedList<TokenFrame>(); 568 newObject._outputFrames = new LinkedList<TokenFrame>(); 569 return newObject; 570 } 571 572 /** Produce outputs (if appropriate). 573 * @exception IllegalActionException If production of an output 574 * fails (e.g. type error), or if this thread is interrupted 575 * while we are waiting for output to produce. 576 */ 577 @Override 578 public synchronized void fire() throws IllegalActionException { 579 // NOTE: This method is synchronized to ensure that when 580 // delay is UNDEFINED and the inside thread calls 581 // fireAtFirstTimeAfter(), that the firing does not 582 // occur before _outputFrames and _outputTimes have 583 // been updated. 584 585 if (_exception != null) { 586 throw new IllegalActionException(ThreadedComposite.this, 587 _exception, "Error in inside thread of actor."); 588 } 589 590 Time environmentTime = ThreadedComposite.this.getExecutiveDirector() 591 .getModelTime(); 592 593 if (ThreadedComposite.this._debugging) { 594 ThreadedComposite.this 595 ._debug("Firing at time " + environmentTime); 596 } 597 598 // If there is an output to be produced at this 599 // time, produce it. 600 Time nextOutputTime = _outputTimes.peek(); 601 if (environmentTime.equals(nextOutputTime)) { 602 // There is an output to be produced. 603 // First, remove that time from the pending outputs queue. 604 605 // FIXME: FindBugs "RV: Base use of return value from method, 606 // Method ignores return value." java.util.Queue.poll() returns 607 // the value, which is ignored. 608 _outputTimes.poll(); 609 // First, wait (if necessary) for output 610 // to be produced. 611 // We already know that the environment time matches 612 // the expected output time, so we can ignore the time 613 // stamp of the output frame. 614 try { 615 // NOTE: Cannot use LinkedBlockingQueue for _outputFrames 616 // because we have to release the lock on this director 617 // while we are waiting or we get a deadlock. 618 while (_outputFrames.isEmpty() && !_stopRequested) { 619 if (ThreadedComposite.this._debugging) { 620 ThreadedComposite.this._debug( 621 "Waiting for outputs from inside thread."); 622 } 623 // The timeout allows this to respond to stop() 624 // even if we have a deadlock for some reason. 625 // However, if the deadlock involves the Swing thread, then 626 // no stopping will be possible. 627 wait(1000L); 628 } 629 if (_outputFrames.isEmpty()) { 630 // A stop has been requested and there is no data to produce. 631 return; 632 } 633 TokenFrame frame = _outputFrames.poll(); 634 635 // There is now an output frame to be produced. 636 if (ThreadedComposite.this._debugging) { 637 ThreadedComposite.this._debug("Done waiting."); 638 } 639 640 // Produce the outputs on the frame, if there are any 641 // outputs. Note that frame.tokens can only be null 642 // if the inside thread was interrupted while executing 643 // or if an exception occurred in the inside thread. 644 if (frame.tokens == null) { 645 if (_exception != null) { 646 throw new IllegalActionException(this, _exception, 647 "Inside thread had an exception."); 648 } 649 throw new IllegalActionException(this, 650 "Inside thread was interrupted."); 651 } 652 for (QueuedToken token : frame.tokens) { 653 if (token.channel < token.port.getWidth()) { 654 // There is now an output frame to be produced. 655 if (ThreadedComposite.this._debugging) { 656 ThreadedComposite.this._debug( 657 "Sending output token ", 658 token + " to port " 659 + token.port.getName()); 660 } 661 662 token.port.send(token.channel, token.token); 663 } 664 } 665 } catch (InterruptedException ex) { 666 throw new IllegalActionException(ThreadedComposite.this, ex, 667 "Director thread interrupted."); 668 } 669 } 670 } 671 672 /** Delegate by calling fireAt() on the director of the container's 673 * container (the executive director), and make a local record that 674 * a refiring request has been made for the specified time. Note that the 675 * executive director may modify the requested time. If it does, the 676 * modified value is returned. It is up to the calling actor to 677 * throw an exception if the modified time is not acceptable. 678 * @param actor The actor requesting firing. 679 * @param time The time at which to fire. 680 * @param microstep The microstep. 681 * @return The time at which the actor passed as an argument 682 * will be fired. 683 * @exception IllegalActionException If the executive director throws it. 684 */ 685 @Override 686 public Time fireAt(Actor actor, Time time, int microstep) 687 throws IllegalActionException { 688 Time result = time; 689 Director director = ThreadedComposite.this.getExecutiveDirector(); 690 if (director != null) { 691 if (ThreadedComposite.this._debugging) { 692 ThreadedComposite.this 693 ._debug("---- Request refiring at time " + time 694 + " for actor: " + actor.getFullName()); 695 } 696 try { 697 result = director.fireAt(ThreadedComposite.this, time, 698 microstep); 699 } catch (IllegalActionException ex) { 700 throw new IllegalActionException(this, ex, "Actor " 701 + actor.getFullName() 702 + " requests refiring at time " + time 703 + ", which fails.\n" 704 + "Perhaps the delay parameter is too large?\n" 705 + "Try setting it to 0."); 706 } 707 } 708 if (actor != ThreadedComposite.this) { 709 // The fireAt() request is coming from the inside, so 710 // when the firing occurs, we want to post an input 711 // frame (even if there are no input events) for 712 // the inside thread. 713 _fireAtTimes.add(result); 714 } 715 return result; 716 } 717 718 /** Start the inside thread. 719 * @exception IllegalActionException If the initialize() method of 720 * one of the inside actors throws it. 721 */ 722 @Override 723 public synchronized void initialize() throws IllegalActionException { 724 // The following must be done before the initialize() methods 725 // of the actors is called because those methods may call fireAt(). 726 // Note that previous runs may have left residual data on these lists. 727 _fireAtTimes.clear(); 728 _outputFrames.clear(); 729 _outputTimes.clear(); 730 _inputFrames.clear(); 731 732 _exception = null; 733 734 // The superclass will initialize all the actors. 735 super.initialize(); 736 737 // Set a flag indicating that the first firing should 738 // initialize the _realStartTime variable. This is done 739 // in the first firing to be as late as possible, so 740 // that startup transients are minimized. 741 // FIXME: This will impede synchronization with other 742 // actors, since there won't be a common time base. 743 _realStartTime = -1L; 744 745 _inputFrames.clear(); 746 _outputFrames.clear(); 747 748 _synchronizeToRealTime = ((BooleanToken) synchronizeToRealTime 749 .getToken()).booleanValue(); 750 751 // Create and start the inside thread. 752 _thread = new CompositeThread(); 753 _thread.setPriority(Thread.MAX_PRIORITY); 754 _thread.start(); 755 } 756 757 /** Return a new instance of QueueReceiver. 758 * @return A new instance of QueueReceiver. 759 * @see QueueReceiver 760 */ 761 @Override 762 public Receiver newReceiver() { 763 return new QueueReceiver(); 764 } 765 766 /** Return true if the inside thread is alive. 767 * @return True if the inside thread is still alive. 768 */ 769 @Override 770 public boolean prefire() throws IllegalActionException { 771 // Do not call super.prefire()! 772 // Superclass sets current time of this 773 // director to that of the container. 774 // The notion of current time presented to the 775 // inside actors (which may be currently executing 776 // in another thread) must match that of the frame 777 // that the inside thread is processing. 778 779 // Have to create a new list because the previous list may 780 // not have been consumed yet. 781 _inputTokens = new LinkedList<QueuedToken>(); 782 783 boolean result = _thread.isAlive() || !_outputFrames.isEmpty(); 784 785 if (ThreadedComposite.this._debugging) { 786 ThreadedComposite.this._debug("Prefire returns " + result); 787 } 788 return result; 789 } 790 791 /** Consume inputs (if any) and post a frame on the queue 792 * for the inside thread to consume. A frame will be posted 793 * even if there are no inputs if a refiring request has 794 * been made for the current time. 795 * @return True if the inside thread is still alive. 796 */ 797 @Override 798 public boolean postfire() throws IllegalActionException { 799 800 Time environmentTime = ThreadedComposite.this.getExecutiveDirector() 801 .getModelTime(); 802 803 if (ThreadedComposite.this._debugging) { 804 ThreadedComposite.this 805 ._debug("Postfiring at time " + environmentTime); 806 } 807 808 // If there are inputs to be consumed, or if a refiring 809 // request has been made for this time, then create an 810 // input frame for the inside thread. 811 // We can safely remove the refire request since we 812 // are now responding to it. Note that semantically, 813 // multiple refire requests for the same time are only 814 // required to trigger a single refiring, so this is true 815 // even if there were multiple refire requests. 816 boolean refireRequested = _fireAtTimes.remove(environmentTime); 817 818 // Put a frame on the _inputFrames for the inside thread 819 // if either a refire was requested or if there are inputs. 820 if (refireRequested || !_inputTokens.isEmpty()) { 821 if (ThreadedComposite.this._debugging) { 822 ThreadedComposite.this._debug( 823 "Queueing input tokens for the inside thread: " 824 + _inputTokens.toString() 825 + " to be processed at time " 826 + environmentTime); 827 } 828 synchronized (this) { 829 _inputFrames.add(new TokenFrame(environmentTime, 830 _inputTokens, TokenFrame.EVENT)); 831 notifyAll(); 832 if (_delayValue >= 0.0) { 833 // Delay value is not UNDEFINED. Schedule a firing 834 // at current time plus the delay. 835 Time responseTime = environmentTime.add(_delayValue); 836 // Need to be sure to call the executive director's fireAt(). 837 // Make sure to throw an exception if the executive 838 // director does not exactly respect this request. 839 Time response = ThreadedComposite.this 840 .getExecutiveDirector() 841 .fireAt(ThreadedComposite.this, responseTime); 842 843 if (!response.equals(responseTime)) { 844 throw new IllegalActionException(this, 845 "Director is unable to fire the actor at the requested time: " 846 + responseTime 847 + ". It responds it will fire it at: " 848 + response); 849 } 850 851 // Queue an indicator to produce outputs in response to that firing. 852 _outputTimes.add(responseTime); 853 } 854 } 855 // Give the inside thread a chance to react. 856 Thread.yield(); 857 } 858 boolean isAlive = _thread.isAlive(); 859 return isAlive || !_outputTimes.isEmpty(); 860 } 861 862 /** Override the base class to post a "stop frame" on the queue 863 * for the inside thread to stop. 864 */ 865 @Override 866 public void stop() { 867 super.stop(); 868 Time environmentTime = ThreadedComposite.this.getExecutiveDirector() 869 .getModelTime(); 870 if (ThreadedComposite.this._debugging) { 871 ThreadedComposite.this._debug( 872 "Queueing a stop-frame token for the inside thread with time: " 873 + environmentTime); 874 } 875 synchronized (this) { 876 _inputFrames.add( 877 new TokenFrame(environmentTime, null, TokenFrame.STOP)); 878 notifyAll(); 879 } 880 } 881 882 /** Record data from the specified input port 883 * for transfer to the queue used to communicate these data to the 884 * inside thread. This is called in the fire() method of 885 * the enclosing composite actor after the prefire() method 886 * of this director has been called and before its fire() method 887 * is called. 888 * @param port The port to transfer tokens from. 889 * @return True if at least one data token is transferred. 890 * @exception IllegalActionException If reading the inputs fails. 891 */ 892 @Override 893 public boolean transferInputs(IOPort port) 894 throws IllegalActionException { 895 boolean result = false; 896 for (int i = 0; i < port.getWidth(); i++) { 897 try { 898 if (port.isKnown(i)) { 899 if (port.hasToken(i)) { 900 Token token = port.get(i); 901 _inputTokens.add(new QueuedToken(port, i, token)); 902 if (ThreadedComposite.this._debugging) { 903 ThreadedComposite.this 904 ._debug("Transferring input from " 905 + port.getName()); 906 } 907 result = true; 908 } 909 } 910 } catch (NoTokenException ex) { 911 // this shouldn't happen. 912 throw new InternalErrorException(this, ex, null); 913 } 914 } 915 return result; 916 } 917 918 /** Override the base class to do nothing since the fire() method of this 919 * director directly handles producing the outputs. In particular, we 920 * don't want to read from the inside of the output ports because the 921 * inside thread may be concurrently writing to them for the next 922 * iteration. 923 * @param port The port to transfer tokens from. 924 * @return False, indicating that no data token is produced now. 925 * @exception IllegalActionException If writing the outputs fails. 926 */ 927 @Override 928 public boolean transferOutputs(IOPort port) 929 throws IllegalActionException { 930 return false; 931 } 932 933 /** Override the base class to wait until the inside thread 934 * terminates and then call super.wrapup(). 935 * @exception IllegalActionException If the wrapup() method of 936 * one of the associated actors throws it. 937 */ 938 @Override 939 public void wrapup() throws IllegalActionException { 940 // First, post a "stop frame" in case one has not been posted. 941 // In the case of a finite run, one will likely have not been posted. 942 Time environmentTime = ThreadedComposite.this.getExecutiveDirector() 943 .getModelTime(); 944 if (ThreadedComposite.this._debugging) { 945 ThreadedComposite.this._debug("Called wrapup. ", 946 "Queueing a stop-frame token for the inside thread with time: " 947 + environmentTime); 948 } 949 synchronized (this) { 950 // A "stop frame" has a null token list. 951 _inputFrames.add( 952 new TokenFrame(environmentTime, null, TokenFrame.STOP)); 953 notifyAll(); 954 } 955 956 if (_exception != null) { 957 throw new IllegalActionException(ThreadedComposite.this, 958 _exception, "Error in inside thread of actor."); 959 } 960 if (_thread != null && _thread.isAlive()) { 961 try { 962 if (ThreadedComposite.this._debugging) { 963 ThreadedComposite.this 964 ._debug("Waiting for inside thread to stop."); 965 } 966 _thread.join(); 967 if (ThreadedComposite.this._debugging) { 968 ThreadedComposite.this 969 ._debug("Inside thread has stopped."); 970 } 971 if (_exception != null) { 972 throw new IllegalActionException(ThreadedComposite.this, 973 _exception, "Error in inside thread of actor."); 974 } 975 } catch (InterruptedException e) { 976 // Ignore. 977 } 978 } 979 super.wrapup(); 980 } 981 982 ////////////////////////////////////////////////////////////// 983 //// private variables //// 984 985 /** If an exception occurs in the inside thread, the exception 986 * will be assigned to this member, which will cause the 987 * the next invocation of the fire() or wrapup() method 988 * to throw the exception. 989 */ 990 private Throwable _exception; 991 992 /** Record of the times which refire requests have been made 993 * and not yet processed by any of the fireAt() methods. 994 * This set is accessed from both the director 995 * thread and the inside thread so it has to 996 * be thread safe. 997 */ 998 private Set<Time> _fireAtTimes = Collections 999 .synchronizedSet(new HashSet<Time>()); 1000 1001 // NOTE: Cannot use LinkedBlockingQueue for _inputFrames 1002 // because we have to release the lock on this director 1003 // while we are waiting or we get a deadlock. 1004 1005 /** Queue of unprocessed input events. This is a blocking 1006 * queue, which blocks the calling thread if the queue is empty. 1007 * This is accessed by both the director thread and the inside 1008 * thread, so it has to be thread safe (LinkedBlockingQueue is a 1009 * thread-safe container). 1010 */ 1011 private LinkedList<TokenFrame> _inputFrames = new LinkedList<TokenFrame>(); 1012 1013 /** List of input events in the current iteration. 1014 * This is accessed only in the director thread so it need 1015 * not be thread safe. 1016 */ 1017 private List<QueuedToken> _inputTokens; 1018 1019 /** Queue of unprocessed output events. 1020 * This queue is accessed from multiple threads, so it must be 1021 * thread safe. 1022 */ 1023 private LinkedList<TokenFrame> _outputFrames = new LinkedList<TokenFrame>(); 1024 1025 /** Record of the time stamps at which 1026 * to produce outputs. These are enqueued and dequeued 1027 * in time stamp order. If the delay value is UNDEFINED, 1028 * then this is accessed from the inside thread as well 1029 * as the director thread, so it needs to be thread safe. 1030 * To ensure this, we always access it within a block 1031 * synchronized on this director. 1032 */ 1033 private Queue<Time> _outputTimes = new LinkedList<Time>(); 1034 1035 /** The value of the synchronizeToRealTime parameter when 1036 * initialize() was invoked. 1037 */ 1038 private boolean _synchronizeToRealTime; 1039 1040 /** The thread that executes the contained actors. */ 1041 private Thread _thread; 1042 1043 ////////////////////////////////////////////////////////////// 1044 //// inner inner classes //// 1045 1046 /////////////////////////////////////////////////////////////////// 1047 //// CompositeThread 1048 1049 /** The inside thread, which executes the contained actor. 1050 */ 1051 private class CompositeThread extends Thread { 1052 public CompositeThread() { 1053 super("CompositeThread_" 1054 + ThreadedComposite.this.getFullName()); 1055 } 1056 1057 @Override 1058 public void run() { 1059 while (!_stopRequested) { 1060 try { 1061 if (ThreadedComposite.this._debugging) { 1062 ThreadedComposite.this._debug( 1063 "---- Waiting for inputs in the inside thread."); 1064 } 1065 TokenFrame frame = null; 1066 synchronized (ThreadedDirector.this) { 1067 // The following blocks this thread if the queue is empty. 1068 while (_inputFrames.isEmpty() && !_stopRequested) { 1069 // The timeout allows this to respond to stop() 1070 // even if we have a deadlock for some reason. 1071 ThreadedDirector.this.wait(1000L); 1072 } 1073 if (_stopRequested) { 1074 break; 1075 } 1076 frame = _inputFrames.poll(); 1077 } 1078 1079 // Check for a "stop frame" and exit the thread. 1080 if (frame.type == TokenFrame.STOP) { 1081 if (ThreadedComposite.this._debugging) { 1082 ThreadedComposite.this._debug( 1083 "---- Read a stop frame in inside thread."); 1084 } 1085 break; 1086 } 1087 if (ThreadedComposite.this._debugging) { 1088 ThreadedComposite.this._debug( 1089 "---- Reading input tokens in inside thread with time " 1090 + frame.time + " and value " 1091 + frame.tokens); 1092 } 1093 // Current time of the director should match the frame time. 1094 // This is the view of time that should be presented to any inside actors. 1095 setModelTime(frame.time); 1096 1097 if (_synchronizeToRealTime) { 1098 long currentRealTime = System.currentTimeMillis(); 1099 // If this is the first firing, record the start time. 1100 if (_realStartTime < 0L) { 1101 _realStartTime = currentRealTime; 1102 } 1103 long realTimeMillis = currentRealTime 1104 - _realStartTime; 1105 long modelTimeMillis = Math.round( 1106 getModelTime().getDoubleValue() * 1000.0); 1107 if (realTimeMillis < modelTimeMillis) { 1108 try { 1109 Thread.sleep( 1110 modelTimeMillis - realTimeMillis); 1111 } catch (InterruptedException e) { 1112 // Ignore and continue. 1113 } 1114 } 1115 } 1116 1117 // Note that there may not be any tokens here, since there 1118 // may not be any inputs (the firing is in response to 1119 // a pure event). We still want to fire the 1120 // enclosed model at the specified time because the firing 1121 // is due to the model itself having previously called 1122 // fireAt(). 1123 for (QueuedToken token : frame.tokens) { 1124 if (token.channel < token.port.getWidthInside()) { 1125 token.port.sendInside(token.channel, 1126 token.token); 1127 } 1128 } 1129 // Iterate the contained actors. 1130 if (!iterateContainedActors()) { 1131 // Collect the outputs so that outputs from this 1132 // final iteration are produced, then terminate this 1133 // thread. 1134 _stopRequested = true; 1135 } 1136 1137 // If outputs are produced by the iteration, then 1138 // we need to record those in an output frame. 1139 List<QueuedToken> outputTokens = new LinkedList<QueuedToken>(); 1140 Iterator ports = outputPortList().iterator(); 1141 while (ports.hasNext()) { 1142 IOPort port = (IOPort) ports.next(); 1143 for (int i = 0; i < port.getWidth(); i++) { 1144 if (port.isKnownInside(i) 1145 && port.hasTokenInside(i)) { 1146 Token token = port.getInside(i); 1147 QueuedToken tokenBundle = new QueuedToken( 1148 port, i, token); 1149 outputTokens.add(tokenBundle); 1150 if (ThreadedComposite.this._debugging) { 1151 ThreadedComposite.this._debug( 1152 "---- Inside actor produced token " 1153 + token + " for port " 1154 + port.getName()); 1155 } 1156 } 1157 } 1158 } 1159 Time responseTime = getModelTime().add(_delayValue); 1160 1161 synchronized (ThreadedDirector.this) { 1162 // If delay is UNDEFINED, then we have to now request a 1163 // refiring at the first opportunity. This is because 1164 // the postfire method won't do it. 1165 if (_delayValue < 0.0) { 1166 // If synchronizeToRealTime is true, then we want to use the 1167 // greater of real-time or the current environment time. 1168 // Otherwise, we just use the current environment time. 1169 if (_synchronizeToRealTime) { 1170 long realTimeMillis = System 1171 .currentTimeMillis() 1172 - _realStartTime; 1173 Time realTime = new Time( 1174 ThreadedDirector.this, 1175 realTimeMillis * 0.001); 1176 responseTime = ThreadedDirector.this.fireAt( 1177 ThreadedComposite.this, realTime); 1178 } else { 1179 responseTime = ThreadedDirector.this.fireAt( 1180 ThreadedComposite.this, 1181 getModelTime()); 1182 } 1183 _outputTimes.add(responseTime); 1184 } 1185 TokenFrame outputFrame = new TokenFrame( 1186 responseTime, outputTokens, 1187 TokenFrame.EVENT); 1188 _outputFrames.add(outputFrame); 1189 if (ThreadedComposite.this._debugging) { 1190 ThreadedComposite.this._debug( 1191 "---- Inside thread posted output frame."); 1192 } 1193 ThreadedDirector.this.notifyAll(); 1194 // Give the director thread a chance to react. 1195 Thread.yield(); 1196 } 1197 } catch (InterruptedException e) { 1198 // Post a stop frame. 1199 TokenFrame stopFrame = new TokenFrame(getModelTime(), 1200 null, TokenFrame.STOP); 1201 synchronized (ThreadedDirector.this) { 1202 _outputFrames.add(stopFrame); 1203 ThreadedDirector.this.notifyAll(); 1204 } 1205 // Exit the thread. 1206 break; 1207 } catch (IllegalActionException ex) { 1208 synchronized (ThreadedDirector.this) { 1209 // To stop the outside firing, set this variable. 1210 // On the next invocation of fire() or wrapup(), the 1211 // exception will be thrown. 1212 _exception = ex; 1213 // Post a stop frame. 1214 TokenFrame stopFrame = new TokenFrame( 1215 getModelTime(), null, TokenFrame.STOP); 1216 _outputFrames.add(stopFrame); 1217 ThreadedDirector.this.notifyAll(); 1218 } 1219 break; 1220 } 1221 } 1222 } 1223 } 1224 } 1225}