001/* A base class for threaded DE domain actors.
002
003 Copyright (c) 1998-2015 The Regents of the University of California.
004 All rights reserved.
005 Permission is hereby granted, without written agreement and without
006 license or royalty fees, to use, copy, modify, and distribute this
007 software and its documentation for any purpose, provided that the above
008 copyright notice and the following two paragraphs appear in all copies
009 of this software.
010
011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
015 SUCH DAMAGE.
016
017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
022 ENHANCEMENTS, OR MODIFICATIONS.
023
024 PT_COPYRIGHT_VERSION_2
025 COPYRIGHTENDKEY
026
027 */
028package ptolemy.domains.de.kernel;
029
030import java.util.Iterator;
031
032import ptolemy.actor.IOPort;
033import ptolemy.actor.TypedCompositeActor;
034import ptolemy.kernel.util.IllegalActionException;
035import ptolemy.kernel.util.InternalErrorException;
036import ptolemy.kernel.util.KernelException;
037import ptolemy.kernel.util.NameDuplicationException;
038import ptolemy.kernel.util.PtolemyThread;
039
040///////////////////////////////////////////////////////////////////
041//// DEThreadActor
042
043/**
044 A base class for threaded DE domain actors.
045 <P>
046 NOTE: This actor is very preliminary. It is not developed and maintained
047 for a long time. We do not recommend using it. To try multiple threads under
048 DE semantics, use DDE domain, which is another experimental domain.
049 <P>
050 This actor, upon its initialization, will start another thread.
051 The thread communicate with the DEDirector thread by placing
052 events into the DEEventQueue asynchronously.
053 <P>
054 Subclass of this class should implement the run() method.
055 The subclass is executed in an event driven way. More precisely,
056 the implementation of the run() method should call
057 waitForNewInputs() after processing all current events. The
058 calls are blocked until the next time fire() is called.
059 Recall that the Director (after putting events into the
060 receiver of the input ports) will call fire() on the actor.
061 NOTE: The synchronization mechanism is implemented in DECQEventQueue
062 to ensure the correct multi-threading behaviour.
063 <P>
064 This implementation does not change the semantics of DEReceiver,
065 but still supports an asynchronous message passing type of
066 concurrency.
067
068 @author Lukito Muliadi
069 @version $Id$
070 @since Ptolemy II 0.2
071 @Pt.ProposedRating Red (lmuliadi)
072 @Pt.AcceptedRating Red (cxh)
073 @see DEActor
074 */
075public abstract class DEThreadActor extends DEActor implements Runnable {
076    /** Constructor.
077     *  @param container The container.
078     *  @param name The name of this actor.
079     *  @exception IllegalActionException If the entity cannot be contained
080     *   by the proposed container.
081     *  @exception NameDuplicationException If the container already has an
082     *   actor with this name.
083     */
084    public DEThreadActor(TypedCompositeActor container, String name)
085            throws NameDuplicationException, IllegalActionException {
086        super(container, name);
087    }
088
089    ///////////////////////////////////////////////////////////////////
090    ////                         public methods                    ////
091
092    /** Awake the thread running this actor.
093     */
094    @Override
095    public void fire() {
096
097        synchronized (_monitor) {
098            // Set the flag to false, to make sure only this actor wakes up.
099            _isWaiting = false;
100
101            _monitor.notifyAll();
102
103            // then wait until this actor go to wait.
104            while (!_isWaiting) {
105                try {
106                    _monitor.wait();
107                } catch (InterruptedException e) {
108                    System.err.println(KernelException.stackTraceToString(e));
109                }
110            }
111        }
112    }
113
114    /** Create a thread for the actor and start the thread.
115     */
116    @Override
117    public void initialize() {
118        // start a thread.
119        _thread = new PtolemyThread(this);
120        _isWaiting = true;
121        _thread.start();
122    }
123
124    /** Implement this method to define the job of the threaded actor.
125     */
126    @Override
127    public abstract void run();
128
129    /** Clear input ports then wait until
130     *  input events arrive.
131     */
132    public void waitForNewInputs() {
133        _emptyPorts();
134
135        synchronized (_monitor) {
136            // Set the flag to true, so the director can wake up.
137            _isWaiting = true;
138
139            _monitor.notifyAll();
140
141            while (_isWaiting) {
142                try {
143                    _monitor.wait();
144                } catch (InterruptedException e) {
145                    System.err.println(KernelException.stackTraceToString(e));
146                }
147            }
148        }
149    }
150
151    /** Wait for new inputs on the specified array of ports.
152     *  @param ports The array of ports whose inputs we're interested in.
153     *  @exception IllegalActionException If the specified array of ports
154     *  is not all input ports.
155     */
156    public void waitForNewInputs(IOPort[] ports) throws IllegalActionException {
157        _emptyPorts();
158
159        while (true) {
160            waitForNewInputs();
161
162            // check for availability of tokens in the list of ports.
163            // If any of the listed ports has at least a token, then return
164            // Otherwise, wait for more new inputs.
165            for (IOPort port : ports) {
166                for (int j = 0; j < port.getWidth(); j++) {
167                    if (port.hasToken(j)) {
168                        return;
169                    }
170                }
171            }
172        } // while (true)
173    }
174
175    ///////////////////////////////////////////////////////////////////
176    ////                         private methods                   ////
177    // Empty all receivers of all input ports.
178    // FIXME: Shouldn't this be guaranteed by the run() of the actor?
179    private void _emptyPorts() {
180        Iterator<?> ports = inputPortList().iterator();
181
182        while (ports.hasNext()) {
183            IOPort port = (IOPort) ports.next();
184
185            try {
186                for (int channel = 0; channel < port.getWidth(); channel++) {
187                    try {
188                        while (port.hasNewToken(channel)) {
189                            port.get(channel);
190                        }
191                    } catch (IllegalActionException ex) {
192                        throw new InternalErrorException(this, ex,
193                                "Failed to empty ports?");
194                    }
195                }
196            } catch (IllegalActionException ex) {
197                throw new InternalErrorException(this, ex,
198                        "At this time IllegalActionExceptions are not allowed to happen.\n"
199                                + "Width inference should already have been done.");
200            }
201        }
202    }
203
204    ///////////////////////////////////////////////////////////////////
205    ////                         private variables                 ////
206    private boolean _isWaiting = true;
207
208    private static Object _monitor = new Object();
209
210    private PtolemyThread _thread;
211}