001/* 002 * Copyright (c) 2009-2010 The Regents of the University of California. 003 * All rights reserved. 004 * 005 * '$Author: crawl $' 006 * '$Date: 2015-06-17 21:53:43 +0000 (Wed, 17 Jun 2015) $' 007 * '$Revision: 33482 $' 008 * 009 * Permission is hereby granted, without written agreement and without 010 * license or royalty fees, to use, copy, modify, and distribute this 011 * software and its documentation for any purpose, provided that the above 012 * copyright notice and the following two paragraphs appear in all copies 013 * of this software. 014 * 015 * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 016 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 017 * ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 018 * THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 019 * SUCH DAMAGE. 020 * 021 * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 022 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 023 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 024 * PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 025 * CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 026 * ENHANCEMENTS, OR MODIFICATIONS. 027 * 028 */ 029 030package org.kepler.provenance; 031 032import java.util.Arrays; 033import java.util.HashMap; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Map; 037import java.util.Queue; 038import java.util.Vector; 039import java.util.concurrent.ConcurrentLinkedQueue; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043 044import ptolemy.actor.IOPort; 045import ptolemy.actor.IOPortEvent; 046import ptolemy.actor.Receiver; 047import ptolemy.kernel.util.IllegalActionException; 048 049/** A utility class to map tokens from output to input ports. 050 * 051 * @author Daniel Crawl 052 * @version $Id: PortConnector.java 33482 2015-06-17 21:53:43Z crawl $ 053 * 054 */ 055public class PortConnector<Id> 056{ 057 058 /** Construct a new PortConnector. */ 059 public PortConnector() 060 { 061 _inputPortQueueMap = new HashMap<IOPort,Queue<Id>[]>(); 062 _lastIdMap = new HashMap<IOPort, Vector<Id>>(); 063 } 064 065 /** Clear all connections. */ 066 public void clear() { 067 _inputPortQueueMap.clear(); 068 _lastIdMap.clear(); 069 } 070 071 /** Create connections for a port. */ 072 public void createConnections(IOPort port) throws RecordingException 073 { 074 int width = -1; 075 try 076 { 077 width = port.getWidth(); 078 } 079 catch(IllegalActionException e) 080 { 081 throw new RecordingException("Error getting port width: ", e); 082 } 083 084 // see if port is already in table 085 Queue<Id>[] queues = _inputPortQueueMap.get(port); 086 Vector<Id> lastIds = _lastIdMap.get(port); 087 if(queues != null) 088 { 089 090 // clear any values that were left by the previous execution. 091 // this can happen, e.g., if the execution is stopped by the 092 // user, an error occurred, etc. 093 for(Queue<Id> queue: queues) { 094 queue.clear(); 095 } 096 097 for(int i = 0; i < lastIds.size(); i++) { 098 lastIds.set(i, null); 099 } 100 101 // see if the allocated queue array is too small 102 if(width > queues.length) 103 { 104 // increase the queue array to be the current width 105 // of the port and copy the current queues to the 106 // new array. 107 Queue<Id>[] newQueues = new ConcurrentLinkedQueue[width]; 108 for(int i = 0; i < width; i++) 109 { 110 if(i < queues.length) 111 { 112 newQueues[i] = queues[i]; 113 } 114 else 115 { 116 newQueues[i] = new ConcurrentLinkedQueue<Id>(); 117 118 // increase the vector length by one for the 119 // additional channel 120 lastIds.add(null); 121 } 122 } 123 124 _inputPortQueueMap.put(port, newQueues); 125 } 126 } 127 // register a new port 128 // NOTE: port may be output port in composite 129 else if(port.isOpaque()) 130 { 131 queues = new ConcurrentLinkedQueue[width]; 132 lastIds = new Vector<Id>(); 133 for(int i = 0; i < width; i++) 134 { 135 queues[i] = new ConcurrentLinkedQueue<Id>(); 136 //_debug("creating queue, channel " + i + ", for port " + 137 // port.getFullName() + " = " + queue[i].hashCode()); 138 139 // increase the vector length by one so the total 140 // size will be the port width. 141 lastIds.add(null); 142 } 143 _inputPortQueueMap.put(port, queues); 144 _lastIdMap.put(port, lastIds); 145 } 146 } 147 148 /** Get the next Id for an input port and channel. */ 149 public Id getNextId(IOPort port, int channel) 150 throws RecordingException 151 { 152 153 //System.out.println("getNextId for " + port.getFullName() + " " + channel); 154 155 Queue<Id> queue = _getQueueForPortAndChannel(port, channel); 156 Id retval = queue.poll(); 157 if(retval == null) 158 { 159 throw new RecordingException("No write event for token " + 160 "received by input port " + port.getFullName() + 161 " on channel " + channel); 162 } 163 164 165 // save the Id for this port and channel in case it is refilled. 166 Vector<Id> lastIds = _lastIdMap.get(port); 167 lastIds.set(channel, retval); 168 169 if(_isDebugging) 170 { 171 _log.debug("in port " + port.getFullName() + " pass = " + retval); 172 } 173 174 return retval; 175 } 176 177 /** Restore the previous Id to a port and channel. */ 178 public void refillId(IOPort port, int channel) throws RecordingException 179 { 180 Vector<Id> lastIds = _lastIdMap.get(port); 181 if(lastIds == null) 182 { 183 throw new RecordingException("No last id map for port: " + 184 port.getFullName()); 185 } 186 187 if(channel >= lastIds.size()) 188 { 189 throw new RecordingException("Invalid channel for port " + 190 port.getFullName() + ": " + channel); 191 } 192 193 if(lastIds.get(channel) == null) 194 { 195 throw new RecordingException("No previous data in port " + 196 port.getFullName() + " channel " + channel); 197 } 198 199 // since we can't place the restored Id at the front of the 200 // queue, create a new queue with the restored Id, and then 201 // add all the existing queued elements at the end. 202 // XXX this is inefficient 203 Queue<Id> queue = _inputPortQueueMap.get(port)[channel]; 204 synchronized(queue) 205 { 206 Queue<Id> queueCopy = new ConcurrentLinkedQueue<Id>(queue); 207 queue.clear(); 208 queue.add(lastIds.get(channel)); 209 queue.addAll(queueCopy); 210 } 211 } 212 213 /** A port should receive an id on a specific channel. This method should 214 * be used when a token is manually placed inside a receiver instead of 215 * when sent with one of the IOPort.send methods. 216 */ 217 public void receiveId(IOPort receiverPort, int channel, Id pass) 218 throws RecordingException 219 { 220 Queue<Id> queue = _getQueueForPortAndChannel(receiverPort, channel); 221 queue.add(pass); 222 } 223 224 /** Send an Id to an output port's connections on a channel. 225 * NOTE: the receivers must be created in the actors, otherwise 226 * this method may fail to find connected ports. 227 */ 228 public void sendIdToConnections(IOPort senderPort, int channel, Id pass) 229 throws RecordingException 230 { 231 //System.out.println("send port " + senderPort.getFullName() + 232 //" " + channel);// + 233 //" pass = " + pass); 234 235 try 236 { 237 Receiver[][] allReceivers; 238 239 // see if this an output port 240 if(senderPort.isOutput()) 241 { 242 //_debug("is output"); 243 244 // FIXME: the javadoc for getRemoteReceivers() implies 245 // that remote receivers will be created if missing, but 246 // this does not always seem to be the case. 247 allReceivers = senderPort.getRemoteReceivers(); 248 } 249 // this is not an output port, so in order to generate 250 // IOPortEvents that are writes, the port must be an 251 // opaque input port on a non-atomic entity. 252 else 253 { 254 //_debug("is input"); 255 allReceivers = senderPort.deepGetReceivers(); 256 } 257 258 // see if the port is connected 259 if(allReceivers.length > 0) 260 { 261 //_debug("has receivers, channel = " + channel); 262 List<Receiver> receiverList = new LinkedList<Receiver>(); 263 264 // see if port write was broadcast 265 if(channel == IOPortEvent.ALLCHANNELS) 266 { 267 // add all receivers on all channels to our receiver list 268 for(Receiver[] receiverArray : allReceivers) 269 { 270 // make sure there are receivers on this channel 271 if(receiverArray != null) 272 { 273 receiverList.addAll(Arrays.asList(receiverArray)); 274 } 275 } 276 } 277 else 278 { 279 // we only care about a specific channel 280 281 // make sure there are receivers on this channel 282 if(allReceivers[channel] != null) 283 { 284 receiverList = Arrays.asList(allReceivers[channel]); 285 } 286 } 287 288 for(Receiver curReceiver : receiverList) 289 { 290 //System.out.println(" receiver " + curReceiver); 291 292 // get the port and channel number of this receiver 293 IOPort recvPort = curReceiver.getContainer(); 294 295 //System.out.println("port " + recvPort); 296 297 int recvChannel = 298 recvPort.getChannelForReceiver(curReceiver); 299 300 Queue<Id>[] queueArray = _inputPortQueueMap.get(recvPort); 301 302 // see if queue array was not created or is too small. 303 // this can happen if the port or channel was created 304 // after the provenance recorder records the workflow 305 // contents. for example the multi instance composite 306 // actor creates its clones in preinitialize(). 307 if(queueArray == null || queueArray.length <= recvChannel) 308 { 309 // create the queue array or make it bigger. 310 createConnections(recvPort); 311 } 312 313 queueArray = _inputPortQueueMap.get(recvPort); 314 315 if(queueArray == null) 316 { 317 throw new RecordingException( 318 "Unable to find queue array for " + 319 recvPort.getFullName()); 320 } 321 else if(queueArray.length <= recvChannel) 322 { 323 throw new RecordingException( 324 "Invalid channel index for queue for " + 325 recvPort.getFullName() + ": " + recvChannel); 326 } 327 328 queueArray[recvChannel].add(pass); 329 330 if(_isDebugging) 331 { 332 _log.debug(" to rcvr " + recvPort.getFullName()); 333 } 334 335 //_debug("added to queue for " + recvPort.getFullName() 336 // + " channel " + recvChannel + " q " + 337 // queueArray[recvChannel].hashCode()); 338 } 339 } 340 } 341 catch(IllegalActionException e) 342 { 343 throw new RecordingException("Error getting receivers for " + 344 senderPort.getFullName() + ": ", e); 345 } 346 } 347 348 /** Return a queue for a port and channel. */ 349 private Queue<Id> _getQueueForPortAndChannel(IOPort port, int channel) 350 throws RecordingException 351 { 352 Queue<Id>[] queue = _inputPortQueueMap.get(port); 353 if(queue == null) 354 { 355 throw new RecordingException("No event queue for input port: " + 356 port.getFullName()); 357 } 358 359 //_debug("reading queue for " + port.getFullName() + " channel " + 360 // channel + " polling q " + queue[channel].hashCode()); 361 362 if(channel >= queue.length) 363 { 364 throw new RecordingException("Invalid channel for port " + 365 port.getFullName() + ": " + channel); 366 } 367 368 return queue[channel]; 369 } 370 371 /////////////////////////////////////////////////////////////////////// 372 //// private fields //// 373 374 /** A mapping of input port to an array of queues; the length of the 375 * array is the width of the port. 376 */ 377 private Map<IOPort,Queue<Id>[]> _inputPortQueueMap; 378 379 /** A mapping of port to a vector of ids; used to restore the last Id 380 * read from a port when a port refill event occurs. A vector of Ids 381 * is used instead of an array since java does not permit the creation 382 * of arrays of generics. 383 */ 384 private Map<IOPort, Vector<Id>> _lastIdMap; 385 386 private static final Log _log = LogFactory.getLog(PortConnector.class.getName()); 387 private static final boolean _isDebugging = _log.isDebugEnabled(); 388}