001/* Director for Kahn-MacQueen process network semantics.
002
003 Copyright (c) 1998-2014 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.domains.pn.kernel;
029
030import java.lang.ref.WeakReference;
031import java.util.HashMap;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.ListIterator;
035
036import ptolemy.actor.CompositeActor;
037import ptolemy.actor.IORelation;
038import ptolemy.actor.Receiver;
039import ptolemy.actor.process.CompositeProcessDirector;
040import ptolemy.actor.process.ProcessReceiver;
041import ptolemy.data.IntToken;
042import ptolemy.data.expr.Parameter;
043import ptolemy.data.type.BaseType;
044import ptolemy.domains.pn.kernel.event.PNProcessListener;
045import ptolemy.kernel.CompositeEntity;
046import ptolemy.kernel.util.IllegalActionException;
047import ptolemy.kernel.util.InternalErrorException;
048import ptolemy.kernel.util.NameDuplicationException;
049import ptolemy.kernel.util.Workspace;
050
051///////////////////////////////////////////////////////////////////
052//// PNDirector
053
054/**
055 <p>A PNDirector governs the execution of a CompositeActor with extended
056 Kahn-MacQueen process networks (PN) semantics. This model of computation has
057 been extended to support mutations of graphs in a non-deterministic way.
058 </p><p>
059 The thread that calls the various execution methods (initialize, prefire, fire
060 and postfire) on the director is referred to as the <i>directing thread</i>.
061 This directing thread might be the main thread responsible for the execution
062 of the entire simulation or might be the thread created by the executive
063 director of the containing composite actor.
064 </p><p>
065 In the PN domain, the director creates a thread (an instance of
066 ProcessThread), representing a Kahn process, for each actor in the model.
067 The threads are created in initialize() and started in the prefire() method
068 of the ProcessDirector. A process is considered <i>active</i> from its
069 creation until its termination. An active process can block when trying to
070 read from a channel (read-blocked), when trying to write to a channel
071 (write-blocked) or when waiting for a queued topology change request to be
072 processed (mutation-blocked).
073 </p><p>
074 A <i>deadlock</i> is when all the active processes are blocked.
075 The director is responsible for handling deadlocks during execution.
076 This director handles two different sorts of deadlocks, <i>real deadlock</i>
077 and <i>artificial deadlock</i>.
078 </p><p>
079 A real deadlock is when all the processes are blocked on a read meaning that
080 no process can proceed until it receives new data. The execution can be
081 terminated, if desired, in such a situation. If the container of this director
082 does not have any input ports (as is in the case of a top-level composite
083 actor), then the executive director or manager terminates the execution.
084 If the container has input ports, then it is up to the
085 executive director of the container to decide on the termination of the
086 execution. To terminate the execution after detection of a real deadlock, the
087 manager or the executive director calls wrapup() on the director.
088 </p><p>
089 An artificial deadlock is when all processes are blocked and at least one
090 process is blocked on a write. In this case the director increases the
091 capacity of the receiver with the smallest capacity amongst all the
092 receivers on which a process is blocked on a write.
093 This breaks the deadlock and the execution can resume.
094 If the increase results in a capacity that exceeds the value of
095 <i>maximumQueueCapacity</i>, then instead of breaking the deadlock,
096 an exception is thrown.  This can be used to detect erroneous models
097 that require unbounded queues.</p>
098
099 <p>There are at least three ways for a PN model to terminate itself:
100 <ol>
101 <li>Have the model starve itself.  Typically, a boolean switch is used.
102 See the PN OrderedMerge demo at
103  <a href="ptolemy/domains/pn/demo/OrderedMerge/OrderedMerge.xml"><code>ptolemy/domains/pn/demo/OrderedMerge/OrderedMerge.xml</code></a>
104
105 <li>Have the model call the Stop actor.  See the PN RemoveNilTokens demo at
106  <a href="ptolemy/domains/pn/demo/RemoveNilTokens/RemoveNilTokens.xml"><code>ptolemy/domains/pn/demo/RemoveNilTokens/RemoveNilTokens.xmll</code></a>
107
108 <li>Set the <i>firingCountLimit</i>
109 ({@link ptolemy.actor.lib.LimitedFiringSource#_firingCountLimit}) actor
110 parameter to the number of iterations desired.  Actors such as Ramp
111 extend LimitedFiringSource and have the <i>firingCountLimit</i> parameter.
112 </ol>
113
114
115 @author Mudit Goel, Edward A. Lee, Xiaowen Xin
116 @version $Id$
117 @since Ptolemy II 0.2
118 @Pt.ProposedRating Green (mudit)
119 @Pt.AcceptedRating Green (davisj)
120 */
121public class PNDirector extends CompositeProcessDirector {
122    /** Construct a director in the default workspace with an empty string
123     *  as its name. The director is added to the list of objects in
124     *  the workspace. Increment the version number of the workspace.
125     *  Create a director parameter "initialQueueCapacity" with the default
126     *  value 1. This sets the initial capacities of the queues in all
127     *  the receivers created in the PN domain.
128     *  @exception IllegalActionException If the name has a period in it, or
129     *   the director is not compatible with the specified container.
130     *  @exception NameDuplicationException If the container already contains
131     *   an entity with the specified name.
132     */
133    public PNDirector()
134            throws IllegalActionException, NameDuplicationException {
135        super();
136        _init();
137    }
138
139    /** Construct a director in the  workspace with an empty name.
140     *  The director is added to the list of objects in the workspace.
141     *  Increment the version number of the workspace.
142     *  Create a director parameter "initialQueueCapacity" with the default
143     *  value 1. This sets the initial capacities of the queues in all
144     *  the receivers created in the PN domain.
145     *  @param workspace The workspace of this object.
146     *  @exception IllegalActionException If the name has a period in it, or
147     *   the director is not compatible with the specified container.
148     *  @exception NameDuplicationException If the container already contains
149     *   an entity with the specified name.
150     */
151    public PNDirector(Workspace workspace)
152            throws IllegalActionException, NameDuplicationException {
153        super(workspace);
154        _init();
155    }
156
157    /** Construct a director in the given container with the given name.
158     *  If the container argument must not be null, or a
159     *  NullPointerException will be thrown.
160     *  If the name argument is null, then the name is set to the
161     *  empty string. Increment the version number of the workspace.
162     *
163     *  Create a director parameter "initialQueueCapacity" with the default
164     *  value 1. This sets the initial capacities of the queues in all
165     *  the receivers created in the PN domain.
166     *  @param container Container of the director.
167     *  @param name Name of this director.
168     *  @exception IllegalActionException If the director is not compatible
169     *   with the specified container.  Thrown in derived classes.
170     *  @exception NameDuplicationException If the container not a
171     *   CompositeActor and the name collides with an entity in the container.
172     */
173    public PNDirector(CompositeEntity container, String name)
174            throws IllegalActionException, NameDuplicationException {
175        super(container, name);
176        _init();
177    }
178
179    ///////////////////////////////////////////////////////////////////
180    ////                         parameters                        ////
181
182    /** The initial size of the queues for each communication channel.
183     *  This is an integer that defaults to 1.
184     */
185    public Parameter initialQueueCapacity;
186
187    /** The maximum size of the queues for each communication channel.
188     *  This is an integer that defaults to 65536.  To specify unbounded
189     *  queues, set this to 0.
190     */
191    public Parameter maximumQueueCapacity;
192
193    ///////////////////////////////////////////////////////////////////
194    ////                         public methods                    ////
195
196    /** Add a process state change listener to this director. The listener
197     *  will be notified of each change to the state of a process.
198     *  @param listener The PNProcessListener to add.
199     *  @see #removeProcessListener(PNProcessListener)
200     */
201    public void addProcessListener(PNProcessListener listener) {
202        _processListeners.add(listener);
203    }
204
205    /** Clone the director into the specified workspace. The new object is
206     *  <i>not</i> added to the directory of that workspace (It must be added
207     *  by the user if he wants it to be there).
208     *  The result is a new director with no container, no pending mutations,
209     *  and no topology listeners. The count of active processes is zero.
210     *
211     *  @param workspace The workspace for the cloned object.
212     *  @exception CloneNotSupportedException If one of the attributes
213     *   cannot be cloned.
214     *  @return The new PNDirector.
215     */
216    @Override
217    public Object clone(Workspace workspace) throws CloneNotSupportedException {
218        PNDirector newObject = (PNDirector) super.clone(workspace);
219        //System.out.println("PNDirector.clone: " + _processListeners);
220        newObject._processListeners = new LinkedList();
221        //System.out.println("PNDirector.clone: " + _processListeners + " clone: " + newObject._processListeners);
222        newObject._readBlockedQueues = new HashMap();
223        newObject._receivers = new LinkedList();
224        newObject._writeBlockedQueues = new HashMap();
225        return newObject;
226    }
227
228    /** Invoke the initialize() method of ProcessDirector. Also set all the
229     *  state variables to the their initial values. The list of process
230     *  listeners is not reset as the developer might want to reuse the
231     *  list of listeners.
232     *  @exception IllegalActionException If the initialize() method of one
233     *  of the deeply contained actors throws it.
234     */
235    @Override
236    public void initialize() throws IllegalActionException {
237        // Initialize these counts BEFORE creating threads.
238        _readBlockedQueues.clear();
239        _writeBlockedQueues.clear();
240
241        super.initialize();
242    }
243
244    /** Return a new receiver compatible with this director. The receiver
245     *  is an instance of PNQueueReceiver. Set the initial capacity
246     *  of the FIFO queue in the receiver to the value specified by the
247     *  director parameter "initialQueueCapacity". The default value
248     *  of the parameter is 1.
249     *  @return A new PNQueueReceiver.
250     */
251    @Override
252    public Receiver newReceiver() {
253        PNQueueReceiver receiver = new PNQueueReceiver();
254        _receivers.add(new WeakReference(receiver));
255
256        // Set the capacity to the default. Note that it will also
257        // be set in preinitialize().
258        try {
259            int capacity = ((IntToken) initialQueueCapacity.getToken())
260                    .intValue();
261            receiver.setCapacity(capacity);
262        } catch (IllegalActionException e) {
263            throw new InternalErrorException(e);
264        }
265
266        return receiver;
267    }
268
269    /** Return true if the containing composite actor contains active
270     *  processes and the composite actor has input ports and if stop()
271     *  has not been called. Return false otherwise. This method is
272     *  normally called only after detecting a real deadlock, or if
273     *  stopFire() is called. True is returned to indicate that the
274     *  composite actor can start its execution again if it
275     *  receives data on any of its input ports.
276     *  @return true to indicate that the composite actor can continue
277     *  executing on receiving additional input on its input ports.
278     *  @exception IllegalActionException Not thrown in this base class. May be
279     *  thrown by derived classes.
280     */
281    @Override
282    public boolean postfire() throws IllegalActionException {
283        _notDone = super.postfire();
284
285        // If the container has input ports and there are active processes
286        // in the container, then the execution might restart on receiving
287        // additional data.
288        if (!((CompositeActor) getContainer()).inputPortList().isEmpty()
289                && _getActiveThreadsCount() != 0) {
290            // Avoid returning false on detected deadlock.
291            return !_stopRequested;
292        } else {
293            return _notDone;
294        }
295    }
296
297    /** Override the base class to reset the capacities of all the receivers.
298     *  @exception IllegalActionException If the superclass throws it.
299     */
300    @Override
301    public void preinitialize() throws IllegalActionException {
302        super.preinitialize();
303
304        // Check that no relation has multiple sources of data connected to it.
305        // FIXME: This only detects the error at this level of the hierarchy.
306        // Probably need to recursively descend into composite actors.
307        CompositeEntity container = (CompositeEntity) getContainer();
308        Iterator relations = container.relationList().iterator();
309
310        while (relations.hasNext()) {
311            IORelation relation = (IORelation) relations.next();
312
313            if (relation.linkedSourcePortList().size() > 1) {
314                throw new IllegalActionException(relation,
315                        "Relation has multiple sources of data,"
316                                + " which is not allowed in PN."
317                                + " If you want nondeterministic merge,"
318                                + " use the NondeterministicMerge actor.");
319            }
320        }
321
322        // Reset the capacities of all the receivers.
323        Parameter parameter = (Parameter) getAttribute("initialQueueCapacity");
324        int capacity = ((IntToken) parameter.getToken()).intValue();
325        ListIterator receivers = _receivers.listIterator();
326
327        while (receivers.hasNext()) {
328            WeakReference reference = (WeakReference) receivers.next();
329
330            if (reference.get() == null) {
331                // Reference has been garbage collected.
332                receivers.remove();
333            } else {
334                PNQueueReceiver receiver = (PNQueueReceiver) reference.get();
335                if (receiver.getDirector() == this) {
336                    receiver.clear();
337                    receiver.setCapacity(capacity);
338                } else {
339                    // If the director is not this, then
340                    // the receiver is no longer in use and
341                    // can be removed.
342                    receivers.remove();
343                }
344            }
345        }
346    }
347
348    /** Remove a process listener from this director.
349     *  If the listener is not attached to this director, do nothing.
350     *
351     *  @param listener The PNProcessListener to be removed.
352     *  @see #addProcessListener(PNProcessListener)
353     */
354    public void removeProcessListener(PNProcessListener listener) {
355        _processListeners.remove(listener);
356    }
357
358    /** Return an array of suggested ModalModel directors  to use with
359     *  PNDirector. The default director is MultirateFSMDirector, the
360     *  alternative director is FSMDirector.
361     *  @return An array of suggested directors to be used with ModalModel.
362     *  @see ptolemy.actor.Director#suggestedModalModelDirectors()
363     */
364    @Override
365    public String[] suggestedModalModelDirectors() {
366        return new String[] {
367                "ptolemy.domains.modal.kernel.MultirateFSMDirector",
368                "ptolemy.domains.modal.kernel.FSMDirector",
369                "ptolemy.domains.modal.kernel.NonStrictFSMDirector" };
370    }
371
372    /** Return true to indicate that a ModalModel under control
373     *  of this director supports multirate firing.
374     *  @return True indicating a ModalModel under control of this director
375     *  supports multirate firing.
376     */
377    @Override
378    public boolean supportMultirateFiring() {
379        return true;
380    }
381
382    /** Notify the director that the specified thread is blocked
383     *  on an I/O operation.
384     *  @param thread The thread.
385     *  @param receiver The receiver handling the I/O operation,
386     *   or null if it is not a specific receiver.
387     *  @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED
388     *   to indicate whether the thread is blocked on read or write.
389     *  @see CompositeProcessDirector#threadBlocked(Thread, ProcessReceiver)
390     */
391    public synchronized void threadBlocked(Thread thread,
392            ProcessReceiver receiver, boolean readOrWrite) {
393        if (readOrWrite == READ_BLOCKED) {
394            _readBlockedQueues.put(receiver, thread);
395        } else {
396            _writeBlockedQueues.put(receiver, thread);
397        }
398
399        super.threadBlocked(thread, receiver);
400    }
401
402    /** Notify the director that the specified thread is unblocked
403     *  on an I/O operation.  If the thread has
404     *  not been registered with threadBlocked(), then this call is
405     *  ignored.
406     *  @param thread The thread.
407     *  @param receiver The receiver handling the I/O operation,
408     *   or null if it is not a specific receiver.
409     *  @param readOrWrite Either READ_BLOCKED or WRITE_BLOCKED
410     *   to indicate whether the thread is blocked on read or write.
411     *  @see CompositeProcessDirector#threadUnblocked(Thread, ProcessReceiver)
412     */
413    public synchronized void threadUnblocked(Thread thread,
414            ProcessReceiver receiver, boolean readOrWrite) {
415        if (readOrWrite == READ_BLOCKED) {
416            _readBlockedQueues.remove(receiver);
417        } else {
418            _writeBlockedQueues.remove(receiver);
419        }
420
421        super.threadUnblocked(thread, receiver);
422    }
423
424    ///////////////////////////////////////////////////////////////////
425    ////                         public variables                  ////
426
427    /** Indicator that a thread is read blocked. */
428    public static final boolean READ_BLOCKED = true;
429
430    /** Indicator that a thread is write blocked. */
431    public static final boolean WRITE_BLOCKED = false;
432
433    ///////////////////////////////////////////////////////////////////
434    ////                         protected methods                 ////
435
436    /** Double the capacity of one of the queues with the smallest
437     *  capacity belonging to a receiver on which a process is blocked
438     *  while attempting to write. <p>Traverse through the list of receivers
439     *  on which a process is blocked on a write and choose the one containing
440     *  the queue with the smallest capacity. Double the capacity
441     *  if the capacity is non-negative. In case the capacity is
442     *  negative, set the capacity to 1.
443     *  Unblock the process blocked on a write to the receiver containing this
444     *  queue.
445     *  Notify the thread corresponding to the blocked process to resume
446     *  its execution and return.</p>
447     *  @exception IllegalActionException If the resulting capacity would
448     *   exceed the value of <i>maximumQueueCapacity</i>.
449     */
450    protected synchronized void _incrementLowestWriteCapacityPort()
451            throws IllegalActionException {
452        // NOTE: This is synchronized as a precaution, although in theory
453        // it gets called only within a synchronized block of the fire()
454        // method of the parent ProcessDirector. It must be synchronized
455        // because of the notifyAll() call at the end.
456        PNQueueReceiver smallestCapacityQueue = null;
457        int smallestCapacity = -1;
458        Iterator receivers = _writeBlockedQueues.keySet().iterator();
459
460        if (!receivers.hasNext()) {
461            return;
462        }
463
464        while (receivers.hasNext()) {
465            PNQueueReceiver queue = (PNQueueReceiver) receivers.next();
466
467            if (smallestCapacity == -1) {
468                smallestCapacityQueue = queue;
469                smallestCapacity = queue.getCapacity();
470            } else if (smallestCapacity > queue.getCapacity()) {
471                smallestCapacityQueue = queue;
472                smallestCapacity = queue.getCapacity();
473            }
474        }
475
476        int capacity = smallestCapacityQueue.getCapacity();
477
478        if (capacity <= 0) {
479            smallestCapacityQueue.setCapacity(1);
480            capacity = 1;
481        } else {
482            int maximumCapacity = ((IntToken) maximumQueueCapacity.getToken())
483                    .intValue();
484
485            if (maximumCapacity > 0 && capacity * 2 > maximumCapacity) {
486                int channel = smallestCapacityQueue.getContainer()
487                        .getChannelForReceiver(smallestCapacityQueue);
488                String msg = "Queue size " + capacity * 2
489                        + " exceeds the maximum capacity in port "
490                        + smallestCapacityQueue.getContainer().getFullName()
491                        + (channel > 0 ? " (channel " + channel + ")" : "")
492                        + ". Perhaps you have an unbounded queue?";
493
494                if (_debugging) {
495                    _debug(msg);
496                }
497
498                throw new IllegalActionException(
499                        smallestCapacityQueue.getContainer(), msg);
500            }
501
502            smallestCapacityQueue.setCapacity(capacity * 2);
503        }
504
505        if (_debugging) {
506            _debug("increasing the capacity of receiver "
507                    + smallestCapacityQueue.getContainer() + " to "
508                    + smallestCapacityQueue.getCapacity());
509        }
510
511        // Need to mark any thread that is blocked on
512        // this receiver unblocked now, before the notification,
513        // or we will detect deadlock all over again and
514        // again increase the buffer sizes.
515        threadUnblocked((Thread) _writeBlockedQueues.get(smallestCapacityQueue),
516                smallestCapacityQueue, WRITE_BLOCKED);
517
518        return;
519    }
520
521    /** Resolve an artificial deadlock and return true. If the
522     *  deadlock is not an artificial deadlock (it is a real deadlock),
523     *  then return false.
524     *  If it is an artificial deadlock, select the
525     *  receiver with the smallest queue capacity on which any process is
526     *  blocked on a write and increment the capacity of the contained queue.
527     *  If the capacity is non-negative, then increment the capacity by 1.
528     *  Otherwise set the capacity to 1. Unblock the process blocked on
529     *  this receiver. Notify the thread corresponding to the blocked
530     *  process and return true.
531     *  <p>
532     *  If derived classes introduce new forms of deadlocks, they should
533     *  override this method to introduce mechanisms of handling those
534     *  deadlocks. This method is called from the fire() method of the director
535     *  alone.</p>
536     *  @return True after handling an artificial deadlock. Otherwise return
537     *  false.
538     *  @exception IllegalActionException If the maximum queue capacity
539     *   is exceeded.
540     *  This might be thrown by derived classes.
541     */
542    @Override
543    protected boolean _resolveInternalDeadlock() throws IllegalActionException {
544        if (_writeBlockedQueues.isEmpty() && !_readBlockedQueues.isEmpty()) {
545            // There is a real deadlock.
546            if (_debugging) {
547                _debug("Deadlock detected: no processes blocked on write, but some are blocked on read.");
548            }
549
550            return false;
551        } else if (_getActiveThreadsCount() == 0) {
552            // There is a real deadlock as no processes are active.
553            if (_debugging) {
554                _debug("No more active processes.");
555            }
556
557            return false;
558        } else {
559            // This is an artificial deadlock. Hence find the input port with
560            // lowest capacity queue that is blocked on a write and increment
561            // its capacity;
562            if (_debugging) {
563                _debug("Artificial Deadlock - increasing queue capacity.");
564            }
565
566            _incrementLowestWriteCapacityPort();
567            return true;
568        }
569    }
570
571    ///////////////////////////////////////////////////////////////////
572    ////                         protected variables               ////
573
574    /** The set of processes blocked on a read from a receiver. */
575    protected HashMap _readBlockedQueues = new HashMap();
576
577    /** The set of receivers blocked on a write to a receiver. */
578    protected HashMap _writeBlockedQueues = new HashMap();
579
580    /** The list of all receivers that this director has created. */
581    protected LinkedList _receivers = new LinkedList();
582
583    ///////////////////////////////////////////////////////////////////
584    ////                         private methods                   ////
585    private void _init()
586            throws IllegalActionException, NameDuplicationException {
587        initialQueueCapacity = new Parameter(this, "initialQueueCapacity",
588                new IntToken(1));
589        initialQueueCapacity.setTypeEquals(BaseType.INT);
590
591        maximumQueueCapacity = new Parameter(this, "maximumQueueCapacity",
592                new IntToken(65536));
593        maximumQueueCapacity.setTypeEquals(BaseType.INT);
594    }
595
596    ///////////////////////////////////////////////////////////////////
597    ////                         private variables                 ////
598
599    /** List of process listeners. */
600    private LinkedList _processListeners = new LinkedList();
601
602}