001/* A nondeterministic merge actor for PN. 002 003 Copyright (c) 2004-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.domains.pn.kernel; 029 030import java.io.Writer; 031import java.util.Iterator; 032import java.util.List; 033 034import ptolemy.actor.Actor; 035import ptolemy.actor.Director; 036import ptolemy.actor.Manager; 037import ptolemy.actor.TypedAtomicActor; 038import ptolemy.actor.TypedCompositeActor; 039import ptolemy.actor.TypedIOPort; 040import ptolemy.actor.process.ProcessReceiver; 041import ptolemy.data.IntToken; 042import ptolemy.data.Token; 043import ptolemy.data.type.BaseType; 044import ptolemy.kernel.CompositeEntity; 045import ptolemy.kernel.Port; 046import ptolemy.kernel.util.IllegalActionException; 047import ptolemy.kernel.util.InternalErrorException; 048import ptolemy.kernel.util.KernelException; 049import ptolemy.kernel.util.NameDuplicationException; 050import ptolemy.kernel.util.StringAttribute; 051import ptolemy.kernel.util.Workspace; 052 053/////////////////////////////////////////////////////////////////// 054//// NondeterministicMerge 055 056/** 057 This actor takes any number of input streams and merges them 058 nondeterministically. This actor is intended for use in the 059 PN domain. It is a composite actor that 060 creates its own contents. It contains an instance of PNDirector and one 061 actor for each input channel (it creates these actors automatically 062 when a connection is created to the input multiport). The contained 063 actors are special actors (implemented as an instance of an inner class) 064 that read from the port of this actor and write to the port of 065 this actor. They have no ports of their own. The lifecycle of the 066 contained actors (when they are started or stopped) is handled by 067 the PNDirector in the usual way. 068 069 @author Edward A. Lee, Haibo Zeng 070 @version $Id$ 071 @since Ptolemy II 4.1 072 @Pt.ProposedRating Yellow (eal) 073 @Pt.AcceptedRating Red (eal) 074 */ 075public class NondeterministicMerge extends TypedCompositeActor { 076 /** Construct an actor in the specified container with the specified 077 * name. Create ports and make the input port a multiport. 078 * @param container The container. 079 * @param name The name. 080 * @exception NameDuplicationException If an actor 081 * with an identical name already exists in the container. 082 * @exception IllegalActionException If the actor cannot be contained 083 * by the proposed container. 084 */ 085 public NondeterministicMerge(CompositeEntity container, String name) 086 throws NameDuplicationException, IllegalActionException { 087 super(container, name); 088 _constructor(); 089 } 090 091 /** Construct a TypedCompositeActor in the specified workspace with 092 * no container and an empty string as a name. You can then change 093 * the name with setName(). If the workspace argument is null, then 094 * use the default workspace. You should set the local director or 095 * executive director before attempting to send data to the actor 096 * or to execute it. Add the actor to the workspace directory. 097 * Increment the version number of the workspace. 098 * @param workspace The workspace that will list the actor. 099 * @exception NameDuplicationException If an actor 100 * with an identical name already exists in the container. 101 * @exception IllegalActionException If the actor cannot be contained 102 * by the proposed container. 103 */ 104 public NondeterministicMerge(Workspace workspace) 105 throws NameDuplicationException, IllegalActionException { 106 // Added for the sake of Kepler's KAR handling, which needs this 107 // constructor to instantiate composite actors. 108 super(workspace); 109 _constructor(); 110 } 111 112 /////////////////////////////////////////////////////////////////// 113 //// ports and parameters //// 114 115 /** The input port. This base class imposes no type constraints except 116 * that the type of the input cannot be greater than the type of the 117 * output. 118 */ 119 public TypedIOPort input; 120 121 /** The output port. By default, the type of this output is constrained 122 * to be at least that of the input. 123 */ 124 public TypedIOPort output; 125 126 /** Output port used to indicate which input channel the current 127 * output came from. This has type int. 128 */ 129 public TypedIOPort channel; 130 131 /////////////////////////////////////////////////////////////////// 132 //// public methods //// 133 134 /** Override the base class to adjust the number of contained 135 * actors, if the number is no longer correct. 136 * @param port The port that has connection changes. 137 */ 138 @Override 139 public void connectionsChanged(Port port) { 140 super.connectionsChanged(port); 141 142 if (port == input) { 143 /* The reason why we delay the execution of _reinitializeInnerActors: 144 * What happens is that the NondeterministicMerge will call 145 * getWidth in its connectionsChanged method, this will cause 146 * IORelation to request a token from the width Parameter, 147 * which will trigger IORelation.attributeChanged (expressions 148 * are always lazy, which gives unexpected behavior with the 149 * attributeChanged mechanism). 150 * (Before the cached version of the width was being used at 151 * this moment, which resulted in the wrong value being used 152 * (this was a very old bug).) 153 * The call of IORelation.attributeChanged will set the 154 * cached width, which results in IOPort.attributeChanged 155 * being called, which calls NondeterministicMerge.connectionsChanged 156 * (again!). This one sees that the component has not added yet 157 * and does so. Finally the functions all return and we end up 158 * in the first NondeterministicMerge.connectionsChanged 159 * again. At the time it called getWidth it knew it had to add 160 * the component and does so, but in between this was already 161 * done by the second NondeterministicMerge.connectionsChanged, 162 * which results in the exception. 163 * When I move the code that triggers the width and 164 * adds to new actors to the initialize method of the 165 * NondeterministicMerge the model 166 * (ptolemy/domains/pn/demo/BrockAckerman/BrockAckerman.xml) runs 167 * again, but I'm reluctant to do so, since it might mess up the 168 * initialization process. 169 * Moving the code to preinitialize has the disadvantage however 170 * that width inference might happen multiple times (and definitely 171 * will for certain type of models). 172 */ 173 // If the model is running, create new internal actors if needed. 174 Manager manager = getManager(); 175 if (manager != null) { 176 Manager.State managerState = manager.getState(); 177 if (managerState == Manager.ITERATING 178 || managerState == Manager.PAUSED 179 || managerState == Manager.PAUSED_ON_BREAKPOINT) { 180 _reinitializeInnerActors(); 181 } 182 } 183 } 184 } 185 186 /** Initialize this actor. 187 * @exception IllegalActionException If the parent class throws it. 188 */ 189 @Override 190 public void initialize() throws IllegalActionException { 191 _reinitializeInnerActors(); 192 193 // super.initialize(); will initialize the director of this 194 // composite actor (the MergeDirector), which will initialize the 195 // newly created actors 196 super.initialize(); 197 } 198 199 /////////////////////////////////////////////////////////////////// 200 //// public methods //// 201 202 /** Clone the object into the specified workspace. This overrides 203 * the base class to set instantiate a new MergeDirector, 204 * @param workspace The workspace for the new object. 205 * @return A new NamedObj. 206 * @exception CloneNotSupportedException If any of the attributes 207 * cannot be cloned. 208 * @see #exportMoML(Writer, int, String) 209 */ 210 @Override 211 public Object clone(Workspace workspace) throws CloneNotSupportedException { 212 NondeterministicMerge result = (NondeterministicMerge) super.clone( 213 workspace); 214 try { 215 // Remove the old inner MergeDirector(s) that is(are) in the wrong workspace. 216 String mergeDirectorName = null; 217 Iterator mergeDirectors = result.attributeList(MergeDirector.class) 218 .iterator(); 219 while (mergeDirectors.hasNext()) { 220 MergeDirector oldMergeDirector = (MergeDirector) mergeDirectors 221 .next(); 222 if (mergeDirectorName == null) { 223 mergeDirectorName = oldMergeDirector.getName(); 224 } 225 oldMergeDirector.setContainer(null); 226 } 227 228 // Create a new MergeDirector that is in the right workspace. 229 MergeDirector mergeDirector = result.new MergeDirector(workspace); 230 mergeDirector.setContainer(result); 231 if (mergeDirectorName != null) { 232 mergeDirector.setName(mergeDirectorName); 233 } 234 } catch (Throwable throwable) { 235 throw new CloneNotSupportedException( 236 "Could not clone: " + throwable); 237 } 238 return result; 239 } 240 241 /////////////////////////////////////////////////////////////////// 242 //// private methods //// 243 244 /** Construct a NondeterministicMerge. */ 245 private void _constructor() 246 throws NameDuplicationException, IllegalActionException { 247 248 input = new TypedIOPort(this, "input", true, false); 249 output = new TypedIOPort(this, "output", false, true); 250 251 input.setMultiport(true); 252 output.setTypeAtLeast(input); 253 254 channel = new TypedIOPort(this, "channel"); 255 channel.setOutput(true); 256 channel.setTypeEquals(BaseType.INT); 257 258 // Add an attribute to get the port placed on the bottom. 259 StringAttribute channelCardinal = new StringAttribute(channel, 260 "_cardinal"); 261 channelCardinal.setExpression("SOUTH"); 262 263 _attachText("_iconDescription", 264 "<svg>\n" + "<polygon points=\"-10,20 10,10 10,-10, -10,-20\" " 265 + "style=\"fill:red\"/>\n" + "</svg>\n"); 266 267 PNDirector director = new MergeDirector(workspace()); 268 director.setContainer(this); 269 director.setName("director"); 270 } 271 272 /** Create the contained actors to handle the inputs. 273 */ 274 private void _reinitializeInnerActors() { 275 List<?> containedActors = entityList(); 276 int numberOfContainedActors = containedActors.size(); 277 278 // Create the contained actors to handle the inputs. 279 int inputWidth; 280 try { 281 inputWidth = input.getWidth(); 282 } catch (IllegalActionException ex) { 283 throw new InternalErrorException(this, ex, 284 "At this time IllegalActionExceptions are not allowed to happen.\n" 285 + "Width inference should already have been done."); 286 } 287 288 for (int i = 0; i < inputWidth; i++) { 289 if (i < numberOfContainedActors) { 290 // Local actor already exists for this channel. 291 // Just wake it up. 292 Object localActor = containedActors.get(i); 293 294 synchronized (localActor) { 295 localActor.notifyAll(); 296 } 297 298 // ProcessThread associated with the actor might 299 // be blocked on a wait on the director. 300 // So we need to notify on the director also. 301 Director director = getExecutiveDirector(); 302 303 // If there is no director, then the model cannot be running, 304 // so there is no need to notify. 305 if (director != null) { 306 synchronized (director) { 307 director.notifyAll(); 308 } 309 } 310 } else { 311 try { 312 /*Actor localActor =*/new ChannelActor(i, this); 313 314 // NOTE: Probably don't want this overhead. 315 // ((NamedObj)localActor).addDebugListener(this); 316 } catch (KernelException e) { 317 throw new InternalErrorException(e); 318 } 319 } 320 } 321 } 322 323 /////////////////////////////////////////////////////////////////// 324 //// inner classes //// 325 326 /** Actor to handle an input channel. It has no ports. It uses the 327 * ports of the container. 328 */ 329 private class ChannelActor extends TypedAtomicActor { 330 public ChannelActor(int index, NondeterministicMerge container) 331 throws IllegalActionException, NameDuplicationException { 332 super(container, "ChannelActor" + index); 333 _channelIndex = index; 334 _channelValue = new IntToken(_channelIndex); 335 } 336 337 // Override the base class to not export anything. 338 @Override 339 public void exportMoML(Writer output, int depth, String name) { 340 } 341 342 @Override 343 public void fire() throws IllegalActionException { 344 // If there is no connection, do nothing. 345 if (input.getWidth() > _channelIndex) { 346 // NOTE: Reading from the input port of the host actor. 347 if (!NondeterministicMerge.this._stopRequested 348 && input.hasToken(_channelIndex)) { 349 if (_debugging) { 350 NondeterministicMerge.this 351 ._debug("Waiting for input from channel " 352 + _channelIndex); 353 } 354 355 // NOTE: Writing to the port of the host actor. 356 Token result = input.get(_channelIndex); 357 358 // We require that the send to the two output ports be 359 // atomic so that the channel port gets tokens 360 // in the same order as the output port. 361 // We synchronize on the director because the send() 362 // may call wait() on the director of the container, 363 // so synchronizing on anything else could cause deadlock. 364 synchronized (((NondeterministicMerge) getContainer()) 365 .getExecutiveDirector()) { 366 output.send(0, result); 367 channel.send(0, _channelValue); 368 } 369 370 if (_debugging) { 371 NondeterministicMerge.this 372 ._debug("Sent " + result + " from channel " 373 + _channelIndex + " to the output."); 374 } 375 } 376 } else { 377 // Input channel is no longer connected. 378 // We don't want to spin lock here, so we 379 // wait. 380 // NOTE: synchronizing is neither allowed 381 // nor necessary here. See workspace().wait(Object). 382 // synchronized (this) { 383 try { 384 workspace().wait(this); 385 } catch (InterruptedException ex) { 386 // Ignore and continue executing. 387 } 388 // } 389 } 390 } 391 392 // Override to return the manager associate with the host. 393 @Override 394 public Manager getManager() { 395 return NondeterministicMerge.this.getManager(); 396 } 397 398 private int _channelIndex; 399 400 private IntToken _channelValue; 401 } 402 403 /** Variant of the PNDirector for the NondeterministicMerge actor. 404 */ 405 private class MergeDirector extends PNDirector { 406 /** Construct an MergeDirector in the specified workspace with 407 * no container and an empty string as a name. You can then change 408 * the name with setName(). If the workspace argument is null, then 409 * use the default workspace. You should set the local director or 410 * executive director before attempting to send data to the actor 411 * or to execute it. Add the actor to the workspace directory. 412 * Increment the version number of the workspace. 413 * @param workspace The workspace that will list the actor. 414 * @exception IllegalActionException If the container is incompatible 415 * with this actor. 416 * @exception NameDuplicationException If the name coincides with 417 * an actor already in the container. 418 */ 419 public MergeDirector(Workspace workspace) 420 throws IllegalActionException, NameDuplicationException { 421 super(workspace); 422 setPersistent(false); 423 } 424 425 /** Queue an initialization request with the manager. 426 * The specified actor will be initialized at an appropriate time, 427 * between iterations, by calling its preinitialize() and initialize() 428 * methods. This method is called by CompositeActor when an actor 429 * sets its container to that composite actor. Typically, that 430 * will occur when a model is first constructed, and during the 431 * execute() method of a ChangeRequest. 432 * We do nothing here in this implementation: 433 * When these actors are added during the initialization phase 434 * setContainer results in the call of this method, which will 435 * requestInitialization, which will normally delegate the action 436 * to the Manager. 437 * super.initialize() in NondeterministicMerge will however 438 * initialize the director of this 439 * composite actor (the MergeDirector), which will initialize the 440 * newly created actors. Hence we don't need to do it again here. 441 * @param actor The actor to initialize. 442 */ 443 @Override 444 public void requestInitialization(Actor actor) { 445 } 446 447 /** Override the base class to delegate to the executive director. 448 * This director does not keep track of threads. 449 * @param thread The thread. 450 */ 451 @Override 452 public synchronized void addThread(Thread thread) { 453 Director director = getExecutiveDirector(); 454 455 if (director instanceof PNDirector) { 456 ((PNDirector) director).addThread(thread); 457 } else { 458 throw new InternalErrorException( 459 "NondeterministicMerge actor can only execute" 460 + " under the control of a PNDirector!"); 461 } 462 } 463 464 /** Do nothing. 465 */ 466 @Override 467 public void fire() { 468 // Do not call super.fire() here because ProcessDirector.fire() 469 // waits until a deadlock is detected, which we don't want to do. 470 } 471 472 /** Return false since this director has nothing to do. 473 * @return False. 474 */ 475 @Override 476 public boolean postfire() { 477 return false; 478 } 479 480 /** Override the base class to delegate to the executive director. 481 * This director does not keep track of threads. 482 * @param thread The thread. 483 */ 484 @Override 485 public synchronized void removeThread(Thread thread) { 486 Director director = getExecutiveDirector(); 487 488 if (director instanceof PNDirector) { 489 ((PNDirector) director).removeThread(thread); 490 } else { 491 throw new InternalErrorException( 492 "NondeterministicMerge actor can only execute" 493 + " under the control of a PNDirector!"); 494 } 495 } 496 497 /** Override the base class to delegate to the executive director. 498 * This director does not keep track of threads. 499 * @param thread The thread. 500 * @param receiver The receiver handling the I/O operation, 501 * or null if it is not a specific receiver. 502 * @see #threadBlocked(Thread, ProcessReceiver, boolean) 503 */ 504 @Override 505 public synchronized void threadBlocked(Thread thread, 506 ProcessReceiver receiver) { 507 Director director = getExecutiveDirector(); 508 509 if (director instanceof PNDirector) { 510 ((PNDirector) director).threadBlocked(thread, receiver); 511 } else { 512 throw new InternalErrorException( 513 "NondeterministicMerge actor can only execute" 514 + " under the control of a PNDirector!"); 515 } 516 } 517 518 /** Override the base class to delegate to the executive director. 519 * This director does not keep track of threads. 520 * @param thread The thread. 521 * @param receiver The receiver handling the I/O operation, 522 * or null if it is not a specific receiver. 523 * @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED 524 * to indicate whether the thread is blocked on read or write. 525 * @see #threadBlocked(Thread, ProcessReceiver) 526 */ 527 @Override 528 public synchronized void threadBlocked(Thread thread, 529 ProcessReceiver receiver, boolean readOrWrite) { 530 Director director = getExecutiveDirector(); 531 532 if (director instanceof PNDirector) { 533 ((PNDirector) director).threadBlocked(thread, receiver, 534 readOrWrite); 535 } else { 536 throw new InternalErrorException( 537 "NondeterministicMerge actor can only execute" 538 + " under the control of a PNDirector!"); 539 } 540 } 541 542 /** Override the base class to delegate to the executive director. 543 * This director does not keep track of threads. 544 * @param thread The thread. 545 */ 546 @Override 547 public synchronized void threadHasPaused(Thread thread) { 548 Director director = getExecutiveDirector(); 549 550 if (director instanceof PNDirector) { 551 ((PNDirector) director).threadHasPaused(thread); 552 } else { 553 throw new InternalErrorException( 554 "NondeterministicMerge actor can only execute" 555 + " under the control of a PNDirector!"); 556 } 557 } 558 559 /** Override the base class to delegate to the executive director. 560 * This director does not keep track of threads. 561 * @param thread The thread. 562 */ 563 @Override 564 public synchronized void threadHasResumed(Thread thread) { 565 Director director = getExecutiveDirector(); 566 567 if (director instanceof PNDirector) { 568 ((PNDirector) director).threadHasResumed(thread); 569 } else { 570 throw new InternalErrorException( 571 "NondeterministicMerge actor can only execute" 572 + " under the control of a PNDirector!"); 573 } 574 } 575 576 /** Override the base class to delegate to the executive director. 577 * This director does not keep track of threads. 578 * @param thread The thread. 579 * @param receiver The receiver handling the I/O operation, 580 * or null if it is not a specific receiver. 581 * @see #threadBlocked(Thread, ProcessReceiver) 582 */ 583 @Override 584 public synchronized void threadUnblocked(Thread thread, 585 ProcessReceiver receiver) { 586 Director director = getExecutiveDirector(); 587 588 if (director instanceof PNDirector) { 589 ((PNDirector) director).threadUnblocked(thread, receiver); 590 } else { 591 throw new InternalErrorException( 592 "NondeterministicMerge actor can only execute" 593 + " under the control of a PNDirector!"); 594 } 595 } 596 597 /** Override the base class to delegate to the executive director. 598 * This director does not keep track of threads. 599 * @param thread The thread. 600 * @param receiver The receiver handling the I/O operation, 601 * or null if it is not a specific receiver. 602 * @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED 603 * to indicate whether the thread is blocked on read or write. 604 * @see #threadBlocked(Thread, ProcessReceiver, boolean) 605 */ 606 @Override 607 public synchronized void threadUnblocked(Thread thread, 608 ProcessReceiver receiver, boolean readOrWrite) { 609 Director director = getExecutiveDirector(); 610 611 if (director instanceof PNDirector) { 612 ((PNDirector) director).threadUnblocked(thread, receiver, 613 readOrWrite); 614 } else { 615 throw new InternalErrorException( 616 "NondeterministicMerge actor can only execute" 617 + " under the control of a PNDirector!"); 618 } 619 } 620 621 /** Do nothing. 622 */ 623 @Override 624 public void wrapup() { 625 } 626 627 // Override since deadlock cannot ever occur internally. 628 @Override 629 protected boolean _resolveDeadlock() { 630 if (_debugging) { 631 _debug("Deadlock is not real as " 632 + "NondeterministicMerge can't deadlock."); 633 } 634 635 return true; 636 } 637 } 638}