001/* A base class for threaded DE domain actors. 002 003 Copyright (c) 1998-2015 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 */ 028package ptolemy.domains.de.kernel; 029 030import java.util.Iterator; 031 032import ptolemy.actor.IOPort; 033import ptolemy.actor.TypedCompositeActor; 034import ptolemy.kernel.util.IllegalActionException; 035import ptolemy.kernel.util.InternalErrorException; 036import ptolemy.kernel.util.KernelException; 037import ptolemy.kernel.util.NameDuplicationException; 038import ptolemy.kernel.util.PtolemyThread; 039 040/////////////////////////////////////////////////////////////////// 041//// DEThreadActor 042 043/** 044 A base class for threaded DE domain actors. 045 <P> 046 NOTE: This actor is very preliminary. It is not developed and maintained 047 for a long time. We do not recommend using it. To try multiple threads under 048 DE semantics, use DDE domain, which is another experimental domain. 049 <P> 050 This actor, upon its initialization, will start another thread. 051 The thread communicate with the DEDirector thread by placing 052 events into the DEEventQueue asynchronously. 053 <P> 054 Subclass of this class should implement the run() method. 055 The subclass is executed in an event driven way. More precisely, 056 the implementation of the run() method should call 057 waitForNewInputs() after processing all current events. The 058 calls are blocked until the next time fire() is called. 059 Recall that the Director (after putting events into the 060 receiver of the input ports) will call fire() on the actor. 061 NOTE: The synchronization mechanism is implemented in DECQEventQueue 062 to ensure the correct multi-threading behaviour. 063 <P> 064 This implementation does not change the semantics of DEReceiver, 065 but still supports an asynchronous message passing type of 066 concurrency. 067 068 @author Lukito Muliadi 069 @version $Id$ 070 @since Ptolemy II 0.2 071 @Pt.ProposedRating Red (lmuliadi) 072 @Pt.AcceptedRating Red (cxh) 073 @see DEActor 074 */ 075public abstract class DEThreadActor extends DEActor implements Runnable { 076 /** Constructor. 077 * @param container The container. 078 * @param name The name of this actor. 079 * @exception IllegalActionException If the entity cannot be contained 080 * by the proposed container. 081 * @exception NameDuplicationException If the container already has an 082 * actor with this name. 083 */ 084 public DEThreadActor(TypedCompositeActor container, String name) 085 throws NameDuplicationException, IllegalActionException { 086 super(container, name); 087 } 088 089 /////////////////////////////////////////////////////////////////// 090 //// public methods //// 091 092 /** Awake the thread running this actor. 093 */ 094 @Override 095 public void fire() { 096 097 synchronized (_monitor) { 098 // Set the flag to false, to make sure only this actor wakes up. 099 _isWaiting = false; 100 101 _monitor.notifyAll(); 102 103 // then wait until this actor go to wait. 104 while (!_isWaiting) { 105 try { 106 _monitor.wait(); 107 } catch (InterruptedException e) { 108 System.err.println(KernelException.stackTraceToString(e)); 109 } 110 } 111 } 112 } 113 114 /** Create a thread for the actor and start the thread. 115 */ 116 @Override 117 public void initialize() { 118 // start a thread. 119 _thread = new PtolemyThread(this); 120 _isWaiting = true; 121 _thread.start(); 122 } 123 124 /** Implement this method to define the job of the threaded actor. 125 */ 126 @Override 127 public abstract void run(); 128 129 /** Clear input ports then wait until 130 * input events arrive. 131 */ 132 public void waitForNewInputs() { 133 _emptyPorts(); 134 135 synchronized (_monitor) { 136 // Set the flag to true, so the director can wake up. 137 _isWaiting = true; 138 139 _monitor.notifyAll(); 140 141 while (_isWaiting) { 142 try { 143 _monitor.wait(); 144 } catch (InterruptedException e) { 145 System.err.println(KernelException.stackTraceToString(e)); 146 } 147 } 148 } 149 } 150 151 /** Wait for new inputs on the specified array of ports. 152 * @param ports The array of ports whose inputs we're interested in. 153 * @exception IllegalActionException If the specified array of ports 154 * is not all input ports. 155 */ 156 public void waitForNewInputs(IOPort[] ports) throws IllegalActionException { 157 _emptyPorts(); 158 159 while (true) { 160 waitForNewInputs(); 161 162 // check for availability of tokens in the list of ports. 163 // If any of the listed ports has at least a token, then return 164 // Otherwise, wait for more new inputs. 165 for (IOPort port : ports) { 166 for (int j = 0; j < port.getWidth(); j++) { 167 if (port.hasToken(j)) { 168 return; 169 } 170 } 171 } 172 } // while (true) 173 } 174 175 /////////////////////////////////////////////////////////////////// 176 //// private methods //// 177 // Empty all receivers of all input ports. 178 // FIXME: Shouldn't this be guaranteed by the run() of the actor? 179 private void _emptyPorts() { 180 Iterator<?> ports = inputPortList().iterator(); 181 182 while (ports.hasNext()) { 183 IOPort port = (IOPort) ports.next(); 184 185 try { 186 for (int channel = 0; channel < port.getWidth(); channel++) { 187 try { 188 while (port.hasNewToken(channel)) { 189 port.get(channel); 190 } 191 } catch (IllegalActionException ex) { 192 throw new InternalErrorException(this, ex, 193 "Failed to empty ports?"); 194 } 195 } 196 } catch (IllegalActionException ex) { 197 throw new InternalErrorException(this, ex, 198 "At this time IllegalActionExceptions are not allowed to happen.\n" 199 + "Width inference should already have been done."); 200 } 201 } 202 } 203 204 /////////////////////////////////////////////////////////////////// 205 //// private variables //// 206 private boolean _isWaiting = true; 207 208 private static Object _monitor = new Object(); 209 210 private PtolemyThread _thread; 211}