001/* A process receiver that stores tokens via a mailbox and can be used by
002 composite actors.
003
004 Copyright (c) 1997-2014 The Regents of the University of California.
005 All rights reserved.
006 Permission is hereby granted, without written agreement and without
007 license or royalty fees, to use, copy, modify, and distribute this
008 software and its documentation for any purpose, provided that the above
009 copyright notice and the following two paragraphs appear in all copies
010 of this software.
011
012 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
013 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
014 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
015 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
016 SUCH DAMAGE.
017
018 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
019 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
020 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
021 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
022 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
023 ENHANCEMENTS, OR MODIFICATIONS.
024
025 PT_COPYRIGHT_VERSION_2
026 COPYRIGHTENDKEY
027
028 */
029package ptolemy.actor.process;
030
031import ptolemy.actor.Actor;
032import ptolemy.actor.Director;
033import ptolemy.actor.IOPort;
034import ptolemy.actor.Mailbox;
035import ptolemy.data.Token;
036import ptolemy.kernel.util.IllegalActionException;
037import ptolemy.kernel.util.InvalidStateException;
038import ptolemy.kernel.util.Workspace;
039
040///////////////////////////////////////////////////////////////////
041//// MailboxBoundaryReceiver
042
043/**
044 A process receiver that stores tokens via a mailbox and can be used by
045 composite actors. This receiver extends the functionality of the mailbox
046 receiver found in the actor package in two key ways. First it facilitates
047 blocking reads and writes. If a read (a call to get()) is attempted when
048 this mailbox is empty then the call will block until a token is placed in
049 the mailbox. Similarly if a write (a call to put()) is attempted when
050 this mailbox is full (has a token) then the call will block until the
051 token is removed from the mailbox.
052 <P>
053 The second key feature of this mailbox receiver is that it can be used
054 by opaque composite actors operating in process-oriented models of
055 computation. Indeed the name "MailboxBoundaryReceiver" is used to
056 indicate that this receiver can be contained on the boundary of an
057 opaque composite actor. The get() and put() methods of mailbox boundary
058 receiver can be invoked by a Branch object. In such cases any blocks that
059 occur are registered with the calling branch. The branch will then serve
060 as a proxy by communicating to the director through the branch controller.
061 <P>
062 Note that it is not necessary for a mailbox boundary receiver to be used
063 in the ports of an opaque composite actor. It is perfectly fine for a
064 mailbox boundary receiver to be used in the ports of an atomic actor. In
065 such cases the get() and put() methods are called without the use of a
066 branch object. If blocking reads or writes occur they are registered with
067 the controlling director without the need for a branch or branch
068 controller.
069
070
071 @author John S. Davis II
072 @version $Id$
073 @since Ptolemy II 1.0
074 @Pt.ProposedRating Green (mudit)
075 @Pt.AcceptedRating Yellow (davisj)
076 @see ptolemy.actor.process.Branch
077 @see ptolemy.actor.process.BranchController
078
079 */
080public class MailboxBoundaryReceiver extends Mailbox
081        implements ProcessReceiver {
082    /** Construct an empty MailboxBoundaryReceiver with no container.
083     */
084    public MailboxBoundaryReceiver() {
085        super();
086        _boundaryDetector = new BoundaryDetector(this);
087    }
088
089    /** Construct an empty MailboxBoundaryReceiver with the specified
090     *  container.
091     *  @param container The container.
092     *  @exception IllegalActionException If the container cannot contain
093     *   this receiver.
094     */
095    public MailboxBoundaryReceiver(IOPort container)
096            throws IllegalActionException {
097        super(container);
098        _boundaryDetector = new BoundaryDetector(this);
099    }
100
101    ///////////////////////////////////////////////////////////////////
102    ////                         public methods                    ////
103
104    /** Get a token from this receiver. If the receiver is empty then
105     *  block until a token becomes available. Use the local director
106     *  to manage blocking reads that occur. If this receiver is
107     *  terminated during the execution of this method, then throw a
108     *  TerminateProcessException.
109     *
110     *  @return The token contained by this receiver.
111     */
112    @Override
113    public Token get() {
114        Workspace workspace = getContainer().workspace();
115        Token result = null;
116
117        int depth = 0;
118        try {
119            // NOTE: This used to synchronize on this, but since it calls
120            // director methods that are synchronized on the director,
121            // this can cause deadlock.
122            synchronized (_director) {
123                while (!_terminate) {
124                    // Try to read.
125                    if (super.hasToken()) {
126                        result = super.get();
127
128                        // Need to mark any thread that is write blocked
129                        // on this receiver unblocked now, before any
130                        // notification, or we will detect deadlock and
131                        // increase the buffer sizes.  Note that there is
132                        // no need to clear the _readPending reference
133                        // because that will have been cleared by the
134                        // write.
135                        if (_writePending != null) {
136                            _director.threadUnblocked(_writePending, this);
137                            _writePending = null;
138                        }
139
140                        break;
141                    }
142
143                    // Wait to try again.
144                    try {
145                        _readPending = Thread.currentThread();
146                        _director.threadBlocked(Thread.currentThread(), this);
147                        // NOTE: We cannot use workspace.wait(Object) here without
148                        // introducing a race condition, because we have to release
149                        // the lock on the _director before calling workspace.wait(_director).
150                        if (depth == 0) {
151                            depth = workspace.releaseReadPermission();
152                        }
153                        _director.wait();
154                    } catch (InterruptedException e) {
155                        _terminate = true;
156                    }
157                }
158
159                if (_terminate) {
160                    throw new TerminateProcessException("");
161                }
162            }
163        } finally {
164            if (depth > 0) {
165                workspace.reacquireReadPermission(depth);
166            }
167        }
168        return result;
169    }
170
171    /** Return the director in charge of this receiver, or null
172     *  if there is none.
173     *  @return The director in charge of this receiver.
174     */
175    public ProcessDirector getDirector() {
176        return _director;
177    }
178
179    /** Return true if this receiver is connected to a boundary port.
180     *  A boundary port is an opaque port that is contained by a
181     *  composite actor. If this receiver is connected to a boundary
182     *  port, then return true; otherwise return false.
183     *  This method is not synchronized so the caller should be.
184     *
185     *  @return True if this receiver is connected to boundary port;
186     *   return false otherwise.
187     * @exception IllegalActionException
188     */
189    @Override
190    public boolean isConnectedToBoundary() throws IllegalActionException {
191        return _boundaryDetector.isConnectedToBoundary();
192    }
193
194    /** Return true if this receiver is connected to the inside of an
195     *  input boundary port; return false otherwise. A boundary port is
196     *  an opaque port that is contained by a composite actor. This
197     *  method is not synchronized so the caller should be.
198     *
199     *  @return True if this receiver is connected to the inside of a
200     *   boundary port; return false otherwise.
201     * @exception IllegalActionException
202     * @exception InvalidStateException
203     */
204    @Override
205    public boolean isConnectedToBoundaryInside()
206            throws InvalidStateException, IllegalActionException {
207        return _boundaryDetector.isConnectedToBoundaryInside();
208    }
209
210    /** Return true if this receiver is connected to the outside of an
211     *  output boundary port; return false otherwise. A boundary port is
212     *  an opaque port that is contained by a composite actor. If this
213     *  receiver is contained on the inside of a boundary port, then return
214     *  false. This method is not synchronized so the caller should be.
215     *
216     *  @return True if this receiver is connected to the outside of a
217     *   boundary port; return false otherwise.
218     * @exception IllegalActionException
219     */
220    @Override
221    public boolean isConnectedToBoundaryOutside()
222            throws IllegalActionException {
223        return _boundaryDetector.isConnectedToBoundaryOutside();
224    }
225
226    /** Return true if this is a consumer receiver; return false otherwise.
227     *  A consumer receiver is defined as a receiver that is connected to
228     *  a boundary port.
229     *
230     *  @return True if this is a consumer receiver; return false otherwise.
231     * @exception IllegalActionException
232     */
233    @Override
234    public boolean isConsumerReceiver() throws IllegalActionException {
235        if (isConnectedToBoundary()) {
236            return true;
237        }
238
239        return false;
240    }
241
242    /** Return true if this receiver is contained on the inside of a
243     *  boundary port. A boundary port is an opaque port that is
244     *  contained by a composite actor. If this receiver is contained
245     *  on the inside of a boundary port then return true; otherwise
246     *  return false. This method is not synchronized so the caller
247     *  should be.
248     *
249     *  @return True if this receiver is contained on the inside of
250     *   a boundary port; return false otherwise.
251     */
252    @Override
253    public boolean isInsideBoundary() {
254        return _boundaryDetector.isInsideBoundary();
255    }
256
257    /** Return true if this receiver is contained on the outside of a
258     *  boundary port. A boundary port is an opaque port that is
259     *  contained by a composite actor. If this receiver is contained
260     *  on the outside of a boundary port then return true; otherwise
261     *  return false. This method is not synchronized so the caller
262     *  should be.
263     *
264     *  @return True if this receiver is contained on the outside of a
265     *   boundary port; return false otherwise.
266     */
267    @Override
268    public boolean isOutsideBoundary() {
269        return _boundaryDetector.isOutsideBoundary();
270    }
271
272    /** Return true if this is a producer receiver; return false otherwise.
273     *  A producer receiver is defined as a receiver that is connected to
274     *  a boundary port.
275     *
276     *  @return True if this is a producer receiver; return false otherwise.
277     */
278    @Override
279    public boolean isProducerReceiver() {
280        if (isOutsideBoundary() || isInsideBoundary()) {
281            return true;
282        }
283
284        return false;
285    }
286
287    /** Return a true or false to indicate whether there is a read block
288     *  on this receiver or not, respectively.
289     *  @return a boolean indicating whether a read is blocked on this
290     *  receiver or not.
291     */
292    @Override
293    public boolean isReadBlocked() {
294        // NOTE: This method used to be synchronized on this
295        // receiver, but since it is called by synchronized methods in
296        // the director, that can cause deadlock.
297        synchronized (_director) {
298            return _readPending != null;
299        }
300    }
301
302    /** Return a true or false to indicate whether there is a write block
303     *  on this receiver or not.
304     *  @return A boolean indicating whether a write is blocked  on this
305     *  receiver or not.
306     */
307    @Override
308    public boolean isWriteBlocked() {
309        // NOTE: This method used to be synchronized on this
310        // receiver, but since it is called by synchronized methods in
311        // the director, that can cause deadlock.
312        synchronized (_director) {
313            return _writePending != null;
314        }
315    }
316
317    /** Put a token into this receiver. If the receiver is full
318     *  (contains a token) then block until room becomes available.
319     *  Use the local director to manage blocking writes that occur.
320     *  If this receiver is terminated during the execution of this
321     *  method, then throw a TerminateProcessException.
322     *  If the specified token is null, this method does nothing.
323     *
324     *  @param token The token being placed in this receiver, or null
325     *   to do nothing.
326     */
327    @Override
328    public void put(Token token) {
329        if (token == null) {
330            return;
331        }
332        Workspace workspace = getContainer().workspace();
333        int depth = 0;
334        try {
335            // NOTE: This used to synchronize on this, but since it calls
336            // director methods that are synchronized on the director,
337            // this can cause deadlock.
338            synchronized (_director) {
339                while (!_terminate) {
340                    // Try to write.
341                    if (super.hasRoom()) {
342                        super.put(token);
343
344                        // If any thread is blocked on a get(), then it will become
345                        // unblocked. Notify the director now so that there isn't a
346                        // spurious deadlock detection.
347                        if (_readPending != null) {
348                            _director.threadUnblocked(_readPending, this);
349                            _readPending = null;
350                        }
351
352                        // Normally, the _writePending reference will have
353                        // been cleared by the read that unblocked this
354                        // write.  However, it might be that the director
355                        // increased the buffer size, which would also
356                        // have the affect of unblocking this
357                        // write. Hence, we clear it here if it is set.
358                        if (_writePending != null) {
359                            _director.threadUnblocked(_writePending, this);
360                            _writePending = null;
361                        }
362
363                        break;
364                    }
365
366                    // Wait to try again.
367                    try {
368                        _writePending = Thread.currentThread();
369                        _director.threadBlocked(_writePending, this);
370
371                        // NOTE: We cannot use workspace.wait(Object) here without
372                        // introducing a race condition, because we have to release
373                        // the lock on the _director before calling workspace.wait(_director).
374                        if (depth == 0) {
375                            depth = workspace.releaseReadPermission();
376                        }
377                        _director.wait();
378                    } catch (InterruptedException e) {
379                        _terminate = true;
380                    }
381                }
382
383                if (_terminate) {
384                    throw new TerminateProcessException("Process terminated.");
385                }
386            }
387        } finally {
388            if (depth > 0) {
389                workspace.reacquireReadPermission(depth);
390            }
391        }
392    }
393
394    /** Set a local flag requesting that execution of the actor
395     *  containing this receiver discontinue.
396     */
397    @Override
398    public synchronized void requestFinish() {
399        _terminate = true;
400        notifyAll();
401    }
402
403    /** Reset the local flags of this receiver. Use this method when
404     *  restarting execution.
405     */
406    @Override
407    public void reset() {
408        if (_readPending != null) {
409            _director.threadUnblocked(_readPending, this);
410        }
411
412        if (_writePending != null) {
413            _director.threadUnblocked(_writePending, this);
414        }
415
416        _terminate = false;
417        _boundaryDetector.reset();
418    }
419
420    /** Set the container. This overrides the base class to record
421     *  the director.
422     *  @param port The container.
423     *  @exception IllegalActionException If the container is not of
424     *   an appropriate subclass of IOPort, or if the container's director
425     *   is not an instance of ProcessDirector.
426     */
427    @Override
428    public void setContainer(IOPort port) throws IllegalActionException {
429        super.setContainer(port);
430
431        if (port == null) {
432            _director = null;
433        } else {
434            Actor actor = (Actor) port.getContainer();
435            Director director;
436
437            // For a composite actor,
438            // the receiver type of an input port is decided by
439            // the executive director.
440            // While the receiver type of an output is decided by the director.
441            // NOTE: getExecutiveDirector() and getDirector() yield the same
442            // result for actors that do not contain directors.
443            if (port.isInput()) {
444                director = actor.getExecutiveDirector();
445            } else {
446                director = actor.getDirector();
447            }
448
449            if (!(director instanceof ProcessDirector)) {
450                throw new IllegalActionException(port,
451                        "Cannot use an instance of PNQueueReceiver "
452                                + "since the director is not a PNDirector.");
453            }
454
455            _director = (ProcessDirector) director;
456        }
457    }
458
459    ///////////////////////////////////////////////////////////////////
460    ////                         private methods                   ////
461
462    /** The boundary detector. */
463    private BoundaryDetector _boundaryDetector;
464
465    /** The director in charge of this receiver. */
466    private ProcessDirector _director;
467
468    /** Reference to a thread that is read blocked on this receiver. */
469    private Thread _readPending = null;
470
471    /** Flag indicating that termination has been requested. */
472    private boolean _terminate = false;
473
474    /** Reference to a thread that is write blocked on this receiver. */
475    private Thread _writePending = null;
476}