001/*
002 * Copyright (c) 2009-2010 The Regents of the University of California.
003 * All rights reserved.
004 *
005 * '$Author: crawl $'
006 * '$Date: 2015-06-17 21:53:43 +0000 (Wed, 17 Jun 2015) $' 
007 * '$Revision: 33482 $'
008 * 
009 * Permission is hereby granted, without written agreement and without
010 * license or royalty fees, to use, copy, modify, and distribute this
011 * software and its documentation for any purpose, provided that the above
012 * copyright notice and the following two paragraphs appear in all copies
013 * of this software.
014 *
015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY
016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF
018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF
019 * SUCH DAMAGE.
020 *
021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE
024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF
025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES,
026 * ENHANCEMENTS, OR MODIFICATIONS.
027 *
028 */
029
030package org.kepler.provenance;
031
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Map;
037import java.util.Queue;
038import java.util.Vector;
039import java.util.concurrent.ConcurrentLinkedQueue;
040
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043
044import ptolemy.actor.IOPort;
045import ptolemy.actor.IOPortEvent;
046import ptolemy.actor.Receiver;
047import ptolemy.kernel.util.IllegalActionException;
048
049/** A utility class to map tokens from output to input ports.
050 *
051 * @author Daniel Crawl
052 * @version $Id: PortConnector.java 33482 2015-06-17 21:53:43Z crawl $
053 *
054 */
055public class PortConnector<Id>
056{
057
058    /** Construct a new PortConnector. */
059    public PortConnector()
060    {
061        _inputPortQueueMap = new HashMap<IOPort,Queue<Id>[]>();
062        _lastIdMap = new HashMap<IOPort, Vector<Id>>();
063    }
064
065    /** Clear all connections. */
066    public void clear() {
067        _inputPortQueueMap.clear();
068        _lastIdMap.clear();
069    }
070    
071    /** Create connections for a port. */
072    public void createConnections(IOPort port) throws RecordingException
073    {
074        int width = -1;
075        try
076        {
077            width = port.getWidth();
078        }
079        catch(IllegalActionException e)
080        {
081            throw new RecordingException("Error getting port width: ", e);
082        }
083
084        // see if port is already in table
085        Queue<Id>[] queues = _inputPortQueueMap.get(port);
086        Vector<Id> lastIds = _lastIdMap.get(port);
087        if(queues != null)
088        {
089            
090            // clear any values that were left by the previous execution.
091            // this can happen, e.g., if the execution is stopped by the
092            // user, an error occurred, etc.
093            for(Queue<Id> queue: queues) {
094                queue.clear();
095            }
096            
097            for(int i = 0; i < lastIds.size(); i++) {
098                lastIds.set(i, null);
099            }
100            
101            // see if the allocated queue array is too small
102            if(width > queues.length)
103            {
104                // increase the queue array to be the current width
105                // of the port and copy the current queues to the
106                // new array.
107                Queue<Id>[] newQueues = new ConcurrentLinkedQueue[width];
108                for(int i = 0; i < width; i++)
109                {
110                    if(i < queues.length)
111                    {
112                        newQueues[i] = queues[i];
113                    }
114                    else
115                    {
116                        newQueues[i] = new ConcurrentLinkedQueue<Id>();
117
118                        // increase the vector length by one for the
119                        // additional channel
120                        lastIds.add(null);
121                    }
122                }
123
124                _inputPortQueueMap.put(port, newQueues);
125            }
126        }
127        // register a new port
128        // NOTE: port may be output port in composite
129        else if(port.isOpaque())
130        {
131            queues = new ConcurrentLinkedQueue[width];
132            lastIds = new Vector<Id>();
133            for(int i = 0; i < width; i++)
134            {
135                queues[i] = new ConcurrentLinkedQueue<Id>();
136                //_debug("creating queue, channel " + i + ", for port " +
137                // port.getFullName() + " = " + queue[i].hashCode());
138                
139                // increase the vector length by one so the total
140                // size will be the port width.
141                lastIds.add(null);
142            }
143            _inputPortQueueMap.put(port, queues);
144            _lastIdMap.put(port, lastIds);
145        }        
146    }
147
148    /** Get the next Id for an input port and channel. */
149    public Id getNextId(IOPort port, int channel)
150        throws RecordingException
151    {
152        
153        //System.out.println("getNextId for " + port.getFullName() + " " + channel);
154        
155        Queue<Id> queue = _getQueueForPortAndChannel(port, channel);            
156        Id retval = queue.poll();
157        if(retval == null)
158        {   
159            throw new RecordingException("No write event for token " +
160                "received by input port " + port.getFullName() +
161                " on channel " + channel);
162        }
163        
164        
165        // save the Id for this port and channel in case it is refilled.
166        Vector<Id> lastIds = _lastIdMap.get(port);
167        lastIds.set(channel, retval);
168        
169        if(_isDebugging)
170        {
171            _log.debug("in port " + port.getFullName() + " pass = " + retval);
172        }
173
174        return retval;
175    }
176
177    /** Restore the previous Id to a port and channel. */
178    public void refillId(IOPort port, int channel) throws RecordingException
179    {
180        Vector<Id> lastIds = _lastIdMap.get(port);
181        if(lastIds == null)
182        {
183            throw new RecordingException("No last id map for port: " +
184                port.getFullName());
185        }
186        
187        if(channel >= lastIds.size())
188        {
189            throw new RecordingException("Invalid channel for port " + 
190                port.getFullName() + ": " + channel);
191        }
192        
193        if(lastIds.get(channel) == null)
194        {
195            throw new RecordingException("No previous data in port " +
196                port.getFullName() + " channel " + channel);
197        }
198        
199        // since we can't place the restored Id at the front of the
200        // queue, create a new queue with the restored Id, and then
201        // add all the existing queued elements at the end.
202        // XXX this is inefficient
203        Queue<Id> queue = _inputPortQueueMap.get(port)[channel];
204        synchronized(queue)
205        {
206            Queue<Id> queueCopy = new ConcurrentLinkedQueue<Id>(queue);
207            queue.clear();
208            queue.add(lastIds.get(channel));
209            queue.addAll(queueCopy);
210        }
211    }
212    
213    /** A port should receive an id on a specific channel. This method should
214     *  be used when a token is manually placed inside a receiver instead of
215     *  when sent with one of the IOPort.send methods.
216     */
217    public void receiveId(IOPort receiverPort, int channel, Id pass)
218        throws RecordingException
219    {
220        Queue<Id> queue = _getQueueForPortAndChannel(receiverPort, channel);
221        queue.add(pass);
222    }
223    
224    /** Send an Id to an output port's connections on a channel.
225     *  NOTE: the receivers must be created in the actors, otherwise
226     *  this method may fail to find connected ports. 
227     */
228    public void sendIdToConnections(IOPort senderPort, int channel, Id pass)
229        throws RecordingException
230    {
231        //System.out.println("send port " + senderPort.getFullName() +
232                //" " + channel);// +
233                //" pass = " + pass);
234
235        try
236        {
237            Receiver[][] allReceivers;
238           
239            // see if this an output port
240            if(senderPort.isOutput())
241            {
242                //_debug("is output");
243                
244                // FIXME: the javadoc for getRemoteReceivers() implies
245                // that remote receivers will be created if missing, but
246                // this does not always seem to be the case.
247                allReceivers = senderPort.getRemoteReceivers();
248            }
249            // this is not an output port, so in order to generate
250            // IOPortEvents that are writes, the port must be an
251            // opaque input port on a non-atomic entity.
252            else
253            {
254                //_debug("is input");
255                allReceivers = senderPort.deepGetReceivers();
256            }
257
258            // see if the port is connected
259            if(allReceivers.length > 0)
260            {
261                //_debug("has receivers, channel = " + channel);
262                List<Receiver> receiverList = new LinkedList<Receiver>();
263
264                // see if port write was broadcast
265                if(channel == IOPortEvent.ALLCHANNELS)
266                {
267                    // add all receivers on all channels to our receiver list
268                    for(Receiver[] receiverArray : allReceivers)
269                    {
270                        // make sure there are receivers on this channel
271                        if(receiverArray != null)
272                        {
273                            receiverList.addAll(Arrays.asList(receiverArray));
274                        }
275                    }
276                }
277                else
278                {
279                    // we only care about a specific channel
280
281                    // make sure there are receivers on this channel
282                    if(allReceivers[channel] != null)
283                    {
284                        receiverList = Arrays.asList(allReceivers[channel]);
285                    }
286                }
287
288                for(Receiver curReceiver : receiverList)
289                {
290                    //System.out.println("  receiver " + curReceiver);
291
292                    // get the port and channel number of this receiver
293                    IOPort recvPort = curReceiver.getContainer();
294
295                    //System.out.println("port " + recvPort);
296
297                    int recvChannel =
298                        recvPort.getChannelForReceiver(curReceiver);
299
300                    Queue<Id>[] queueArray = _inputPortQueueMap.get(recvPort);
301
302                    // see if queue array was not created or is too small.
303                    // this can happen if the port or channel was created
304                    // after the provenance recorder records the workflow
305                    // contents. for example the multi instance composite
306                    // actor creates its clones in preinitialize().
307                    if(queueArray == null || queueArray.length <= recvChannel)
308                    { 
309                        // create the queue array or make it bigger.
310                        createConnections(recvPort);
311                    }
312                    
313                    queueArray = _inputPortQueueMap.get(recvPort);
314
315                    if(queueArray == null)
316                    { 
317                        throw new RecordingException(
318                            "Unable to find queue array for " +
319                            recvPort.getFullName());
320                    }
321                    else if(queueArray.length <= recvChannel)
322                    {
323                        throw new RecordingException(
324                            "Invalid channel index for queue for " +
325                            recvPort.getFullName() + ": " + recvChannel);
326                    }
327
328                    queueArray[recvChannel].add(pass);
329        
330                    if(_isDebugging)
331                    {
332                        _log.debug("    to rcvr " + recvPort.getFullName());
333                    }
334
335                    //_debug("added to queue for " + recvPort.getFullName()
336                    // + " channel " + recvChannel + " q " +
337                    // queueArray[recvChannel].hashCode());
338                }
339            }
340        }
341        catch(IllegalActionException e)
342        {
343            throw new RecordingException("Error getting receivers for " +
344                senderPort.getFullName() + ": ", e);
345        }
346    }
347    
348    /** Return a queue for a port and channel. */
349    private Queue<Id> _getQueueForPortAndChannel(IOPort port, int channel)
350        throws RecordingException
351    {
352        Queue<Id>[] queue = _inputPortQueueMap.get(port);
353        if(queue == null)
354        {
355            throw new RecordingException("No event queue for input port: " +
356                port.getFullName());
357        }   
358                
359        //_debug("reading queue for  " + port.getFullName() + " channel " +
360        // channel + " polling q " + queue[channel].hashCode());
361
362        if(channel >= queue.length)
363        {
364            throw new RecordingException("Invalid channel for port " +
365                port.getFullName() + ": " + channel);
366        }
367        
368        return queue[channel];
369    }
370    
371    ///////////////////////////////////////////////////////////////////////
372    //// private fields                                                ////
373
374    /** A mapping of input port to an array of queues; the length of the
375     *  array is the width of the port.
376     */
377    private Map<IOPort,Queue<Id>[]> _inputPortQueueMap;
378    
379    /** A mapping of port to a vector of ids; used to restore the last Id
380     *  read from a port when a port refill event occurs. A vector of Ids
381     *  is used instead of an array since java does not permit the creation
382     *  of arrays of generics.
383     */
384    private Map<IOPort, Vector<Id>> _lastIdMap;
385        
386    private static final Log _log = LogFactory.getLog(PortConnector.class.getName());
387        private static final boolean _isDebugging = _log.isDebugEnabled();
388}