001/* An output port that publishes its data on a named channel. 002 003 Copyright (c) 1997-2014 The Regents of the University of California. 004 All rights reserved. 005 Permission is hereby granted, without written agreement and without 006 license or royalty fees, to use, copy, modify, and distribute this 007 software and its documentation for any purpose, provided that the above 008 copyright notice and the following two paragraphs appear in all copies 009 of this software. 010 011 IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY 012 FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES 013 ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF 014 THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF 015 SUCH DAMAGE. 016 017 THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, 018 INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 019 MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE 020 PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF 021 CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, 022 ENHANCEMENTS, OR MODIFICATIONS. 023 024 PT_COPYRIGHT_VERSION_2 025 COPYRIGHTENDKEY 026 027 Review vectorized methods. 028 Review broadcast/get/send/hasRoom/hasToken. 029 Review setInput/setOutput/setMultiport. 030 Review isKnown/broadcastClear/sendClear. 031 createReceivers creates inside receivers based solely on insideWidth, and 032 outsideReceivers based solely on outside width. 033 connectionsChanged: no longer validates the attributes of this port. This is 034 now done in Manager.initialize(). 035 Review sendInside, getInside, getWidthInside, transferInputs/Outputs, etc. 036 */ 037package ptolemy.actor; 038 039import java.util.regex.Pattern; 040 041import ptolemy.data.BooleanToken; 042import ptolemy.data.Token; 043import ptolemy.data.expr.StringParameter; 044import ptolemy.kernel.ComponentEntity; 045import ptolemy.kernel.CompositeEntity; 046import ptolemy.kernel.util.Attribute; 047import ptolemy.kernel.util.IllegalActionException; 048import ptolemy.kernel.util.NameDuplicationException; 049import ptolemy.kernel.util.NamedObj; 050 051/////////////////////////////////////////////////////////////////// 052//// SubscriptionAggregatorPort 053 054/** 055 Aggregate data produced by multiple publishers. 056 057 <p>This is a generalization of the {@link 058 ptolemy.actor.SubscriberPort} (the base class) where the channel name 059 is interpreted as a regular expression. Data produced by all 060 publishers that publish on a channel name that matches the regular 061 expression are aggregated using the operation given by the {@link 062 #operation} parameter.</p> 063 064 <p>Note that the {@link ptolemy.actor.SubscriberPort#channel <i>channel</i>} 065 parameter of the superclass is now a regular expression in this class. 066 One thing to watch out for is using <code>.</code> instead of <code>\.</code>. 067 For example, <code>channel.foo</code> does not mean the same thing as 068 <code>channel\.foo</code>. The latter requires a dot between channel and 069 foo, where the former does not. 070 071 <p>Note that although this is a multiport, calls to get() should only reference 072 channel 0. An exception will be thrown otherwise. The result of the get will 073 be the aggregate of what is received on all the input channels. 074 075 @author Edward A. Lee 076 @version $Id$ 077 @since Ptolemy II 10.0 078 @Pt.ProposedRating Yellow (eal) 079 @Pt.AcceptedRating Red (eal) 080 */ 081public class SubscriptionAggregatorPort extends SubscriberPort { 082 083 /** Construct a subscriber port with a containing actor and a name. 084 * This is always an input port. 085 * @param container The container actor. 086 * @param name The name of the port. 087 * @exception IllegalActionException If the port is not of an acceptable 088 * class for the container, or if the container does not implement the 089 * Actor interface. 090 * @exception NameDuplicationException If the name coincides with 091 * a port already in the container. 092 */ 093 public SubscriptionAggregatorPort(ComponentEntity container, String name) 094 throws IllegalActionException, NameDuplicationException { 095 super(container, name); 096 097 operation = new StringParameter(this, "operation"); 098 operation.addChoice("add"); 099 operation.addChoice("multiply"); 100 operation.setExpression("add"); 101 102 setMultiport(true); 103 } 104 105 /////////////////////////////////////////////////////////////////// 106 //// parameters //// 107 108 /** The operation used to aggregate the data produced by 109 * matching publishers. The choices are "add" and "multiply". 110 * Note that "multiply" is a poor choice if the data type 111 * has a non-commutative multiplication operation (e.g. 112 * matrix types) because the result will be nondeterministic. 113 * This is a string that defaults to "add". 114 */ 115 public StringParameter operation; 116 117 /////////////////////////////////////////////////////////////////// 118 //// public methods //// 119 120 /** If a publish and subscribe channel is set, then set up the connections. 121 * If an aspect is added, removed or modified update the list of 122 * aspects. 123 * @param attribute The attribute that changed. 124 * @exception IllegalActionException Thrown if the new color attribute cannot 125 * be created. 126 */ 127 @Override 128 public void attributeChanged(Attribute attribute) 129 throws IllegalActionException { 130 if (attribute == operation) { 131 String newValue = operation.stringValue(); 132 if (newValue.equals("add")) { 133 _addOperation = true; 134 } else { 135 _addOperation = false; 136 } 137 } else if (attribute == channel) { 138 // Override the base class to use the version 139 // of unlinkToPublishedPort() that takes a Pattern 140 // argument rather than a String. 141 String newValue = channel.stringValue(); 142 if (!newValue.equals(_channel)) { 143 NamedObj immediateContainer = getContainer(); 144 if (immediateContainer != null) { 145 NamedObj container = immediateContainer.getContainer(); 146 if (container instanceof CompositeActor 147 && !(_channel == null 148 || _channel.trim().equals(""))) { 149 ((CompositeActor) container).unlinkToPublishedPort( 150 _channelPattern, this, _global); 151 } 152 _channel = newValue; 153 // Don't call super here because super.attributeChanged() tries to unlink _channel 154 // as a non-regular expression string, which seems wrong. 155 // super.attributeChanged(attribute); 156 _channelPattern = Pattern.compile(_channel); 157 } 158 } 159 } else if (attribute == global) { 160 boolean newValue = ((BooleanToken) global.getToken()) 161 .booleanValue(); 162 if (newValue == false && _global == true) { 163 NamedObj immediateContainer = getContainer(); 164 if (immediateContainer != null) { 165 NamedObj container = immediateContainer.getContainer(); 166 if (container instanceof CompositeActor 167 && !(_channel == null 168 || _channel.trim().equals(""))) { 169 ((CompositeActor) container).unlinkToPublishedPort( 170 _channelPattern, this, _global); 171 } 172 } 173 } 174 _global = newValue; 175 // Do not call SubscriptionAggregator.attributeChanged() 176 // because it will remove the published port name by _channel. 177 // If _channel is set to a real name (not a regex pattern), 178 // Then chaos ensues. See test 3.0 in SubscriptionAggregator.tcl 179 } else { 180 super.attributeChanged(attribute); 181 } 182 } 183 184 /** Get a token from the specified channel. 185 * This overrides the base class to first ensure that 186 * the <i>channelIndex</i> is 0 (or an exception is 187 * thrown), and then to aggregate the tokens from all 188 * of the input channels according to the 189 * {@link #operation} parameter and return the 190 * single token result. 191 * Specifically, it reads one token from each input channel 192 * that has a token, aggregates these, and returns the aggregate. 193 * @param channelIndex The channel index. This is required to be 0. 194 * @return An aggregation of the tokens from all input channels. 195 * @exception NoTokenException If there is no token. 196 * @exception IllegalActionException If there is no director, and hence 197 * no receivers have been created, if the port is not an input port, or 198 * if the channel index is not 0. 199 */ 200 @Override 201 public Token get(int channelIndex) 202 throws NoTokenException, IllegalActionException { 203 if (channelIndex != 0) { 204 throw new IllegalActionException(this, 205 "Although it is a multiport, you can only read" 206 + " from channel 0 of a SubscriptionAggregatorPort."); 207 } 208 Token result = null; 209 for (int i = 0; i < getWidth(); i++) { 210 if (super.hasToken(i)) { 211 Token input = super.get(i); 212 if (result == null) { 213 result = input; 214 } else { 215 if (_addOperation) { 216 result = result.add(input); 217 } else { 218 result = result.multiply(input); 219 } 220 } 221 } 222 } 223 if (result == null) { 224 throw new NoTokenException(this, "No input tokens"); 225 } 226 return result; 227 } 228 229 /** Get an array of tokens from the specified channel. 230 * This overrides the base class to first ensure that 231 * the <i>channelIndex</i> is 0 (or an exception is 232 * thrown), and then to aggregate the tokens from all 233 * of the input channels according to the 234 * {@link #operation} parameter and return the 235 * single token result. 236 * Specifically, it reads one token from each input channel 237 * that has a token, aggregates these, and returns the aggregate. 238 * @param channelIndex The channel index. This is required to be 0. 239 * @param vectorLength The number of valid tokens to get in the 240 * returned array. 241 * @return A token array with length 242 * <i>vectorLength</i> aggregating the inputs. 243 * @exception NoTokenException If there is not enough tokens. 244 * @exception IllegalActionException If there is no director, and hence 245 * no receivers have been created, if the port is not an input port, or 246 * if the channel index is not 0. 247 */ 248 @Override 249 public Token[] get(int channelIndex, int vectorLength) 250 throws NoTokenException, IllegalActionException { 251 if (channelIndex != 0) { 252 throw new IllegalActionException(this, 253 "Although it is a multiport, you can only read" 254 + " from channel 0 of a SubscriptionAggregatorPort."); 255 } 256 Token[] result = null; 257 for (int i = 0; i < getWidth(); i++) { 258 if (super.hasToken(i, vectorLength)) { 259 Token[] input = super.get(i, vectorLength); 260 if (result == null) { 261 result = input; 262 } else { 263 if (_addOperation) { 264 for (int j = 0; j < vectorLength; j++) { 265 result[j] = result[j].add(input[j]); 266 } 267 } else { 268 for (int j = 0; j < vectorLength; j++) { 269 result[j] = result[j].multiply(input[j]); 270 } 271 } 272 } 273 } 274 } 275 if (result == null) { 276 throw new NoTokenException(this, "Not engouh input tokens"); 277 } 278 return result; 279 } 280 281 /** Return the inside width of this port, which in this class is 282 * always 1. 283 * @return The width of the inside of the port. 284 */ 285 @Override 286 public int getWidthInside() { 287 return 1; 288 } 289 290 /** Return true if any input channel has a token. 291 * @param channelIndex The channel index. This is required to be 0. 292 * @return True if any input channel has a token. 293 * @exception IllegalActionException If the channel index is not 0 294 * or if the superclass throws it. 295 */ 296 @Override 297 public boolean hasToken(int channelIndex) throws IllegalActionException { 298 /* Allow asking about other channels. 299 if (channelIndex != 0) { 300 throw new IllegalActionException( 301 this, 302 "Although it is a multiport, you can only read" 303 + " from channel 0 of a SubscriptionAggregatorPort."); 304 } 305 */ 306 for (int i = 0; i < getWidth(); i++) { 307 if (super.hasToken(i)) { 308 return true; 309 } 310 } 311 return false; 312 } 313 314 /** Return true if every input channel that has tokens has enough tokens. 315 * @param channelIndex The channel index. This is required to be 0. 316 * @param vectorLength The number of tokens to query the channel for. 317 * @return True if every input channel that has tokens has enough tokens. 318 * @exception IllegalActionException If the channel index is not 0 319 * or if the superclass throws it. 320 */ 321 @Override 322 public boolean hasToken(int channelIndex, int vectorLength) 323 throws IllegalActionException { 324 /* Allow asking about other channels. 325 if (channelIndex != 0) { 326 throw new IllegalActionException( 327 this, 328 "Although it is a multiport, you can only read" 329 + " from channel 0 of a SubscriptionAggregatorPort."); 330 } 331 */ 332 boolean foundOne = false; 333 for (int i = 0; i < getWidth(); i++) { 334 if (super.hasToken(i)) { 335 foundOne = true; 336 if (!super.hasToken(i, vectorLength)) { 337 return false; 338 } 339 } 340 } 341 return foundOne; 342 } 343 344 /** Check that the port is not in the top level, then 345 * call preinitialize() in the super class. 346 * @exception IllegalActionException If the port is in 347 * the top level. 348 */ 349 @Override 350 public void preinitialize() throws IllegalActionException { 351 NamedObj actor = getContainer(); 352 if (actor != null && actor.getContainer() == null) { 353 throw new IllegalActionException(this, 354 "SubscriptionAggregatorPorts cannot be used at the top level, use a SubscriptionAggregator actor instead."); 355 } 356 super.preinitialize(); 357 } 358 359 /////////////////////////////////////////////////////////////////// 360 //// protected methods //// 361 362 /** Override the base class to always return 1. 363 * @param except The relation to exclude. 364 * @return The sums of the width of the relations linked on the inside, 365 * except for the specified port. 366 */ 367 @Override 368 protected int _getInsideWidth(IORelation except) { 369 return 1; 370 } 371 372 /** Update the connection to the publishers, if there are any. 373 * @exception IllegalActionException If creating the link 374 * triggers an exception. 375 */ 376 @Override 377 protected void _updateLinks() throws IllegalActionException { 378 // This overrides the base class to Pattern version 379 // rather than the String version of linkToPublishedPort(). 380 381 // If the channel has not been set, then there is nothing 382 // to do. This is probably the first setContainer() call, 383 // before the object is fully constructed. 384 if (_channelPattern == null) { 385 return; 386 } 387 388 NamedObj immediateContainer = getContainer(); 389 if (immediateContainer != null) { 390 NamedObj container = immediateContainer.getContainer(); 391 if (container instanceof CompositeActor) { 392 try { 393 try { 394 ((CompositeActor) container).linkToPublishedPort( 395 _channelPattern, this, _global); 396 } catch (IllegalActionException ex) { 397 // If we have a LazyTypedCompositeActor that 398 // contains the Publisher, then populate() the 399 // model, expanding the LazyTypedCompositeActors 400 // and retry the link. This is computationally 401 // expensive. 402 // See $PTII/ptolemy/actor/lib/test/auto/LazyPubSub.xml 403 _updatePublisherPorts((CompositeEntity) toplevel()); 404 // Now try again. 405 ((CompositeActor) container).linkToPublishedPort( 406 _channelPattern, this, _global); 407 } 408 } catch (NameDuplicationException e) { 409 throw new IllegalActionException(this, e, 410 "Can't link SubscriptionAggregatorPort with a PublisherPort."); 411 } 412 } 413 } 414 } 415 416 /////////////////////////////////////////////////////////////////// 417 //// private variables //// 418 419 /** Indicator that the operation is "add" rather than "multiply". */ 420 private boolean _addOperation = true; 421 422 /** Regex Pattern for _channelName. */ 423 private Pattern _channelPattern; 424}