001/* A receiver with a FIFO queue and performing blocking reads
002 and blocking writes.
003
004 Copyright (c) 1997-2015 The Regents of the University of California.
005 All rights reserved.
006 Permission is hereby granted, without written agreement and without
007 license or royalty fees, to use, copy, modify, and distribute this
008 software and its documentation for any purpose, provided that the above
009 copyright notice and the following two paragraphs appear in all copies
010 of this software.
011
012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
016 SUCH DAMAGE.
017
018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 ENHANCEMENTS, OR MODIFICATIONS.
024
025 PT_COPYRIGHT_VERSION_2
026 COPYRIGHTENDKEY
027
028 */
029package ptolemy.domains.pn.kernel;
030
031import ptolemy.actor.Actor;
032import ptolemy.actor.Director;
033import ptolemy.actor.IOPort;
034import ptolemy.actor.Manager;
035import ptolemy.actor.NoRoomException;
036import ptolemy.actor.QueueReceiver;
037import ptolemy.actor.process.BoundaryDetector;
038import ptolemy.actor.process.ProcessReceiver;
039import ptolemy.actor.process.TerminateProcessException;
040import ptolemy.data.Token;
041import ptolemy.kernel.util.IllegalActionException;
042import ptolemy.kernel.util.InvalidStateException;
043import ptolemy.kernel.util.Nameable;
044import ptolemy.kernel.util.Workspace;
045
046///////////////////////////////////////////////////////////////////
047//// PNQueueReceiver
048
049/**
050
051 A receiver with a FIFO queue that blocks the calling process on a read if the
052 FIFO queue is empty and on a write if the queue is full. Blocking read provides
053 the basic functionality of a FIFO channel in the process networks model of
054 computation. Blocking write supports the implementation suggested by Parks for
055 bounded memory execution of process networks.
056 <p>
057 Tokens are appended to the queue with the put() method, which blocks on a write
058 if the queue is full. Tokens are removed from the queue with the get() method,
059 which blocks on a read if the queue is empty.
060 In case a process blocks on a read or a write, the receiver informs the
061 director about the same.
062 The receiver also unblocks processes blocked on a read or a write. In case
063 a process is blocked on a read (read-blocked), it is unblocked on availability
064 of a token.  If a process is blocked on a write (write-blocked), it
065 is unblocked on the availability of room in the queue and informs the director
066 of the same.
067 <p>
068 This class is also responsible for pausing or terminating a process that tries
069 to read from or write to the receiver. In case of termination, the receiver
070 throws a TerminateProcessException when a process tries to read from or write
071 to the receiver. This terminates the process.
072 In case of pausing, the receiver suspends the process when it tries to read
073 from or write to the receiver and resumes it only after a request to resume the
074 process has been received.g
075 <p>
076
077 @author Mudit Goel, John S. Davis II, Edward A. Lee, Xiaowen Xin
078 @version $Id$
079 @since Ptolemy II 0.2
080 @Pt.ProposedRating Yellow (eal)
081 @Pt.AcceptedRating Red (hyzheng)
082 @see QueueReceiver
083 @see ptolemy.actor.QueueReceiver
084 */
085public class PNQueueReceiver extends QueueReceiver implements ProcessReceiver {
086    /** Construct an empty receiver with no container.
087     */
088    public PNQueueReceiver() {
089        super();
090        _boundaryDetector = new BoundaryDetector(this);
091    }
092
093    /** Construct an empty receiver with the specified container.
094     *  @param container The container of this receiver.
095     *  @exception IllegalActionException If the container does
096     *   not accept this receiver.
097     */
098    public PNQueueReceiver(IOPort container) throws IllegalActionException {
099        super(container);
100        _boundaryDetector = new BoundaryDetector(this);
101    }
102
103    ///////////////////////////////////////////////////////////////////
104    ////                         public methods                    ////
105
106    /** Set the container. This overrides the base class to record
107     *  the director.
108     *  @param port The container.
109     *  @exception IllegalActionException If the container is not of
110     *   an appropriate subclass of IOPort, or if the container's director
111     *   is not an instance of PNDirector.
112     */
113    @Override
114    public void setContainer(IOPort port) throws IllegalActionException {
115        super.setContainer(port);
116        if (port == null) {
117            _director = null;
118        } else {
119            Actor actor = (Actor) port.getContainer();
120            Director director;
121
122            // For a composite actor,
123            // the receiver type of an input port is decided by
124            // the executive director.
125            // While the receiver type of an output is decided by the director.
126            // NOTE: getExecutiveDirector() and getDirector() yield the same
127            // result for actors that do not contain directors.
128            if (port.isInput()) {
129                director = actor.getExecutiveDirector();
130            } else {
131                director = actor.getDirector();
132            }
133
134            if (!(director instanceof PNDirector)) {
135                throw new IllegalActionException(port,
136                        "Cannot use an instance of PNQueueReceiver "
137                                + "since the director is not a PNDirector.");
138            }
139
140            _director = (PNDirector) director;
141        }
142    }
143
144    /** Get a token from this receiver. If the receiver is empty then
145     *  block until a token becomes available. If this receiver is
146     *  terminated during the execution of this method, then throw a
147     *  TerminateProcessException.
148     *  @return The token contained by this receiver.
149     */
150    @Override
151    public Token get() {
152        Token result = null;
153        Workspace workspace = getContainer().workspace();
154        while (!_terminate) {
155            int depth = 0;
156            try {
157                // NOTE: This used to synchronize on this, but since it calls
158                // director methods that are synchronized on the director,
159                // this can cause deadlock.
160                synchronized (_director) {
161                    // Need to check this again after acquiring the lock.
162                    // Otherwise, could end up calling wait() below _after_
163                    // notification has occurred.
164                    if (_terminate) {
165                        break;
166                    }
167
168                    // Try to read.
169                    if (super.hasToken()) {
170                        result = super.get();
171
172                        // Need to mark any thread that is write blocked on
173                        // this receiver unblocked now, before any notification,
174                        // or we will detect deadlock and increase the buffer sizes.
175                        // Note that there is no need to clear the _readPending
176                        // reference because that will have been cleared by the write.
177                        if (_writePending != null) {
178                            _director.threadUnblocked(_writePending, this,
179                                    PNDirector.WRITE_BLOCKED);
180                            _writePending = null;
181                        }
182
183                        break;
184                    }
185                    _readPending = Thread.currentThread();
186                    _director.threadBlocked(Thread.currentThread(), this,
187                            PNDirector.READ_BLOCKED);
188
189                    // NOTE: We cannot use workspace.wait(Object) here without
190                    // introducing a race condition, because we have to release
191                    // the lock on the _director before calling workspace.wait(_director).
192                    depth = workspace.releaseReadPermission();
193                    _director.wait();
194                } // release lock on _director before reacquiring read permissions.
195            } catch (InterruptedException e) {
196                _terminate = true;
197            } finally {
198                if (depth > 0) {
199                    workspace.reacquireReadPermission(depth);
200                }
201            }
202        }
203        if (_terminate) {
204            throw new TerminateProcessException("");
205        }
206        return result;
207    }
208
209    /** Return the director in charge of this receiver, or null
210     *  if there is none.
211     *  @return The director in charge of this receiver.
212     */
213    public PNDirector getDirector() {
214        return _director;
215    }
216
217    /** Return true, since a channel in the Kahn process networks
218     *  model of computation is of infinite capacity and always has room.
219     *  @return True.
220     */
221    @Override
222    public boolean hasRoom() {
223        return true;
224    }
225
226    /** Return true, since a channel in the Kahn process networks
227     *  model of computation is of infinite capacity and always has room.
228     *  @param tokens The number of tokens, which is ignored in this method.
229     *  @return True.
230     */
231    @Override
232    public boolean hasRoom(int tokens) {
233        return true;
234    }
235
236    /** Return true, since a call to the get() method of the receiver will
237     *  always return a token if the call to get() ever returns.
238     *  @return True.
239     */
240    @Override
241    public boolean hasToken() {
242        return true;
243    }
244
245    /** Return true, since a call to the get() method of the receiver will
246     *  always return a token if the call to get() ever returns.
247     *  @param tokens The number of tokens, which is ignored in this method.
248     *  @return True.
249     */
250    @Override
251    public boolean hasToken(int tokens) {
252        return true;
253    }
254
255    /** Return true if this receiver is connected to the inside of a
256     *  boundary port. A boundary port is an opaque port that is
257     *  contained by a composite actor. If this receiver is connected
258     *  to the inside of a boundary port, then return true; otherwise
259     *  return false.
260     *  @return True if this receiver is connected to the inside of
261     *   a boundary port; return false otherwise.
262     *  @exception IllegalActionException If thrown by the boundary
263     *  detector.
264     *  @see ptolemy.actor.process.BoundaryDetector
265     */
266    @Override
267    public boolean isConnectedToBoundary() throws IllegalActionException {
268        return _boundaryDetector.isConnectedToBoundary();
269    }
270
271    /** Return true if this receiver is connected to the inside of a
272     *  boundary port. A boundary port is an opaque port that is
273     *  contained by a composite actor. If this receiver is connected
274     *  to the inside of a boundary port, then return true; otherwise
275     *  return false.
276     *  @return True if this receiver is connected to the inside of
277     *   a boundary port; return false otherwise.
278     *  @exception IllegalActionException If thrown by the boundary
279     *   detector.
280     *  @exception InvalidStateException If thrown by the boundary
281     *   detector.
282     *  @see ptolemy.actor.process.BoundaryDetector
283     */
284    @Override
285    public boolean isConnectedToBoundaryInside()
286            throws InvalidStateException, IllegalActionException {
287        return _boundaryDetector.isConnectedToBoundaryInside();
288    }
289
290    /** Return true if this receiver is connected to the outside of a
291     *  boundary port. A boundary port is an opaque port that is
292     *  contained by a composite actor. If this receiver is connected
293     *  to the outside of a boundary port, then return true; otherwise
294     *  return false.
295     *  @return True if this receiver is connected to the outside of
296     *   a boundary port; return false otherwise.
297     *  @exception IllegalActionException If thrown by the boundary
298     *   detector.
299     *  @see ptolemy.actor.process.BoundaryDetector
300     */
301    @Override
302    public boolean isConnectedToBoundaryOutside()
303            throws IllegalActionException {
304        return _boundaryDetector.isConnectedToBoundaryOutside();
305    }
306
307    /** Return true if this receiver is connected to the boundary.
308     *  That is, it is in an input port that is connected on
309     *  the outside to the inside of an input port, or it is on
310     *  the inside of an output port that is connected on the
311     *  outside to an input port higher in the hierarchy.
312     *  @see #isConnectedToBoundary()
313     *  @return True if this is connected to the boundary.
314     *  @exception IllegalActionException If thrown by the boundary
315     *   detector.
316     */
317    @Override
318    public boolean isConsumerReceiver() throws IllegalActionException {
319        if (isConnectedToBoundary()) {
320            return true;
321        }
322
323        return false;
324    }
325
326    /** Return true if this receiver is contained on the inside of a
327     *  boundary port. A boundary port is an opaque port that is
328     *  contained by a composite actor. If this receiver is contained
329     *  on the inside of a boundary port then return true; otherwise
330     *  return false.
331     *  @return True if this receiver is contained on the inside of
332     *   a boundary port; return false otherwise.
333     *  @see ptolemy.actor.process.BoundaryDetector
334     */
335    @Override
336    public boolean isInsideBoundary() {
337        return _boundaryDetector.isInsideBoundary();
338    }
339
340    /** Return true if this receiver is contained on the outside of a
341     *  boundary port. A boundary port is an opaque port that is
342     *  contained by a composite actor. If this receiver is contained
343     *  on the outside of a boundary port then return true; otherwise
344     *  return false.
345     *  @return True if this receiver is contained on the outside of
346     *   a boundary port; return false otherwise.
347     *  @see BoundaryDetector
348     */
349    @Override
350    public boolean isOutsideBoundary() {
351        return _boundaryDetector.isOutsideBoundary();
352    }
353
354    /** Return true if this receiver is at a boundary.
355     *  @return True if this receiver is at a boundary.
356     */
357    @Override
358    public boolean isProducerReceiver() {
359        if (isOutsideBoundary() || isInsideBoundary()) {
360            return true;
361        }
362
363        return false;
364    }
365
366    /** Return a true or false to indicate whether there is a read block
367     *  on this receiver or not, respectively.
368     *  @return a boolean indicating whether a read is blocked on this
369     *  receiver or not.
370     */
371    @Override
372    public boolean isReadBlocked() {
373        // NOTE: This method used to be synchronized on this
374        // receiver, but since it is called by synchronized methods in
375        // the director, that can cause deadlock.
376        synchronized (_director) {
377            return _readPending != null;
378        }
379    }
380
381    /** Return a true or false to indicate whether there is a write block
382     *  on this receiver or not.
383     *  @return A boolean indicating whether a write is blocked  on this
384     *  receiver or not.
385     */
386    @Override
387    public boolean isWriteBlocked() {
388        // NOTE: This method used to be synchronized on this
389        // receiver, but since it is called by synchronized methods in
390        // the director, that can cause deadlock.
391        synchronized (_director) {
392            return _writePending != null;
393        }
394    }
395
396    /** Put a token on the queue contained in this receiver.
397     *  If the queue is full, then suspend the calling thread (blocking
398     *  write) and inform the director of the same. Resume the process on
399     *  detecting room in the queue.
400     *  If a termination is requested, then initiate the termination of the
401     *  calling process by throwing a TerminateProcessException.
402     *  On detecting a room in the queue, put a token in the queue.
403     *  Check whether any process is blocked
404     *  on a read from this receiver. If a process is indeed blocked, then
405     *  unblock the process, and inform the director of the same.
406     *  @param token The token to be put in the receiver, or null to not put anything.
407     *  @exception NoRoomException If during initialization, capacity cannot be increased
408     *   enough to accommodate initial tokens.
409     */
410    @Override
411    public void put(Token token) throws NoRoomException {
412        IOPort port = getContainer();
413        if (port == null || token == null) {
414            return; // Nothing to do.
415        }
416        Workspace workspace = port.workspace();
417        while (!_terminate) {
418            int depth = 0;
419            try {
420                // NOTE: Avoid acquiring read access on the workspace
421                // while holding the lock on the director because if
422                // some other process is trying to acquire write access,
423                // the request for read access will be deferred.
424                Nameable container = getContainer().getContainer();
425                Manager manager = ((Actor) container).getManager();
426                // NOTE: This used to synchronize on this, but since it calls
427                // director methods that are synchronized on the director,
428                // this can cause deadlock.
429                synchronized (_director) {
430                    // Need to check this again after acquiring the lock.
431                    // Otherwise, could end up calling wait() below _after_
432                    // notification has occurred.
433                    if (_terminate) {
434                        break;
435                    }
436
437                    // If we are in the initialization phase, then we may have
438                    // to increase the queue capacity before proceeding. This
439                    // may be needed to support PublisherPorts that produce
440                    // initial tokens (or, I suppose, any actor that produces
441                    // initial tokens during initialize()?).
442                    if (!super.hasRoom()) {
443                        if (container instanceof Actor) {
444                            if (manager.getState()
445                                    .equals(Manager.INITIALIZING)) {
446                                try {
447                                    _queue.setCapacity(
448                                            _queue.getCapacity() + 1);
449                                } catch (IllegalActionException e) {
450                                    throw new NoRoomException(getContainer(),
451                                            "Failed to increase queue capacity enough to accommodate initial tokens");
452                                }
453                            }
454                        }
455                    }
456                    // Try to write.
457                    if (super.hasRoom()) {
458                        super.put(token);
459
460                        // If any thread is blocked on a get(), then it will become
461                        // unblocked. Notify the director now so that there isn't a
462                        // spurious deadlock detection.
463                        if (_readPending != null) {
464                            _director.threadUnblocked(_readPending, this,
465                                    PNDirector.READ_BLOCKED);
466                            _readPending = null;
467                        }
468
469                        // Normally, the _writePending reference will have
470                        // been cleared by the read that unblocked this write.
471                        // However, it might be that the director increased the
472                        // buffer size, which would also have the affect of unblocking
473                        // this write. Hence, we clear it here if it is set.
474                        if (_writePending != null) {
475                            _director.threadUnblocked(_writePending, this,
476                                    PNDirector.WRITE_BLOCKED);
477                            _writePending = null;
478                        }
479
480                        break;
481                    }
482
483                    // Wait to try again.
484                    _writePending = Thread.currentThread();
485                    _director.threadBlocked(_writePending, this,
486                            PNDirector.WRITE_BLOCKED);
487
488                    // NOTE: We cannot use workspace.wait(Object) here without
489                    // introducing a race condition, because we have to release
490                    // the lock on the _director before calling workspace.wait(_director).
491                    depth = workspace.releaseReadPermission();
492                    _director.wait();
493                } // release lock on _director before reacquiring read permissions.
494            } catch (InterruptedException e) {
495                _terminate = true;
496            } finally {
497                if (depth > 0) {
498                    workspace.reacquireReadPermission(depth);
499                }
500            }
501        }
502
503        if (_terminate) {
504            throw new TerminateProcessException("Process terminated.");
505        }
506    }
507
508    /** Reset the state variables in the receiver.
509     */
510    @Override
511    public void reset() {
512        if (_readPending != null) {
513            _director.threadUnblocked(_readPending, this,
514                    PNDirector.READ_BLOCKED);
515        }
516
517        if (_writePending != null) {
518            _director.threadUnblocked(_writePending, this,
519                    PNDirector.WRITE_BLOCKED);
520        }
521
522        _readPending = null;
523        _writePending = null;
524        _terminate = false;
525        _boundaryDetector.reset();
526    }
527
528    /** Set a flag in the receiver to indicate the onset of termination.
529     *  This will result in termination of any process that is either blocked
530     *  on the receiver or is trying to read from or write to it.
531     */
532    @Override
533    public void requestFinish() {
534        // NOTE: This method used to be synchronized on this
535        // receiver, but since it calls synchronized methods in
536        // the director, that can cause deadlock.
537        synchronized (_director) {
538            _terminate = true;
539            _director.notifyAll();
540        }
541    }
542
543    ///////////////////////////////////////////////////////////////////
544    ////                         protected variables               ////
545
546    /** The director in charge of this receiver. */
547    protected PNDirector _director;
548
549    /** Reference to a thread that is read blocked on this receiver. */
550    protected Thread _readPending = null;
551
552    /** Reference to a thread that is write blocked on this receiver. */
553    protected Thread _writePending = null;
554
555    ///////////////////////////////////////////////////////////////////
556    ////                         private variables                 ////
557
558    /** Flag indicating whether finish has been requested. */
559    protected boolean _terminate = false;
560
561    /** A BoundaryDetector determines the topological relationship of
562     *  a Receiver with respect to boundary ports.
563     */
564    protected BoundaryDetector _boundaryDetector;
565}