001/* Governs the execution of a CompositeActor with timed Kahn process 002 network semantics. 003 004 Copyright (c) 1998-2014 The Regents of the University of California. 005 All rights reserved. 006 Permission is hereby granted, without written agreement and without 007 license or royalty fees, to use, copy, modify, and distribute this 008 software and its documentation for any purpose, provided that the above 009 copyright notice and the following two paragraphs appear in all copies 010 of this software. 011 012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 016 SUCH DAMAGE. 017 018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 023 ENHANCEMENTS, OR MODIFICATIONS. 024 025 PT_COPYRIGHT_VERSION_2 026 COPYRIGHTENDKEY 027 028 */ 029package ptolemy.domains.pn.kernel; 030 031import java.util.ArrayList; 032import java.util.List; 033 034import ptolemy.actor.Actor; 035import ptolemy.actor.util.CalendarQueue; 036import ptolemy.actor.util.Time; 037import ptolemy.actor.util.TimedEvent; 038import ptolemy.kernel.CompositeEntity; 039import ptolemy.kernel.util.IllegalActionException; 040import ptolemy.kernel.util.InternalErrorException; 041import ptolemy.kernel.util.NameDuplicationException; 042import ptolemy.kernel.util.Workspace; 043 044/////////////////////////////////////////////////////////////////// 045//// TimedPNDirector 046 047/** 048 A TimedPNDirector governs the execution of a CompositeActor with 049 Kahn-MacQueen process networks (PN) semantics extended by introduction of a 050 notion of global time. 051 <p> 052 The thread that calls the various execution methods (initialize, prefire, fire 053 and postfire) on the director is referred to as the <i>directing thread</i>. 054 This directing thread might be the main thread responsible for the execution 055 of the entire simulation or might be the thread created by the executive 056 director of the containing composite actor. 057 <p> 058 In the PN domain, the director creates a thread (an instance of 059 ProcessThread), representing a Kahn process, for each actor in the model. 060 The threads are created in initialize() and started in the prefire() method 061 of the ProcessDirector. A process is considered <i>active</i> from its 062 creation until its termination. An active process can block when trying to 063 read from a channel (read-blocked), when trying to write to a channel 064 (write-blocked), or when waiting for time to progress (time-blocked). Time 065 can progress for an active process in this model of computation only when the 066 process is blocked. 067 <p> 068 A <i>deadlock</i> is when all the active processes are blocked. 069 The director is responsible for handling deadlocks during execution. 070 This director handles three different sorts of deadlocks, real deadlock, timed 071 deadlock and artificial deadlock. 072 <p> 073 A real deadlock is when all the processes are blocked on a read meaning that 074 no process can proceed until it receives new data. The execution can be 075 terminated, if desired, in such a situation. If the container of this director 076 does not have any input ports (as is in the case of a top-level composite 077 actor), then the executive director or manager terminates the execution. 078 If the container has input ports, then it is up to the 079 executive director of the container to decide on the termination of the 080 execution. To terminate the execution after detection of a real deadlock, the 081 manager or the executive director calls wrapup() on the director. 082 <p> 083 An artificial deadlock is when all processes are blocked and at least one 084 process is blocked on a write. In this case the director increases the 085 capacity of the receiver with the smallest capacity amongst all the 086 receivers on which a process is blocked on a write. 087 This breaks the deadlock and the execution can proceed. 088 <p> 089 A timed deadlock is when all the processes under the control of this 090 director are blocked, at least one process is blocked on a delay (time-blocked) 091 and no process is blocked on a write. This director supports a notion of global 092 time. All active processes that are not blocked and are executing concurrently 093 are executing at the same global time. A process that wants time to advance, 094 suspends itself by calling the fireAt() method of the director and specifies 095 the time it wants to be awakened at. Time can advance only when a timed 096 deadlock occurs. In such a case, the director advances time to the time when 097 the first timed-blocked process can be awakened. 098 <p> 099 100 @author Mudit Goel 101 @version $Id$ 102 @since Ptolemy II 0.2 103 @Pt.ProposedRating Green (mudit) 104 @Pt.AcceptedRating Green (davisj) 105 */ 106public class TimedPNDirector extends PNDirector { 107 /** Construct a director in the default workspace with an empty string 108 * as its name. The director is added to the list of objects in 109 * the workspace. Increment the version number of the workspace. 110 * Create a director parameter "initialQueueCapacity" with the default 111 * value 1. This sets the initial capacities of the FIFO queues in all 112 * the receivers created in the PN domain. 113 * @exception IllegalActionException If the name has a period in it, or 114 * the director is not compatible with the specified container. 115 * @exception NameDuplicationException If the container already contains 116 * an entity with the specified name. 117 */ 118 public TimedPNDirector() 119 throws IllegalActionException, NameDuplicationException { 120 super(); 121 } 122 123 /**Construct a director in the workspace with an empty name. 124 * The director is added to the list of objects in the workspace. 125 * Increment the version number of the workspace. 126 * Create a director parameter "initialQueueCapacity" with the default 127 * value 1. This sets the initial capacities of the queues in all 128 * the receivers created in the PN domain. 129 * @param workspace The workspace of this object. 130 * @exception IllegalActionException If the name has a period in it, or 131 * the director is not compatible with the specified container. 132 * @exception NameDuplicationException If the container already contains 133 * an entity with the specified name. 134 */ 135 public TimedPNDirector(Workspace workspace) 136 throws IllegalActionException, NameDuplicationException { 137 super(workspace); 138 } 139 140 /** Construct a director in the given container with the given name. 141 * If the container argument must not be null, or a 142 * NullPointerException will be thrown. 143 * If the name argument is null, then the name is set to the 144 * empty string. Increment the version number of the workspace. 145 * Create a director parameter "initialQueueCapacity" with the default 146 * value 1. This sets the initial capacities of the queues in all 147 * the receivers created in the PN domain. 148 * @param container Container of the director. 149 * @param name Name of this director. 150 * @exception IllegalActionException If the director is not compatible 151 * with the specified container. May be thrown in derived classes. 152 * @exception NameDuplicationException If the container not a 153 * CompositeActor and the name collides with an entity in the container. 154 */ 155 public TimedPNDirector(CompositeEntity container, String name) 156 throws IllegalActionException, NameDuplicationException { 157 super(container, name); 158 } 159 160 /////////////////////////////////////////////////////////////////// 161 //// public methods //// 162 163 /** Clone the director into the specified workspace. The new object is 164 * <i>not</i> added to the directory of that workspace (It must be added 165 * by the user if he wants it to be there). The result is a new director 166 * with no container and no topology listeners. The count of active 167 * processes is zero. The parameter "initialQueueCapacity" has the 168 * same value as the director being cloned. 169 * 170 * @param workspace The workspace for the cloned object. 171 * @exception CloneNotSupportedException If one of the attributes 172 * cannot be cloned. 173 * @return The new TimedPNDirector. 174 */ 175 @Override 176 public Object clone(Workspace workspace) throws CloneNotSupportedException { 177 TimedPNDirector newObject = (TimedPNDirector) super.clone(workspace); 178 179 // Findbugs: 180 // [M M IS] Inconsistent synchronization [IS2_INCONSISTENT_SYNC] 181 // Actually this is not a problem since the object is 182 // being created and hence nobody else has access to it. 183 184 newObject._eventQueue = new CalendarQueue( 185 new TimedEvent.TimeComparator()); 186 newObject._delayBlockCount = 0; 187 return newObject; 188 } 189 190 /** Suspend the calling process until the time has advanced to at least the 191 * time specified by the method argument. 192 * Add the actor corresponding to the calling process to the priority 193 * queue and sort it by the time specified by the method argument. 194 * Increment the count of the actors blocked on a delay. Suspend the 195 * calling process until the time has advanced to at least the time 196 * specified by the method argument. Resume the execution of the calling 197 * process and return. 198 * @param actor The actor scheduled to be fired. 199 * @param newFiringTime The scheduled time. 200 * @param microstep The microstep (ignored by this director). 201 * @return the value of the newFiringTime argument. 202 * @exception IllegalActionException If the operation is not 203 * permissible (e.g. the given time is in the past). 204 */ 205 @Override 206 public synchronized Time fireAt(Actor actor, Time newFiringTime, 207 int microstep) throws IllegalActionException { 208 if (newFiringTime.compareTo(getModelTime()) < 0) { 209 throw new IllegalActionException(this, 210 "The process wants to " + " get fired in the past!"); 211 } 212 213 _eventQueue.put(new TimedEvent(newFiringTime, actor)); 214 _informOfDelayBlock(); 215 216 try { 217 while (getModelTime().compareTo(newFiringTime) < 0) { 218 wait(); 219 } 220 } catch (InterruptedException e) { 221 System.err.println(e.toString()); 222 } 223 return newFiringTime; 224 } 225 226 /** Set a new value to the current time of the model, where 227 * the new time must be no earlier than the current time. 228 * @param newTime The new time of the model. 229 * @exception IllegalActionException If an attempt is made to change the 230 * time to less than the current time. 231 */ 232 @Override 233 public void setModelTime(Time newTime) throws IllegalActionException { 234 if (newTime.compareTo(getModelTime()) < 0) { 235 throw new IllegalActionException(this, 236 "Attempt to set the " + "time to past."); 237 } else { 238 super.setModelTime(newTime); 239 } 240 } 241 242 /////////////////////////////////////////////////////////////////// 243 //// protected methods //// 244 245 /** 246 * Reset private variables. 247 * added 7/15/08 Patricia Derler 248 */ 249 @Override 250 public void wrapup() throws IllegalActionException { 251 _delayBlockCount = 0; 252 _eventQueue.clear(); 253 } 254 255 /** Return true if a deadlock is detected. Return false otherwise. 256 * Return true if all the active processes in the container are either 257 * read-blocked, write-blocked or delay-blocked. 258 * @return true if a deadlock is detected. 259 */ 260 @Override 261 protected synchronized boolean _areThreadsDeadlocked() { 262 if (_readBlockedQueues.size() + _writeBlockedQueues.size() 263 + _delayBlockCount >= _getActiveThreadsCount()) { 264 return true; 265 } else { 266 return false; 267 } 268 } 269 270 /** Increment by 1 the count of actors waiting for the time to advance. 271 * Check for a resultant deadlock or pausing of the 272 * execution. If either of them is detected, then notify the directing 273 * thread of the same. 274 */ 275 protected synchronized void _informOfDelayBlock() { 276 _delayBlockCount++; 277 notifyAll(); 278 } 279 280 /** Decrease by 1 the count of processes blocked on a delay. 281 */ 282 protected synchronized void _informOfDelayUnblock() { 283 _delayBlockCount--; 284 return; 285 } 286 287 /** Return false on detection of a real deadlock. Otherwise break the 288 * deadlock and return true. 289 * On detection of a timed deadlock, advance time to the earliest 290 * time that a delayed process is waiting for, wake up all the actors 291 * waiting for time to advance to the new time, and remove them from 292 * the priority queue. This method is synchronized on the director. 293 * @return true if a real deadlock is detected, false otherwise. 294 * @exception IllegalActionException Not thrown in this base class. 295 * This might be thrown by derived classes. 296 */ 297 @Override 298 protected boolean _resolveDeadlock() throws IllegalActionException { 299 if (_writeBlockedQueues.size() != 0) { 300 // Artificial deadlock based on write blocks. 301 _incrementLowestWriteCapacityPort(); 302 return true; 303 } else if (_delayBlockCount == 0) { 304 // Real deadlock with no delayed processes. 305 return false; 306 } else { 307 // Artificial deadlock due to delayed processes. 308 // Advance time to next possible time. 309 synchronized (this) { 310 // There could be multiple events for the same 311 // actor for the same time (e.g. by sending events 312 // to this actor with same time stamps on different 313 // input ports. Thus, only _informOfDelayUnblock() 314 // for events with the same time stamp but different 315 // actors. 7/15/08 Patricia Derler 316 List unblockedActors = new ArrayList(); 317 if (!_eventQueue.isEmpty()) { 318 //Take the first time-blocked process from the queue. 319 TimedEvent event = (TimedEvent) _eventQueue.take(); 320 unblockedActors.add(event.contents); 321 //Advance time to the resumption time of this process. 322 setModelTime(event.timeStamp); 323 _informOfDelayUnblock(); 324 } else { 325 throw new InternalErrorException("Inconsistency" 326 + " in number of actors blocked on delays count" 327 + " and the entries in the CalendarQueue"); 328 } 329 330 //Remove any other process waiting to be resumed at the new 331 //advanced time (the new currentTime). 332 boolean sameTime = true; 333 334 while (sameTime) { 335 //If queue is not empty, then determine the resumption 336 //time of the next process. 337 if (!_eventQueue.isEmpty()) { 338 //Remove the first process from the queue. 339 TimedEvent event = (TimedEvent) _eventQueue.take(); 340 Actor actor = (Actor) event.contents; 341 342 //Get the resumption time of the newly removed 343 //process. 344 Time newTime = event.timeStamp; 345 346 //If the resumption time of the newly removed 347 //process is the same as the newly advanced time 348 //then unblock it. Else put the newly removed 349 //process back on the event queue. 350 if (newTime.equals(getModelTime())) { 351 if (unblockedActors.contains(actor)) { 352 continue; 353 } else { 354 unblockedActors.add(actor); 355 } 356 _informOfDelayUnblock(); 357 } else { 358 _eventQueue.put(new TimedEvent(newTime, actor)); 359 sameTime = false; 360 } 361 } else { 362 sameTime = false; 363 } 364 } 365 366 //Wake up all delayed actors 367 notifyAll(); 368 } 369 } 370 371 return true; 372 } 373 374 /////////////////////////////////////////////////////////////////// 375 //// protected variables //// 376 377 /** The priority queue that stores the list of processes waiting for time 378 * to advance. These processes are sorted by the time they want to resume 379 * at. 380 */ 381 protected CalendarQueue _eventQueue = new CalendarQueue( 382 new TimedEvent.TimeComparator()); 383 384 /** The number of time-blocked processes. */ 385 protected int _delayBlockCount = 0; 386}