001/* Governs the execution of a CompositeActor with timed Kahn process
002 network semantics.
003
004 Copyright (c) 1998-2014 The Regents of the University of California.
005 All rights reserved.
006 Permission is hereby granted, without written agreement and without
007 license or royalty fees, to use, copy, modify, and distribute this
008 software and its documentation for any purpose, provided that the above
009 copyright notice and the following two paragraphs appear in all copies
010 of this software.
011
012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
016 SUCH DAMAGE.
017
018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 ENHANCEMENTS, OR MODIFICATIONS.
024
025 PT_COPYRIGHT_VERSION_2
026 COPYRIGHTENDKEY
027
028 */
029package ptolemy.domains.pn.kernel;
030
031import java.util.ArrayList;
032import java.util.List;
033
034import ptolemy.actor.Actor;
035import ptolemy.actor.util.CalendarQueue;
036import ptolemy.actor.util.Time;
037import ptolemy.actor.util.TimedEvent;
038import ptolemy.kernel.CompositeEntity;
039import ptolemy.kernel.util.IllegalActionException;
040import ptolemy.kernel.util.InternalErrorException;
041import ptolemy.kernel.util.NameDuplicationException;
042import ptolemy.kernel.util.Workspace;
043
044///////////////////////////////////////////////////////////////////
045//// TimedPNDirector
046
047/**
048 A TimedPNDirector governs the execution of a CompositeActor with
049 Kahn-MacQueen process networks (PN) semantics extended by introduction of a
050 notion of global time.
051 <p>
052 The thread that calls the various execution methods (initialize, prefire, fire
053 and postfire) on the director is referred to as the <i>directing thread</i>.
054 This directing thread might be the main thread responsible for the execution
055 of the entire simulation or might be the thread created by the executive
056 director of the containing composite actor.
057 <p>
058 In the PN domain, the director creates a thread (an instance of
059 ProcessThread), representing a Kahn process, for each actor in the model.
060 The threads are created in initialize() and started in the prefire() method
061 of the ProcessDirector. A process is considered <i>active</i> from its
062 creation until its termination. An active process can block when trying to
063 read from a channel (read-blocked), when trying to write to a channel
064 (write-blocked), or when waiting for time to progress (time-blocked). Time
065 can progress for an active process in this model of computation only when the
066 process is  blocked.
067 <p>
068 A <i>deadlock</i> is when all the active processes are blocked.
069 The director is responsible for handling deadlocks during execution.
070 This director handles three different sorts of deadlocks, real deadlock, timed
071 deadlock and artificial deadlock.
072 <p>
073 A real deadlock is when all the processes are blocked on a read meaning that
074 no process can proceed until it receives new data. The execution can be
075 terminated, if desired, in such a situation. If the container of this director
076 does not have any input ports (as is in the case of a top-level composite
077 actor), then the executive director or manager terminates the execution.
078 If the container has input ports, then it is up to the
079 executive director of the container to decide on the termination of the
080 execution. To terminate the execution after detection of a real deadlock, the
081 manager or the executive director calls wrapup() on the director.
082 <p>
083 An artificial deadlock is when all processes are blocked and at least one
084 process is blocked on a write. In this case the director increases the
085 capacity of the receiver with the smallest capacity amongst all the
086 receivers on which a process is blocked on a write.
087 This breaks the deadlock and the execution can proceed.
088 <p>
089 A timed deadlock is when all the processes under the control of this
090 director are blocked, at least one process is blocked on a delay (time-blocked)
091 and no process is blocked on a write. This director supports a notion of global
092 time. All active processes that are not blocked and are executing concurrently
093 are executing at the same global time. A process that wants time to advance,
094 suspends itself by calling the fireAt() method of the director and specifies
095 the time it wants to be awakened at. Time can advance only when a timed
096 deadlock occurs. In such a case, the director advances time to the time when
097 the first timed-blocked process can be awakened.
098 <p>
099
100 @author Mudit Goel
101 @version $Id$
102 @since Ptolemy II 0.2
103 @Pt.ProposedRating Green (mudit)
104 @Pt.AcceptedRating Green (davisj)
105 */
106public class TimedPNDirector extends PNDirector {
107    /** Construct a director in the default workspace with an empty string
108     *  as its name. The director is added to the list of objects in
109     *  the workspace. Increment the version number of the workspace.
110     *  Create a director parameter "initialQueueCapacity" with the default
111     *  value 1. This sets the initial capacities of the FIFO queues in all
112     *  the receivers created in the PN domain.
113     *  @exception IllegalActionException If the name has a period in it, or
114     *   the director is not compatible with the specified container.
115     *  @exception NameDuplicationException If the container already contains
116     *   an entity with the specified name.
117     */
118    public TimedPNDirector()
119            throws IllegalActionException, NameDuplicationException {
120        super();
121    }
122
123    /**Construct a director in the  workspace with an empty name.
124     *  The director is added to the list of objects in the workspace.
125     *  Increment the version number of the workspace.
126     *  Create a director parameter "initialQueueCapacity" with the default
127     *  value 1. This sets the initial capacities of the queues in all
128     *  the receivers created in the PN domain.
129     *  @param workspace The workspace of this object.
130     *  @exception IllegalActionException If the name has a period in it, or
131     *   the director is not compatible with the specified container.
132     *  @exception NameDuplicationException If the container already contains
133     *   an entity with the specified name.
134     */
135    public TimedPNDirector(Workspace workspace)
136            throws IllegalActionException, NameDuplicationException {
137        super(workspace);
138    }
139
140    /** Construct a director in the given container with the given name.
141     *  If the container argument must not be null, or a
142     *  NullPointerException will be thrown.
143     *  If the name argument is null, then the name is set to the
144     *  empty string. Increment the version number of the workspace.
145     *  Create a director parameter "initialQueueCapacity" with the default
146     *  value 1. This sets the initial capacities of the queues in all
147     *  the receivers created in the PN domain.
148     *  @param container Container of the director.
149     *  @param name Name of this director.
150     *  @exception IllegalActionException If the director is not compatible
151     *   with the specified container.  May be thrown in derived classes.
152     *  @exception NameDuplicationException If the container not a
153     *   CompositeActor and the name collides with an entity in the container.
154     */
155    public TimedPNDirector(CompositeEntity container, String name)
156            throws IllegalActionException, NameDuplicationException {
157        super(container, name);
158    }
159
160    ///////////////////////////////////////////////////////////////////
161    ////                         public methods                    ////
162
163    /** Clone the director into the specified workspace. The new object is
164     *  <i>not</i> added to the directory of that workspace (It must be added
165     *  by the user if he wants it to be there). The result is a new director
166     *  with no container and no topology listeners. The count of active
167     *  processes is zero. The parameter "initialQueueCapacity" has the
168     *  same value as the director being cloned.
169     *
170     *  @param workspace The workspace for the cloned object.
171     *  @exception CloneNotSupportedException If one of the attributes
172     *   cannot be cloned.
173     *  @return The new TimedPNDirector.
174     */
175    @Override
176    public Object clone(Workspace workspace) throws CloneNotSupportedException {
177        TimedPNDirector newObject = (TimedPNDirector) super.clone(workspace);
178
179        // Findbugs:
180        //  [M M IS] Inconsistent synchronization [IS2_INCONSISTENT_SYNC]
181        // Actually this is not a problem since the object is
182        // being created and hence nobody else has access to it.
183
184        newObject._eventQueue = new CalendarQueue(
185                new TimedEvent.TimeComparator());
186        newObject._delayBlockCount = 0;
187        return newObject;
188    }
189
190    /** Suspend the calling process until the time has advanced to at least the
191     *  time specified by the method argument.
192     *  Add the actor corresponding to the calling process to the priority
193     *  queue and sort it by the time specified by the method argument.
194     *  Increment the count of the actors blocked on a delay. Suspend the
195     *  calling process until the time has advanced to at least the time
196     *  specified by the method argument. Resume the execution of the calling
197     *  process and return.
198     *  @param actor The actor scheduled to be fired.
199     *  @param newFiringTime The scheduled time.
200     *  @param microstep The microstep (ignored by this director).
201     *  @return the value of the newFiringTime argument.
202     *  @exception IllegalActionException If the operation is not
203     *  permissible (e.g. the given time is in the past).
204     */
205    @Override
206    public synchronized Time fireAt(Actor actor, Time newFiringTime,
207            int microstep) throws IllegalActionException {
208        if (newFiringTime.compareTo(getModelTime()) < 0) {
209            throw new IllegalActionException(this,
210                    "The process wants to " + " get fired in the past!");
211        }
212
213        _eventQueue.put(new TimedEvent(newFiringTime, actor));
214        _informOfDelayBlock();
215
216        try {
217            while (getModelTime().compareTo(newFiringTime) < 0) {
218                wait();
219            }
220        } catch (InterruptedException e) {
221            System.err.println(e.toString());
222        }
223        return newFiringTime;
224    }
225
226    /** Set a new value to the current time of the model, where
227     *  the new time must be no earlier than the current time.
228     *  @param newTime The new time of the model.
229     *  @exception IllegalActionException If an attempt is made to change the
230     *  time to less than the current time.
231     */
232    @Override
233    public void setModelTime(Time newTime) throws IllegalActionException {
234        if (newTime.compareTo(getModelTime()) < 0) {
235            throw new IllegalActionException(this,
236                    "Attempt to set the " + "time to past.");
237        } else {
238            super.setModelTime(newTime);
239        }
240    }
241
242    ///////////////////////////////////////////////////////////////////
243    ////                         protected methods                 ////
244
245    /**
246     * Reset private variables.
247     * added 7/15/08 Patricia Derler
248     */
249    @Override
250    public void wrapup() throws IllegalActionException {
251        _delayBlockCount = 0;
252        _eventQueue.clear();
253    }
254
255    /** Return true if a deadlock is detected. Return false otherwise.
256     *  Return true if all the active processes in the container are either
257     *  read-blocked, write-blocked or delay-blocked.
258     *  @return true if a deadlock is detected.
259     */
260    @Override
261    protected synchronized boolean _areThreadsDeadlocked() {
262        if (_readBlockedQueues.size() + _writeBlockedQueues.size()
263                + _delayBlockCount >= _getActiveThreadsCount()) {
264            return true;
265        } else {
266            return false;
267        }
268    }
269
270    /** Increment by 1 the count of actors waiting for the time to advance.
271     *  Check for a resultant deadlock or pausing of the
272     *  execution. If either of them is detected, then notify the directing
273     *  thread of the same.
274     */
275    protected synchronized void _informOfDelayBlock() {
276        _delayBlockCount++;
277        notifyAll();
278    }
279
280    /** Decrease by 1 the count of processes blocked on a delay.
281     */
282    protected synchronized void _informOfDelayUnblock() {
283        _delayBlockCount--;
284        return;
285    }
286
287    /** Return false on detection of a real deadlock. Otherwise break the
288     *  deadlock and return true.
289     *  On detection of a timed deadlock, advance time to the earliest
290     *  time that a delayed process is waiting for, wake up all the actors
291     *  waiting for time to advance to the new time, and remove them from
292     *  the priority queue. This method is synchronized on the director.
293     *  @return true if a real deadlock is detected, false otherwise.
294     *  @exception IllegalActionException Not thrown in this base class.
295     *  This might be thrown by derived classes.
296     */
297    @Override
298    protected boolean _resolveDeadlock() throws IllegalActionException {
299        if (_writeBlockedQueues.size() != 0) {
300            // Artificial deadlock based on write blocks.
301            _incrementLowestWriteCapacityPort();
302            return true;
303        } else if (_delayBlockCount == 0) {
304            // Real deadlock with no delayed processes.
305            return false;
306        } else {
307            // Artificial deadlock due to delayed processes.
308            // Advance time to next possible time.
309            synchronized (this) {
310                // There could be multiple events for the same
311                // actor for the same time (e.g. by sending events
312                // to this actor with same time stamps on different
313                // input ports. Thus, only _informOfDelayUnblock()
314                // for events with the same time stamp but different
315                // actors. 7/15/08 Patricia Derler
316                List unblockedActors = new ArrayList();
317                if (!_eventQueue.isEmpty()) {
318                    //Take the first time-blocked process from the queue.
319                    TimedEvent event = (TimedEvent) _eventQueue.take();
320                    unblockedActors.add(event.contents);
321                    //Advance time to the resumption time of this process.
322                    setModelTime(event.timeStamp);
323                    _informOfDelayUnblock();
324                } else {
325                    throw new InternalErrorException("Inconsistency"
326                            + " in number of actors blocked on delays count"
327                            + " and the entries in the CalendarQueue");
328                }
329
330                //Remove any other process waiting to be resumed at the new
331                //advanced time (the new currentTime).
332                boolean sameTime = true;
333
334                while (sameTime) {
335                    //If queue is not empty, then determine the resumption
336                    //time of the next process.
337                    if (!_eventQueue.isEmpty()) {
338                        //Remove the first process from the queue.
339                        TimedEvent event = (TimedEvent) _eventQueue.take();
340                        Actor actor = (Actor) event.contents;
341
342                        //Get the resumption time of the newly removed
343                        //process.
344                        Time newTime = event.timeStamp;
345
346                        //If the resumption time of the newly removed
347                        //process is the same as the newly advanced time
348                        //then unblock it. Else put the newly removed
349                        //process back on the event queue.
350                        if (newTime.equals(getModelTime())) {
351                            if (unblockedActors.contains(actor)) {
352                                continue;
353                            } else {
354                                unblockedActors.add(actor);
355                            }
356                            _informOfDelayUnblock();
357                        } else {
358                            _eventQueue.put(new TimedEvent(newTime, actor));
359                            sameTime = false;
360                        }
361                    } else {
362                        sameTime = false;
363                    }
364                }
365
366                //Wake up all delayed actors
367                notifyAll();
368            }
369        }
370
371        return true;
372    }
373
374    ///////////////////////////////////////////////////////////////////
375    ////                         protected variables               ////
376
377    /** The priority queue that stores the list of processes waiting for time
378     *  to advance. These processes are sorted by the time they want to resume
379     *  at.
380     */
381    protected CalendarQueue _eventQueue = new CalendarQueue(
382            new TimedEvent.TimeComparator());
383
384    /** The number of time-blocked processes. */
385    protected int _delayBlockCount = 0;
386}